o
    NDi(                  
   @   sF  d Z ddlZddlZddlZddlmZ ddlmZmZ ddl	Z	ddl
mZmZ ddlmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZmZmZmZ z
ddlmZ dZW n e yr Z! z
dZe!Z"W Y dZ![!ndZ![!ww e#e$Z%dd Z&G dd dej'Z(G dd de)Z*eeG dd de)Z+G dd deZ,dS )zV
Module that implements an event loop based on twisted
( https://twistedmatrix.com ).
    N)partial)ThreadLock)reactorprotocol)connectProtocolTCP4ClientEndpointSSL4ClientEndpoint)IOpenSSLClientConnectionCreator)Failure)implementer)
ConnectionConnectionShutdownTimerTimerManagerConnectionException)SSLTFc                 C   s&   z|     W d S  ty   Y d S w N)_cleanupReferenceError)cleanup_weakref r   T/var/www/Datamplify/venv/lib/python3.10/site-packages/cassandra/io/twistedreactor.pyr   *   s
   r   c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )TwistedConnectionProtocolz[
    Twisted Protocol class for handling data received and connection
    made events.
    c                 C   s
   || _ d S r   )
connection)selfr   r   r   r   __init__7   s   
z"TwistedConnectionProtocol.__init__c                 C   s   | j j| | j   dS )z
        Callback function that is called when data has been received
        on the connection.

        Reaches back to the Connection object and queues the data for
        processing.
        N)r   _iobufwritehandle_readr   datar   r   r   dataReceived:   s   z&TwistedConnectionProtocol.dataReceivedc                 C   s   | j | j dS )z
        Callback function that is called when a connection has succeeded.

        Reaches back to the Connection object and confirms that the connection
        is ready.
        N)r   client_connection_made	transportr   r   r   r   connectionMadeE   s   z(TwistedConnectionProtocol.connectionMadec                 C   s   t d| | j|j d S )NzConnect lost: %s)logdebugr   defunctvalue)r   reasonr   r   r   connectionLostN   s   z(TwistedConnectionProtocol.connectionLostN)__name__
__module____qualname____doc__r   r"   r&   r,   r   r   r   r   r   1   s    	r   c                   @   sT   e Zd ZdZdZdZdZdd Zdd Zdd Z	dd	 Z
d
d Zdd Zdd ZdS )TwistedLoopNc                 C   s   t  | _t | _d S r   )r   _lockr   _timersr%   r   r   r   r   [   s   zTwistedLoop.__init__c                 C   s   | j 3 tjs.ttjdddid| _d| j_| j  t	t
tt|  W d    d S W d    d S 1 s9w   Y  d S )N#cassandra_driver_twisted_event_loopinstallSignalHandlersF)targetnamekwargsT)r2   r   runningr   run_threaddaemonstartatexitregisterr   r   weakrefrefr%   r   r   r   maybe_start_   s   
"zTwistedLoop.maybe_startc                 C   s   t jS r   )r   _stoppedr%   r   r   r   _reactor_stoppedi   s   zTwistedLoop._reactor_stoppedc                 C   sF   | j r!ttj | j jdd | j  rtd td d S d S )Ng      ?timeoutzrEvent loop thread could not be joined, so shutdown may not be clean. Please call Cluster.shutdown() to avoid this.zEvent loop thread was joined)	r;   r   callFromThreadstopjoinis_aliver'   warningr(   r%   r   r   r   r   l   s   

zTwistedLoop._cleanupc                 C   s    | j | t| j|j d S r   )r3   	add_timerr   rG   _schedule_timeoutend)r   timerr   r   r   rL   v   s   zTwistedLoop.add_timerc                 C   sh   |r2t |t  d}| jr%| j r%|| jk r#| j| || _d S d S t|| j| _|| _d S d S )Nr   )	maxtime_timeout_taskactive_timeoutresetr   	callLater_on_loop_timer)r   next_timeoutdelayr   r   r   rM   |   s   


zTwistedLoop._schedule_timeoutc                 C   s   | j   | | j j d S r   )r3   service_timeoutsrM   rX   r%   r   r   r   rW      s   
zTwistedLoop._on_loop_timer)r-   r.   r/   r2   r;   rR   rT   r   rB   rD   r   rL   rM   rW   r   r   r   r   r1   T   s    

r1   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
_SSLCreatorc                 C   s   || _ || _|| _|| _|r|| _nBttj| _d| jv r'| j| jd  d| jv r5| j	| jd  d| jv rC| j
| jd  d| jv rT| jj| jd | jd | j| j d S )Ncertfilekeyfileca_certs	cert_reqs)callback)endpointssl_optionscheck_hostnamerF   contextr   ContextTLSv1_METHODuse_certificate_fileuse_privatekey_fileload_verify_locations
set_verifyverify_callbackset_info_callbackinfo_callback)r   ra   ssl_contextrb   rc   rF   r   r   r   r      s&   



z_SSLCreator.__init__c                 C   s   |S r   r   )r   r   x509errnumerrdepthokr   r   r   rk      s   z_SSLCreator.verify_callbackc                 C   sT   |t j@ r$| jr&| jj|  jkr(| }|	t
td| j d S d S d S d S )NzHostname verification failed)r   SSL_CB_HANDSHAKE_DONErc   ra   addressget_peer_certificateget_subject
commonNameget_app_datafailVerificationr   r   )r   r   whereretr$   r   r   r   rm      s   
z_SSLCreator.info_callbackc                 C   sB   t | jd }|| | jrd| jv r|| jd d |S )Nserver_hostnameascii)r   r   rd   set_app_datarb   set_tlsext_host_nameencode)r   tlsProtocolr   r   r   r   clientConnectionForTLS   s
   
z"_SSLCreator.clientConnectionForTLSN)r-   r.   r/   r   rk   rm   r   r   r   r   r   r[      s
    r[   c                   @   sd   e Zd ZdZdZedd Zedd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd ZdS )TwistedConnectionz]
    An implementation of :class:`.Connection` that utilizes the
    Twisted event loop.
    Nc                 C   s   | j s	t | _ d S d S r   )_loopr1   )clsr   r   r   initialize_reactor   s   z$TwistedConnection.initialize_reactorc                 C   s   t ||}| j| |S r   )r   r   rL   )r   rF   r`   rO   r   r   r   create_timer   s   
zTwistedConnection.create_timerc                 O   sD   t j| g|R i | d| _d| _d| _t| j | j	  dS )a  
        Initialization method.

        Note that we can't call reactor methods directly here because
        it's not thread-safe, so we schedule the reactor/connection
        stuff to be run from the event loop thread when it gets the
        chance.
        TN)
r   r   	is_closed	connectorr$   r   rG   add_connectionr   rB   )r   argsr8   r   r   r   r      s   	zTwistedConnection.__init__c                 C   s(   | j s| jrtstttd d S d S )NzO, pyOpenSSL must be installed to enable SSL support with the Twisted event loop)rn   rb   _HAS_SSLImportErrorstrimport_exceptionr%   r   r   r   _check_pyopenssl   s   z"TwistedConnection._check_pyopensslc                 C   s   | j  \}}| js| jr.|   t| j | jr| jnd| j| j| j}tt	|||| jd}n	t
t	||| jd}t|t|  dS )z\
        Convenience function to connect and store the resulting
        connector.
        N)sslContextFactoryrF   rE   )ra   resolvern   rb   r   r[   _check_hostnameconnect_timeoutr	   r   r   r   r   )r   hostportssl_connection_creatorra   r   r   r   r      s0   z TwistedConnection.add_connectionc                 C   s>   | j  d| _W d   n1 sw   Y  || _|   dS )z]
        Called by twisted protocol when a connection attempt has
        succeeded.
        FN)lockr   r$   _send_options_message)r   r$   r   r   r   r#     s
   z(TwistedConnection.client_connection_madec                 C   s   | j  | jr	 W d   dS d| _W d   n1 sw   Y  tdt| | j t| jj	j
 td| j | jsO| td| j  | j  dS dS )z8
        Disconnect and error-out all requests.
        NTzClosing connection (%s) to %szClosed socket to %szConnection to %s was closed)r   r   r'   r(   idra   r   rG   r$   r   
disconnect
is_defuncterror_all_requestsr   connected_eventsetr%   r   r   r   close  s   zTwistedConnection.closec                 C   s   |    dS )z3
        Process the incoming data buffer.
        N)process_io_bufferr%   r   r   r   r   #  s   zTwistedConnection.handle_readc                 C   s   t | jj| dS )a  
        This function is called when outgoing data should be queued
        for sending.

        Note that we can't call transport.write() directly because
        it is not thread-safe, so we schedule it to run from within
        the event loop when it gets the chance.
        N)r   rG   r$   r   r    r   r   r   push)  s   	zTwistedConnection.push)r-   r.   r/   r0   r   classmethodr   r   r   r   r   r#   r   r   r   r   r   r   r   r      s    

#
r   )-r0   r>   loggingrQ   	functoolsr   	threadingr   r   r@   twisted.internetr   r   twisted.internet.endpointsr   r   r	   twisted.internet.interfacesr
   twisted.python.failurer   zope.interfacer   cassandra.connectionr   r   r   r   r   OpenSSLr   r   r   er   	getLoggerr-   r'   r   Protocolr   objectr1   r[   r   r   r   r   r   <module>   s8   
#8)