o
    NDi                     @   s   d dl mZmZ d dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
mZ eeZzej W n ey;   edw G dd deZG dd deZdS )	    )
ConnectionConnectionShutdownN)LockThread	get_identzgCannot use asyncioreactor without access to asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)c                   @   sH   e Zd ZdZedd Zdd Zedd Zdd	 Z	d
d Z
dd ZdS )AsyncioTimera  
    An ``asyncioreactor``-specific Timer. Similar to :class:`.connection.Timer,
    but with a slightly different API due to limitations in the underlying
    ``call_later`` interface. Not meant to be used with a
    :class:`.connection.TimerManager`.
    c                 C      t d)NzD{} is not compatible with TimerManager and does not implement .end()NotImplementedErrorself r   T/var/www/Datamplify/venv/lib/python3.10/site-packages/cassandra/io/asyncioreactor.pyend%   s   zAsyncioTimer.endc                 C   s"   | j ||d}tj||d| _d S )Ntimeoutcallbackloop)_call_delayed_coroasynciorun_coroutine_threadsafe_handle)r   r   r   r   delayedr   r   r   __init__*   s   zAsyncioTimer.__init__c                    s   t | I d H  | S N)r   sleepr   r   r   r   r   /   s   zAsyncioTimer._call_delayed_coroc                 C   s"   z| j |j k W S  ty   tw r   )r   AttributeErrorNotImplemented)r   otherr   r   r   __lt__4   s
   zAsyncioTimer.__lt__c                 C   s   | j   d S r   )r   cancelr   r   r   r   r!   :   s   zAsyncioTimer.cancelc                 C   r   )NzG{} is not compatible with TimerManager and does not implement .finish()r	   r   r   r   r   finish=   s   zAsyncioTimer.finishN)__name__
__module____qualname____doc__propertyr   r   staticmethodr   r    r!   r"   r   r   r   r   r      s    

r   c                   @   s~   e Zd ZdZdZe Ze Z	dZ
dZdZdd Zedd Zedd Zd	d
 Zdd Zdd Zdd Zdd Zdd ZdS )AsyncioConnectiona  
    An experimental implementation of :class:`.Connection` that uses the
    ``asyncio`` module in the Python standard library for its event loop.

    Note that it requires ``asyncio`` features that were only introduced in the
    3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.
    Nc                 O   sx   t j| g|R i | |   | jd t | _t | _	tj
|  | jd| _tj
|  | jd| _|   d S Nr   r   )r   r   _connect_socket_socketsetblockingr   Queue_write_queuer   _write_queue_lockr   handle_read_loop_read_watcherhandle_write_write_watcher_send_options_message)r   argskwargsr   r   r   r   V   s   



zAsyncioConnection.__init__c                 C   s   | j = | jt krd | _| jd u rt | _t| j | js8t	| jj
ddd| _| j  W d    d S W d    d S 1 sCw   Y  d S )NTasyncio_thread)targetdaemonname)_lock_pidosgetpidr2   r   new_event_loopset_event_loop_loop_threadr   run_foreverstart)clsr   r   r   initialize_reactori   s   

"z$AsyncioConnection.initialize_reactorc                 C   s   t ||| jdS )Nr   )r   r2   )rF   r   r   r   r   r   create_timery   s   zAsyncioConnection.create_timerc                 C   s\   | j  | jr	 W d    d S d| _W d    n1 sw   Y  tj|  | jd d S )NTr   )lock	is_closedr   r   _closer2   r   r   r   r   close}   s   

zAsyncioConnection.closec                    s   t dt| | jf  | jr| j  | jr| j  | jr7| j	| j
  | j| j
  | j  t d| jf  | jsT| td| j  | j  d S d S )NzClosing connection (%s) to %szClosed socket to %szConnection to %s was closed)logdebugidendpointr5   r!   r3   r,   r2   remove_writerfilenoremove_readerrL   
is_defuncterror_all_requestsr   connected_eventsetr   r   r   r   rK      s"   


zAsyncioConnection._closec                 C   s   | j }t||kr"g }tdt||D ]}|||||   qn|g}| jjt kr9tj| 	|| j
d d S | j
| 	| d S r*   )out_buffer_sizelenrangeappendrC   identr   r   r   	_push_msgr2   create_task)r   data	buff_sizechunksir   r   r   push   s   
zAsyncioConnection.pushc                    sJ   | j I d H  |D ]}| j| q
W d    d S 1 sw   Y  d S r   )r0   r/   
put_nowait)r   ra   chunkr   r   r   r]      s   "zAsyncioConnection._push_msgc              
      s   	 z| j  I d H }|r| j| j|I d H  W n* tjy9 } ztd| | | 	| W Y d }~d S d }~w t
jyC   Y d S w q)NTzException in send for %s: %s)r/   getr2   sock_sendallr,   socketerrorrM   rN   defunctr   CancelledError)r   next_msgerrr   r   r   r4      s    
zAsyncioConnection.handle_writec              
      s   	 z| j | j| jI d H }| j| W n> tjtjfy+   t	
dI d H  Y q tjyJ } ztd| | | | W Y d }~d S d }~w t	jyT   Y d S w |ra| j ra|   ntd|  |   d S q)NTr   z'Exception during socket recv for %s: %szConnection %s closed by server)r2   	sock_recvr,   in_buffer_size_iobufwritesslSSLWantWriteErrorSSLWantReadErrorr   r   rh   ri   rM   rN   rj   rk   tellprocess_io_bufferrL   )r   bufrm   r   r   r   r1      s0   

zAsyncioConnection.handle_read)r#   r$   r%   r&   r2   r?   r@   r>   r   r=   rC   r/   r0   r   classmethodrG   rH   rL   rK   rc   r]   r4   r1   r   r   r   r   r)   D   s&    

r)   )cassandra.connectionr   r   r   loggingr?   rh   rr   	threadingr   r   r   	getLoggerr#   rM   r   r   ImportErrorobjectr   r)   r   r   r   r   <module>   s"    
	
'