o
    NDi                     @   s  d Z ddlmZ ddlZddlZddlZddlmZmZm	Z	 ddl
Z
zddl
mZ W n ey9   ddlmZ Y nw ddlmZ ddlmZmZmZ ddlmZ eeZG d	d
 d
eZeG dd deZG dd deZG dd deZG dd deZdZ dZ!G dd deZ"dS )z)
Connection pooling and host management.
    )total_orderingN)LockRLock	Condition)WeakSet)AuthenticationFailed)ConnectionExceptionEndPointDefaultEndPoint)HostDistancec                   @   s   e Zd ZdZdS )NoConnectionsAvailablezb
    All existing connections to a given host are busy, or there are
    no open connections.
    N)__name__
__module____qualname____doc__ r   r   G/var/www/Datamplify/venv/lib/python3.10/site-packages/cassandra/pool.pyr   %   s    r   c                   @   s   e Zd ZdZdZ	 dZ	 dZ	 dZ	 dZ	 dZ		 dZ
	 dZ	 dZ	 dZ	 dZ	 dZ	 dZ	 dZ	 dZdZdZdZdZd"ddZedd Zedd	 Zed
d Zdd Zdd Zdd Zdd Zdd Z dd Z!dd Z"dd Z#dd Z$dd Z%d d! Z&dS )#Hostz-
    Represents a single Cassandra node.
    NFc                 C   s`   |d u rt d|d u rt dt|tr|nt|| _|| | _|| _| || t | _	d S )Nzendpoint may not be Nonez)conviction_policy_factory may not be None)

ValueError
isinstancer	   r
   endpointconviction_policyhost_idset_location_infor   lock)selfr   conviction_policy_factory
datacenterrackr   r   r   r   __init__   s   
zHost.__init__c                 C   s   | j jS )zv
        The IP address of the endpoint. This is the RPC address the driver uses when connecting to the node.
        )r   addressr   r   r   r   r       s   zHost.addressc                 C      | j S )z! The datacenter the node is in.  )_datacenterr!   r   r   r   r         zHost.datacenterc                 C   r"   )z The rack the node is in.  )_rackr!   r   r   r   r      r$   z	Host.rackc                 C   s   || _ || _dS )z
        Sets the datacenter and rack for this node. Intended for internal
        use (by the control connection, which periodically checks the
        ring topology) only.
        N)r#   r%   )r   r   r   r   r   r   r      s   
zHost.set_location_infoc                 C   s(   | j s
td| j | j  d| _ d S )NzHost %s is now marked upT)is_uplogdebugr   r   resetr!   r   r   r   set_up   s   

zHost.set_upc                 C   
   d| _ d S )NF)r&   r!   r   r   r   set_down      
zHost.set_downc                 C   s   | j |S N)r   add_failure)r   connection_excr   r   r   signal_connection_failure      zHost.signal_connection_failurec                 C   s
   | j d uS r.   )_reconnection_handlerr!   r   r   r   is_currently_reconnecting   r-   zHost.is_currently_reconnectingc                 C   s:   | j  | j}|| _|W  d   S 1 sw   Y  dS )zv
        Atomically replaces the reconnection handler for this
        host.  Intended for internal use only.
        N)r   r3   )r   new_handleroldr   r   r    get_and_set_reconnection_handler   s
   $z%Host.get_and_set_reconnection_handlerc                 C   s"   t |tr| j|jkS | jj|kS r.   )r   r   r   r    r   otherr   r   r   __eq__   s   
zHost.__eq__c                 C   
   t | jS r.   )hashr   r!   r   r   r   __hash__   r-   zHost.__hash__c                 C   s   | j |j k S r.   )r   r8   r   r   r   __lt__   r2   zHost.__lt__c                 C   r;   r.   )strr   r!   r   r   r   __str__   r-   zHost.__str__c                 C   s*   | j r	d| j f nd}d| jj| j|f S )Nz %s z
<%s: %s%s>)r#   	__class__r   r   )r   dcr   r   r   __repr__   s   zHost.__repr__)NNN)'r   r   r   r   r   broadcast_addressbroadcast_portbroadcast_rpc_addressbroadcast_rpc_portlisten_addresslisten_portr   r&   release_versionr   dse_versiondse_workloaddse_workloadsr#   r%   r3   r   _currently_handling_node_upr   propertyr    r   r   r   r*   r,   r1   r4   r7   r:   r=   r>   r@   rD   r   r   r   r   r   -   sj    

	



	
r   c                   @   sL   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dd ZdS )_ReconnectionHandlerz^
    Abstract class for attempting reconnections with a given
    schedule and scheduler.
    Fc                 O   s"   || _ || _|| _|| _|| _d S r.   )	schedulerschedulecallbackcallback_argscallback_kwargs)r   rR   rS   rT   rU   rV   r   r   r   r     s
   
z_ReconnectionHandler.__init__c                 C   s2   | j r
td d S t| j}| j|| j d S )Nz2Reconnection handler was cancelled before starting)
_cancelledr'   r(   nextrS   rR   run)r   first_delayr   r   r   start  s
   

z_ReconnectionHandler.startc                 C   s   | j rd S d }zaz|  }W n= tyK } z1zt| j}W n ty(   d }Y nw | ||rA|d u r9td n| j	|| j
 W Y d }~nd }~ww | j s^| | | j| ji | j W |rg|  d S d S |rp|  w w )NzSWill not continue to retry reconnection attempts due to an exhausted retry schedule)rW   try_reconnect	ExceptionrX   rS   StopIterationon_exceptionr'   warningrR   rY   on_reconnectionrT   rU   rV   close)r   connexc
next_delayr   r   r   rY     s8   

z_ReconnectionHandler.runc                 C   r+   NT)rW   r!   r   r   r   cancel.  r-   z_ReconnectionHandler.cancelc                 C   s   t  )z
        Subclasses must implement this method.  It should attempt to
        open a new Connection and return it; if a failure occurs, an
        Exception should be raised.
        )NotImplementedErrorr!   r   r   r   r\   1  s   z"_ReconnectionHandler.try_reconnectc                 C   s   dS )zk
        Called when a new Connection is successfully opened.  Nothing is
        done by default.
        Nr   r   
connectionr   r   r   ra   9  s   z$_ReconnectionHandler.on_reconnectionc                 C   s   t |trdS dS )a  
        Called when an Exception is raised when trying to connect.
        `exc` is the Exception that was raised and `next_delay` is the
        number of seconds (as a float) that the handler will wait before
        attempting to connect again.

        Subclasses should return :const:`False` if no more attempts to
        connection should be made, :const:`True` otherwise.  The default
        behavior is to always retry unless the error is an
        :exc:`.AuthenticationFailed` instance.
        FT)r   r   r   rd   re   r   r   r   r_   @  s   
z!_ReconnectionHandler.on_exceptionN)r   r   r   r   rW   r   r[   rY   rg   r\   ra   r_   r   r   r   r   rQ      s    rQ   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
_HostReconnectionHandlerc                 O   s:   t j| g|R i | || _|| _|| _|| _|| _d S r.   )rQ   r   is_host_additionon_addon_uphostconnection_factory)r   rp   rq   rm   rn   ro   argskwargsr   r   r   r   T  s   
z!_HostReconnectionHandler.__init__c                 C   s   |   S r.   )rq   r!   r   r   r   r\   \  s   z&_HostReconnectionHandler.try_reconnectc                 C   s4   t d| j | jr| | j d S | | j d S )NzBSuccessful reconnection to %s, marking node up if it isn't already)r'   inforp   rm   rn   ro   ri   r   r   r   ra   _  s   z(_HostReconnectionHandler.on_reconnectionc                 C   s2   t |trdS td| j|| tjddd dS )NFzGError attempting to reconnect to %s, scheduling retry in %s seconds: %szReconnection error detailsT)exc_info)r   r   r'   r`   rp   r(   rk   r   r   r   r_   f  s   
z%_HostReconnectionHandler.on_exceptionN)r   r   r   r   r\   ra   r_   r   r   r   r   rl   R  s
    rl   c                   @   s   e Zd ZdZdZdZdZdZdZdZ	dZ
dZdd Zdd Zdd	 Zdd
dZdd Zdd Zdd Zdd Zdd Zdd Zedd ZdS )HostConnectionz
    When using v3 of the native protocol, this is used instead of a connection
    pool per host (HostConnectionPool) due to the increased in-flight capacity
    of individual connections.
    NFc                 C   s   || _ || _t|| _t | _t| j| _d| _	t
 | _|tjkr+td| j  d S |tjkr=|jjs=td| j  d S td| j  |jj|j| jd| _|j| _| jr]| j| j td| j  d S )NFz)Not opening connection to ignored host %sz(Not opening connection to remote host %sz#Initializing connection for host %son_orphaned_stream_releasedz,Finished initializing connection for host %s)rp   host_distanceweakrefproxy_sessionr   _lockr   _stream_available_condition_is_replacingset_trashr   IGNOREDr'   r(   REMOTEclusterconnect_to_remote_hostsrq   r   rx   _connectionkeyspace	_keyspaceset_keyspace_blocking)r   rp   ry   sessionr   r   r   r     s&   
zHostConnection.__init__c                 C   s.   | j rtd| jf | j| j}|st |S )NPool for %s is shutdown)is_shutdownr   rp   r   r   r   rc   r   r   r   _get_connection  s   zHostConnection._get_connectionc                 C   sF  |   }|jr/| j | js d| _| j| j| td| j	 W d    n1 s*w   Y  t

 }|}	 |j( |jr@|jsZ|j|jk rZ| jd7  _|| fW  d    S W d    n1 sdw   Y  |d ur~|t

  | }|dk r~	 td| j |jr|jr|   }n| j| W d    n1 sw   Y  q6)NTzAConnection to host %s reached orphaned stream limit, replacing...   r   z$All request IDs are currently in use)r   orphaned_threshold_reachedr}   r   r|   submit_replacer'   r(   rp   timer   	is_closed	in_flightmax_request_idget_request_idr~   waitr   )r   timeoutrc   r[   	remainingr   r   r   borrow_connection  sD   	
 
z HostConnection.borrow_connectionc              	   C   s  |s4|j  | jd8  _W d    n1 sw   Y  | j | j  W d    n1 s/w   Y  |js:|jr|jrB| jsBd S d}|js`t	dt
|| j | jjj| j|jdd}d|_| jrq|sqd}| jjj| jdd |ry|   d S d | _| j  | jr	 W d    d S d| _| j| j| W d    d S 1 sw   Y  d S || jv r
|j M |jt|jkr| j! || jv r| j| t	dt
|| j |  W d    n1 sw   Y  W d    d S W d    d S W d    d S 1 sw   Y  d S d S )Nr   FWDefunct or closed connection (%s) returned to pool, potentially marking host %s as downrm   T%Closing trashed connection (%s) to %s)r   r   r~   notify
is_defunctr   signaled_errorshutdown_on_errorr'   r(   idrp   r|   r   r1   
last_erroron_downshutdownr   r}   r   r   r   r   lenorphaned_request_idsremoverb   )r   rj   stream_was_orphanedis_downr   r   r   return_connection  sj   


"



z HostConnection.return_connectionc                 C   s6   | j  | j   W d   dS 1 sw   Y  dS zt
        Called when a response for an orphaned stream (timed out on the client
        side) was received.
        N)r~   r   r!   r   r   r   rx     s   "z*HostConnection.on_orphaned_stream_releasedc              	   C   s^  | j  | jr	 W d    d S W d    n1 sw   Y  tdt|| j z| jjj| jj	| j
d}| jr?|| j || _W n ty_   td| jj	f  | j| j| Y d S w |jB | j & |jr~|jt|jkrx|  n| j| d| _| j  W d    n1 sw   Y  W d    d S W d    d S 1 sw   Y  d S )NReplacing connection (%s) to %srw   z!Failed reconnecting %s. Retrying.F)r}   r   r'   r(   r   rp   r|   r   rq   r   rx   r   r   r   r]   r`   r   r   r   r   r   r   r   rb   r   addr   r~   r   )r   rj   rc   r   r   r   r     s8   

"zHostConnection._replacec                 C   s   | j  | jr	 W d    d S d| _| j  W d    n1 s"w   Y  | jr2| j  d | _d }| j  | jrB| j}t | _W d    n1 sLw   Y  |d ur_| jD ]}|  qXd S d S rf   )r}   r   r~   
notify_allr   rb   r   r   )r   trash_connsrc   r   r   r   r     s,   


zHostConnection.shutdownc                    s6   j sjsd S  fdd}|_j|| d S )Nc                    s&    |  |s	g n|g} | d S r.   )r   )rc   errorerrorsrT   r   r   r   $connection_finished_setting_keyspace+  s   
zXHostConnection._set_keyspace_for_all_conns.<locals>.connection_finished_setting_keyspace)r   r   r   set_keyspace_async)r   r   rT   r   r   r   r   _set_keyspace_for_all_conns'  s
   z*HostConnection._set_keyspace_for_all_connsc                 C   s   | j }|r|gS g S r.   )r   )r   cr   r   r   get_connections3  s   zHostConnection.get_connectionsc                 C   sN   | j }|r|js|jsdnd}|r|jgng }|r|jgng }| j|||dS )Nr   r   r   
open_count
in_flightsorphan_requests)r   r   r   r   r   r   )r   rj   r   r   r   r   r   r   	get_state7  s   zHostConnection.get_statec                 C   s   | j }|r|js|jsdS dS )Nr   r   )r   r   r   ri   r   r   r   r   ?  s   zHostConnection.open_countF)r   r   r   r   rp   ry   r   r   r|   r   r}   r   r   r   r   r   rx   r   r   r   r   r   rP   r   r   r   r   r   rv   p  s,    

+rv   r   
   c                   @   s   e Zd ZdZdZdZdZdZdZdZ	dZ
dd Zdd Zd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zd+ddZdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* ZdS ),HostConnectionPoolzK
    Used to pool connections to a host for v1 and v2 native protocol.
    NFr   c                    s    _ |_t_t _t _t	
dj  j|} fddt|D _j_jrBjD ]}|j q9t _t _|_t	
dj  d S )Nz,Initializing new connection pool for host %sc                    s    g | ]}j j jjd qS )rw   )r   rq   r   rx   ).0irp   r   r   r   r   
<listcomp>`  s    z/HostConnectionPool.__init__.<locals>.<listcomp>z5Finished initializing new connection pool for host %s)rp   ry   rz   r{   r|   r   r}   r   _conn_available_conditionr'   r(   r   get_core_connections_per_hostrange_connectionsr   r   r   r   r   r   _next_trash_allowed_atr   )r   rp   ry   r   
core_connsrc   r   r   r   r   V  s$   

zHostConnectionPool.__init__c                 C   st  | j rtd| jf | j| j}|sZtd| j | jj| j	}| j
' |t| j| j  }t|D ]}|  jd7  _| j| j q3W d    n1 sNw   Y  | |}|S | jj| j	}| jj| j	}t|dd d}	d }
d}|	j |	j|	jk r|	 jd7  _|	 }
nd}W d    n1 sw   Y  |r| |\}	}
|	j|krt| j|k r|   |	|
fS )	Nr   z-Detected empty pool, opening core conns to %sr   c                 S   r"   r.   r   r   r   r   r   <lambda>      z6HostConnectionPool.borrow_connection.<locals>.<lambda>keyFT)r   r   rp   r   r'   r(   r|   r   r   ry   r}   r   _scheduled_for_creationr   r   _create_new_connection_wait_for_connget_max_requests_per_connectionget_max_connections_per_hostminr   r   r   r   _maybe_spawn_new_connection)r   r   connsr   	to_creater   rc   max_reqs	max_conns
least_busy
request_idneed_to_waitr   r   r   r   m  sD   


z$HostConnectionPool.borrow_connectionc                 C   s   | j 1 | jtkr	 W d    d S | j| jj| jkr&	 W d    d S |  jd7  _W d    n1 s7w   Y  t	d| j
 | j| j d S )Nr   z4Submitting task for creation of new Connection to %s)r}   r   _MAX_SIMULTANEOUS_CREATIONr   r|   r   r   ry   r'   r(   rp   r   r   r!   r   r   r   r     s   
z.HostConnectionPool._maybe_spawn_new_connectionc                 C   s   zOz|    W n* ttjfy$ } ztd| j| W Y d }~nd }~w ty1   td Y nw W | j	 |  j
d8  _
W d    d S 1 sIw   Y  d S | j	 |  j
d8  _
W d    w 1 sew   Y  w )Nz)Failed to create new connection to %s: %sz,Unexpectedly failed to create new connectionr   )_add_conn_if_under_maxr   socketr   r'   r`   rp   r]   	exceptionr}   r   )r   rd   r   r   r   r     s   *z)HostConnectionPool._create_new_connectionc                 C   s  | j j| j}| j) | jr	 W d    dS | j|kr&	 W d    dS |  jd7  _W d    n1 s7w   Y  td| j	 zN| j jj
| j	j| jd}| jrZ|| j j t t | _| j | jd d  |g }|| _W d    n1 s|w   Y  tdt|| j	 |   W dS  ttjfy } z8td| j	| | j |  jd8  _W d    n1 sw   Y  | j jj| j	|ddr|   W Y d }~dS d }~w ty   | j |  jd8  _W d    Y dS 1 sw   Y  Y dS w )	NTr   z'Going to open new connection to host %srw   zFAdded new connection (%s) to pool for host %s, signaling availablilityz4Failed to add new connection to pool for host %s: %sFr   )r|   r   r   ry   r}   r   r   r'   r(   rp   rq   r   rx   r   r   r   r   _MIN_TRASH_INTERVALr   r   r   _signal_available_connr   r   r   r`   r1   r   r   )r   r   rc   new_connectionsrd   r   r   r   r     sT   
	

z)HostConnectionPool._add_conn_if_under_maxc                 C   s8   | j  | j | W d    d S 1 sw   Y  d S r.   )r   r   )r   r   r   r   r   _await_available_conn  s   "z(HostConnectionPool._await_available_connc                 C   6   | j  | j   W d    d S 1 sw   Y  d S r.   )r   r   r!   r   r   r   r        "z)HostConnectionPool._signal_available_connc                 C   r   r.   )r   r   r!   r   r   r   _signal_all_available_conn  r   z-HostConnectionPool._signal_all_available_connc                 C   s   t   }|}|dkr\| | | jrtd| j}|rPt|dd d}|j" |j|jk rA| jd7  _||	 fW  d    S W d    n1 sKw   Y  |t   |  }|dks
t
 )Nr   zPool is shutdownc                 S   r"   r.   r   r   r   r   r   r     r   z3HostConnectionPool._wait_for_conn.<locals>.<lambda>r   r   )r   r   r   r   r   r   r   r   r   r   r   )r   r   r[   r   r   r   r   r   r   r     s&   

z!HostConnectionPool._wait_for_connc              	   C   s  |j  |s| jd8  _|j}W d    n1 sw   Y  |js%|jrR|jsPtdt|| j | j	j
j| j|jdd}d|_|rI|   d S | | d S d S || jv r|j B |jdkr| j || jv ro| j| W d    n1 syw   Y  tdt|| j |  W d    d S W d    d S 1 sw   Y  d S | j	j
| j}| j	j
| j}t| j|kr||krt | jkr| | d S |   d S )Nr   r   Fr   Tr   r   )r   r   r   r   r   r'   r(   r   rp   r|   r   r1   r   r   r   r   r}   r   rb   r   ry   get_min_requests_per_connectionr   r   r   r   _maybe_trash_connectionr   )r   rj   r   r   r   r   min_reqsr   r   r   r     sR   







z$HostConnectionPool.return_connectionc                 C   s   |    dS r   )r   r!   r   r   r   rx   +  s   z.HostConnectionPool.on_orphaned_stream_releasedc              	   C   s4  | j j| j}d}| jn || jvr	 W d    d S | j|krtd}|  jd8  _| jd d  }|| || _|j* |j	dkr_t
dt|| j |  	 W d    W d    d S W d    n1 siw   Y  | j| W d    n1 s~w   Y  |rt t | _t
dt|| j d S d S )NFTr   r   z7Skipping trash and closing unused connection (%s) to %szTrashed connection (%s) to %s)r|   r   r   ry   r}   r   r   r   r   r   r'   r(   r   rp   rb   r   r   r   r   r   )r   rj   r   	did_trashr   r   r   r   r   2  s8   



z*HostConnectionPool._maybe_trash_connectionc                 C   s   d}| j % || jv r#| jd d  }|| || _|  jd8  _d}W d    n1 s-w   Y  |rKtdt|| j |  | j	
| j d S tdt|| j |  d S )NFr   Tr   zClosing connection (%s) to %s)r}   r   r   r   r'   r(   r   rp   rb   r|   r   _retrying_replace)r   rj   should_replacer   r   r   r   r   N  s    

zHostConnectionPool._replacec                 C   sZ   d}z|   }W n ty   td| j Y nw |s+td| j | j| j d S d S )NFz!Failed replacing connection to %sz,Failed replacing connection to %s. Retrying.)	r   r]   r'   r   rp   r(   r|   r   r   )r   replacedr   r   r   r   `  s   z$HostConnectionPool._retrying_replacec                 C   s   | j  | jr	 W d    d S d| _W d    n1 sw   Y  |   | jD ]}|  |  jd8  _q)| jD ]}|  q:d S )NTr   )r}   r   r   r   rb   r   r   r   r   r   r   r   j  s   


zHostConnectionPool.shutdownc                 C   s   | j rd S | jj| j}| j( |t| j| j  }t	|D ]}|  jd7  _| j
| j qW d    d S 1 s;w   Y  d S )Nr   )r   r|   r   r   ry   r}   r   r   r   r   r   r   )r   r   r   r   r   r   r   ensure_core_connectionsy  s   "z*HostConnectionPool.ensure_core_connectionsc                    sT   t jg s  dS  fdd}|_jD ]}||| qdS )z
        Asynchronously sets the keyspace for all connections.  When all
        connections have been set, `callback` will be called with two
        arguments: this pool, and a list of any errors that occurred.
        Nc                    s8    |  |  |r| s  d S d S r.   )r   r   append)rc   r   rT   r   remaining_callbacksr   r   r   r     s   


z\HostConnectionPool._set_keyspace_for_all_conns.<locals>.connection_finished_setting_keyspace)r   r   r   r   )r   r   rT   r   rc   r   r   r   r     s   

	
z.HostConnectionPool._set_keyspace_for_all_connsc                 C   r"   r.   )r   r!   r   r   r   r     s   z"HostConnectionPool.get_connectionsc                 C   s2   dd | j D }dd | j D }| j| j||dS )Nc                 S      g | ]}|j qS r   r   r   r   r   r   r   r         z0HostConnectionPool.get_state.<locals>.<listcomp>c                 S   r   r   )r   r   r   r   r   r     r   r   )r   r   r   )r   r   r   r   r   r   r     s
   zHostConnectionPool.get_stater   )r   r   r   r   rp   ry   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rx   r   r   r   r   r   r   r   r   r   r   r   r   r   H  s8    8$
'
r   )#r   	functoolsr   loggingr   r   	threadingr   r   r   rz   r   ImportErrorcassandra.util	cassandrar   cassandra.connectionr   r	   r
   cassandra.policiesr   	getLoggerr   r'   r]   r   objectr   rQ   rl   rv   r   r   r   r   r   r   r   <module>   s6   
 LY U