o
    NDi                    @   s  d dl mZ d dlmZmZ d dlZd dlmZmZm	Z	 d dl
mZmZ d dlZd dlZd dlZd dlZd dlZd dlmZmZmZmZ d dlZd dlZd dlZdejv r^d dlmZmZ nd dlmZmZ d d	lm Z m!Z!m"Z"m#Z# d d
l$m%Z% d dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7 d dl8m9Z9m:Z: d dl;m<Z< e=e>Z?e9 Z@dZAe< ZBzd dlCZCW n	 eDy   Y nBw zd dlCmEZF W n eDy   eCZFY nw zeFjG eFjH W n eIy   eDdjJeKeFdw dd ZLdd ZMeLeMfeBd< e9eLeMZAzd dlNZNW n
 eDy   Y nw dd ZHeNjGeHfeBd< dejd jOZPZQdZRd ZSdZTdZUeVdZWeVdZXG dd  d eYZZG d!d" d"eYZ[e	G d#d$ d$eZZ\G d%d& d&e[Z]e	G d'd( d(eZZ^G d)d* d*e[Z_e	G d+d, d,eZZ`G d-d. d.eYZaejbejcfZdG d/d0 d0eeZfG d1d2 d2efZgG d3d4 d4efZhG d5d6 d6eeZiG d7d8 d8eeZjG d9d: d:efZkG d;d< d<eYZlG d=d> d>eYZmd?d@ ZndAZoG dBdC dCeYZpG dDdE dEeYZqG dFdG dGeYZrG dHdI dIeYZsG dJdK dKeZtG dLdM dMeYZuG dNdO dOeYZvdS )P    )absolute_import)defaultdictdequeN)wrapspartialtotal_ordering)heappushheappop)ThreadEventRLock	Conditionzgevent.monkey)QueueEmpty)ConsistencyLevelAuthenticationFailedOperationTimedOutProtocolVersion)
int32_pack)ReadyMessageAuthenticateMessageOptionsMessageStartupMessageErrorMessageCredentialsMessageQueryMessageResultMessageProtocolHandlerInvalidRequestExceptionSupportedMessageAuthResponseMessageAuthChallengeMessageAuthSuccessMessageProtocolExceptionRegisterMessageReviseRequestMessage)SegmentCodecCrcException)OrderedDict)blockzlz4 not imported correctly. Imported object should have .compress and and .decompress attributes but does not. Please file a bug report on JIRA. (Imported object was {lz4_block}))	lz4_blockc                 C   s   t t| t| dd   S )N   )r   lenr*   compressbyts r0   M/var/www/Datamplify/venv/lib/python3.10/site-packages/cassandra/connection.pylz4_compressV   s   r2   c                 C   s    t | dd d | dd   S )N   r+   )r*   
decompressr.   r0   r0   r1   lz4_decompressZ   s    r6   lz4c                 C   s   | dkrdS t | S )N  )snappyr5   r.   r0   r0   r1   r5   g   s   
r5   r:   zDataStax Python Driver	cassandra      z>BbBiz>BhBic                   @   sH   e Zd ZdZedd Zedd Zedd Zedd	 Zd
d Z	dS )EndPointzD
    Represents the information to connect to a cassandra node.
    c                 C      t  )zq
        The IP address of the node. This is the RPC address the driver uses when connecting to the node
        NotImplementedErrorselfr0   r0   r1   address~      zEndPoint.addressc                 C   r?   )z'
        The port of the node.
        r@   rB   r0   r0   r1   port   rE   zEndPoint.portc                 C      dS )z8
        SSL options specific to this endpoint.
        Nr0   rB   r0   r0   r1   ssl_options   s   zEndPoint.ssl_optionsc                 C      t jS )z4
        The socket family of the endpoint.
        )socket	AF_UNSPECrB   r0   r0   r1   socket_family   rE   zEndPoint.socket_familyc                 C   r?   )zl
        Resolve the endpoint to an address/port. This is called
        only on socket connection.
        r@   rB   r0   r0   r1   resolve   rE   zEndPoint.resolveN)
__name__
__module____qualname____doc__propertyrD   rF   rH   rL   rM   r0   r0   r0   r1   r>   y   s    



r>   c                   @   s    e Zd ZdZdd Zdd ZdS )EndPointFactoryNc                 C   s
   || _ | S )zJ
        This is called by the cluster during its initialization.
        )cluster)rC   rT   r0   r0   r1   	configure   s   zEndPointFactory.configurec                 C   r?   )z=
        Create an EndPoint from a system.peers row.
        r@   )rC   rowr0   r0   r1   create   s   zEndPointFactory.create)rN   rO   rP   rT   rU   rW   r0   r0   r0   r1   rS      s    rS   c                   @   sb   e Zd ZdZdddZedd Zedd Zd	d
 Zdd Z	dd Z
dd Zdd Zdd ZdS )DefaultEndPointzN
    Default EndPoint implementation, basically just an address and port.
    R#  c                 C      || _ || _d S N_address_port)rC   rD   rF   r0   r0   r1   __init__      
zDefaultEndPoint.__init__c                 C      | j S r[   )r]   rB   r0   r0   r1   rD         zDefaultEndPoint.addressc                 C   ra   r[   r^   rB   r0   r0   r1   rF      rb   zDefaultEndPoint.portc                 C   s   | j | jfS r[   r\   rB   r0   r0   r1   rM         zDefaultEndPoint.resolvec                 C   s"   t |to| j|jko| j|jkS r[   )
isinstancerX   rD   rF   rC   otherr0   r0   r1   __eq__   s
   


zDefaultEndPoint.__eq__c                 C   s   t | j| jfS r[   )hashrD   rF   rB   r0   r0   r1   __hash__      zDefaultEndPoint.__hash__c                 C   s   | j | jf|j |jfk S r[   )rD   rF   rf   r0   r0   r1   __lt__      zDefaultEndPoint.__lt__c                 C   s   t d| j| jf S )Nz%s:%d)strrD   rF   rB   r0   r0   r1   __str__      zDefaultEndPoint.__str__c                 C   s   d| j j| j| jf S )Nz<%s: %s:%d>)	__class__rN   rD   rF   rB   r0   r0   r1   __repr__   s   zDefaultEndPoint.__repr__NrY   )rN   rO   rP   rQ   r_   rR   rD   rF   rM   rh   rj   rl   ro   rr   r0   r0   r0   r1   rX      s    


rX   c                   @   s$   e Zd ZdZ	 dddZdd ZdS )DefaultEndPointFactoryNc                 C   
   || _ d S r[   )rF   )rC   rF   r0   r0   r1   r_         
zDefaultEndPointFactory.__init__c                 C   sL   ddl m} ||}||}|d u r| jr| jnd}t| jj||S )Nr   )	_NodeInforY   )	cassandra.metadatarw   get_broadcast_rpc_addressget_broadcast_rpc_portrF   rX   rT   address_translator	translate)rC   rV   rw   addrrF   r0   r0   r1   rW      s   

zDefaultEndPointFactory.creater[   )rN   rO   rP   rF   r_   rW   r0   r0   r0   r1   rt      s
    
rt   c                   @   sn   e Zd ZdZdddZedd Zedd Zed	d
 Zdd Z	dd Z
dd Zdd Zdd Zdd ZdS )SniEndPointz"SNI Proxy EndPoint implementation.rY   c                 C   s,   || _ d| _d | _|| _|| _d|i| _d S )Nr   server_hostname)_proxy_address_index_resolved_addressr^   _server_name_ssl_options)rC   proxy_addressserver_namerF   r0   r0   r1   r_      s   zSniEndPoint.__init__c                 C   ra   r[   )r   rB   r0   r0   r1   rD      rb   zSniEndPoint.addressc                 C   ra   r[   rc   rB   r0   r0   r1   rF     rb   zSniEndPoint.portc                 C   ra   r[   )r   rB   r0   r0   r1   rH     rb   zSniEndPoint.ssl_optionsc                 C   s   zt | j| jt jt j}W n t jy"   td| j| jf   w t	dd |D | j
t|  | _|  j
d7  _
| j| jfS )Nz6Could not resolve sni proxy hostname "%s" with port %dc                 s   s    | ]	}|d  d V  qdS )r+   r   Nr0   ).0r}   r0   r0   r1   	<genexpr>  s    z&SniEndPoint.resolve.<locals>.<genexpr>   )rJ   getaddrinfor   r^   rK   SOCK_STREAMgaierrorlogdebugsortedr   r,   r   )rC   resolved_addressesr0   r0   r1   rM     s   
"zSniEndPoint.resolvec                 C   s.   t |to| j|jko| j|jko| j|jkS r[   )re   r~   rD   rF   r   rf   r0   r0   r1   rh     s   



zSniEndPoint.__eq__c                 C   s   t | j| j| jfS r[   )ri   rD   rF   r   rB   r0   r0   r1   rj      rp   zSniEndPoint.__hash__c                 C   s    | j | j| jf|j |j| jfk S r[   )rD   rF   r   rf   r0   r0   r1   rl   #  s   zSniEndPoint.__lt__c                 C   s   t d| j| j| jf S )Nz%s:%d:%s)rn   rD   rF   r   rB   r0   r0   r1   ro   '  rm   zSniEndPoint.__str__c                 C   s   d| j j| j| j| jf S )Nz<%s: %s:%d:%s>)rq   rN   rD   rF   r   rB   r0   r0   r1   rr   *  s   zSniEndPoint.__repr__Nrs   )rN   rO   rP   rQ   r_   rR   rD   rF   rH   rM   rh   rj   rl   ro   rr   r0   r0   r0   r1   r~      s    



r~   c                   @   $   e Zd Zdd Zdd Zdd ZdS )SniEndPointFactoryc                 C   rZ   r[   )r   r^   )rC   r   rF   r0   r0   r1   r_   1  r`   zSniEndPointFactory.__init__c                 C   s.   | d}|d u rtdt| jt|| jS )Nhost_idz$No host_id to create the SniEndPoint)get
ValueErrorr~   r   rn   r^   )rC   rV   r   r0   r0   r1   rW   5  s   
zSniEndPointFactory.createc                 C   s   t | j|| jS r[   )r~   r   r^   )rC   snir0   r0   r1   create_from_sni<  rk   z"SniEndPointFactory.create_from_sniN)rN   rO   rP   r_   rW   r   r0   r0   r0   r1   r   /  s    r   c                   @   sl   e Zd ZdZdd Zedd Zedd Zedd	 Zd
d Z	dd Z
dd Zdd Zdd Zdd ZdS )UnixSocketEndPointz.
    Unix Socket EndPoint implementation.
    c                 C   ru   r[   _unix_socket_path)rC   unix_socket_pathr0   r0   r1   r_   F  rv   zUnixSocketEndPoint.__init__c                 C   ra   r[   r   rB   r0   r0   r1   rD   I  rb   zUnixSocketEndPoint.addressc                 C      d S r[   r0   rB   r0   r0   r1   rF   M  s   zUnixSocketEndPoint.portc                 C   rI   r[   )rJ   AF_UNIXrB   r0   r0   r1   rL   Q  rb   z UnixSocketEndPoint.socket_familyc                 C   s
   | j d fS r[   )rD   rB   r0   r0   r1   rM   U  rv   zUnixSocketEndPoint.resolvec                 C   s   t |to
| j|jkS r[   )re   r   r   rf   r0   r0   r1   rh   X  s   

zUnixSocketEndPoint.__eq__c                 C   s
   t | jS r[   )ri   r   rB   r0   r0   r1   rj   \  rv   zUnixSocketEndPoint.__hash__c                 C      | j |j k S r[   r   rf   r0   r0   r1   rl   _  rd   zUnixSocketEndPoint.__lt__c                 C   s   t d| jf S )Nz%s)rn   r   rB   r0   r0   r1   ro   b  rk   zUnixSocketEndPoint.__str__c                 C   s   d| j j| jf S )Nz<%s: %s>)rq   rN   r   rB   r0   r0   r1   rr   e  s   zUnixSocketEndPoint.__repr__N)rN   rO   rP   rQ   r_   rR   rD   rF   rL   rM   rh   rj   rl   ro   rr   r0   r0   r0   r1   r   @  s    


r   c                   @   r   )_Framec                 C   s(   || _ || _|| _|| _|| _|| _d S r[   )versionflagsstreamopcodebody_offsetend_pos)rC   r   r   r   r   r   r   r0   r0   r1   r_   j  s   
z_Frame.__init__c                 C   sV   t |tr)| j|jko(| j|jko(| j|jko(| j|jko(| j|jko(| j|jkS tS r[   )	re   r   r   r   r   r   r   r   NotImplementedrf   r0   r0   r1   rh   r  s   





z_Frame.__eq__c              	   C   s&   d | j| j| j| j| j| j| j S )NzEver({0}); flags({1:04b}); stream({2}); op({3}); offset({4}); len({5}))formatr   r   r   r   r   r   rB   r0   r0   r1   ro   |  s   &z_Frame.__str__N)rN   rO   rP   r_   rh   ro   r0   r0   r0   r1   r   i  s    
r   c                   @   s&   e Zd ZdZdddZedd ZdS )ConnectionExceptionz
    An unrecoverable error was hit when attempting to use a connection,
    or the connection was already closed or defunct.
    Nc                 C   s   t | | || _d S r[   )	Exceptionr_   endpoint)rC   messager   r0   r0   r1   r_     s   
zConnectionException.__init__c                 C      | j jS r[   r   rD   rB   r0   r0   r1   host     zConnectionException.hostr[   )rN   rO   rP   rQ   r_   rR   r   r0   r0   r0   r1   r     s
    
r   c                   @      e Zd ZdZdS )ConnectionShutdownzQ
    Raised when a connection has been marked as defunct or has been closed.
    NrN   rO   rP   rQ   r0   r0   r0   r1   r         r   c                       s    e Zd ZdZ fddZ  ZS )ProtocolVersionUnsupportedzM
    Server rejected startup message due to unsupported protocol version
    c                    s(   d||f }t t| || || _d S )Nz&Unsupported protocol version on %s: %d)superr   r_   startup_version)rC   r   r   msgrq   r0   r1   r_     s   
z#ProtocolVersionUnsupported.__init__)rN   rO   rP   rQ   r_   __classcell__r0   r0   r   r1   r     s    r   c                   @   r   )ConnectionBusyz
    An attempt was made to send a message through a :class:`.Connection` that
    was already at the max number of in-flight operations.
    Nr   r0   r0   r0   r1   r     s    r   c                   @   r   )ProtocolErrorzL
    Communication did not match the protocol that this driver expects.
    Nr   r0   r0   r0   r1   r     r   r   c                   @      e Zd ZdS )CrcMismatchExceptionNrN   rO   rP   r0   r0   r0   r1   r         r   c                   @   s*   e Zd ZdZdZ	 dZ	 dZ	 dd ZdS )ContinuousPagingStatez_
     A class for specifying continuous paging state, only supported starting with DSE_V2.
    Nc                 C   s   || _ d| _|| _d S Nr   )num_pages_requestednum_pages_receivedmax_queue_size)rC   r   r0   r0   r1   r_     s   
zContinuousPagingState.__init__)rN   rO   rP   rQ   r   r   r   r_   r0   r0   r0   r1   r     s    r   c                   @   s\   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd ZdS )ContinuousPagingSessionc                 C   s>   || _ || _|| _|| _t | _d| _t | _|| _	d| _
d S NF)	stream_iddecoderrow_factory
connectionr   
_condition_stopr   _page_queue_statereleased)rC   r   r   r   r   stater0   r0   r1   r_     s   
z ContinuousPagingSession.__init__c                 C   s4   t |tr| | d S t |tr| | d S d S r[   )re   r   on_pager   on_errorrC   resultr0   r0   r1   
on_message  s
   

z"ContinuousPagingSession.on_messagec                 C   s   | j + | jr| j jd7  _| j|j|jd f |  j|jO  _| j 	  W d    n1 s1w   Y  |jr>d| _
d S d S )Nr   T)r   r   r   r   
appendleftcolumn_namesparsed_rowsr   continuous_paging_lastnotifyr   r   r0   r0   r1   r     s   
zContinuousPagingSession.on_pagec                 C   st   t |tr	| }td|| j | j | jd d |f d| _	| j
  W d    n1 s0w   Y  d| _d S )NzGot error %s for session %sT)re   r   to_exceptionr   r   r   r   r   r   r   r   r   )rC   errorr0   r0   r1   r     s   

z ContinuousPagingSession.on_errorc                 c   s    z^| j   	 | js| js| j jdd | js| jr| jrG| j \}}}|r*||   | j   | ||D ]}|V  q9| j   | js| jrKnqW z| j   W d S  t	y_   Y d S w z| j   W w  t	yp   Y w w )NT   timeout)
r   acquirer   r   waitpopmaybe_request_morereleaser   RuntimeError)rC   namesrowserrrV   r0   r0   r1   results  s>   


	zContinuousPagingSession.resultsc              	   C   sv   | j sd S | j j}| j j| j j }|t| j | }td| j| j	j
|| j j| j j| ||d kr9| | d S d S )NzYSession %s from %s, space in CP queue: %s, requested: %s, received: %s, num_in_flight: %s   )r   r   r   r   r,   r   r   r   r   r   r   update_next_pages)rC   r   num_in_flightspace_in_queuer0   r0   r1   r     s   z*ContinuousPagingSession.maybe_request_morec              
   C   s   z?| j  j|7  _td| j| jj | jj | jt	t	j
j| j|d| j | j W d    W d S 1 s8w   Y  W d S  tya } ztd| j| jj | | W Y d }~d S d }~ww )Nz,Updating backpressure for session %s from %s)
next_pageszLFailed to update backpressure for session %s from %s, connection is shutdown)r   r   r   r   r   r   r   locksend_msgr%   RevisionTypePAGING_BACKPRESSUREget_request_id_on_backpressure_responser   r   )rC   num_next_pagesexr0   r0   r1   r   !  s&   
&
z)ContinuousPagingSession.update_next_pagesc                 C   sR   t |trtd| j d S td| j| jjt|dr|	 n| | 
| d S )Nz'Paging session %s backpressure updated.z7Failed updating backpressure for session %s from %s: %sr   )re   r   r   r   r   r   r   r   hasattrr   r   rC   responser0   r0   r1   r   0  s   
z1ContinuousPagingSession._on_backpressure_responsec                 C   s   z2t d| j| jj | jj | jttjj	| j| j
 | j W d    n1 s,w   Y  W n tyE   t d| j| jj Y nw | j d| _| j  W d    d S 1 s]w   Y  d S )Nz#Canceling paging session %s from %sz;Failed to cancel session %s from %s, connection is shutdownT)r   r   r   r   r   r   r   r%   r   PAGING_CANCELr   _on_cancel_responser   r   r   r   rB   r0   r0   r1   cancel8  s(   

"zContinuousPagingSession.cancelc                 C   sL   t |trtd| j ntd| j| jjt|dr|	 n| d| _
d S )NzPaging session %s canceled.z1Failed canceling streaming session %s from %s: %sr   T)re   r   r   r   r   r   r   r   r   r   r   r   r0   r0   r1   r   H  s   

z+ContinuousPagingSession._on_cancel_responseN)rN   rO   rP   r_   r   r   r   r   r   r   r   r   r   r0   r0   r0   r1   r     s    r   c                    s   t   fdd}|S )Nc              
      sJ   z | g|R i |W S  t y$ } z| | W Y d }~d S d }~ww r[   )r   defunct)rC   argskwargsexcfr0   r1   wrapperS  s   z!defunct_on_error.<locals>.wrapper)r   )r  r  r0   r  r1   defunct_on_errorQ  s   r  z3.0.0c                   @   s   e Zd ZdZdZdZdZdZdd Ze	dd Z
e	dd	 Zd
d Ze	dd Ze	dd Zdd Zdd Zdd Zdd ZdS )_ConnectionIOBufferz
    Abstraction class to ease the use of the different connection io buffers. With
    protocol V5 and checksumming, the data is read, validated and copied to another
    cql frame buffer.
    NFc                 C   s   t  | _t|| _d S r[   )ioBytesIO
_io_bufferweakrefproxy_connection)rC   r   r0   r0   r1   r_   j  s   
z_ConnectionIOBuffer.__init__c                 C   ra   r[   )r  rB   r0   r0   r1   	io_buffern  rb   z_ConnectionIOBuffer.io_bufferc                 C   s   | j r| jS | jS r[   )is_checksumming_enabled_cql_frame_bufferr  rB   r0   r0   r1   cql_frame_bufferr  s   z$_ConnectionIOBuffer.cql_frame_bufferc                 C   s   |    t | _d S r[   )reset_io_bufferr  r  r  rB   r0   r0   r1   set_checksumming_bufferw  s   z+_ConnectionIOBuffer.set_checksumming_bufferc                 C   r   r[   )r  _is_checksumming_enabledrB   r0   r0   r1   r  {  r   z+_ConnectionIOBuffer.is_checksumming_enabledc                 C   ra   r[   )_segment_consumedrB   r0   r0   r1   has_consumed_segment  rb   z(_ConnectionIOBuffer.has_consumed_segmentc                 C   
   | j  S r[   )r  tellrB   r0   r0   r1   readable_io_bytes  rv   z%_ConnectionIOBuffer.readable_io_bytesc                 C   r  r[   )r  r  rB   r0   r0   r1   readable_cql_frame_bytes  rv   z,_ConnectionIOBuffer.readable_cql_frame_bytesc                 C   s$   t | j | _| jdd d S Nr   r   )r  r  r  readseekrB   r0   r0   r1   r    s   z#_ConnectionIOBuffer.reset_io_bufferc                 C   s6   | j rt| j | _| jdd d S |   d S r  )r  r  r  r  r  r  r  rB   r0   r0   r1   reset_cql_frame_buffer  s   z*_ConnectionIOBuffer.reset_cql_frame_buffer)rN   rO   rP   rQ   r  r  r  r  r_   rR   r  r  r  r  r  r  r  r  r  r0   r0   r0   r1   r  _  s(    



r  c                   @   s  e Zd ZdZdZdZdZdZej	Z
dZdZdZdZdZdZdZdZdZdZdZdZdZdZdZde d	 ZdZdZdZdZdZdZ dZ!dZ"dZ#dZ$dZ%e&Z'dZ(dZ)dZ*dZ+e,d
d Z-dddddddej	dddddddfddZ.e,dd Z/e,dd Z0e1dd Z2e1dd Z3e1dd Z4e1dd Z5dd Z6dd Z7d d! Z8d"d# Z9d$d% Z:d&d' Z;d(d) Z<d*d+ Z=d,d- Z>d.d/ Z?d0d1 Z@d2d3 ZAd4d5 ZBd6d7 ZCeDjEeDjFdfd8d9ZGddd:d;ZHd<d= ZIddd>d?ZJddd@dAZKdBdC ZLeMdDdE ZNeMdFdG ZOdHdI ZPeMdJdK ZQdLdM ZRdNdO ZSeMdPdQ ZTeMdRdS ZUeMdedTdUZVeMdfdVdWZWeMdXdY ZXdZd[ ZYd\d] ZZe,d^d_ Z[d`da Z\dbdc Z]e]Z^dS )g
Connectiond   i   NFTr   i   r3   r+   c                 C   r   r[   )r  r  rB   r0   r0   r1   _iobuf  s   zConnection._iobufz	127.0.0.1rY   c                 C   sb  t |tr|nt||| _|| _|r| ni | _|| _|| _|| _	|| _
|| _|	| _|
| _|| _|| _|| _tt| _i | _t| | _i | _d| _t | _|| _|r^| j| jjp[i  n	| jjrg| jj| _| jsr| jrr|  | _|dkrt| jd d| _td| j}t t!|| _"|d | _#nt| jd| _t t!| jd | _"| j| _#t$ | _%t& | _'d S )NTr3   r   i  i,  r<   )(re   r>   rX   r   authenticatorcopyrH   ssl_contextsockoptscompressioncql_versionprotocol_versionis_control_connectionuser_type_mapconnect_timeoutallow_beta_protocol_version
no_compactr   set_push_watchers	_requestsr  r  _continuous_paging_sessions_socket_writableorphaned_request_ids_on_orphaned_stream_releasedupdate_build_ssl_context_from_optionsminmax_in_flightmax_request_idr   rangerequest_idshighest_request_idr   r   r   connected_event)rC   r   rF   r   rH   r#  r$  r%  r&  r'  r(  r)  r*  r+  r"  on_orphaned_stream_releasedinitial_sizer0   r0   r1   r_     sH   




zConnection.__init__c                 C   r   r[   r   rB   r0   r0   r1   r   )  r   zConnection.hostc                 C   r   r[   )r   rF   rB   r0   r0   r1   rF   -  r   zConnection.portc                 C   rG   )z
        Called once by Cluster.connect().  This should be used by implementations
        to set up any resources that will be shared across connections.
        Nr0   clsr0   r0   r1   initialize_reactor1     zConnection.initialize_reactorc                 C   rG   )z{
        Called after a forking.  This should cleanup any remaining reactor state
        from the parent process.
        Nr0   r>  r0   r0   r1   handle_fork9  rA  zConnection.handle_forkc                 C   r?   r[   r@   )r?  r   callbackr0   r0   r1   create_timerA  rb   zConnection.create_timerc                 O   s   t   }||d< | |g|R i |}t   | }|j||  |jr0|jr-t||j|j|j s?|  t	d| |S )z
        A factory function which returns connections which have
        succeeded in connecting and are ready for service (or
        raises an exception otherwise).
        r)  z*Timed out creating connection (%s seconds))
timer;  r   
last_erroris_unsupported_proto_versionr   r&  is_setcloser   )r?  r   r   r   r   startconnelapsedr0   r0   r1   factoryE  s   
zConnection.factoryc           
         s   g d} fdd|D }| dd ptj}| dd ptj}tjt|d}t| dd|_t||_| d	d }| d
d }|rI|	|| | dd }|rV|
| | dd }	|	rc||	 |S )N)ssl_version	cert_reqscheck_hostnamekeyfilecertfileca_certsciphersc                    &   i | ]}| j v r| j |d qS r[   rH   r   r   krB   r0   r1   
<dictcomp>_     & z>Connection._build_ssl_context_from_options.<locals>.<dictcomp>rN  rO  )protocolrP  FrR  rQ  rS  rT  )r   sslPROTOCOL_TLS_CLIENTCERT_REQUIRED
SSLContextintboolrP  optionsload_cert_chainload_verify_locationsset_ciphers)
rC   ssl_context_opt_namesoptsrN  rO  rvrR  rQ  rS  rT  r0   rB   r1   r4  [  s$   


z*Connection._build_ssl_context_from_optionsc                    sP   g d} fdd|D } j jrd|vr jj}||d<  j j jfi |S )N)server_sidedo_handshake_on_connectsuppress_ragged_eofsr   c                    rU  r[   rV  rW  rB   r0   r1   rY  {  rZ  z8Connection._wrap_socket_from_context.<locals>.<dictcomp>r   )r"  rP  r   rD   wrap_socket_socket)rC   wrap_socket_opt_namesrg  r   r0   rB   r1   _wrap_socket_from_contextv  s   z$Connection._wrap_socket_from_contextc                 C   s   | j | d S r[   )rm  connect)rC   sockaddrr0   r0   r1   _initiate_connection  rk   zConnection._initiate_connectionc                 C   r   r[   r0   rB   r0   r0   r1   _validate_hostname  s   zConnection._validate_hostnamec                 C   sh   | j  \}}ttdr| j jtjkrtjtjdd |fgS t||| j jtj}|s2td| j f |S )Nr   r   z&getaddrinfo returned empty list for %s)	r   rM   r   rJ   rL   r   r   r   r   )rC   rD   rF   	addressesr0   r0   r1   _get_socket_addresses  s   z Connection._get_socket_addressesc           
      C   s  d }|   }|D ]V\}}}}}z/| j|||| _| jr!|  | _| j| j | | | jd  | j	r:| 
  d }W  n  tjy^ } z| jrR| j  d | _|}W Y d }~qd }~ww |rtt|jddd |D |jpp|f | jr| jD ]
}	| jj|	  qzd S d S )Nz&Tried connecting to %s. Last error: %sc                 S   s   g | ]}|d  qS )r+   r0   )r   ar0   r0   r1   
<listcomp>      z.Connection._connect_socket.<locals>.<listcomp>)ru  _socket_implrJ   rm  r"  ro  
settimeoutr)  rr  _check_hostnamers  r   rI  errnostrerrorr#  
setsockopt)
rC   sockerrrt  afsocktypeproto_rq  r   r   r0   r0   r1   _connect_socket  s<   




zConnection._connect_socketc                 C   s   | j r	| j | _d S d S r[   )_compressor
compressorrB   r0   r0   r1   _enable_compression  s   zConnection._enable_compressionc                 C   s4   | j   d| _| jrtnt| _tdt	|  d S )NTz2Enabling protocol checksumming on connection (%s).)
r  r  r  r  segment_codec_lz4segment_codec_no_compression_segment_codecr   r   idrB   r0   r0   r1   _enable_checksumming  s   
zConnection._enable_checksummingc                 C   r?   r[   r@   rB   r0   r0   r1   rI    s   zConnection.closec                 C   s   | j  | js
| jr	 W d    d S d| _W d    n1 s w   Y  t }t|r:tjdt| | j	|d ntdt| | j	| || _
|   | | | | | j  |S )NTz!Defuncting connection (%s) to %s:exc_infoz$Defuncting connection (%s) to %s: %s)r   
is_defunct	is_closedsysr  anyr   r   r  r   rF  rI  error_all_cp_sessionserror_all_requestsr;  r,  )rC   r   r  r0   r0   r1   r     s(   


zConnection.defunctc                 C   s,   t | j }|D ]
}| j| | q	d S r[   )listr/  keysr   )rC   r   
stream_idsr   r0   r0   r1   r    s   z Connection.error_all_cp_sessionsc                    s   j  ji _W d    n1 sw   Y  sd S tt|  fdd d \}}}| s;d S fdd}ttjk rN|  d S t|d}d|_	|
  d S )Nc              	      s<   z|   W d S  t y   tjdtjdd Y d S w )Nz]Ignoring unhandled exception while erroring requests for a failed connection (%s) to host %s:Tr  )r   r   warningr  r   cb)new_excrC   r0   r1   try_callback  s   z3Connection.error_all_requests.<locals>.try_callbackr   c                     s       D ]	\} }}|  qd S r[   )values)r  r  )requestsr  r0   r1   err_all_callbacks  s   
z8Connection.error_all_requests.<locals>.err_all_callbacks)targetT)r   r.  r   rn   popitemr,   r  CALLBACK_ERR_THREAD_THRESHOLDr
   daemonrJ  )rC   r   r  r  r  tr0   )r  r  rC   r  r1   r    s$   	

zConnection.error_all_requestsc                 C   sD   z| j  W S  ty!   | jd }|| jksJ || _| j Y S w )z>
        This must be called while self.lock is held.
        r   )r9  popleft
IndexErrorr:  r7  )rC   new_request_idr0   r0   r1   r     s   

zConnection.get_request_idc              	   C   sP   t d| | j|jg D ]}z||j W q ty%   t d Y qw d S )NzMessage pushed from server: %rz'Pushed event handler errored, ignoring:)r   r   r-  r   
event_type
event_argsr   	exception)rC   r   r  r0   r0   r1   handle_pushed  s   zConnection.handle_pushedc                 C   s   | j r
td| j | jrtd| j | jstd| j |||f| j|< |||| j| j| j	d}| j
rDt }| j|| | }| | t|S )NzConnection to %s is defunctzConnection to %s is closedzConnection %s is overloaded)r  r*  )r  r   r   r  r0  r   r.  r&  r  r*  r  r  r  r  encodegetvaluepushr,   )rC   r   
request_idr  encoderr   result_metadatabufferr0   r0   r1   r   #  s    
zConnection.send_msgc                 K   s   | j |fd|i|d S )Nr   r   )wait_for_responses)rC   r   r   r   r0   r0   r1   wait_for_response9  s   zConnection.wait_for_responsec              
      sh   j s jrtd f |d}|dd}t t||}d}	 t|| } j% t| j j	 d } fddt
|D }	  j	|7  _	W d	   n1 sTw   Y  t|	D ]\}
} |||
  |t|j||
 d
 q]||7 }|t|krn|d	ur|d8 }|dkrt td q#z||W S  ty     ty } z |  d	}~ww )a;  
        Returns a list of (success, response) tuples.  If success
        is False, response will be an Exception.  Otherwise, response
        will be the normal query response.

        If fail_on_error was left as True and one of the requests
        failed, the corresponding Exception will be raised.
        zConnection %s is already closedr   fail_on_errorTr   r   c                    s   g | ]}   qS r0   )r   )r   r  rB   r0   r1   rw  Q  rx  z1Connection.wait_for_responses.<locals>.<listcomp>N)index{Gz?g        )r  r  r   r   ResponseWaiterr,   r   r5  r7  	in_flightr8  	enumerater   r   got_responser   rE  sleepdeliverr   r   )rC   msgsr   r   r  waitermessages_sentneeded	availabler9  ir  r   r0   rB   r1   r  <  sH   	


zConnection.wait_for_responsesc                 C   s*   | j | | | jt|gd|d dS )z=
        Register a callback for a given event type.
        
event_listr   N)r-  addr  r$   )rC   r  rC  register_timeoutr0   r0   r1   register_watcherk  s
   

zConnection.register_watcherc                 C   s>   |  D ]\}}| j| | q| jt| d|d dS )zS
        Register multiple callback/event type pairs, expressed as a dict.
        r  r   N)itemsr-  r  r  r$   r  )rC   type_callback_dictr  r  rC  r0   r0   r1   register_watcherst  s   
zConnection.register_watchersc                 C   s   d| _ i | _d S r   )r'  r-  rB   r0   r0   r1   control_conn_disposed~  r`   z Connection.control_conn_disposedc           
      C   s   | j j }t|}|rN|d t@ }|tjvrtd| |dkr#tnt	}|j
d }||krN||d\}}}}	|	dk rBtd|	 t||||||	| | _|S )Nr   z?This version of the driver does not support protocol version %dr3   r   z!Received negative body length: %r)r  r  r  r,   PROTOCOL_VERSION_MASKr   SUPPORTED_VERSIONSr   frame_header_v3frame_header_v1_v2sizeunpack_fromr   _current_frame)
rC   bufposr   frame_headerheader_sizer   r   opbody_lenr0   r0   r1   _read_frame_header  s   

zConnection._read_frame_headerc              
   C   s   | j  }|| jjkrZz:| j jd | j| j j}||jkr5| j| j	|}d| j _
| j j|j nd| j _
| j jd W d S W d S  tyY } ztt|| jd }~ww d| j _
d S )Nr   TF)r  r  r  header_length_with_crcr  r  decode_headersegment_lengthdecoder  r  r  writepayloadr'   r   rn   r   )rC   readable_bytessegment_headersegmentr   r0   r0   r1   _process_segment_buffer  s"   

z"Connection._process_segment_bufferc                 C   s   	 | j r| j r|   | j  | j r| jjsd S | js#|  }n| j }| jr1|| jj	k r<| j r:| j r:q d S | j}| jj
|j | jj
|j	|j }| || | j  d | _qr[   )r  r  r  r  r  r  r  r  r  r   r  r  r   r  process_msgr  )rC   r  framer   r0   r0   r1   process_io_buffer  s(   



zConnection.process_io_bufferc                 C   s~  d| _ |j}|dk rd }tj}d }nv|| jv r%| j| }|j}|j}d }ncd}| j || jv r?|  j	d8  _	| j
| d}W d    n1 sIw   Y  |rW| jrW|   z| j|\}}}W n% ty   | j | j| W d    Y d S 1 sw   Y  Y d S w z||j| j||j|j|| j|}	W n* ty }
 ztd|| j  |d ur||
 | |
 W Y d }
~
d S d }
~
ww z0|dkrt|	trd|	jv rd| _n	t d| |	!  | |	 |d ur||	 n| "|	 W n ty   td Y nw |dkr=|| jv r | j| j#r| $| d S d S | j | j| W d    d S 1 s6w   Y  d S d S )	NTr   Fr   z6Error decoding response from Cassandra. %s; buffer: %rzunsupported protocol versionz/Closing connection %s due to protocol error: %sz#Callback handler errored, ignoring:)%msg_receivedr   r   decode_messager/  r   r   r   r1  r  remover2  r.  r   KeyErrorr9  appendr   r(  r   r   decompressorr   r   r  r  r  r   re   r#   r   rG  r   summary_msgr  r    remove_continuous_paging_session)rC   headerbodyr   rC  r   r  paging_sessionneed_notify_of_releaser   r   r0   r0   r1   r    s   











$zConnection.process_msgc                 C   s   t |||| |}|| j|< |S r[   )r   r/  )rC   r   r   r   r   sessionr0   r0   r1   new_continuous_paging_session  s   
z(Connection.new_continuous_paging_sessionc                 C   sj   z*| j | | j td| | j| W d    W d S 1 s#w   Y  W d S  ty4   Y d S w )Nz!Returning cp session stream id %s)r/  r   r   r   r   r9  r  r  )rC   r   r0   r0   r1   r    s   &z+Connection.remove_continuous_paging_sessionc                 C   s.   t dt| | j | t |  | j d S )Nz=Sending initial options message for new connection (%s) to %s)r   r   r  r   r   r   r   _handle_options_responserB   r0   r0   r1   _send_options_message  s   z Connection._send_options_messagec                 C   s  | j rd S t|tst|tr|td| td|f tdt| | j |j	}|j
d }|j
dd gd | _| jrM| j|vrLtd| j|f n|d | _d | _d }| jrtt t|@ }t|dkrttdt | nId }t| jtr| j|vrtd| j| jf | j}nt D ]
}||v r|} nq|d	krt| jrtd
| j d }n|| _t| \| _| _| j|| jd d S )Nz@Did not get expected SupportedMessage response; instead, got: %sz8Received options response on new connection (%s) from %sCOMPRESSIONPRODUCT_TYPEr   zVcql_version %r is not supported by remote (w/ native protocol). Supported versions: %rzdNo available compression types supported on both ends. locally supported: %r. remotely supported: %rzRThe requested compression type (%s) is not supported by the Cassandra server at %sr:   z~Snappy compression is not supported with protocol version %s and checksumming. Consider installing lz4. Disabling compression.)r+  )r  re   r   r   r   r   r   r  r   cql_versionsrb  r   _product_typer%  r   r  r$  r,  locally_supported_compressionsr  r,   rn   r   has_checksumming_supportr&  _compression_typer  _send_startup_messager+  )rC   options_responsesupported_cql_versionsremote_supported_compressionscompression_typeoverlaprX  r0   r0   r1   r  $  s|   










z#Connection._handle_options_responsec                 C   sb   t d|  ttd}|r||d< |rd|d< t| j|d}| j||  | jd t d|  d S )	NzSending StartupMessage on %s)DRIVER_NAMEDRIVER_VERSIONr  true
NO_COMPACT)
cqlversionrb  r  zSent StartupMessage on %s)	r   r   r  r  r   r%  r   r   _handle_startup_response)rC   r$  r+  rg  smr0   r0   r1   r  m  s   z Connection._send_startup_messagec                 C   s  | j rd S t|tr5| jrtd| jjj tdt	| | j
 |   t| jr.|   | j  d S t|trtdt	| | j
|j | jd u rXtd| jf  td|   t| jrf|   t| jtrtd|  t| jd}t| jdd	}| j||  |d
 d S td|  |j| j_| j }|d u rdn|}| t||  | j d S t|t rtdt	| | j
|!  |rtd| j
|! f t"d| j
|! f t|t#rtd| j
 |d}t|| t$||f )NzAn authentication challenge was not sent, this is suspicious because the driver expects authentication (configured authenticator = %s)z/Got ReadyMessage on new connection (%s) from %sz:Got AuthenticateMessage on new connection (%s) from %s: %szFailed to authenticate to %s. If you are trying to connect to a DSE cluster, consider using TransitionalModePlainTextAuthProvider if DSE authentication is configured with transitional modez"Remote end requires authenticationz-Sending credentials-based auth response on %s)credsT)did_authenticater  z&Sending SASL-based auth response on %sr9   8Received ErrorMessage on new connection (%s) from %s: %s Failed to authenticate to %s: %sz-Failed to initialize new connection to %s: %sz8Connection to %s was closed during the startup handshakez/Unexpected response during Connection setup: %r)%r  re   r   r   r   r  rq   rN   r   r  r   r  r   r  r&  r  r;  r,  r   r   r   r   dictr   r   r
  r   r   server_authenticator_classinitial_responser    _handle_auth_responser   r  r   r   r   )rC   startup_responser  cmrC  r  r   r0   r0   r1   r
  z  sr   






z#Connection._handle_startup_responsec                 C   s  | j rd S t|tr%td|  | j|j | jr| j| _	| j
  d S t|trM| j|j}t|d u r8dn|}td|  | ||  | j d S t|trjtdt| | j|  td| j| f t|trxtd| j |d}t|| j| t|| j|f )Nz(Connection %s successfully authenticatedr9   z"Responding to auth challenge on %sr  r  z=Connection to %s was closed during the authentication processz>Unexpected response during Connection authentication to %s: %r)r  re   r"   r   r   r   on_authentication_successtokenr  r  r;  r,  r!   evaluate_challenge	challenger    r   r   r  r   r  r   r  r   r   r   r   )rC   auth_responser   r   r0   r0   r1   r    s8   



z Connection._handle_auth_responsec              
   C   s   |r|| j kr	d S td|f tjd}z| |}W n+ ty* } z| d }~w tyE } ztd|f | j	}| 
| |d }~ww t|trP|| _ d S td|f | j	}| 
| |)NUSE "%s"queryconsistency_level"Problem while setting keyspace: %r)keyspacer   r   ONEr  r   r   r   r   r   r   re   r   )rC   r   r  r   irer   conn_excr0   r0   r1   set_keyspace_blocking  s2   




z Connection.set_keyspace_blockingc                    s   	 j  jjk r jd7  _	 W d   nW d   n1 s$w   Y  td qr6jkr= d dS tdf tjd} fdd}	 }
||| dS )	a  
        Use this in order to avoid deadlocking the event loop thread.
        When the operation completes, `callback` will be called with
        two arguments: this connection and an Exception if an error
        occurred, otherwise :const:`None`.

        This method will always increment :attr:`.in_flight` attribute, even if
        it doesn't need to make a request, just to maintain an
        ":attr:`.in_flight` is incremented" invariant.
        Tr   NgMbP?r  r  c                    s\   t | tr_ d  d S t | tr |   d S  td| f j d S )Nr  )re   r   r   r   r   r   r   r   )r   rC  r   rC   r0   r1   process_result  s   


z5Connection.set_keyspace_async.<locals>.process_result)r   r  r7  rE  r  r   r   r   r!  r   r   )rC   r   rC  r  r&  r  r0   r%  r1   set_keyspace_async  s&   


zConnection.set_keyspace_asyncc                 C   s   | j  S r[   r  rB   r0   r0   r1   is_idle%  r   zConnection.is_idlec                 C   
   d| _ d S r   r(  rB   r0   r0   r1   
reset_idle)  rv   zConnection.reset_idlec                 C   s4   d}| j rd}n| jrd}d| jjt| | j|f S )Nr9   z
 (defunct)z	 (closed)z<%s(%r) %s%s>)r  r  rq   rN   r  r   )rC   statusr0   r0   r1   ro   ,  s   zConnection.__str__r[   r   )F)_rN   rO   rP   r  in_buffer_sizeout_buffer_sizer%  r+  r   MAX_SUPPORTEDr&  r   r$  r  r  r  r   rH   r"  rF  r  r6  r9  r:  r1  orphaned_threshold_reachedorphaned_thresholdr  r  r   r(  r  rG  r'  signaled_errorr*  r  rm  rJ   ry  r{  r  r  r2  rR   r  r_   r   rF   classmethodr@  rB  rD  rM  r4  ro  rr  rs  ru  r  r  r  rI  r   r  r  r   r  r   encode_messager  r   r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r
  r  r$  r'  r)  r+  ro   rr   r0   r0   r0   r1   r    s    

<





#(

/
	


"
C	

H?
4
r  c                   @   s&   e Zd Zdd Zdd ZdddZdS )	r  c                 C   s0   || _ || _|| _d | _d g| | _t | _d S r[   )r   pendingr  r   	responsesr   event)rC   r   num_responsesr  r0   r0   r1   r_   9  s   zResponseWaiter.__init__c                 C   s   | j j | j  jd8  _W d    n1 sw   Y  t|tr>t|dr*| }| jr6|| _| j	
  nd|f| j|< n| jsId|f| j|< n|| j|< |  jd8  _| js_| j	
  d S d S )Nr   r   FT)r   r   r  re   r   r   r   r  r   r7  r,  r6  r5  )rC   r   r  r0   r0   r1   r  A  s"   



zResponseWaiter.got_responseNc                 C   s.   | j | | jr| j| j  st | jS )a  
        If fail_on_error was set to False, a list of (success, response)
        tuples will be returned.  If success is False, response will be
        an Exception.  Otherwise, response will be the normal query response.

        If fail_on_error was left as True and one of the requests
        failed, the corresponding Exception will be raised. Otherwise,
        the normal response will be returned.
        )r7  r   r   rH  r   r6  rC   r   r0   r0   r1   r  V  s   

zResponseWaiter.deliverr[   )rN   rO   rP   r_   r  r  r0   r0   r0   r1   r  7  s    r  c                   @   r   )HeartbeatFuturec                 C   s   d | _ t | _|| _|| _tdt||j |j	4 |j
|jk r4| j
d7  _
|t | | j ntd| _ | j  W d    d S W d    d S 1 sQw   Y  d S )Nz<Sending options message heartbeat on idle connection (%s) %sr   zIFailed to send heartbeat because connection 'in_flight' exceeds threshold)
_exceptionr   _eventr   ownerr   r   r  r   r   r  r7  r   r   r   _options_callbackr   r,  )rC   r   r=  r0   r0   r1   r_   j  s    

"zHeartbeatFuture.__init__c                 C   s:   | j | | j  r| jr| jd S td|f | jj)Nz-Connection heartbeat timeout after %s seconds)r<  r   rH  r;  r   r   r   r9  r0   r0   r1   r   y  s   
zHeartbeatFuture.waitc                 C   sT   t |trtdt| j| jj nt |tr|| _ntd|f | _| j	
  d S )Nz4Received options response on connection (%s) from %sz2Received unexpected response to OptionsMessage: %s)re   r   r   r   r  r   r   r   r;  r<  r,  r   r0   r0   r1   r>    s   

z!HeartbeatFuture._options_callbackN)rN   rO   rP   r_   r   r>  r0   r0   r0   r1   r:  i  s    r:  c                   @   s<   e Zd Zdd ZG dd deZdd Zdd Zd	d
 ZdS )ConnectionHeartbeatc                 C   s:   t j| dd || _|| _|| _t | _d| _|   d S )NzConnection heartbeat)nameT)	r
   r_   	_interval_timeout_get_connection_holdersr   _shutdown_eventr  rJ  )rC   interval_secget_connection_holdersr   r0   r0   r1   r_     s   zConnectionHeartbeat.__init__c                   @   r   )z%ConnectionHeartbeat.ShutdownExceptionNr   r0   r0   r0   r1   ShutdownException  r   rG  c                 C   sr  | j | j | j  s7t }g }g }zdd |  D D ]]\}}|D ]R}|   |jsh|jsh|j	rcz
|
t|| W q% tyb } ztdt||j |
|||f W Y d }~q%d }~ww |  q%tdt||j || q%|   q| j}t }|D ]^}	|   |	j}z%|	| |j | jd8  _W d    n1 sw   Y  |  W n& ty } ztdt||j |
|	j|	j|f W Y d }~nd }~ww | jt |  }q|D ]\}}}
|   |jsd|_||
 || qW n | jy   Y n ty   tjddd	 Y nw t | }| j t| j| d
 | j  rd S d S )Nc                 S   s   g | ]}|  |fqS r0   )get_connections)r   or0   r0   r1   rw    s    z+ConnectionHeartbeat.run.<locals>.<listcomp>z9Failed sending heartbeat message on connection (%s) to %sz6Cannot send heartbeat message on connection (%s) to %sr   z*Heartbeat failed for connection (%s) to %sTzFailed connection heartbeatr  r  )rD  r   rA  rH  rE  rC  _raise_if_stoppedr  r  r)  r  r:  r   r   r  r  r   r+  r   return_connectionrB  r   r   r  r=  r'  shutdown_on_errorr   rG  r   max)rC   
start_timefuturesfailed_connectionsconnectionsr=  r   er   r  r   rL  r0   r0   r1   run  sx   





 
zConnectionHeartbeat.runc                 C   s   | j   |   d S r[   )rD  r,  joinrB   r0   r0   r1   stop  s   
zConnectionHeartbeat.stopc                 C   s   | j  r	|  d S r[   )rD  rH  rG  rB   r0   r0   r1   rJ    s   
z%ConnectionHeartbeat._raise_if_stoppedN)	rN   rO   rP   r_   r   rG  rS  rU  rJ  r0   r0   r0   r1   r?    s    	>r?  c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )TimerFc                 C   s   t   | | _|| _d S r[   )rE  endrC  )rC   r   rC  r0   r0   r1   r_     s   
zTimer.__init__c                 C   r   r[   )rW  rf   r0   r0   r1   rl     rd   zTimer.__lt__c                 C   r*  )NT)canceledrB   r0   r0   r1   r     rv   zTimer.cancelc                 C   s$   | j rdS || jkr|   dS dS )NTF)rX  rW  rC  )rC   time_nowr0   r0   r1   finish  s   
zTimer.finishN)rN   rO   rP   rX  r_   rl   r   rZ  r0   r0   r0   r1   rV    s    rV  c                   @   s0   e Zd Zdd Zdd Zdd Zedd Zd	S )
TimerManagerc                 C   s   g | _ g | _d S r[   )_queue_new_timersrB   r0   r0   r1   r_     r`   zTimerManager.__init__c                 C   s   | j |j|f dS )z?
        called from client thread with a Timer object
        N)r]  r  rW  )rC   timerr0   r0   r1   	add_timer  s   zTimerManager.add_timerc                 C   s   | j }| jr| j}|rt||  |s|rEt }|rGz|d d }||r-t| n|jW S W n ty@   t	
d Y nw |sdS dS dS )z
        run callbacks on all expired timers
        Called from the event thread
        :return: next end time, or None
        r   r   z,Exception while servicing timeout callback: N)r\  r]  r   r   rE  rZ  r	   rW  r   r   r  )rC   queue
new_timersnowr^  r0   r0   r1   service_timeouts  s*   

zTimerManager.service_timeoutsc                 C   s&   z| j d d W S  ty   Y d S w r   )r\  r  rB   r0   r0   r1   next_timeout   s
   zTimerManager.next_timeoutN)rN   rO   rP   r_   r_  rc  rR   rd  r0   r0   r0   r1   r[    s    r[  )w
__future__r   collectionsr   r   r|  	functoolsr   r   r   heapqr   r	   r  loggingrJ   structr  	threadingr
   r   r   r   rE  r\  r	  modulesgevent.queuer   r   r`  r;   r   r   r   r   cassandra.marshalr   cassandra.protocolr   r   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   cassandra.segmentr&   r'   cassandra.utilr(   	getLoggerrN   r   r  r  r  r7   ImportErrorr)   r*   r-   r5   AttributeErrorr   reprr2   r6   r:   __version__r  r  r  HEADER_DIRECTION_FROM_CLIENTHEADER_DIRECTION_TO_CLIENTHEADER_DIRECTION_MASKStructr  r  objectr>   rS   rX   rt   r~   r   r   r   EAGAINEWOULDBLOCKNONBLOCKINGr   r   r   r   r   r   r   r   r   r  DEFAULT_CQL_VERSIONr  r  r  r:  r?  rV  r[  r0   r0   r0   r1   <module>   s   
L




)%:(
 6       )2%U