o
    NDi9                     @   s  d dl Z d dlmZ d dlmZ d dlZd dlZd dlZd dlZd dl	m
Z
mZmZ d dlZd dlZd dlZd dlZzd dlmZ W n eyQ   d dlmZ Y nw d dlmZ zd dlZW n eyi   edw d dlmZmZmZmZmZ eeZi a d	d
 Z!G dd deZ"G dd de#Z$G dd dej%Z&G dd de&Z'G dd de&Z(G dd de#Z)G dd de#Z*da+e ,ee!t+ G dd deej%Z-dS )    N)deque)partial)LockThreadEvent)WeakSet)DependencyExceptionzUnable to import asyncore module.  Note that this module has been removed in Python 3.12 so when using the driver with this version (or anything newer) you will need to use one of the other event loop implementations.)
ConnectionConnectionShutdownNONBLOCKINGTimerTimerManagerc                 C   s   | r|    d S d S N)_cleanup)loop r   U/var/www/Datamplify/venv/lib/python3.10/site-packages/cassandra/io/asyncorereactor.pyr   1   s   r   c                   @   s&   e Zd Zdd Zdd ZdddZdS )	WaitableTimerc                 C   s&   t | || || _t | _d | _d S r   )r   __init__callbackr   eventfinal_exception)selftimeoutr   r   r   r   r   7   s   
zWaitableTimer.__init__c              
   C   s^   zt | |}|r| j  W dS W dS  ty. } z|| _| j  W Y d }~dS d }~ww )NTF)r   finishr   set	Exceptionr   )r   time_nowfinisheder   r   r   r   >   s   

zWaitableTimer.finishNc                 C   s   | j | | jr| jd S r   )r   waitr   r   r   r   r   r   r    K   s   zWaitableTimer.waitr   )__name__
__module____qualname__r   r   r    r   r   r   r   r   6   s    r   c                   @   s.   e Zd Zdd Zdd Zdd Zd
dd	ZdS )_PipeWrapperc                 C   s
   || _ d S r   fd)r   r'   r   r   r   r   S   s   
z_PipeWrapper.__init__c                 C      | j S r   r&   r   r   r   r   filenoV      z_PipeWrapper.filenoc                 C   s   t | j d S r   )oscloser'   r)   r   r   r   r-   Y   s   z_PipeWrapper.closeNc                 C   s"   |t jkr|t jkr|sdS t Nr   )socket
SOL_SOCKETSO_ERRORNotImplementedError)r   leveloptnamebuflenr   r   r   
getsockopt\   s   z_PipeWrapper.getsockoptr   )r"   r#   r$   r   r*   r-   r6   r   r   r   r   r%   Q   s
    r%   c                   @   ,   e Zd Zdd Zdd Zdd Zdd Zd	S )
_AsyncoreDispatcherc                 C   s$   t jj| td | | d| _d S )N)mapF)asyncore
dispatcherr   _dispatcher_map
set_socket	_notified)r   r/   r   r   r   r   e   s   

z_AsyncoreDispatcher.__init__c                 C      dS NFr   r)   r   r   r   writablek      z_AsyncoreDispatcher.writablec                 C   s4   | j rJ |   | j sJ | d | j rJ d S )N皙?)r>   notify_loopr   r)   r   r   r   validaten   s
   


z_AsyncoreDispatcher.validatec                 C      t j|dtdd d S )NT   r   use_pollr9   countr:   r   r<   r!   r   r   r   r   u      z_AsyncoreDispatcher.loopN)r"   r#   r$   r   rA   rE   r   r   r   r   r   r8   c   s
    r8   c                   @   r7   )
_AsyncorePipeDispatcherc                 C   s&   t  \| _| _t| t| j d S r   )r,   piperead_fdwrite_fdr8   r   r%   r)   r   r   r   r   {   s   z _AsyncorePipeDispatcher.__init__c                 C   r?   r@   r   r)   r   r   r   rA      rB   z _AsyncorePipeDispatcher.writablec                 C   s8   t t| jddkr	 t t| jddksd| _d S )Ni   F)lenr,   readrO   r>   r)   r   r   r   handle_read   s   
z#_AsyncorePipeDispatcher.handle_readc                 C   s"   | j sd| _ t| jd d S d S )NT   x)r>   r,   writerP   r)   r   r   r   rD      s   z#_AsyncorePipeDispatcher.notify_loopN)r"   r#   r$   r   rA   rS   rD   r   r   r   r   rM   y   s
    rM   c                   @   s4   e Zd ZdZdZdd Zdd Zdd Zd	d
 ZdS )_AsyncoreUDPDispatchera  
    Experimental alternate dispatcher for avoiding busy wait in the asyncore loop. It is not used by default because
    it relies on local port binding.
    Port scanning is not implemented, so multiple clients on one host will collide. This address would need to be set per
    instance, or this could be specialized to scan until an address is found.

    To use::

        from cassandra.io.asyncorereactor import _AsyncoreUDPDispatcher, AsyncoreLoop
        AsyncoreLoop._loop_dispatch_class = _AsyncoreUDPDispatcher

    )	localhosti'  c                 C   s>   t  t jt j| _| j| j | jd t| | j d S r.   )	r/   AF_INET
SOCK_DGRAM_socketbindbind_addresssetblockingr8   r   r)   r   r   r   r      s   z_AsyncoreUDPDispatcher.__init__c              
   C   sf   z| j d}|r|d r| j d}|r|d sW n tjy- } zW Y d }~nd }~ww d| _d S )NrG   F)rZ   recvfromr/   errorr>   )r   dr   r   r   r   rS      s   
z"_AsyncoreUDPDispatcher.handle_readc                 C   s$   | j sd| _ | jd| j d S d S )NT    )r>   rZ   sendtor\   r)   r   r   r   rD      s   z"_AsyncoreUDPDispatcher.notify_loopc                 C   rF   )NFrG   rH   rK   r!   r   r   r   r      rL   z_AsyncoreUDPDispatcher.loopN)	r"   r#   r$   __doc__r\   r   rS   rD   r   r   r   r   r   rV      s    	rV   c                   @   s2   e Zd ZdZ	 dd Zdd Zdd Zdd	 Zd
S )_BusyWaitDispatchergMbP?c                 C      d S r   r   r)   r   r   r   rD      rB   z_BusyWaitDispatcher.notify_loopc                 C   s0   t std || j }tj| jdt |d d S )Ng{Gzt?TrH   )r<   timesleepmax_write_latencyr:   r   )r   r   rJ   r   r   r   r      s   

z_BusyWaitDispatcher.loopc                 C   re   r   r   r)   r   r   r   rE      rB   z_BusyWaitDispatcher.validatec                 C   re   r   r   r)   r   r   r   r-      rB   z_BusyWaitDispatcher.closeN)r"   r#   r$   rh   rD   r   rE   r-   r   r   r   r   rd      s    rd   c                   @   sZ   e Zd ZdZejdkreneZdd Z	dd Z
dd Zd	d
 Zdd Zdd Zdd ZdS )AsyncoreLooprC   ntc                 C   s   t  | _t | _d| _d| _d | _t | _	z| 
 }|  td| j
 W n ty>   td| j
 |  t }Y nw || _d S )NFzValidated loop dispatch with %szKFailed validating loop dispatch with %s. Using busy wait execution instead.)r,   getpid_pidr   
_loop_lock_started	_shutdown_threadr   _timers_loop_dispatch_classrE   logdebugr   	exceptionr-   rd   _loop_dispatcher)r   r;   r   r   r   r      s    


zAsyncoreLoop.__init__c                 C   s|   d}d}z| j d}|r| jsd| _d}W |r| j   n	|r&| j   w w |r<t| jdd| _d| j_| j  d S d S )NFT$asyncore_cassandra_driver_event_loop)targetname)	rm   acquirern   releaser   	_run_looprp   daemonstart)r   should_startdid_acquirer   r   r   maybe_start   s&   

zAsyncoreLoop.maybe_startc                 C   s   | j   d S r   )rv   rD   r)   r   r   r   	wake_loop   s   zAsyncoreLoop.wake_loopc                 C   s   t d | j9 | js7z| j| j | j  W n t	y3 } z| j
d|d W Y d }~nd }~ww | jrd| _W d    n1 sDw   Y  | 
d d S )NzStarting asyncore event loopz(Asyncore event loop stopped unexpectedly)exc_infoFzAsyncore event loop ended)rs   rt   rm   ro   rv   r   timer_resolutionrq   service_timeoutsr   _maybe_log_debugrn   )r   excr   r   r   r|      s   

zAsyncoreLoop._run_loopc                 O   s,   zt j|i | W d S  ty   Y d S w r   )rs   rt   r   )r   argskwargsr   r   r   r     s
   zAsyncoreLoop._maybe_log_debugc                 C   s   | j | |   d S r   )rq   	add_timerr   )r   timerr   r   r   r     s   zAsyncoreLoop.add_timerc                 C   s   d| _ | jsd S td | jjdd | j rtd td tt	 D ]}|| j
ur4|  q)| j  | j
  td d S )NTz(Waiting for event loop thread to join...g      ?)r   zrEvent loop thread could not be joined, so shutdown may not be clean. Please call Cluster.shutdown() to avoid this.zEvent loop thread was joinedzDispatchers were closed)ro   rp   rs   rt   joinis_alivewarningtupler<   valuesrv   r-   rq   r   )r   connr   r   r   r     s"   





zAsyncoreLoop._cleanupN)r"   r#   r$   r   r,   ry   rM   rd   rr   r   r   r   r|   r   r   r   r   r   r   r   ri      s    	ri   c                   @   s   e Zd ZdZdZdZedd Zedd Zedd Z	d	d
 Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZdS )AsyncoreConnectionz
    An implementation of :class:`.Connection` that uses the ``asyncore``
    module in the Python standard library for its event loop.
    Fc                 C   s@   t st a d S t }t j|krtd |   t a d S d S )Nz8Detected fork, clearing and reinitializing reactor state)_global_loopri   r,   rk   rl   rs   rt   handle_fork)clscurrent_pidr   r   r   initialize_reactorA  s   



z%AsyncoreConnection.initialize_reactorc                 C   s   i a trt  d ad S d S r   )r<   r   r   )r   r   r   r   r   M  s
   zAsyncoreConnection.handle_forkc                 C   s   t ||}t| |S r   )r   r   r   )r   r   r   r   r   r   r   create_timerU  s   

zAsyncoreConnection.create_timerc                 O   s   t j| g|R i | t | _t | _|   t  tdt	t
jj| | jtd}t| ||d  d| _d| _|   d S )Nr   )r   r   connect_timeoutT)r	   r   r   r   
deque_lock_connect_socketr   r   r   r   r:   r;   rZ   r<   r   r    	_writable	_readable_send_options_message)r   r   r   init_handlerr   r   r   r   [  s    
zAsyncoreConnection.__init__c                 C   s   | j  | jr	 W d    d S d| _W d    n1 sw   Y  tdt| | j d| _d| _| dt	t
jj|  td| j | jse| td| j  | j s^td| j | _| j  d S d S )NTzClosing connection (%s) to %sFr   zClosed socket to %szConnection to %s was closed)lock	is_closedrs   rt   idendpointr   r   r   r   r:   r;   r-   
is_defuncterror_all_requestsr
   connected_eventis_set
last_errorr   r)   r   r   r   r-   s  s&   
zAsyncoreConnection.closec                 C   s   |  t d  d S )NrG   )defunctsysr   r)   r   r   r   handle_error  rL   zAsyncoreConnection.handle_errorc                 C   s   t d|  |   d S )NzConnection %s closed by server)rs   rt   r-   r)   r   r   r   handle_close  s   zAsyncoreConnection.handle_closec                 C   sR  	 | j # z| j }W n ty   d| _Y W d    d S w W d    n1 s*w   Y  z
| |}d| _W nE tjy~ } z8|j	d t
v sT|j	d tjtjfv rn| j  | j| W d    n1 shw   Y  n| | W Y d }~d S d }~ww |t|k r| j  | j||d   W d    n1 sw   Y  |dkrd S q)NTFr   )r   r   popleft
IndexErrorr   sendr   r/   r_   r   r   sslSSL_ERROR_WANT_READSSL_ERROR_WANT_WRITE
appendleftr   rQ   )r   next_msgsenterrr   r   r   handle_write  s@   


	zAsyncoreConnection.handle_writec              
   C   s   z	 |  | j}| j| t|| jk rnqW n] tjyu } zPt|tj	rK|j
d tjtjfv r?| j s>W Y d }~d S n,| | W Y d }~d S |j
d tv r_| j s^W Y d }~d S n| | W Y d }~d S W Y d }~nd }~ww | j r|   | js| jsd| _d S d S d S d S )NTr   F)recvin_buffer_size_iobufrU   rQ   r/   r_   
isinstancer   SSLErrorr   r   r   tellr   r   process_io_buffer	_requestsis_control_connectionr   )r   bufr   r   r   r   rS     s<   





zAsyncoreConnection.handle_readc                 C   s   | j }t||kr"g }tdt||D ]}|||||   qn|g}| j | j| d| _W d    n1 s<w   Y  t	  d S )Nr   T)
out_buffer_sizerQ   rangeappendr   r   extendr   r   r   )r   datasabschunksir   r   r   push  s   zAsyncoreConnection.pushc                 C   r(   r   )r   r)   r   r   r   rA     r+   zAsyncoreConnection.writablec                 C   s    | j p| js	| jo| jp| j S r   )r   r   _continuous_paging_sessionsr   r   r)   r   r   r   readable  s    zAsyncoreConnection.readableN)r"   r#   r$   rc   r   r   classmethodr   r   r   r   r-   r   r   r   rS   r   rA   r   r   r   r   r   r   8  s&    


r   ).atexitcollectionsr   	functoolsr   loggingr,   r/   r   	threadingr   r   r   rf   weakrefr   r   ImportErrorcassandra.util	cassandrar   r:   ModuleNotFoundErrorcassandra.connectionr	   r
   r   r   r   	getLoggerr"   rs   r<   r   r   objectr%   r;   r8   rM   rV   rd   ri   r   registerr   r   r   r   r   <module>   sN   
'h