o
    NDi                     @   sv   d dl Z d dlZ d dlmZ d dl mZ d dlZ d dlZd dlZd dlm	Z	m
Z
mZmZ eeZG dd de	ZdS )    N)Queue)socket)
ConnectionConnectionShutdownTimerTimerManagerc                   @   s   e Zd ZdZdZdZejZej	Z
dZdZdZedd Zedd Zedd Zd	d
 Zdd Zdd Zdd Zdd Zdd ZdS )GeventConnectionz
    An implementation of :class:`.Connection` that utilizes ``gevent``.

    This implementation assumes all gevent monkey patching is active. It is not tested with partial patching.
    Nc                 C   s0   | j st | _ t| j| _tj | _d S d S N)	_timersr   geventspawnservice_timeouts_timeout_watchereventEvent
_new_timer)cls r   S/var/www/Datamplify/venv/lib/python3.10/site-packages/cassandra/io/geventreactor.pyinitialize_reactor/   s
   z#GeventConnection.initialize_reactorc                 C   s$   t ||}| j| | j  |S r	   )r   r
   	add_timerr   set)r   timeoutcallbacktimerr   r   r   create_timer6   s   

zGeventConnection.create_timerc                 C   sD   | j }| j}	 | }|rt|t  dnd}|| |  q)NTr   i'  )r
   r   r   maxtimewaitclear)r   timer_managertimer_eventnext_end
sleep_timer   r   r   r   =   s   
z!GeventConnection.service_timeoutsc                 O   sP   t j| g|R i | t | _|   t| j| _t| j	| _
|   d S r	   )r   __init__r   _write_queue_connect_socketr   r   handle_read_read_watcherhandle_write_write_watcher_send_options_message)selfargskwargsr   r   r   r$   G   s   zGeventConnection.__init__c                 C   s   | j  | jr	 W d    d S d| _W d    n1 sw   Y  tdt| | jf  | jr8| jjdd | jrB| jjdd | j	rJ| j	
  td| jf  | jsg| td| j  | j  d S d S )NTzClosing connection (%s) to %sF)blockzClosed socket to %szConnection to %s was closed)lock	is_closedlogdebugidendpointr(   killr*   _socketclose
is_defuncterror_all_requestsr   connected_eventr   r,   r   r   r   r8   R   s(   
zGeventConnection.closec                 C   s   t d |   d S )Nzconnection closed by server)r2   r3   r8   r<   r   r   r   handle_closeg   s   
zGeventConnection.handle_closec              
   C   s`   	 z| j  }| j| W n  tjy. } ztd| | | | W Y d }~d S d }~ww q)NTzException in send for %s: %s)	r%   getr7   sendallr   errorr2   r3   defunct)r,   next_msgerrr   r   r   r)   k   s   

zGeventConnection.handle_writec              
   C   s   	 z| j | j}| j| W n  tjy0 } ztd| | | 	| W Y d }~d S d }~ww |r=| j
 r=|   ntd|  |   d S q)NTzException in read for %s: %szConnection %s closed by server)r7   recvin_buffer_size_iobufwriter   r@   r2   r3   rA   tellprocess_io_bufferr8   )r,   bufrC   r   r   r   r'   u   s    

zGeventConnection.handle_readc                 C   s8   | j }tdt||D ]}| j||||   qd S )Nr   )out_buffer_sizerangelenr%   put)r,   data
chunk_sizeir   r   r   push   s   zGeventConnection.push)__name__
__module____qualname____doc__r(   r*   r   r   _socket_implssl	_ssl_implr
   r   r   classmethodr   r   r   r$   r8   r=   r)   r'   rR   r   r   r   r   r      s*    


	
r   )r   gevent.eventgevent.queuer   r   
gevent.sslloggingr   cassandra.connectionr   r   r   r   	getLoggerrS   r2   r   r   r   r   r   <module>   s   
