o
    NDi~                 	   @   s  d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZmZmZ ddlmZ dd	lmZmZmZ dd
lmZmZmZ ddlZddlZddlmZ ddlmZ ddlZddlZddlZddl Z ddl!Z!ddl"m#Z#m$Z$m%Z%m&Z& ddl'Z'ddl(Z(ddl(m)Z) ddl*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3 ddl4m5Z5m6Z6 ddl7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@mAZA ddlBmCZC ddlDmEZE ddlFmGZGmHZHmIZImJZJmKZKmLZLmMZMmNZNmOZOmPZPmQZQmRZRmSZSmTZTmUZUmVZVmWZWmXZXmYZYmZZZm[Z[ ddl\m]Z]m^Z^m_Z_m`Z` ddlambZbmcZcmdZdmeZemfZfmgZgmhZhmiZimjZjmkZkmlZl ddlmmnZnmoZompZpmqZqmrZrmsZs ddltmuZumvZvmwZwmxZxmyZymzZzm{Z{m|Z|m}Z}m~Z~mZmZ ddlmZ ddlmZ ddlmZmZ ddlmZ ddlmZ ddlmZmZmZmZmZmZmZmZmZmZ ddlmZmZ dd lmZ zdd!lmZ W n ey   dZY nw zdd"lmZ W n eefy   dZY nw zdd#l(mZ W n ey   dd#lmZ Y nw d$d% Zd&d' Zd(d) Zd*d+ Zd,d- Zd.d/ Zd0d1 ZeeZeeeefZeeedg f\ZZes
e3d2eeZd3d4 d5Zd6Zd7Zd8Zd9Zd7Zed:Ze ZG d;d< d<eZd=d> Zd?d@ Ze ZdAdB ZdCdD ZdEdF Zeeġ dGdH ZG dIdJ dJeZdKdL ZdMdN ZG dOdP dPeZG dQdR dReʃZG dSdT dTe˃ZG dUdV dVeZe Z	 e Z	 e Z	 e Z	 G dWdX dXeZG dYdZ dZeZG d[d\ d\eZG d]d^ d^eZG d_d` d`eoZdadb Zdcdd ZG dedf dfeZdgdh ZG didj dje%Zdkdl ZG dmdn dneZG dodp dpeZG dqdr dreZdS )szf
This module houses the main classes you will interact with,
:class:`.Cluster` and :class:`.Session`.
    )absolute_importN)hexlify)defaultdict)Mapping)ThreadPoolExecutorFIRST_COMPLETEDwait)copy)partialreducewraps)groupbycountchain)warn)random)LockRLockThreadEvent)WeakValueDictionary)	ConsistencyLevelAuthenticationFailedOperationTimedOutUnsupportedOperationSchemaTargetTypeDriverExceptionProtocolVersionUnresolvableContactPointsDependencyException)_proxy_execute_keyPlainTextAuthProvider)
ConnectionExceptionConnectionShutdownConnectionHeartbeatProtocolVersionUnsupportedEndPointDefaultEndPointDefaultEndPointFactoryContinuousPagingStateSniEndPointFactoryConnectionBusy)UserType)Encoder)QueryMessageResultMessageErrorMessageReadTimeoutErrorMessageWriteTimeoutErrorMessageUnavailableErrorMessageOverloadedErrorMessagePrepareMessageExecuteMessagePreparedQueryNotFoundIsBootstrappingErrorMessageTruncateErrorServerErrorBatchMessageRESULT_KIND_PREPAREDRESULT_KIND_SET_KEYSPACERESULT_KIND_ROWSRESULT_KIND_SCHEMA_CHANGEProtocolHandlerRESULT_KIND_VOIDProtocolException)Metadataprotect_namemurmur3	_NodeInfo)TokenAwarePolicyDCAwareRoundRobinPolicySimpleConvictionPolicyExponentialReconnectionPolicyHostDistanceRetryPolicyIdentityTranslatorNoSpeculativeExecutionPlanNoSpeculativeExecutionPolicyDefaultLoadBalancingPolicyNeverRetryPolicy)Host_ReconnectionHandler_HostReconnectionHandlerHostConnectionPoolHostConnectionNoConnectionsAvailable)SimpleStatementPreparedStatementBoundStatementBatchStatementbind_params
QueryTraceTraceUnavailablenamed_tuple_factorydict_factorytuple_factoryFETCH_SIZE_UNSETHostTargetingStatement)
int64_pack)MonotonicTimestampGenerator)%_resolve_contact_points_to_string_mapVersion)MonitorReporter)version_supports_insights)
graph_object_row_factoryGraphOptionsGraphSON1SerializerGraphProtocolGraphSON2SerializerGraphStatementSimpleGraphStatementgraph_graphson2_row_factorygraph_graphson3_row_factoryGraphSON3Serializer)_request_timeout_key_GraphSONContextRowFactory)cloud)TwistedConnectionEventletConnection)WeakSetc                  C   s$   dt jvrdS dd l} tj| jju S )Nzgevent.monkeyFr   )sysmodulesgevent.socketsocket)gevent r   J/var/www/Datamplify/venv/lib/python3.10/site-packages/cassandra/cluster.py_is_gevent_monkey_patchedr   s   
r   c                  C      t  rddlm}  | d fS dS )Nr   GeventConnectionNN)r   cassandra.io.geventreactorr   r   r   r   r   _try_gevent_importx      r   c                  C   s:   dt jvrdS zdd l} | jdW S  ty   Y dS w )Nzeventlet.patcherFr   r~   )r{   r|   eventlet.patcherpatcheris_monkey_patchedAttributeError)eventletr   r   r   _is_eventlet_monkey_patched   s   
r   c                  C   r   )Nr   rx   r   )r   cassandra.io.eventletreactorry   rx   r   r   r   _try_eventlet_import   r   r   c               
   C   D   zddl m}  | d fW S  ty! } z
d |fW  Y d }~S d }~ww )Nr   )LibevConnection)cassandra.io.libevreactorr   r   )r   er   r   r   _try_libev_import      
r   c               
   C   r   )Nr   )AsyncoreConnection)cassandra.io.asyncorereactorr   r   )r   r   r   r   r   _try_asyncore_import   r   r   c                 C   s4   | \}}|r| S | \}}|r| | |p||fS N)append)val	import_fnrvexcsimport_resultexcr   r   r   _connection_reduce_fn   s   

r   z)Unable to load a default connection class utf8   d            z6.8.0c                   @   s   e Zd ZdZdZ	 dd ZdS )NoHostAvailablez
    Raised when an operation is attempted but all connections are
    busy, defunct, closed, or resulted in errors when used.
    Nc                 C   s   t | || || _d S r   )	Exception__init__errors)selfmessager   r   r   r   r      s   
zNoHostAvailable.__init__)__name__
__module____qualname____doc__r   r   r   r   r   r   r      s
    r   c                 C   s"   |   }|rtjd|d dS dS )z Helper for run_in_executor() zFailed to run task on executorexc_infoN)	exceptionlogdebugfuturer   r   r   r   _future_completed   s   r   c                    s   t   fdd}|S )zH
    A decorator to run the given method in the ThreadPoolExecutor.
    c                    sV   | j rd S z| jj | g|R i |}|t W d S  ty*   td Y d S w )Nz!Failed to submit task to executor)is_shutdownexecutorsubmitadd_done_callbackr   r   r   r   )r   argskwargsr   fr   r   new_f   s   zrun_in_executor.<locals>.new_f)r   )r   r   r   r   r   run_in_executor   s   
r   c                 C      t |  d S r   )_clusters_for_shutdownaddclusterr   r   r   _register_cluster_shutdown      r   c                 C   r   r   )r   discardr   r   r   r   _discard_cluster_shutdown   r   r   c                  C   s   t  } | D ]}|  qd S r   )r   r	   shutdown)clustersr   r   r   r   _shutdown_clusters   s   
r   c                   C   s   t d ur	tt S t S r   )rE   rG   rH   r   r   r   r   default_lbp_factory  s   
r   c                   @   sP   e Zd ZG dd deZdZ	 dZ	 dZ	 dZ	 ej	dddfddZ
dd	 ZdS )
ContinuousPagingOptionsc                   @   s   e Zd ZdZdZdS )z"ContinuousPagingOptions.PagingUnitr   r   N)r   r   r   BYTESROWSr   r   r   r   
PagingUnit  s    r   Nr      c                 C   s,   || _ || _|| _|dk rtd|| _d S )Nr   z;ContinuousPagingOptions.max_queue_size must be 2 or greater)	page_unit	max_pagesmax_pages_per_second
ValueErrormax_queue_size)r   r   r   r   r   r   r   r   r   (  s   
z ContinuousPagingOptions.__init__c                 C   s   | j tjjkS r   )r   r   r   r   r   r   r   r   page_unit_bytes0  r   z'ContinuousPagingOptions.page_unit_bytes)r   r   r   objectr   r   r   r   r   r   r   r   r   r   r   r   r     s    r   c              	   C   s@   zt | |t jt jW S  t jy   td| | Y dS w )z
    A helper function that wraps socket.getaddrinfo and returns None
    when it fails to, e.g. resolve one of the hostnames. Used to address
    PYTHON-895.
    z,Could not resolve hostname "{}" with port {}N)r~   getaddrinfo	AF_UNSPECSOCK_STREAMgaierrorr   r   format)contact_pointportr   r   r   _addrinfo_or_none4  s   r   c                 C   s.   t dtdtdtdi}| |v r||  S d| f S )NEXEC_PROFILE_DEFAULTEXEC_PROFILE_GRAPH_DEFAULT!EXEC_PROFILE_GRAPH_SYSTEM_DEFAULT$EXEC_PROFILE_GRAPH_ANALYTICS_DEFAULTz"%s")r   r   r   r   )namedefault_profilesr   r   r   _execution_profile_to_stringC  s   
r   c                   @   sd   e Zd ZdZ	 dZ	 ejZ	 dZ	 dZ		 e
eZ	 dZ	 dZ	 dZdZededdeddfddZdS )ExecutionProfileN      $@Fc	           	      C   s   |t u rd| _t | _nd| _|| _|t u rd| _tj| _nd| _|| _|p(t | _	|d ur7t
|s7td|| _|| _|| _|pDt | _|| _d S )NFTzaserial_consistency_level must be either ConsistencyLevel.SERIAL or ConsistencyLevel.LOCAL_SERIAL.)_NOT_SET_load_balancing_policy_explicitr   load_balancing_policy_consistency_level_explicitr   	LOCAL_ONEconsistency_levelrL   retry_policy	is_serialr   serial_consistency_levelrequest_timeoutrow_factoryrO   speculative_execution_policycontinuous_paging_options)	r   r   r   r   r   r   r   r   r   r   r   r   r     s(   


zExecutionProfile.__init__)r   r   r   r   r   r   r   r   r   r   staticmethodr_   r   r   r   r   r   r   r   r   r   r   r   r   Q  s0    r   c                       s4   e Zd ZdZ	 ededdddef fdd	Z  ZS )GraphExecutionProfileNg      >@c	           	   	      s>   |pt  }tt| j|||||||d |ptddd| _dS )ay  
        Default execution profile for graph execution.

        See :class:`.ExecutionProfile` for base attributes. Note that if not explicitly set,
        the row_factory and graph_options.graph_protocol are resolved during the query execution.
        These options will resolve to graph_graphson3_row_factory and GraphProtocol.GRAPHSON_3_0
        for the core graph engine (DSE 6.8+), otherwise graph_object_row_factory and GraphProtocol.GRAPHSON_1_0

        In addition to default parameters shown in the signature, this profile also defaults ``retry_policy`` to
        :class:`cassandra.policies.NeverRetryPolicy`.
        )r      g   gremlin-groovy)graph_sourcegraph_languageN)rQ   superr   r   rk   graph_options)	r   r   r   r   r   r   r   r  r   	__class__r   r   r     s   
zGraphExecutionProfile.__init__)r   r   r   r  r   r   __classcell__r   r   r  r   r     s    
r   c                       s,   e Zd Zddeddddf fdd	Z  ZS )GraphAnalyticsExecutionProfileNg     u"Ac              	      sF   |pt t }|ptdd}tt| ||||||| | j  dS )aR  
        Execution profile with timeout and load balancing appropriate for graph analytics queries.

        See also :class:`~.GraphExecutionPolicy`.

        In addition to default parameters shown in the signature, this profile also defaults ``retry_policy`` to
        :class:`cassandra.policies.NeverRetryPolicy`, and ``load_balancing_policy`` to one that targets the current Spark
        master.

        Note: The graph_options.graph_source is set automatically to b'a' (analytics)
        when using GraphAnalyticsExecutionProfile. This is mandatory to target analytics nodes.
        r  )r  N)rP   r   rk   r  r
  r   r  set_source_analytics)r   r   r   r   r   r   r   r  r  r   r   r     s   z'GraphAnalyticsExecutionProfile.__init__)r   r   r   r   r   r	  r   r   r  r   r
    s    r
  c                   @   s`   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zedd ZdS )ProfileManagerc                 C   s   t  | _d S r   )dictprofilesr   r   r   r   r        zProfileManager.__init__c                 C   s&   dd | j  D }tdd |D S )Nc                 s   s    | ]
\}}|j s|V  qd S r   )r   ).0profile_nameprofiler   r   r   	<genexpr>  s
    zAProfileManager._profiles_without_explicit_lbps.<locals>.<genexpr>c                 s   s     | ]}|t u r
d n|V  qdS )r   N)r   )r  nr   r   r   r    s
    
)r  itemstuple)r   namesr   r   r   _profiles_without_explicit_lbps  s   z.ProfileManager._profiles_without_explicit_lbpsc                    sB   t  fdd| j D }tj|v rtjS tj|v rtjS tjS )Nc                 3   s    | ]	}|j  V  qd S r   )r   distance)r  phostr   r   r    s    z*ProfileManager.distance.<locals>.<genexpr>)setr  valuesrK   LOCALREMOTEIGNORED)r   r  	distancesr   r  r   r    s   zProfileManager.distancec                 C   s"   | j  D ]	}|j|| qd S r   )r  r  r   populate)r   r   hostsr  r   r   r   r#    s   zProfileManager.populatec                 C   s   | j  D ]}|j  qd S r   )r  r  r   check_supported)r   r  r   r   r   r%    s   zProfileManager.check_supportedc                 C       | j  D ]}|j| qd S r   )r  r  r   on_upr   r  r  r   r   r   r'       zProfileManager.on_upc                 C   r&  r   )r  r  r   on_downr(  r   r   r   r*    r)  zProfileManager.on_downc                 C   r&  r   )r  r  r   on_addr(  r   r   r   r+    r)  zProfileManager.on_addc                 C   r&  r   )r  r  r   	on_remover(  r   r   r   r,     r)  zProfileManager.on_removec                 C   s
   | j t S )zc
        internal-only; no checks are done because this entry is populated on cluster init
        )r  r   r   r   r   r   default$  s   
zProfileManager.defaultN)r   r   r   r   r  r  r#  r%  r'  r*  r+  r,  propertyr-  r   r   r   r   r    s    	r  c                   @      e Zd ZdZdZdZdS )_ConfigModer   r   r   N)r   r   r   UNCOMMITTEDLEGACYPROFILESr   r   r   r   r0  N      r0  c                )   @   s  e Zd ZdZdgZ	 dZdZ	 dZ	 ej	Z
	 dZdZ	 dZ	 dZdZedd Zejd	d ZdZed
d Zejdd Zedd ZeddZ	 e Zedd Zejdd ZeZ	 e Z	 dZ	 dZ 	 dZ!	 dZ"	 dZ#	 dZ$	 dZ%	 dZ&	 e'Z(	 dZ)	 dZ*	 dZ+	 dZ,	 dZ-	 dZ.	 dZ/	 dZ0	 dZ1	 dZ2	 dZ3	 dZ4	 dZ5	 dZ6	 dZ7	 dZ8	 dZ9	 edd Z:e:jdd Z:edd Z;e;jdd Z;dZ<	 dZ=e>j?Z@dZAdZBdZCdZDdZEdZFdZGdZHdZIdZJdZKdZL	 dZMdZNeOddddddddddddeOdddddddddddddddddddddddddddf)d d!ZPd"d# ZQd$d% ZRdd&d'ZSd(d) ZTd*d+ ZUd,d- ZVd.d/ ZWd0d1 ZXd2d3 ZYd4d5 ZZd6d7 Z[d8d9 Z\d:d; Z]d<d= Z^d>d? Z_dd@dAZ`dBdC ZadDdE ZbdFdG ZcdHdI ZddJdK ZedLdM ZfdNdO ZgdPdQ ZhdRdS ZidTdU ZjdVdW ZkelddXdYZmddZd[Zndd\d]Zod^d_ Zpdd`daZqddbdcZrddde Zsdfdg Ztdhdi Zuedjdk Zvdldm Zwexdndo Zyexdpdq Zzdrds Z{ddtduZ|ddvdwZ}ddxdyZ~ddzd{Zdd|d}Zdd~dZdddZdddZdd ZedddZdd Zdd ZdS )Clusteraf  
    The main class to use when interacting with a Cassandra cluster.
    Typically, one instance of this class will be created for each
    separate Cassandra cluster that your application interacts with.

    Example usage::

        >>> from cassandra.cluster import Cluster
        >>> cluster = Cluster(['192.168.1.1', '192.168.1.2'])
        >>> session = cluster.connect()
        >>> session.execute("CREATE KEYSPACE ...")
        >>> ...
        >>> cluster.shutdown()

    ``Cluster`` and ``Session`` also provide context management functions
    which implicitly handle shutdown when leaving scope.
    	127.0.0.1NiR#  FTc                 C      | j S )a  
        When :attr:`~.Cluster.protocol_version` is 2 or higher, this should
        be an instance of a subclass of :class:`~cassandra.auth.AuthProvider`,
        such as :class:`~.PlainTextAuthProvider`.

        When :attr:`~.Cluster.protocol_version` is 1, this should be
        a function that accepts one argument, the IP address of a node,
        and returns a dict of credentials for that node.

        When not using authentication, this should be left as :const:`None`.
        )_auth_providerr   r   r   r   auth_provider     zCluster.auth_providerc                 C   s`   |s|| _ d S z|j| _W n ty*   | jdkrtdt|s%td|| _Y nw || _ d S )Nr   zaauth_provider must implement the cassandra.auth.AuthProvider interface when protocol_version >= 2z9auth_provider must be callable when protocol_version == 1)r8  new_authenticator_auth_provider_callabler   protocol_version	TypeErrorcallable)r   valuer   r   r   r9    s   


c                 C   r7  )a/  
        An instance of :class:`.policies.LoadBalancingPolicy` or
        one of its subclasses.

        .. versionchanged:: 2.6.0

        Defaults to :class:`~.TokenAwarePolicy` (:class:`~.DCAwareRoundRobinPolicy`).
        when using CPython (where the murmur3 extension is available). :class:`~.DCAwareRoundRobinPolicy`
        otherwise. Default local DC will be chosen from contact points.

        **Please see** :class:`~.DCAwareRoundRobinPolicy` **for a discussion on default behavior with respect to
        DC locality and remote nodes.**
        )_load_balancing_policyr   r   r   r   r     s   zCluster.load_balancing_policyc                 C   &   | j tjkr
td|| _tj| _ d S )NzkCannot set Cluster.load_balancing_policy while using Configuration Profiles. Set this in a profile instead.)_config_moder0  r3  r   rA  r2  )r   lbpr   r   r   r        c                 C   s
   | j jjS r   )profile_managerr-  r   r   r   r   r   _default_load_balancing_policy  s   
z&Cluster._default_load_balancing_policyg      ?g     @c                 C   r7  )z
        A default :class:`.policies.RetryPolicy` instance to use for all
        :class:`.Statement` objects which do not have a :attr:`~.Statement.retry_policy`
        explicitly set.
        )_default_retry_policyr   r   r   r   default_retry_policy  s   zCluster.default_retry_policyc                 C   rB  )NzjCannot set Cluster.default_retry_policy while using Configuration Profiles. Set this in a profile instead.)rC  r0  r3  r   rH  r2  )r   policyr   r   r   rI    rE  
          @   r   r   r   c                 C      | j jS )a  
        Flag indicating whether internal schema metadata is updated.

        When disabled, the driver does not populate Cluster.metadata.keyspaces on connect, or on schema change events. This
        can be used to speed initial connection, and reduce load on client and server during operation. Turning this off
        gives away token aware request routing, and programmatic inspection of the metadata model.
        )control_connection_schema_meta_enabledr   r   r   r   schema_metadata_enabled  s   	zCluster.schema_metadata_enabledc                 C      t || j_d S r   )boolrO  rP  r   enabledr   r   r   rQ  '     c                 C   rN  )a  
        Flag indicating whether internal token metadata is updated.

        When disabled, the driver does not query node token information on connect, or on topology change events. This
        can be used to speed initial connection, and reduce load on client and server during operation. It is most useful
        in large clusters using vnodes, where the token map can be expensive to compute. Turning this off
        gives away token aware request routing, and programmatic inspection of the token ring.
        )rO  _token_meta_enabledr   r   r   r   token_metadata_enabled+  s   
zCluster.token_metadata_enabledc                 C   rR  r   )rS  rO  rW  rT  r   r   r   rX  7  rV  c*           4         s  |
dur|
| _ |(dur^|(| _|tus s|!s|rtdto#t| j t}*to+t| j t}+tj|(|*p2|+d},|,j	}!ddi}|du rM|,j
rM|,jrMt|,j
|,j}t|,j|,j  fdd|,jD }|dur|tu rmd| _d	g}nd| _t|trytd
d|v rtd|| _|| _|)dur|)| _ pt| jd| _| j|  g }-dd | jD D ]}.|-t|.tr|.n|.|f qdd | jD | _dd | jD | _t|-}/| j t!t"dd |/# D   | j$dd |/% D  |r| jst&| j|| _'|tur|| _(d| _)|| _*| | _+|| _,|dur t|t-rtd|| _.nt/ | _0|dur6t|t-r3td|| _1|durHt|t-rEtd|| _2|durYt3|sVtd|| _4|durkt|t-rhtd|| _5|dur}t3|sytd|| _6nt7 | _6t8 | _9t:| j.| j2t;j<t;j=d| j9j>t?< |s|r|rtdt@jA| _BtCdtD n3| j9j>}0|r|0$| t@jE| _BtF| j9jGj.}1|0HtItJ|1d |0HtKtJ|1dd |0HtLtM|1d | jr| js| jBt@jEu r| j9N }2|2r tOPdjQ||2d  n|du rtOPd!jQ||d" |	| _R|r|!stCd#tD || _S|!| _	|| _T|| _U|| _V|| _W|| _X|| _Y|| _Z|| _[|| _\|| _]|| _^|| __|%| _`|&| _atb | _ctd | _etf | _gth | _id| _jtk | _ltd | _mtnto| _ptqjrtstqjttsi| _utqjrtvtqjttvi| _wtqjrtxtqjttyi| _ztqjrt{tqjtt|i| _}| j~|d$| _t| j| _t | _| jRrd%d&lm}3 |3t| | _t| | jW| jZ| j[| j\||| _j|'du rt | _|#dur|#| _|$dur|$| _dS dS )'a  
        ``executor_threads`` defines the number of threads in a pool for handling asynchronous tasks such as
        extablishing connection pools or refreshing metadata.

        Any of the mutable Cluster attributes may be set as keyword arguments to the constructor.
        Nzmcontact_points, endpoint_factory, ssl_context, and ssl_options cannot be specified with a cloud configuration)create_pyopenssl_contextcheck_hostnameTc                    s   g | ]}  |qS r   )create_from_sni)r  host_idendpoint_factoryr   r   
<listcomp>  s    z$Cluster.__init__.<locals>.<listcomp>Fr6  zUcontact_points should not be a string, it should be a sequence (e.g. list) of stringszDcontact_points should not contain None (it can resolve to localhost))r   c                 S   s   g | ]	}t |ts|qS r   
isinstancer&   r  cpr   r   r   r_        c                 S      g | ]	}t |tr|qS r   r`  rb  r   r   r   r_    rd  c                 S   s$   i | ]}t |d j|j|jdqS z{ip}:{port})ipr   )reprr   addressr   r  epr   r   r   
<dictcomp>  s    z$Cluster.__init__.<locals>.<dictcomp>c                 S   s"   g | ]}|d urdd |D qS )Nc                 S   s"   g | ]\}}|d urt ||qS r   )r'   r  rg  r   r   r   r   r_    s   " z/Cluster.__init__.<locals>.<listcomp>.<listcomp>r   )r  xsr   r   r   r_    s    
c                 S   s(   i | ]\}}|d ur|dd |D qS )Nc                 S   s   g | ]\}}d j ||dqS rf  )r   rm  r   r   r   r_    s    z/Cluster.__init__.<locals>.<dictcomp>.<listcomp>r   )r  keyr@  r   r   r   rl    s    zSload_balancing_policy should not be a class, it should be an instance of that classzQreconnection_policy should not be a class, it should be an instance of that classzRdefault_retry_policy should not be a class, it should be an instance of that classz*conviction_policy_factory must be callablezPaddress_translator should not be a class, it should be an instance of that classz$timestamp_generator must be callable)r   r   zClusters constructed with execution_profiles should not specify legacy parameters load_balancing_policy or default_retry_policy. Configure this in a profile instead.zVLegacy execution parameters will be removed in 4.0. Consider using execution profiles.)r   g     f@)r   r   a  Cluster.__init__ called with contact_points specified, but load-balancing policies are not specified in some ExecutionProfiles. In the next major version, this will raise an error; please specify a load-balancing policy. (contact_points = {cp}, EPs without explicit LBPs = {eps}))rc  epszCluster.__init__ called with contact_points specified, but no load_balancing_policy. In the next major version, this will raise an error; please specify a load-balancing policy. (contact_points = {cp}, lbp = {lbp}))rc  rD  zUsing ssl_options without ssl_context is deprecated and will result in an error in the next major release. Please use ssl_context to prepare for that release.)max_workersr   )Metrics)connection_classrv   r   r   rw   
issubclassry   dscloudget_cloud_configssl_contextusernamepasswordr!   r*   sni_hostsni_porthost_ids_contact_points_explicitra  strr>  contact_pointsr   column_encryption_policyr(   r^  	configurer   r  endpoints_resolved_endpoint_map_for_insightsrf   extendlistr   r  updater  r   compressionr=  _protocol_version_explicitallow_beta_protocol_version
no_compactr9  typer   r   rA  reconnection_policyrI  r?  conviction_policy_factoryaddress_translatortimestamp_generatorre   r  rF  r   Session_default_timeout_row_factoryr  r   r0  r2  rC  r   DeprecationWarningr3  rP   r-  
setdefaultr   r   r   r   r
  r  r   warningr   metrics_enabledssl_optionssockoptscql_versionmax_schema_agreement_waitcontrol_connection_timeoutidle_heartbeat_intervalidle_heartbeat_timeoutschema_event_refresh_windowtopology_event_refresh_windowstatus_event_refresh_windowconnect_timeoutprepare_on_all_hostsreprepare_on_upmonitor_reporting_enabledmonitor_reporting_intervalr  
_listenersr   _listener_lockrz   sessionsrC   metadatarO  r   _prepared_statements_prepared_statement_lockr   r  _user_typesrK   r  DEFAULT_MIN_REQUESTSr   _min_requests_per_connectionDEFAULT_MAX_REQUESTS_max_requests_per_connection&DEFAULT_MIN_CONNECTIONS_PER_LOCAL_HOST'DEFAULT_MIN_CONNECTIONS_PER_REMOTE_HOST_core_connections_per_host&DEFAULT_MAX_CONNECTIONS_PER_LOCAL_HOST'DEFAULT_MAX_CONNECTIONS_PER_REMOTE_HOST_max_connections_per_host_create_thread_pool_executorr   
_Scheduler	schedulerr   _lockcassandra.metricsrr  weakrefproxymetricsControlConnectionuuiduuid4	client_idapplication_nameapplication_version)4r   r  r   r  r9  r   r  rI  r  r  rs  r  r  r  r=  executor_threadsr  r  r  r  r  r  rQ  rX  r  r  r  r  execution_profilesr  r  r  r  rw  r^  r  r  r  r  r  rv   r  uses_twisteduses_eventletcloud_configraw_contact_pointsrc  strs_resolved_mapr  rD  default_lbp_profilesrr  r   r]  r   r   Y  sz  0
























zCluster.__init__c                 K   s   t }tjd dkrCtjd dkrCzddlm} t| j|}W n   |d	i | Y S |rCz
ddlm} |}W n t	yB   t	dw |d	i |S )
a  
        Create a ThreadPoolExecutor for the cluster. In most cases, the built-in
        `concurrent.futures.ThreadPoolExecutor` is used.

        Python 3.7+ and Eventlet cause the `concurrent.futures.ThreadPoolExecutor`
        to hang indefinitely. In that case, the user needs to have the `futurist`
        package so we can use the `futurist.GreenThreadPoolExecutor` class instead.

        :param kwargs: All keyword args are passed to the ThreadPoolExecutor constructor.
        :return: A ThreadPoolExecutor instance.
        r      r      rx   )GreenThreadPoolExecutora2  Python 3.7+ and Eventlet cause the `concurrent.futures.ThreadPoolExecutor` to hang indefinitely. If you want to use the Eventlet reactor, you need to install the `futurist` package to allow the driver to use the GreenThreadPoolExecutor. See https://github.com/eventlet/eventlet/issues/508 for more details.Nr   )
r   r{   version_infor   ry   rt  rs  futuristr  ImportError)r   r   	tpe_classry   is_eventletr  r   r   r   r    s"   	z$Cluster._create_thread_pool_executorc                 C   sX   | j dk rtd| j || || j| |< t| jD ]	}|||| qt|| dS )a  
        Registers a class to use to represent a particular user-defined type.
        Query parameters for this user-defined type will be assumed to be
        instances of `klass`.  Result sets for this user-defined type will
        be instances of `klass`.  If no class is registered for a user-defined
        type, a namedtuple will be used for result sets, and non-prepared
        statements may not encode parameters for this type correctly.

        `keyspace` is the name of the keyspace that the UDT is defined in.

        `user_type` is the string name of the UDT to register the mapping
        for.

        `klass` should be a class with attributes whose names match the
        fields of the user-defined type.  The constructor must accepts kwargs
        for each of the fields in the UDT.

        This method should only be called after the type has been created
        within Cassandra.

        Example::

            cluster = Cluster(protocol_version=3)
            session = cluster.connect()
            session.set_keyspace('mykeyspace')
            session.execute("CREATE TYPE address (street text, zipcode int)")
            session.execute("CREATE TABLE users (id int PRIMARY KEY, location address)")

            # create a class to map to the "address" UDT
            class Address(object):

                def __init__(self, street, zipcode):
                    self.street = street
                    self.zipcode = zipcode

            cluster.register_user_type('mykeyspace', 'address', Address)

            # insert a row using an instance of Address
            session.execute("INSERT INTO users (id, location) VALUES (%s, %s)",
                            (0, Address("123 Main St.", 78723)))

            # results will include Address instances
            results = session.execute("SELECT * FROM users")
            row = results[0]
            print(row.id, row.location.street, row.location.zipcode)

        r  zUser Type serialization is only supported in native protocol version 3+ (%d in use). CQL encoding for simple statements will still work, but named tuples will be returned when reading type %s.%s.N)	r=  r   r  r  r  r  user_type_registeredr,   evict_udt_class)r   keyspace	user_typeklasssessionr   r   r   register_user_type  s   
0zCluster.register_user_typec           
      C   s   t |ts	td| jtjkrtd|| jjv r td	|| j
o&|j }|r6tdj	t|| |d || jj|< |j| | j  tdd | j D ]}|j| qPt }t| jD ]}| | ||  qat||\}}	|	r}tdd	S )
a}  
        Adds an :class:`.ExecutionProfile` to the cluster. This makes it available for use by ``name`` in :meth:`.Session.execute`
        and :meth:`.Session.execute_async`. This method will raise if the profile already exists.

        Normally profiles will be injected at cluster initialization via ``Cluster(execution_profiles)``. This method
        provides a way of adding them dynamically.

        Adding a new profile updates the connection pools according to the specified ``load_balancing_policy``. By default,
        this method will wait up to five seconds for the pool creation to complete, so the profile can be used immediately
        upon return. This behavior can be controlled using ``pool_wait_timeout`` (see
        `concurrent.futures.wait <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait>`_
        for timeout semantics).
        z/profile must be an instance of ExecutionProfilezHCannot add execution profiles when legacy parameters are set explicitly.zProfile {} already existsa  Tried to add an ExecutionProfile with name {name}. {self} was explicitly configured with contact_points, but {ep} was not explicitly configured with a load_balancing_policy. In the next major version, trying to add an ExecutionProfile without an explicitly configured LBP to a cluster with explicitly configured contact_points will raise an exception; please specify a load-balancing policy in the ExecutionProfile.)r   r   rk  c                 S   r7  r   )is_up)hr   r   r   <lambda>	      z/Cluster.add_execution_profile.<locals>.<lambda>z=Failed to create all new connection pools in the %ss timeout.N)ra  r   r>  rC  r0  r2  r   rF  r  r   r}  r   r   r  r   r   r#  r  	all_hostsfilterr'  r  r  r  _set_default_dbaas_consistencyr  update_created_poolswait_futuresr   )
r   r   r  pool_wait_timeoutcontact_points_but_no_lbpr  futuresr  _not_doner   r   r   add_execution_profile  s8   

zCluster.add_execution_profilec                 C   
   | j | S r   )r  r   host_distancer   r   r   get_min_requests_per_connection     
z'Cluster.get_min_requests_per_connectionc                 C   sR   | j dkr	td|dk s|dks|| j| kr"td| j| f || j|< dS )a.  
        Sets a threshold for concurrent requests per connection, below which
        connections will be considered for disposal (down to core connections;
        see :meth:`~Cluster.set_core_connections_per_host`).

        Pertains to connection pool management in protocol versions {1,2}.
        r  z`Cluster.set_min_requests_per_connection() only has an effect when using protocol_version 1 or 2.r   ~   zUmin_requests must be 0-126 and less than the max_requests for this host_distance (%d)N)r=  r   r  r   r  )r   r  min_requestsr   r   r   set_min_requests_per_connection     

z'Cluster.set_min_requests_per_connectionc                 C   r  r   )r  r  r   r   r   get_max_requests_per_connection(  r  z'Cluster.get_max_requests_per_connectionc                 C   sR   | j dkr	td|dk s|dks|| j| kr"td| j| f || j|< dS )a(  
        Sets a threshold for concurrent requests per connection, above which new
        connections will be created to a host (up to max connections;
        see :meth:`~Cluster.set_max_connections_per_host`).

        Pertains to connection pool management in protocol versions {1,2}.
        r  z`Cluster.set_max_requests_per_connection() only has an effect when using protocol_version 1 or 2.r      zXmax_requests must be 1-127 and greater than the min_requests for this host_distance (%d)N)r=  r   r  r   r  )r   r  max_requestsr   r   r   set_max_requests_per_connection+  r  z'Cluster.set_max_requests_per_connectionc                 C   r  )an  
        Gets the minimum number of connections per Session that will be opened
        for each host with :class:`~.HostDistance` equal to `host_distance`.
        The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
        :attr:`~HostDistance.REMOTE`.

        This property is ignored if :attr:`~.Cluster.protocol_version` is
        3 or higher.
        )r  r  r   r   r   get_core_connections_per_host=     

z%Cluster.get_core_connections_per_hostc                 C   s>   | j dkr	td| j| }|| j|< ||k r|   dS dS )a  
        Sets the minimum number of connections per Session that will be opened
        for each host with :class:`~.HostDistance` equal to `host_distance`.
        The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
        :attr:`~HostDistance.REMOTE`.

        Protocol version 1 and 2 are limited in the number of concurrent
        requests they can send per connection. The driver implements connection
        pooling to support higher levels of concurrency.

        If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this
        is not supported (there is always one connection per host, unless
        the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`)
        and using this will result in an :exc:`~.UnsupportedOperation`.
        r  z^Cluster.set_core_connections_per_host() only has an effect when using protocol_version 1 or 2.N)r=  r   r  _ensure_core_connections)r   r  core_connectionsoldr   r   r   set_core_connections_per_hostI  s   


z%Cluster.set_core_connections_per_hostc                 C   r  )an  
        Gets the maximum number of connections per Session that will be opened
        for each host with :class:`~.HostDistance` equal to `host_distance`.
        The default is 8 for :attr:`~HostDistance.LOCAL` and 2 for
        :attr:`~HostDistance.REMOTE`.

        This property is ignored if :attr:`~.Cluster.protocol_version` is
        3 or higher.
        )r  r  r   r   r   get_max_connections_per_hostb  r  z$Cluster.get_max_connections_per_hostc                 C   s    | j dkr	td|| j|< dS )a=  
        Sets the maximum number of connections per Session that will be opened
        for each host with :class:`~.HostDistance` equal to `host_distance`.
        The default is 2 for :attr:`~HostDistance.LOCAL` and 1 for
        :attr:`~HostDistance.REMOTE`.

        If :attr:`~.Cluster.protocol_version` is set to 3 or higher, this
        is not supported (there is always one connection per host, unless
        the host is remote and :attr:`connect_to_remote_hosts` is :const:`False`)
        and using this will result in an :exc:`~.UnsupportedOperation`.
        r  z]Cluster.set_max_connections_per_host() only has an effect when using protocol_version 1 or 2.N)r=  r   r  )r   r  max_connectionsr   r   r   set_max_connections_per_hostn  s
   
z$Cluster.set_max_connections_per_hostc                 O   s*   |  ||}| jj|| jg|R i |S )zv
        Called to create a new connection with proper configuration.
        Intended for internal use only.
        )_make_connection_kwargsrs  factoryr  )r   endpointr   r   r   r   r   connection_factory  s   zCluster.connection_factoryc                 O   s0   |  |j|}t| jj|j| jg|R i |S r   )r  r  r
   rs  r  r  )r   r  r   r   r   r   r   _make_connection_factory  s   "z Cluster._make_connection_factoryc                 C   s   | j r|d|  |j |d| j |d| j |d| j |d| j |d| j |d| j |d| j	 |d	| j
 |d
| j |d| j |S )Nauthenticatorr   r  r  r  rw  r  r=  user_type_mapr  r  )r<  r  ri  r   r  r  r  rw  r  r=  r  r  r  )r   r  kwargs_dictr   r   r   r    s   zCluster._make_connection_kwargsc                 C   sT   | j r
td|f t|}|tjk rtdtjf td| j|| || _d S )NzXProtocolError returned from server while using explicitly set client protocol_version %dzECannot downgrade protocol version below minimum supported version: %da  Downgrading core protocol version from %d to %d for %s. To avoid this, it is best practice to explicitly set Cluster(protocol_version) to the version supported by your cluster. http://datastax.github.io/python-driver/api/cassandra/cluster.html#cassandra.cluster.Cluster.protocol_version)r  r   r   get_lower_supportedMIN_SUPPORTEDr   r  r=  )r   host_endpointprevious_versionnew_versionr   r   r   protocol_downgrade  s   



zCluster.protocol_downgradec           	   	   C   s  | j  | jrtd| jstd| j| j | j	  t
|  | jD ]}| j|dd\}}|r?|  | jD ]}|| q7q#| jt| | j  | jt| | j  z&| j  | jD ]}| j|}|ry| j|tjkryd|_qctd W n ty   td |     w | j!  | j"rt#| j"| j$| j%d| _&d	| _W d   n1 sw   Y  | '|}|rt(|j) | *| |S )
a  
        Creates and returns a new :class:`~.Session` object.

        If `keyspace` is specified, that keyspace will be the default keyspace for
        operations on the ``Session``.

        `wait_for_all_pools` specifies whether this call should wait for all connection pools to be
        established or attempted. Default is `False`, which means it will return when the first
        successful connection is established. Remaining pools are added asynchronously.
        zCluster is already shut downz?Connecting to cluster, contact points: %s; protocol version: %sF)signalNzControl connection createdz<Control connection failed to connect, shutting down Cluster:timeoutT)+r  r   r   	_is_setupr   r   r  r=  rs  initialize_reactorr   r  add_hostset_up	listenersr+  rF  r#  r  r  r  r  r   rO  connectget_hostr  rK   r!  r  r   r   r   r%  r  r$   get_connection_holdersr  _idle_heartbeat_new_sessionr  _initial_connect_futuresr  )	r   r  wait_for_all_poolsr  r  newlistenerr  r  r   r   r   r    sd   







0

zCluster.connectc                 C   s:   |j jjr| jj D ]	}|jstj|_	qtj|_
d S d S r   )r   r  dbaasrF  r  r  r   r   LOCAL_QUORUMr   _default_consistency_level)r   r  r  r   r   r   r    s   
z&Cluster._set_default_dbaas_consistencyc                 C   s2   g }t | jD ]	}||  q|| j |S r   )r  r  r  	get_poolsr   rO  )r   holderssr   r   r   r    s
   zCluster.get_connection_holdersc                 C   s   | j  | jr	 W d   dS d| _W d   n1 sw   Y  | jr*| j  | j  | j  t| jD ]}|  q9| j	  t
|  dS )a.  
        Closes all sessions and connection associated with this Cluster.
        To ensure all connections are properly closed, **you should always
        call shutdown() on a Cluster instance when you are done with it**.

        Once shutdown, a Cluster should not be used for any purpose.
        NT)r  r   r  stopr  r   rO  r  r  r   r   )r   r  r   r   r   r     s   




zCluster.shutdownc                 C      | S r   r   r   r   r   r   	__enter__     zCluster.__enter__c                 G      |    d S r   r   r   r   r   r   r   __exit__  r  zCluster.__exit__c                 C   s,   t | | j |}| | | j| |S r   )r  r  r  _session_register_user_typesr  r   )r   r  r  r   r   r   r    s   
zCluster._new_sessionc                 C   s8   | j  D ]\}}| D ]\}}|||| qqd S r   )r  r  r  )r   r  r  type_mapudt_namer  r   r   r   r.  #  s
   z$Cluster._session_register_user_typesc                 C   sD   | j | | j| t| jD ]}|| q| j|dd d S NFis_host_addition)rF  r*  rO  r  r  remove_pool_start_reconnector)r   r  r  r   r   r   _cleanup_failed_on_up_handling(  s
   z&Cluster._cleanup_failed_on_up_handlingc           	      C   s  |9 | | z	||  W n ty( } z|| W Y d }~nd }~ww |r4	 W d    d S W d    n1 s>w   Y  zdd |D D ])}tjd||d | |  W |j d|_W d    d S 1 snw   Y  d S t	|st
d| | | W |j d|_W d    d S 1 sw   Y  d S td| |  | jD ]}|| qW |j d|_W d    n1 sw   Y  n|j d|_W d    w 1 sw   Y  w t| jD ]}|  qd S )Nc                 S   re  r   ra  r   r  r   r   r   r   r_  >  rd  z3Cluster._on_up_future_completed.<locals>.<listcomp>z,Unexpected failure while marking node %s up:r   F<Connection pool could not be created, not marking node %s upz(Connection pools established for node %s)r   r   resultr   r   errorr6  lock_currently_handling_node_upallr   infor  r  r'  r  r  r  )	r   r  r  resultsr<  finished_futurer   r  r  r   r   r   _on_up_future_completed0  sV   

"
"
 
zCluster._on_up_future_completedc           
      C   sV  | j rdS td| |j/ |jr!td| 	 W d   dS |jr3td| 	 W d   dS d|_W d   n1 s@w   Y  td| d}t }ztd| |d}|rhtd	| |	  | j
|tjkr|| | td
| t| jD ]}|| qtd| | j
| td| | j| td| t }g }t| j||||}t| jD ]}|j|dd}	|	durd}|	| ||	 qW n4 ty	   td| |D ]}	|		  q| | |j d|_W d    1 sw   Y   w |s)|j |  d|_W d   |S 1 s$w   Y  |S )1
        Intended for internal use only.
        Nz9Waiting to acquire lock for handling up status of node %sz7Another thread is already handling up status of node %szHost %s was already marked upTz'Starting to handle up status of node %sFz@Host %s may be up; will prepare queries and open connection poolz;Now that host %s is up, cancelling the reconnection handlerz(Done preparing all queries for host %s, z8Signalling to load balancing policies that host %s is upz3Signalling to control connection that host %s is upz3Attempting to open new connection pools for host %sr2  z4Unexpected failure handling node %s being marked up:)r   r   r   r<  r=  r  r  r?   get_and_set_reconnection_handlercancelrF  r  rK   r!  _prepare_all_queriesr  r  r4  r'  rO  r   r
   rB  add_or_renew_poolr   r   r   r   r6  r  )
r   r  have_futurer  reconnectorr  futures_lockfutures_resultscallbackr   r   r   r   r'  U  s   	








zCluster.on_upc                 C   s   | j |tjkrd S | j }| |}t|||| j| j	| j
||jd d	}||}|r7td| |  td| |  d S )Nnew_handlerz-Old host reconnector found for %s, cancellingz Starting reconnector for host %s)rF  r  rK   r!  r  new_scheduler  rT   r+  r'  r  rD  r   r   rE  start)r   r  r3  scheduleconn_factoryrI  old_reconnectorr   r   r   r5    s   



zCluster._start_reconnectorc           
      C   s,  | j rdS |jT |j}| jr@| j|tjkr@d}t| j	D ]}|
 }||}|r4||d dkO }q|r@	 W d   dS |  |sH|rL| rU	 W d   dS W d   n1 s_w   Y  td| | j| | j| t| j	D ]}|| q{| jD ]}	|	| q| || dS )rC  NF
open_countr   zHost %s has been marked down)r   r<  r  _discount_down_eventsrF  r  rK   r!  r  r  get_pool_stategetset_downis_currently_reconnectingr   r  r*  rO  r  r5  )
r   r  r3  expect_host_to_be_downwas_up	connectedr  pool_states
pool_stater  r   r   r   r*    s:   

zCluster.on_downc                    s   j rd S td j}|tjkr! td j j	| |tjkrBtd j
dd d S t g t   fdd}d}tjD ]}|jdd	}|d urtd} | || q[|s~
 d S d S )
Nz,Handling new host %r and notifying listenersz&Done preparing queries for new host %rzeNot adding connection pool for new host %r because the load balancing policy has marked it as IGNOREDF)r  c                    s   9   |  z	|   W n ty( } z| W Y d }~nd }~ww  r4	 W d    d S W d    n1 s>w   Y  td dd D D ]}tjd|d  d S tsitd d S 	 d S )Nz,All futures have completed for added host %sc                 S   re  r   r7  r8  r   r   r   r_    rd  z<Cluster.on_add.<locals>.future_completed.<locals>.<listcomp>z:Unexpected failure while adding node %s, will not mark up:r   r9  )
r   r   r:  r   r   r   r;  r>  r  _finalize_addr   r  rJ  rK  r  r   r   r   future_completed  s*   
z(Cluster.on_add.<locals>.future_completedTr2  )r   r   r   rF  r  rK   r!  rF  r+  rO  r_  r   r  r  r  rG  r   r   )r   r  refresh_nodesr  ra  rH  r  r   r   r`  r   r+    s<   




zCluster.on_addc                 C   s>   |r|   | jD ]}|| q	t| jD ]}|  qd S r   )r  r  r+  r  r  r  )r   r  r  r  r  r   r   r   r_    s   

zCluster._finalize_addc                 C   s   | j rd S td| |  | j| t| jD ]}|| q| jD ]}|| q%| j	| |
d }|r@|  d S d S )NzRemoving host %s)r   r   r   rX  rF  r,  r  r  r  rO  rD  rE  )r   r  r  r  reconnection_handlerr   r   r   r,  "  s   

zCluster.on_removec                 C   s    | |}|r| ||| |S r   )signal_connection_failurer*  )r   r  connection_excr3  rZ  is_downr   r   r   rd  3  s   
z!Cluster.signal_connection_failurec                 C   sD   | j t|| j||\}}|r|rtd| | || ||fS )a  
        Called when adding initial contact points and when the control
        connection subsequently discovers a new node.
        Returns a Host instance, and a flag indicating whether it was new in
        the metadata.
        Intended for internal use only.
        z New Cassandra host %r discovered)r  add_or_return_hostrR   r  r   r?  r+  )r   r  
datacenterrackr  rb  r  r  r   r   r   r  9  s
   zCluster.add_hostc                 C   s2   |r| j |rtd| | | dS dS dS )z
        Called when the control connection observes that a node has left the
        ring.  Intended for internal use only.
        zCassandra host %s removedN)r  remove_hostr   r?  r,  r   r  r   r   r   rj  H  s   zCluster.remove_hostc                 C   8   | j  | j| W d   dS 1 sw   Y  dS )z
        Adds a :class:`cassandra.policies.HostStateListener` subclass instance to
        the list of listeners to be notified when a host is added, removed,
        marked up, or marked down.
        N)r  r  r   r   r  r   r   r   register_listenerQ  s   "zCluster.register_listenerc                 C   rl  )z  Removes a registered listener. N)r  r  removerm  r   r   r   unregister_listenerZ  s   "zCluster.unregister_listenerc                 C   s4   | j  | j W  d    S 1 sw   Y  d S r   )r  r  r	   r   r   r   r   r  _  s   $zCluster.listenersc                 C   s0   t | jD ]}t |j D ]}|  qqdS )z
        If any host has fewer than the configured number of core connections
        open, attempt to open connections until that number is met.
        N)r  r  _poolsr  ensure_core_connections)r   r  poolr   r   r   r  d  s
   
z Cluster._ensure_core_connectionsc                 C   sH   t ||||fr | stdtdd |||fD dkr"tdd S d S )NzZkeyspace is required to refresh specific sub-entity {table, usertype, function, aggregate}c                 s   s    | ]}|rd V  qdS )r   Nr   )r  r   r   r   r   r  r      z3Cluster._validate_refresh_schema.<locals>.<genexpr>r   z={table, usertype, function, aggregate} are mutually exclusive)anyr   sumr  tableusertypefunction	aggregater   r   r   _validate_refresh_schemam  s   z Cluster._validate_refresh_schemac                 C   s6   |rt jS |r
t jS |rt jS |rt jS | rt jS d S r   )r   	AGGREGATEFUNCTIONTYPETABLEKEYSPACErw  r   r   r   _target_type_from_refresh_argsu  s   z&Cluster._target_type_from_refresh_argsc                 C   s*   | j j}|r	|jnd}|r| j|S dS )z?
        Returns the control connection host metadata.
        N)rO  _connectionr  r  r  )r   
connectionr  r   r   r   get_control_connection_host  s   z#Cluster.get_control_connection_hostc                 C   s   | j j|ddstddS )a  
        Synchronously refresh all schema metadata.

        By default, the timeout for this operation is governed by :attr:`~.Cluster.max_schema_agreement_wait`
        and :attr:`~.Cluster.control_connection_timeout`.

        Passing max_schema_agreement_wait here overrides :attr:`~.Cluster.max_schema_agreement_wait`.

        Setting max_schema_agreement_wait <= 0 will bypass schema agreement and refresh schema immediately.

        An Exception is raised if schema refresh fails for any reason.
        T)schema_agreement_waitforcez7Schema metadata was not refreshed. See log for details.N)rO  refresh_schemar   )r   r  r   r   r   refresh_schema_metadata  s   zCluster.refresh_schema_metadatac                 C   s"   | j jtj||ddstddS )aN  
        Synchronously refresh keyspace metadata. This applies to keyspace-level information such as replication
        and durability settings. It does not refresh tables, types, etc. contained in the keyspace.

        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
        T)target_typer  r  r  z9Keyspace metadata was not refreshed. See log for details.N)rO  r  r   r  r   )r   r  r  r   r   r   refresh_keyspace_metadata  s
   z!Cluster.refresh_keyspace_metadatac                 C   $   | j jtj|||ddstddS )z
        Synchronously refresh table metadata. This applies to a table, and any triggers or indexes attached
        to the table.

        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
        Tr  r  rx  r  r  z6Table metadata was not refreshed. See log for details.NrO  r  r   r  r   )r   r  rx  r  r   r   r   refresh_table_metadata  s
   zCluster.refresh_table_metadatac                 C   r  )z
        Synchronously refresh materialized view metadata.

        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
        Tr  z5View metadata was not refreshed. See log for details.Nr  )r   r  viewr  r   r   r   "refresh_materialized_view_metadata  
   z*Cluster.refresh_materialized_view_metadatac                 C   r  )z
        Synchronously refresh user defined type metadata.

        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
        T)r  r  r  r  r  z:User Type metadata was not refreshed. See log for details.N)rO  r  r   r  r   )r   r  r  r  r   r   r   refresh_user_type_metadata  r  z"Cluster.refresh_user_type_metadatac                 C   r  )z
        Synchronously refresh user defined function metadata.

        ``function`` is a :class:`cassandra.UserFunctionDescriptor`.

        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
        T)r  r  rz  r  r  z>User Function metadata was not refreshed. See log for details.N)rO  r  r   r~  r   )r   r  rz  r  r   r   r   refresh_user_function_metadata  
   z&Cluster.refresh_user_function_metadatac                 C   r  )a  
        Synchronously refresh user defined aggregate metadata.

        ``aggregate`` is a :class:`cassandra.UserAggregateDescriptor`.

        See :meth:`~.Cluster.refresh_schema_metadata` for description of ``max_schema_agreement_wait`` behavior
        T)r  r  r{  r  r  z?User Aggregate metadata was not refreshed. See log for details.N)rO  r  r   r}  r   )r   r  r{  r  r   r   r   refresh_user_aggregate_metadata  r  z'Cluster.refresh_user_aggregate_metadatac                 C   s   | j |s
tddS )a  
        Synchronously refresh the node list and token metadata

        `force_token_rebuild` can be used to rebuild the token map metadata, even if no new nodes are discovered.

        An Exception is raised if node refresh fails for any reason.
        z1Node list was not refreshed. See log for details.N)rO  refresh_node_list_and_token_mapr   r   force_token_rebuildr   r   r   rb    s   zCluster.refresh_nodesc                 C   s   t dt || _|| _dS )a  
        *Deprecated:* set :attr:`~.Cluster.schema_metadata_enabled` :attr:`~.Cluster.token_metadata_enabled` instead

        Sets a flag to enable (True) or disable (False) all metadata refresh queries.
        This applies to both schema and node topology.

        Disabling this is useful to minimize refreshes during multiple changes.

        Meta refresh must be enabled for the driver to become aware of any cluster
        topology changes or schema updates.
        zCluster.set_meta_refresh_enabled is deprecated and will be removed in 4.0. Set Cluster.schema_metadata_enabled and Cluster.token_metadata_enabled instead.N)r   r  rQ  rX  rT  r   r   r   set_meta_refresh_enabled  s
   
z Cluster.set_meta_refresh_enabledc           
         sR   |D ]$} fdd|D }|j |ddd}|D ]\}}	|s%td||	 qqd S )Nc                    s$   g | ]}t |j r|jnd dqS )Nqueryr  )r5   query_stringr  )r  r%  set_keyspacer   r   r_    s
    
z(Cluster._send_chunks.<locals>.<listcomp>g      @Fr  fail_on_errorz?Got unexpected response when preparing statement on host %s: %r)wait_for_responsesr   r   )
clsr  r  chunksr  ks_chunkmessages	responsessuccessresponser   r  r   _send_chunks  s   
zCluster._send_chunksc           
   
   C   s  | j r| jsd S td| d }zzt| |j}t| j  }t	| j
rFg }tdt|dD ]}||||d   | |||d q/n8t|dd D ]0\}}|d urZ|| t|}g }tdt|dD ]}||||d   qh| ||| qMtd| W nB ty } ztd|| W Y d }~n.d }~w ttjfy }	 ztd	||	 W Y d }	~	nd }	~	w ty   td
| Y nw W |r|  d S d S |r|  w w )Nz7Preparing all known prepared statements against host %sr   rK  Tc                 S   r7  r   )r  )r%  r   r   r   r  	  r  z.Cluster._prepare_all_queries.<locals>.<lambda>z<Done preparing all known prepared statements against host %sz9Timed out trying to prepare all statements on host %s: %sz5Error trying to prepare all statements on host %s: %rz1Error trying to prepare all statements on host %s)r  r  r   r   r  r  r  r  r   uses_keyspace_flagr=  rangelenr   r  r   set_keyspace_blockingr   r  r"   r~   r;  r   r   close)
r   r  r  
statementsr  ir  ks_statementsr  r   r   r   r   rF  	  sJ   

zCluster._prepare_all_queriesc                 C   s6   | j  || j|< W d    d S 1 sw   Y  d S r   )r  r  )r   query_idprepared_statementr   r   r   add_prepared,	  s   "zCluster.add_prepared)r   NFFT)NNTTr   )r   r   r   r   r  r}  r   r  r   DSE_V2r=  r  r  r  r8  r<  r.  r9  setterrA  r   rG  rJ   r  rL   rH  rI  rI   r  rM   r  connect_to_remote_hostsr  r  r  rw  r  r  r  DefaultConnectionrs  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  rv   r  rQ  rX  r^  rF  r0  r1  rC  r  rO  r  r   r   r  r  r  r  r  rU  r  r  r  r   r   r  r  r  r  r  r  r  r  r  r  r   r  r  r  r  r  r  r  r   r(  r-  r  r.  r6  rB  r'  r5  r   r*  r+  r_  r,  rd  r  rj  rn  rp  r  r  r   r|  r  r  r  r  r  r  r  r  r  rb  r  classmethodr  rF  r  r   r   r   r   r5  T  s   









	






  .$
:0
C%H
&
;

		
	











&r5  c                   @   sN  e Zd ZdZdZdZdZdZdZdZ	e
eZedd Zejdd ZdZedd	 Zejd
d	 ZejZedd Zejdd ZdZedd Zejdd ZdZ	 dZ	 dZ	 dZ	 dZ	 eZ	 dZ	 dZdZ dZ!dZ"dZ#dZ$d\ddZ%de&dde'dddfddZ(ddde&e'dddfddZ)dde*dfddZ+dde*dfddZ,dd Z-d d! Z.d"d# Z/d$d% Z0d&d' Z1d(d) Z2e'ddfd*d+Z3d,d- Z4d.d/ Z5d0d1 Z6d2d3 Z7d4d5 Z8d6d7 Z9d]d8d9Z:d\d:d;Z;d<d= Z<d>d? Z=d@dA Z>dBdC Z?dDdE Z@dFdG ZAdHdI ZBdJdK ZCdLdM ZDdNdO ZEdPdQ ZFdRdS ZGdTdU ZHdVdW ZIdXdY ZJdZd[ ZKdS )^r  a  
    A collection of connection pools for each host in the cluster.
    Instances of this class should not be created directly, only
    using :meth:`.Cluster.connect()`.

    Queries and statements can be executed through ``Session`` instances
    using the :meth:`~.Session.execute()` and :meth:`~.Session.execute_async()`
    methods.

    Example usage::

        >>> session = cluster.connect()
        >>> session.set_keyspace("mykeyspace")
        >>> session.execute("SELECT * FROM mycf")

    NFc                 C   r7  )a  
        The format to return row results in.  By default, each
        returned row will be a named tuple.  You can alternatively
        use any of the following:

        - :func:`cassandra.query.tuple_factory` - return a result row as a tuple
        - :func:`cassandra.query.named_tuple_factory` - return a result row as a named tuple
        - :func:`cassandra.query.dict_factory` - return a result row as a dict
        - :func:`cassandra.query.ordered_dict_factory` - return a result row as an OrderedDict

        )r  r   r   r   r   r   K	  r:  zSession.row_factoryc                 C      |  d| d S )Nr   _validate_set_legacy_config)r   rfr   r   r   r   Z	  rV  r   c                 C   r7  )a  
        A default timeout, measured in seconds, for queries executed through
        :meth:`.execute()` or :meth:`.execute_async()`.  This default may be
        overridden with the `timeout` parameter for either of those methods.

        Setting this to :const:`None` will cause no timeouts to be set by default.

        Please see :meth:`.ResponseFuture.result` for details on the scope and
        effect of this timeout.

        .. versionadded:: 2.0.0
        )r  r   r   r   r   default_timeout`	     zSession.default_timeoutc                 C   r  )Nr  r  )r   r  r   r   r   r  p	  rV  c                 C   r7  )a  
        *Deprecated:* use execution profiles instead
        The default :class:`~ConsistencyLevel` for operations executed through
        this session.  This default may be overridden by setting the
        :attr:`~.Statement.consistency_level` on individual statements.

        .. versionadded:: 1.2.0

        .. versionchanged:: 3.0.0

            default changed from ONE to LOCAL_ONE
        )r"  r   r   r   r   default_consistency_levelv	  r  z!Session.default_consistency_levelc                 C   s   t dt | d| dS )z>
        *Deprecated:* use execution profiles instead
        zSetting the consistency level at the session level will be removed in 4.0. Consider using execution profiles and setting the desired consistency level to the EXEC_PROFILE_DEFAULT profile.r  N)r   r  r  r   clr   r   r   r  	  s   c                 C   r7  )a9  
        The default :class:`~ConsistencyLevel` for serial phase of  conditional updates executed through
        this session.  This default may be overridden by setting the
        :attr:`~.Statement.serial_consistency_level` on individual statements.

        Only valid for ``protocol_version >= 2``.
        )!_default_serial_consistency_levelr   r   r   r    default_serial_consistency_level	  s   	z(Session.default_serial_consistency_levelc                 C   s*   |d urt |std| d| d S )Nzidefault_serial_consistency_level must be either ConsistencyLevel.SERIAL or ConsistencyLevel.LOCAL_SERIAL.r  )r   r   r   r  r  r   r   r   r  	  s
   rL  i  Tc           
      C   s  || _ || _|| _t | _i | _|j| _|j| _	g | _
| j j| _t | _t | _|D ]}| j|dd}|r;| j| q*t| jtd}|jrdtdd |jD sdt|jtd}|jrdtdd |jD rPtdd | jD sd}| jrz|d| j 7 }t|d	d
 |D t | _|  | _| j jd urztt | jd t!fd| j ji| _"W n t#y   t$%d Y nw | j j&r| j ' }|ot(|j)}	|	rt*| j j+| d| _,n|rt$-dj.|j/|d t$-d.| j j0| j d S )NFr2  )return_whenc                 s       | ]}|  V  qd S r   r:  r8  r   r   r   r  $
  rt  z#Session.__init__.<locals>.<genexpr>c                 s   r  r   r  r8  r   r   r   r  '
  rt   Unable to connect to any serversz using keyspace '%s'c                 S      g | ]}|j qS r   )ri  )r  r  r   r   r   r_  +
      z$Session.__init__.<locals>.<listcomp>z-ProtocolHandlerr  z2Unable to set column encryption policy for session)interval_secr  zsNot starting MonitorReporter thread for Insights; not supported by server version {v} on ControlConnection host {c})vcz3Started Session with client_id {} and session_id {})1r   r$  r  r   r  rq  rF  _profile_managerr  _metrics_request_init_callbacksr=  _protocol_versionr-   encoderr  r  rG  r   r  r   r  ru  doner   r  r  
session_id_check_graph_paging_available_graph_paging_availabler  r  r~  r@   client_protocol_handlerr   r   r?  r  r  ri   dse_versionrh   r  _monitor_reporterr   r   release_versionr  )
r   r   r$  r  r  r   r  msgcc_hostvalid_insights_versionr   r   r   r   
  sh   







zSession.__init__c
           
      C   s   |  |||||||||		 S )a  
        Execute the given query and synchronously wait for the response.

        If an error is encountered while executing the query, an Exception
        will be raised.

        `query` may be a query string or an instance of :class:`cassandra.query.Statement`.

        `parameters` may be a sequence or dict of parameters to bind.  If a
        sequence is used, ``%s`` should be used the placeholder for each
        argument.  If a dict is used, ``%(name)s`` style placeholders must
        be used.

        `timeout` should specify a floating-point timeout (in seconds) after
        which an :exc:`.OperationTimedOut` exception will be raised if the query
        has not completed.  If not set, the timeout defaults to the request_timeout of the selected ``execution_profile``.
        If set to :const:`None`, there is no timeout. Please see :meth:`.ResponseFuture.result` for details on
        the scope and effect of this timeout.

        If `trace` is set to :const:`True`, the query will be sent with tracing enabled.
        The trace details can be obtained using the returned :class:`.ResultSet` object.

        `custom_payload` is a :ref:`custom_payload` dict to be passed to the server.
        If `query` is a Statement with its own custom_payload. The message payload
        will be a union of the two, with the values specified here taking precedence.

        `execution_profile` is the execution profile to use for this request. It can be a key to a profile configured
        via :meth:`Cluster.add_execution_profile` or an instance (from :meth:`Session.execution_profile_clone_update`,
        for example

        `paging_state` is an optional paging state, reused from a previous :class:`ResultSet`.

        `host` is the :class:`cassandra.pool.Host` that should handle the query. If the host specified is down or
        not yet connected, the query will fail with :class:`NoHostAvailable`. Using this is
        discouraged except in a few cases, e.g., querying node-local tables and applying schema changes.

        `execute_as` the user that will be used on the server to execute the request. This is only available
        on a DSE cluster.
        )execute_asyncr:  )
r   r  
parametersr  tracecustom_payloadexecution_profilepaging_stater  
execute_asr   r   r   executeJ
  s   +zSession.executec
              
   C   sR   |r|ni }|	r|	  |t< | ||||||||}
| j|
_| |
 |
  |
S )av  
        Execute the given query and return a :class:`~.ResponseFuture` object
        which callbacks may be attached to for asynchronous response
        delivery.  You may also call :meth:`~.ResponseFuture.result()`
        on the :class:`.ResponseFuture` to synchronously block for results at
        any time.

        See :meth:`Session.execute` for parameter definitions.

        Example usage::

            >>> session = cluster.connect()
            >>> future = session.execute_async("SELECT * FROM mycf")

            >>> def log_results(results):
            ...     for row in results:
            ...         log.info("Results: %s", row)

            >>> def log_error(exc):
            >>>     log.error("Operation failed: %s", exc)

            >>> future.add_callbacks(log_results, log_error)

        Async execution with blocking wait for results::

            >>> future = session.execute_async("SELECT * FROM mycf")
            >>> # do other stuff...

            >>> try:
            ...     results = future.result()
            ... except Exception:
            ...     log.exception("Operation failed:")

        )encoder    _create_response_futurer  _protocol_handler_on_requestsend_request)r   r  r  r  r  r  r  r  r  r  r   r   r   r   r  w
  s   %

zSession.execute_asyncc                 C   s   |  ||||| S )a  
        Executes a Gremlin query string or GraphStatement synchronously,
        and returns a ResultSet from this execution.

        `parameters` is dict of named parameters to bind. The values must be
        JSON-serializable.

        `execution_profile`: Selects an execution profile for the request.

        `execute_as` the user that will be used on the server to execute the request.
        )execute_graph_asyncr:  )r   r  r  r  r  r  r   r   r   execute_graph
  s   zSession.execute_graphc           	      C   s&  | j jtju rtdt|tst|}| |}t	|ds"td| 
| zt|jtr5|| j |_W n	 ty?   Y nw | | d}|rQ| j||jd}|j }|r^| |t< tt|jd |t< | j|d||t|d}||j_| j|_|jjrt|j t!r| "| |S |#  |S )a
  
        Execute the graph query and return a :class:`ResponseFuture`
        object which callbacks may be attached to for asynchronous response delivery. You may also call ``ResponseFuture.result()`` to synchronously block for
        results at any time.
        zCannot execute graph queries using Cluster legacy parameters. Consider using Execution profiles: https://docs.datastax.com/en/developer/python-driver/latest/execution_profiles/#execution-profilesr  zeExecution profile for graph queries must derive from GraphExecutionProfile, and provide graph_optionsN)r  i  )r  r  r  r  r  )$r   rC  r0  r2  r   ra  ro   rp   execution_profile_clone_updatehasattr"_resolve_execution_profile_optionsrt  r   ru   r>  _maybe_set_graph_paging_transform_paramsr  get_options_mapr  r    rd   intr   rt   r  r   r   query_paramsr  r  is_analytics_sourcer   rP   _target_analytics_masterr  )	r   r  r  r  r  r  graph_parametersr  r   r   r   r   r  
  sJ   







zSession.execute_graph_asyncc                 C   s*   |j }|j tu r| jrt nd }||_ d S r   )r   r   r  r   )r   r  graph_pagingr   r   r   r  
  s   

zSession._maybe_set_graph_pagingc                 C   sN   t | jsdS | jj D ]}|jdu r dS t|j}|tk r$ dS qdS )zZVerify if we can enable graph paging. This executed only once when the session is created.FNT)	r    has_continuous_paging_next_pagesr  r   r  r  r  rg   _GRAPH_PAGING_MIN_DSE_VERSION)r   r  versionr   r   r   r  
  s   

z%Session._check_graph_paging_availablec                 C   s   |j jdur|jdurdS |j }d}|jr0|jd}|| jjjv r0| jjj| }|jdkr0d}|r8t	j
}t}n|jtjkrDtj}t}nt	j}t}|jdu rQ||_|jdu r[||_dS dS )a  
        Determine the GraphSON protocol and row factory for a graph query. This is useful
        to configure automatically the execution profile when executing a query on a
        core graph.

        If `graph_protocol` is not explicitly specified, the following rules apply:
        - Default to GraphProtocol.GRAPHSON_1_0, or GRAPHSON_2_0 if the `graph_language` is not gremlin-groovy.
        - If `graph_options.graph_name` is specified and is a Core graph, set GraphSON_3_0.
        If `row_factory` is not explicitly specified, the following rules apply:
        - Default to graph_object_row_factory.
        - If `graph_options.graph_name` is specified and is a Core graph, set graph_graphson3_row_factory.
        NFutf-8CoreT)r  graph_protocolr   
graph_namedecoder   r  	keyspacesgraph_enginerm   GRAPHSON_3_0rr   r  rk   DEFAULT_GRAPH_LANGUAGEDEFAULT_GRAPH_PROTOCOLrj   GRAPHSON_2_0rq   )r   r  r  is_core_graphr   ks_metadatar  r   r   r   r   r    s0   




z*Session._resolve_execution_profile_optionsc                 C   sx   t |ts	tdt}|jtjkrt }n|jtjkr.| j	|j
r&|j
dnd d}t|}||}t|dgS )NzHThe parameters must be a dictionary. Unnamed parameters are not allowed.r   )r   r  )ra  r  r   rl   r  rm   r
  rn   r  r   r  r  rs   	serializejsondumpsr  )r   r  r  
serializercontextserialized_parametersr   r   r   r  3  s   

zSession._transform_paramsc                 C   sP   |   | jdd dd |jd}t|_|  | j}||f}|j||||d d S )Nz,CALL DseClientTool.getAnalyticsGraphServer()F)r  r  r  r  )rL  callback_argserrbackerrback_args)_start_timerr  r  ra   r   r  _on_analytics_master_resultadd_callbacks)r   r   master_query_futurecbr   r   r   r   r  F  s   z Session._target_analytics_masterc                 C   s   z,|  d }|d d }|d}|dkr|d | }t|j|}|j| j||_W n ty<   t	j
ddd Y nw | |j d S )Nr   location:zFailed querying analytics master (request might not be routed optimally). Make sure the session is connecting to a graph analytics datacenter.Tr   )r:  rfindrc   r  _load_balancermake_query_planr  
query_planr   r   r   r   r  )r   r  master_futurequery_futurerowaddrdelimiter_indextargeted_queryr   r   r   r  R  s   

z#Session._on_analytics_master_resultc	                 C   s  d}	t |trt|}n
t |tr||}| jjtjkrU|t	ur%t
d|tu r,| j}|jdur4|jn| j}
|jdur?|jn| j}|jpH| jj}| j}| jj}d}d}n4| |}|tu ra|j}|jduri|jn|j}
|jdurt|jn|j}|j}|jp|j}|j}|j}|j}|j}|tu r| jdkr| j}n| jdkrd}t }| jdkr| jr| j }nd}t !| j}|r|rt"|j#}nd}t |tr|j$}t %| jr|j&nd}|rt'||| j(}t)||
||||||}nWt |t*r|j+}	t,|	j-|j.|
||||t/|	j0||	j1d
}n9t |t2r4| jdk rt3dt %| jr&|j&nd}t4|j5|j6|
|||}nt |t7rEt)|j8|
|||||}||_9|:|j; |:| | jj<|_<|j=rj|rj|>|j&pg| j&|nd}t?| |||| j@|	|||||||dS )	z@ Returns the ResponseFuture before calling send_request() on it Nz?Cannot specify execution_profile while using legacy parameters.r   r   r  )	skip_metar   result_metadata_idzBatchStatement execution is only supported with protocol version 2 or higher (supported in Cassandra 2.0 and higher).  Consider setting Cluster.protocol_version to 2 to support this operation.)	r  r  r   r   load_balancer
start_timespeculative_execution_plancontinuous_paging_stater  )Ara  r~  rX   rY   bindr   rC  r0  r2  r   r   r   r  r   r  r   r  r   rI  r   r   _maybe_get_execution_profiler   r   r   
fetch_sizerb   r  default_fetch_sizetimeuse_client_timestampr  r   r  r)   r   r  r  r  r\   r  r.   rZ   r  r6   r  r  rS  result_metadatar(  r[   r   r;   
batch_type_statements_and_parametersro   r  tracingupdate_custom_payloadr  r  is_idempotentnew_planResponseFuturer  )r   r  r  r  r  r  r  r  r  r  r  	serial_clr   r   r   spec_exec_policyr   r/  r*  	timestamp supports_continuous_paging_stater,  r  statement_keyspacer   spec_exec_planr   r   r   r  a  s   











(zSession._create_response_futurec              	   C   sP   | j jj}z|| W S  ty'   dd | D }tdt|d|f w )z
        Returns the execution profile associated with the provided ``name``.

        :param name: The name (or key) of the execution profile.
        c                 S   s   g | ]}t |qS r   )r   rj  r   r   r   r_    s    z1Session.get_execution_profile.<locals>.<listcomp>z6Invalid execution_profile: %s; valid profiles are: %s.z, )r   rF  r  KeyErrorkeysr   r   join)r   r   r  rp  r   r   r   get_execution_profile  s   

zSession.get_execution_profilec                 C   s   t |tr|S | |S r   )ra  r   rD  )r   rk  r   r   r   r.    s   z$Session._maybe_get_execution_profilec                 K   s0   t | |}| D ]
\}}t||| q|S )a  
        Returns a clone of the ``ep`` profile.  ``kwargs`` can be specified to update attributes
        of the returned profile.

        This is a shallow clone, so any objects referenced by the profile are shared. This means Load Balancing Policy
        is maintained by inclusion in the active profiles. It also means updating any other rich objects will be seen
        by the active profile. In cases where this is not desirable, be sure to replace the instance instead of manipulating
        the shared object.
        )r	   r.  r  setattr)r   rk  r   cloneattrr@  r   r   r   r    s   
z&Session.execution_profile_clone_updatec                 O      | j |||f dS )a  
        Adds a callback with arguments to be called when any request is created.

        It will be invoked as `fn(response_future, *args, **kwargs)` after each client request is created,
        and before the request is sent. This can be used to create extensions by adding result callbacks to the
        response future.

        `response_future` is the :class:`.ResponseFuture` for the request.

        Note that the init callback is done on the client thread creating the request, so you may need to consider
        synchronization if you have multiple threads. Any callbacks added to the response future will be executed
        on the event loop thread, so the normal advice about minimizing cycles and avoiding blocking apply (see Note in
        :meth:`.ResponseFuture.add_callbacks`.

        See `this example <https://github.com/datastax/python-driver/blob/master/examples/request_init_listener.py>`_ in the
        source tree for an example.
        N)r  r   r   fnr   r   r   r   r   add_request_init_listener  s   z!Session.add_request_init_listenerc                 O   rH  )zz
        Removes a callback and arguments from the list.

        See :meth:`.Session.add_request_init_listener`.
        N)r  ro  rI  r   r   r   remove_request_init_listener   s   z$Session.remove_request_init_listenerc                 C   s,   | j D ]\}}}||g|R i | qd S r   )r  )r   response_futurerJ  r   r   r   r   r   r    s   zSession._on_requestc           
      C   s   t ||d}t| |d| jd}z|  |  }W n ty(   td  w |r-|nd}t	
|j|j|j| jj||| j|j|j| jj
}|j|_| j|j| | jjru|j}	z| |j|	| W |S  tyt   td Y |S w |S )aE  
        Prepares a query string, returning a :class:`~cassandra.query.PreparedStatement`
        instance which can be used as follows::

            >>> session = cluster.connect("mykeyspace")
            >>> query = "INSERT INTO users (id, name, age) VALUES (?, ?, ?)"
            >>> prepared = session.prepare(query)
            >>> session.execute(prepared, (user.id, user.name, user.age))

        Or you may bind values to the prepared statement ahead of time::

            >>> prepared = session.prepare(query)
            >>> bound_stmt = prepared.bind((user.id, user.name, user.age))
            >>> session.execute(bound_stmt)

        Of course, prepared statements may (and should) be reused::

            >>> prepared = session.prepare(query)
            >>> for user in users:
            ...     bound = prepared.bind((user.id, user.name, user.age))
            ...     session.execute(bound)

        Alternatively, if :attr:`~.Cluster.protocol_version` is 5 or higher
        (requires Cassandra 4.0+), the keyspace can be specified as a
        parameter. This will allow you to avoid specifying the keyspace in the
        query without specifying a keyspace in :meth:`~.Cluster.connect`. It
        even will let you prepare and use statements against a keyspace other
        than the one originally specified on connection:

            >>> analyticskeyspace_prepared = session.prepare(
            ...     "INSERT INTO user_activity id, last_activity VALUES (?, ?)",
            ...     keyspace="analyticskeyspace")  # note the different keyspace

        **Important**: PreparedStatements should be prepared only once.
        Preparing the same query more than once will likely affect performance.

        `custom_payload` is a key value map to be passed along with the prepare
        message. See :ref:`custom_payload`.
        r  N)r  r  zError preparing query:z#Error preparing query on all hosts:)r5   r:  r  r  r:  oner   r   r   rY   from_messager  bind_metadata
pk_indexesr   r  r  column_metadatar(  r  r  r  r  _current_hostr  )
r   r  r  r  r   r   r  prepared_keyspacer  r  r   r   r   prepare  s4   (
zSession.preparec              	   C   s   g }t | j D ]C}||krL|jrLt| t||dd| j}z||}W n ty4   t	
d| Y q	w |du rEt	d||j| q	|||f q	|D ]\}}z|  W qO tyh   t	
d| Y qOw dS )z|
        Prepare the given query on all hosts, excluding ``excluded_host``.
        Intended for internal use only.
        r  Nz"Error preparing query for host %s:z'Failed to prepare query for host %s: %r)r  rq  rB  r  r:  r5   r  _queryr   r   r   r   _errorsrW  r   r:  )r   r  excluded_hostr  r  r  r   
request_idr   r   r   r  N  s4   zSession.prepare_on_all_hostsc                 C   s   | j  | jr	 W d   dS d| _W d   n1 sw   Y  | jD ]}|  q%t| j | jr9| j  t| j	 D ]}|
  q@dS )z
        Close all connections.  ``Session`` instances should not be used
        for any purpose after being shutdown.
        NT)r  r   r  rE  r  r  r&  r  rq  r  r   )r   r   rs  r   r   r   r   p  s   
	



zSession.shutdownc                 C   r'  r   r   r   r   r   r   r(    r)  zSession.__enter__c                 G   r*  r   r+  r,  r   r   r   r-    r  zSession.__exit__c                 C   s   z|    W d S    Y d S r   r+  r   r   r   r   __del__  s   zSession.__del__c                    s6   j   tjkrdS  fdd}|S )z(
        For internal use only.
        Nc               
      s  zj dkrt} nt} W nG ty7 } ztt|d}j| W Y d }~dS d }~w ty[ } zt	j
d|d jj|dd W Y d }~dS d }~ww j}jb | jjkrj  t g   fdd	}| j| jj  r rt	
d
  j |   j  	 W d    dS j  | jjksl| j< W d    n1 sw   Y  t	d |r|  dS )Nr  )r  Fz1Failed to create connection pool for new host %s:r   T)rZ  c                    s     |   d S r   )r  r  )rs  r   errors_returnedset_keyspace_eventr   r   rL    s   
zJSession.add_or_renew_pool.<locals>.run_add_or_renew_pool.<locals>.callbackzJFailed setting keyspace for pool after keyspace changed during connect: %sz!Added pool for host %s to session)r  rV   rU   r   r"   r~  r   rd  r   r   r  rq  rW  r  	_keyspacer  releaser   _set_keyspace_for_all_connsr   r  is_setr*  r   acquirer   )new_poolauth_excconn_excpreviousrL  r  r  r3  r   r[  r   run_add_or_renew_pool  sX   
	


z8Session.add_or_renew_pool.<locals>.run_add_or_renew_pool)r  r  rK   r!  r   )r   r  r3  rh  r   rg  r   rG    s
   

0zSession.add_or_renew_poolc                 C   s.   | j |d }|rtd| | |jS d S )NzRemoved connection pool for %r)rq  popr   r   r   r   )r   r  rs  r   r   r   r4    s
   zSession.remove_poolc                 C   s   t  }| jj D ]@}| j|}| j|}d}|r|jr/|t	j
kr.|jdv r.| |d}n||jkrB|t	j
kr?| |}n||_|rI|| q	|S )a  
        When the set of live nodes change, the loadbalancer will change its
        mind on host distances. It might change it on the node that came/left
        but also on other nodes (for instance, if a node dies, another
        previously ignored node may be now considered).

        This method ensures that all hosts for which a pool should exist
        have one, and hosts that shouldn't don't.

        For internal use only.
        N)TNF)r  r   r  r  r  r  rq  rW  r   rK   r!  r  rG  r  r4  r   )r   r  r  r  rs  r   r   r   r   r    s"   



zSession.update_created_poolsc                    s(     |}|r| fdd dS dS )z{
        Called by the parent Cluster instance when a node is marked down.
        Only intended for internal use.
        c                    s      S r   )r  r   r   r   r   r    s    z!Session.on_down.<locals>.<lambda>N)r4  r   )r   r  r   r   r   r   r*    s   
zSession.on_downc                 C   s   |  | dS 
 Internal N)r*  rk  r   r   r   r,    s   zSession.on_removec                 C   s   |  dt|f  dS )z
        Set the default keyspace for all queries made through this Session.
        This operation blocks until complete.
        zUSE %sN)r  rD   )r   r  r   r   r   r    s   zSession.set_keyspacec                    s   | j  || _t| j W d   n1 sw   Y  i s'  dS  fdd}t| j D ]}||| q6dS )a  
        Asynchronously sets the keyspace on all pools.  When all
        pools have set all of their connections, `callback` will be
        called with a dictionary of all errors that occurred, keyed
        by the `Host` that they occurred against.
        Nc                    s,    |  |r|| j< s | d S d S r   )ro  r  )rs  host_errorsrL  r   remaining_callbacksr   r   pool_finished_setting_keyspace  s   

zKSession._set_keyspace_for_all_pools.<locals>.pool_finished_setting_keyspace)r  r  r  rq  r  r  r`  )r   r  rL  ro  rs  r   rm  r   _set_keyspace_for_all_pools  s   z#Session._set_keyspace_for_all_poolsc                    s   z	j jj| }W n ty   td|f w z|j| }W n ty.   td||f w |j  fdd}|jj|< dS )z
        Called by the parent Cluster instance when the user registers a new
        mapping from a user-defined type to a class.  Intended for internal
        use only.
        zCKeyspace %s does not exist or has not been discovered by the driverz*User type %s does not exist in keyspace %sc                    s   dd  fddD  S )Nz{ %s }z , c              	   3   s,    | ]}d | j t|df V  qdS )z%s : %sN)r  cql_encode_all_typesgetattr)r  
field_name)r   r   r   r   r  >  s    z?Session.user_type_registered.<locals>.encode.<locals>.<genexpr>)rC  r   field_namesr   rt  r   r  =  s   
z,Session.user_type_registered.<locals>.encodeN)	r   r  r  rA  UserTypeDoesNotExist
user_typesrv  r  mapping)r   r  r  r  ks_meta	type_metar  r   ru  r   r  )  s"   
zSession.user_type_registeredc                 O   s&   | j s| jjj|g|R i |S dS rj  )r   r   r   r   rI  r   r   r   r   E  s   zSession.submitc                 C   s   t dd t| j D S )Nc                 s   s     | ]\}}||  fV  qd S r   )	get_state)r  r  rs  r   r   r   r  K      z)Session.get_pool_state.<locals>.<genexpr>)r  r  rq  r  r   r   r   r   rV  J  s   zSession.get_pool_statec                 C   
   | j  S r   )rq  r  r   r   r   r   r#  M  r  zSession.get_poolsc                 C   s:   | j jtjkrtd|f t| d| | tj| j _d S )NzXCannot set Session.%s while using Configuration Profiles. Set this in a profile instead.r  )r   rC  r0  r3  r   rE  r2  )r   	attr_namer@  r   r   r   r  P  s   z#Session._validate_set_legacy_configr   r   )Lr   r   r   r   r   r$  r  r   r  r  r   r_   r  r.  r   r  r  r  r   r   r"  r  r  r  max_trace_waitr0  r2  r  r  r@   r  r  rq  r  r  r  r  r   r   r   r  r  r   r  r  r  r  r  r  r  r  r  rD  r.  r  rK  rL  r  rU  r  r   r(  r-  rZ  rG  r4  r  r*  r,  r  rp  r  r   rV  r#  r  r   r   r   r   r  1	  s    





	


			

<
-
18.
k

B"	:!	r  c                   @      e Zd ZdZdS )rw  zj
    An attempt was made to use a user-defined type that does not exist.

    .. versionadded:: 2.1.0
    Nr   r   r   r   r   r   r   r   rw  W  s    rw  c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )_ControlReconnectionHandler
    Internal
    c                 O   s(   t j| g|R i | t|| _d S r   )rS   r   r  r  rO  )r   rO  r   r   r   r   r   r   e  s   z$_ControlReconnectionHandler.__init__c                 C   r~  r   )rO  _reconnect_internalr   r   r   r   try_reconnecti  r  z)_ControlReconnectionHandler.try_reconnectc                 C   s   | j | d S r   )rO  _set_new_connectionr   r  r   r   r   on_reconnectionl  s   z+_ControlReconnectionHandler.on_reconnectionc                 C   s   t |trdS td| dS )NFz0Error trying to reconnect control connection: %rT)ra  r   r   r   )r   r   
next_delayr   r   r   on_exceptiono  s   
z(_ControlReconnectionHandler.on_exceptionN)r   r   r   r   r   r  r  r  r   r   r   r   r  `  s    r  c                 O   s*   |  }|du r	dS t |||i | dS )zZ
    A callback handler for the ControlConnection that tolerates
    weak references.
    Nrr  )obj_weakrefmethod_namer   r   objr   r   r   _watch_callbackx  s   r  c                 C   s$   z|    W dS  ty   Y dS w )z
    Called when the ControlConnection object is about to be finalized.
    This clears watchers on the underlying Connection object.
    N)control_conn_disposedReferenceError)connexpiring_weakrefr   r   r   _clear_watcher  s
   r  c                   @   s  e Zd ZdZdZdZdZdZdZdZ	dZ
d	Zd
ZdZedZG dd deZdZdZdZdZdZdZdZdZdZeZ		dPddZdd Zdd Zdd Z dd Z!dd Z"dd Z#d d! Z$d"d# Z%d$d% Z&dQd&d'Z'dRd(d)Z(dQd*d+Z)		dSd,d-Z*e+d.d/ Z,d0d1 Z-d2d3 Z.d4d5 Z/d6d7 Z0d8d9 Z1d:d; Z2dTd<d=Z3d>d? Z4dUd@dAZ5dBdC Z6dDdE Z7dFdG Z8dVdHdIZ9dJdK Z:dLdM Z;dNdO Z<dS )Wr  r  zSELECT * FROM system.peerszvSELECT host_id, peer, data_center, rack, rpc_address, {nt_col_name}, release_version, schema_version FROM system.peersz,SELECT * FROM system.local WHERE key='local'zSELECT host_id, cluster_name, data_center, rack, partitioner, release_version, schema_version FROM system.local WHERE key='local'z6SELECT rpc_address FROM system.local WHERE key='local'zESELECT peer, host_id, {nt_col_name}, schema_version FROM system.peersz9SELECT schema_version FROM system.local WHERE key='local'zSELECT * FROM system.peers_v2zSELECT host_id, peer, peer_port, data_center, rack, native_address, native_port, release_version, schema_version FROM system.peers_v2zaSELECT host_id, peer, peer_port, native_address, native_port, schema_version FROM system.peers_v2z6.0.0c                   @   r/  )z ControlConnection.PeersQueryTypezinternal Enum for _peers_queryr   r   N)r   r   r   r   PEERSPEERS_SCHEMAr   r   r   r   PeersQueryType  r4  r  FNTc                 C   s^   t || _d | _|| _|| _|| _|| _|| _|| _	t
 | _t | _d | _t
 | _i | _d S r   )r  r  _clusterr  _timeout_schema_event_refresh_window_topology_event_refresh_window_status_event_refresh_windowrP  rW  r   r  r   _schema_agreement_lock_reconnection_handler_reconnection_lock_event_schedule_times)r   r   r  r  r  r  schema_meta_enabledtoken_meta_enabledr   r   r   r     s   
zControlConnection.__init__c                 C   s:   | j rd S | jj| _| |   | jjtj	k| jj
_d S r   )_is_shutdownr  r=  r  r  r  r  _product_typeru  DATASTAX_CLOUD_PRODUCT_TYPEr  r   r   r   r   r   r    s
   
zControlConnection.connectc                 C   sT   | j  | j}|| _W d   n1 sw   Y  |r(td|| |  dS dS )zM
        Replace existing connection (if there is one) and close it.
        NzA[control connection] Closing old connection %r, replacing with %r)r  r  r   r   r  )r   r  r  r   r   r   r    s   z%ControlConnection._set_new_connectionc                 C   s   i }| j jtjkr| j jn| j j}| D ]\}z| |W   S  tyI } z||t	|j
< tjd|dd | j j||dd W Y d}~n%d}~w tyi } z||t	|j
< tjd|dd W Y d}~nd}~ww | jrqtdqtd|)	a  
        Tries to connect to each host in the query plan until one succeeds
        or every attempt fails. If successful, a new Connection will be
        returned.  Otherwise, :exc:`NoHostAvailable` will be raised
        with an "errors" arg that is a dict mapping host addresses
        to the exception that was raised when an attempt was made to open
        a connection to that host.
        z,[control connection] Error connecting to %s:Tr   Fr2  Nz=[control connection] Reconnection in progress during shutdownr  )r  rC  r0  r2  r   rG  r  _try_connectr"   r~  r  r   r  rd  r   r  r   r   )r   r   rD  r  r   r   r   r   r    s,   	
z%ControlConnection._reconnect_internalc              
   C   s  t d| 	 z| jj|jdd}| jr|  tdW n@ ty8 } z| j	|j|j
 W Y d}~n)d}~w ty\ } z| jjsQ|jrQ| j	|j| jj n W Y d}~nd}~ww qt d| t| ttt|}z}|jtt|dtt|dtt|d	d
| jd | | jj|}| jr| jn| j}t|tjd}t|tjd}|j ||| jdd\\}	}
\}}|s||	sd| _!| | jj|}t|tjd}|j"|| jd}
|
|f}| j#||d | j$||dd W |S  t%y   |   w )z~
        Creates a new Connection, registers for pushed events, and refreshes
        node/token and schema metadata.
        z1[control connection] Opening new connection to %sT)is_control_connectionzReconnecting during shutdownNzk[control connection] Established new connection %r, registering watchers and refreshing schema and topology_handle_topology_change_handle_status_change_handle_schema_change)TOPOLOGY_CHANGESTATUS_CHANGESCHEMA_CHANGE)register_timeoutr  r   Fr  r  )preloaded_results)r  r  )&r   r   r  r  r  r  r  r   r%   r  startup_versionrB   r  is_beta_protocol_errorr=  r  refr
   r  r  register_watchersr  r  _get_peers_queryr  r  rW  _SELECT_LOCAL_SELECT_LOCAL_NO_TOKENSr.   r   ONEr  _uses_peers_v2wait_for_response _refresh_node_list_and_token_map_refresh_schemar   )r   r  r  r   self_weakref	sel_peers	sel_localpeers_querylocal_querypeers_successpeers_resultlocal_successlocal_resultshared_resultsr   r   r   r    sn   



zControlConnection._try_connectc                 C   s   | j rd S | | j d S r   )r  _submit
_reconnectr   r   r   r   	reconnectJ  s   zControlConnection.reconnectc                 C   s   t d z
| |   W d S  tyN   | jj }| j$ | j	r(| j	
  t| | jj|| jd d| _	| j	  W d    Y d S 1 sFw   Y  Y d S  ty\   t jddd  w )Nz,[control connection] Attempting to reconnectrM  z'[control connection] error reconnectingTr   )r   r   r  r  r   r  r  rO  r  r  rE  r  r  !_get_and_set_reconnection_handlerrP  r   )r   rQ  r   r   r   r  P  s&   


&zControlConnection._reconnectc                 C   s:   | j  | j}|| _|W  d   S 1 sw   Y  dS )z
        Called by the _ControlReconnectionHandler when a new connection
        is successfully created.  Clears out the _reconnection_handler on
        this ControlConnection.
        N)r  r  )r   rN  r  r   r   r   r  j  s
   $z3ControlConnection._get_and_set_reconnection_handlerc                 O   s:   z| j js| j jj|i |W S W d S  ty   Y d S w r   )r  r   r   r   r  )r   r   r   r   r   r   r  u  s   zControlConnection._submitc                 C   s   | j  | jr| j  W d    n1 sw   Y  | j0 | jr+	 W d    d S d| _td | jrF| j  d | _W d    d S W d    d S 1 sQw   Y  d S )NTz Shutting down control connection)	r  r  rE  r  r  r   r   r  r  r   r   r   r   r   }  s"   


"zControlConnection.shutdownc                 K   sd   z| j r| j| j fd|i|W S W dS  ty   Y dS  ty1   tjddd |   Y dS w )Nr  z,[control connection] Error refreshing schemaTr   F)r  r  r  r   r   r   _signal_error)r   r  r   r   r   r   r    s   
z ControlConnection.refresh_schemac                 K   sd   | j jrdS | j|||d}| js|std dS |s#td dS | j jj|| jfi | dS )NF)r  	wait_timezP[control connection] Skipping schema refresh because schema metadata is disabledz7Skipping schema refresh due to lack of schema agreementT)	r  r   wait_for_schema_agreementrP  r   r   r  refreshr  )r   r  r  r  r  r   agreedr   r   r   r    s   


z!ControlConnection._refresh_schemac                 C   s`   z| j r| j| j |d W dS W dS  ty   Y dS  ty/   tjddd |   Y dS w )Nr  Tz=[control connection] Error refreshing node list and token mapr   F)r  r  r  r   r   r   r  r  r   r   r   r    s   
z1ControlConnection.refresh_node_list_and_token_mapc                 C   sd  |rt d |d }|d }n6tj}| | jj|}| js't d | j}nt d | j	}t
||d}	t
||d}
|j|	|
| jd\}}t|j|j}d }i }t }|jr0||j t|j|j}|d }|d }|| jj_|d	}|d
}| jj|j}|r0|d}|d}| ||| |d|_|d|_|d|_t||_t||_ t!||_"t#||_$|j"d u r| jr|jj%|_"|jj&|_$n8t
| j'tjd}|j(|| jdd\}}|rt|j|j}t!|d |_"t#|d |_$n
|jj%|_"|jj&|_$|d|_)|d|_*|d|_+|d|_,|r0|r0|||< |p9| jjj-d u }|D ]}| .|sPt /dt!|  q<| jj01|}||v rht /d||d q<|| | jj|}|d}|d}|d u rt d| | jj2|||ddd\}}d}n	|| |||O }|d|_t||_t||_ t!||_"t#||_$|d|_)|d|_*|d|_+|d|_,|d
d }|r|r| jr|||< q<| jj3 D ]}|jj%|jkr|j|vrd}t d| | j4| qt d |r.|r0t d | jj5|| d S d S d S )NzO[control connection] Refreshing node list and token map using preloaded resultsr   r   z;[control connection] Refreshing node list without token mapz7[control connection] Refreshing node list and token mapr  r  cluster_namepartitionertokensdata_centerri  r\  listen_addresslisten_portFr  r  r  workload	workloadsz2Found an invalid row for peer (%s). Ignoring host.zCFound multiple hosts with the same endpoint (%s). Excluding peer %speerz5[control connection] Found new host to connect to: %sT)r  rb  zB[control connection] Removing host not found in peers metadata: %rz0[control connection] Finished fetching ring infozA[control connection] Rebuilding token map due to topology changes)6r   r   r   r  r  r  r  rW  r  r  r.   r  r  r`   column_namesparsed_rowsr  r   r  r  r  r  rW  r  _update_location_infor\  r  r  rF   get_broadcast_addressbroadcast_addressget_broadcast_portbroadcast_portget_broadcast_rpc_addressbroadcast_rpc_addressget_broadcast_rpc_portbroadcast_rpc_portri  r   #_SELECT_LOCAL_NO_TOKENS_RPC_ADDRESSr  r  r  dse_workloaddse_workloadsr  _is_valid_peerr  r^  creater  r  rj  rebuild_token_map)r   r  r  r  r  r  r  r  r  r  r  r  	token_mapfound_hosts
local_rows	local_rowr  r  r  rh  ri  local_rpc_address_queryr  local_rpc_address_resultr#  should_rebuild_token_mapr  r  old_hostr   r   r   r    s   




















z2ControlConnection._refresh_node_list_and_token_mapc                 C   s>   t t| o| do| do| dod| vp| dS )Nr\  r  ri  r  )rS  rF   r  rW  )r#  r   r   r   r  ?  s   z ControlConnection._is_valid_peerc                 C   sD   |j |kr|j|krdS | jj| ||| | jj| dS )NFT)rh  ri  r  rF  r*  set_location_infor'  )r   r  rh  ri  r   r   r   r  E  s   z'ControlConnection._update_location_infoc                 C   sR   | j |d}| j }||kr|d }|| }n	t | }|| }|| j |< |S )Nr   {Gz?)r  rW  _timer1  r   )r   
event_typedelay_window	next_timenow	this_timedelayr   r   r   _delay_for_event_typeQ  s   



z'ControlConnection._delay_for_event_typec                 C   s   |r|j s|   dS dS )z
        Used to mitigate refreshes for nodes that are already known.
        Some versions of the server send superfluous NEW_NODE messages in addition to UP events.
        N)r  r  rk  r   r   r   _refresh_nodes_if_not_up_  s   
z*ControlConnection._refresh_nodes_if_not_upc                 C   s   |d }|d \}}| j j||}|dks|dkr4| jdkr2| d| j}| j j|| j| d S d S |dkrE| j jd| j j| d S d S )Nchange_typeri  NEW_NODE
MOVED_NODEr   topology_changeREMOVED_NODE)	r  r  r  r  r  r  schedule_uniquer  rj  r   eventr   r$  r   r  r  r   r   r   r  g  s   
z)ControlConnection._handle_topology_changec                 C   s   |d }|d \}}| j j||}|dkr9| d| j}|d u r,| j j|| j d S | j j|| j j| d S |dkrK|d urM| j j	|dd d S d S d S )Nr   ri  UPstatus_changeDOWNFr2  )
r  r  r  r  r  r  r  r  r'  r*  r  r   r   r   r  r  s   z'ControlConnection._handle_status_changec                 C   s:   | j dk rd S | d| j }| jjj|| jfi | d S )Nr   schema_change)r  r  r  r  r  r  )r   r  r  r   r   r   r    s   
z'ControlConnection._handle_schema_changec                 C   s
  |d ur|n| j j}|dkrdS | j | jr 	 W d    d S |s%| j}|rItd |d }|d }| |||j}|d u rI	 W d    dS td | j	
 }d}	tj}
d }| | jj|}|	|k rt||
d}t| j|
d}zt| j||	 }|j|||d\}}W n7 ty } ztd| | j	
 | }	W Y d }~qbd }~w ty   | jrtd	 Y W d    d S  w | |||j}|d u r	 W d    dS td
 | j	d | j	
 | }	|	|k sftd|j| 	 W d    dS 1 sw   Y  d S )Nr   TzM[control connection] Attempting to use preloaded results for schema agreementr   z1[control connection] Waiting for schema agreementr  r  zU[control connection] Timed out waiting for response during schema agreement check: %szC[control connection] Aborting wait for schema match due to shutdownz5[control connection] Schemas mismatched, trying againg?z.Node %s is reporting a schema disagreement: %sF)r  r  r  r  r  r   r   _get_schema_mismatchesr  r  r1  r   r  r  r  r  r.   _SELECT_SCHEMA_LOCALminr  r  r   r#   sleepr  )r   r  r  r  total_timeoutr  r  schema_mismatchesrP  elapsedr  select_peers_queryr  r  r  r   r   r   r    st   



(
.$z+ControlConnection.wait_for_schema_agreementc           
      C   s   t |j|j}tt}|jr&t |j|jd }|dr&||d | |D ]&}|d}|s2q(| jj	|}| jj
|}	|	rN|	jdurN|| | q(t|dkr\td d S tdd | D S )Nr   schema_versionFr   z"[control connection] Schemas matchc                 s   s     | ]\}}|t |fV  qd S r   r  )r  r  nodesr   r   r   r    r}  z;ControlConnection._get_schema_mismatches.<locals>.<genexpr>)r`   r  r  r   r  rW  r   r  r^  r  r  r  r  r  r   r   r  r  )
r   r  r  local_addressversionsr  r#  
schema_verr  r  r   r   r   r    s&   


z(ControlConnection._get_schema_mismatchesc                 C   s   || j j| j jfvrtd| | jr)|| j jkr$| jr| jn| j}|S | j}|S || j jkr7| jr7| j	}|S || j jkr@| j
n| j}| jj|jj}| jj|jj}|o]t|| jk}|rh|jdd}|S |rr|jdd}|S | j	}|S )a.  
        Determine the peers query to use.

        :param peers_query_type: Should be one of PeersQueryType enum.

        If _uses_peers_v2 is True, return the proper peers_v2 query (no templating).
        Else, apply the logic below to choose the peers v1 address column name:

        Given a connection:

        - find the server product version running on the connection's host,
        - use that to choose the column name for the transport address (see APOLLO-1130), and
        - use that column name in the provided peers query template.
        zInvalid peers query type: %snative_transport_address)nt_col_namerpc_address)r  r  r  r   r  rW  _SELECT_PEERS_V2_SELECT_PEERS_NO_TOKENS_V2_SELECT_SCHEMA_PEERS_V2_SELECT_PEERS_SELECT_SCHEMA_PEERS_TEMPLATE _SELECT_PEERS_NO_TOKENS_TEMPLATEr  r  r  r  r  r  rg   #_MINIMUM_NATIVE_ADDRESS_DSE_VERSIONr   )r   peers_query_typer  r  query_templatehost_release_versionhost_dse_versionuses_native_address_queryr   r   r   r    s6   z"ControlConnection._get_peers_queryc                 C   s   | j : | jr	 W d    d S | jr6| jjr6| jj| jj}|r6| jj|| jj	dd 	 W d    d S W d    n1 s@w   Y  | 
  d S r1  )r  r  r  
is_defunctr  r  r  r  rd  
last_errorr  rk  r   r   r   r    s   
 zControlConnection._signal_errorc                 C   s   d S r   r   rk  r   r   r   r'  %  r)  zControlConnection.on_upc                 C   sD   | j }|r|j|jkr| jd u r td| |   d S d S d S d S )Nz[[control connection] Control connection host (%s) is considered down, starting reconnection)r  r  r  r   r   r  )r   r  r  r   r   r   r*  (  s   
zControlConnection.on_downc                 C   s   |r
| j dd d S d S )NTr  )r  )r   r  rb  r   r   r   r+  2  s   zControlConnection.on_addc                 C   s>   | j }|r|j|jkrtd| |   d S | jdd d S )NzP[control connection] Control connection host (%s) is being removed. ReconnectingTr  )r  r  r   r   r  r  )r   r  r  r   r   r   r,  6  s
   zControlConnection.on_removec                 C   s   t | dd }|r|gS g S )Nr  r  )r   r  r   r   r   get_connections?  s   z!ControlConnection.get_connectionsc                 C   s*   || j u r|js|jr|   d S d S d S r   )r  r)  	is_closedr  r  r   r   r   return_connectionC  s   z#ControlConnection.return_connection)TTr  )NNFr  )NNNr   r  )=r   r   r   r   r   r"  r  r  r  r!  r  r  r  r  rg   r#  r   r  r  r  r  r  r  r  rP  rW  r  r1  r  r   r  r  r  r  r  r  r  r  r   r  r  r  r  r   r  r  r  r  r  r  r  r  r  r  r  r'  r*  r+  r,  r+  r-  r   r   r   r   r    sz    
	D



 

@
-

	r  c                 C   s2   z	| j s|   W n	 ty   Y nw |  d S r   )r   r   r  rC  )r  threadr   r   r   _stop_schedulerH  s   r/  c                   @   sT   e Zd ZdZdZdZdZdd Zdd Zdd Z	d	d
 Z
dd Zdd Zdd ZdS )r  NFc                 C   s@   t  | _t | _t | _|| _tj	| dd d| _
|   d S )NzTask Scheduler)r   T)queuePriorityQueue_queuer  _scheduled_tasksr   _count	_executorr   r   daemonrP  )r   r   r   r   r   r   Y  s   
z_Scheduler.__init__c                 C   s@   zt d W n	 ty   Y nw d| _| jd |   d S )NzShutting down Cluster SchedulerT)r   r   N)r   r   r   r   r2  
put_nowaitrC  r   r   r   r   r   c  s   z_Scheduler.shutdownc                 O   s   |  |||t| f d S r   )_insert_taskr  r  )r   r  rJ  r   r   r   r   r   rQ  m  s   z_Scheduler.schedulec                 O   s<   ||t | f}|| jvr| || d S td| d S )Nz7Ignoring schedule_unique for already-scheduled task: %r)r  r  r3  r8  r   r   )r   r  rJ  r   r   taskr   r   r   r  p  s   
z_Scheduler.schedule_uniquec                 C   sJ   | j st | }| j| | j|t| j|f d S t	d| d S )Nz*Ignoring scheduled task after shutdown: %r)
r   r1  r3  r   r2  r7  nextr4  r   r   )r   r  r9  run_atr   r   r   r8  w  s
   z_Scheduler._insert_taskc                 C   s   	 | j rd S zO	 | jjdd d\}}}| j r |rtd W d S |t krI| j| |\}}}t|}| j	j
|g|R i |}|| j n
| j|||f nqW n
 tjy_   Y nw td q)NT)blockr  z6Not executing scheduled task due to Scheduler shutdowng?)r   r2  rW  r   r   r1  r3  r   r  r5  r   r   _log_if_failedr7  r0  Emptyr  )r   r;  r  r9  rJ  r   r   r   r   r   r   run  s2   


z_Scheduler.runc                 C   s"   |  }|rtjd|d d S d S )NzBAn internally scheduled tasked failed with an unhandled exception:r   )r   r   r  )r   r   r   r   r   r   r=    s   
z_Scheduler._log_if_failed)r   r   r   r2  r3  r5  r   r   r   rQ  r  r8  r?  r=  r   r   r   r   r  R  s    

r  c              	   K   s   z<zt d| | j|fi ||_W n ty,   t d |jj| jfi | Y n	w W |	d  d S W |	d  d S |	d  w )Nz2Refreshing schema in response to schema change. %sz9Exception refreshing schema in response to schema change:)
r   r   r  is_schema_agreedr   r   r  r   r  _set_final_result)control_connrM  r  r   r   r   r   refresh_schema_and_set_result  s   
rC  c                	   @   s  e Zd ZdZdZ	 dZ	 dZ	 dZ	 dZ	 dZ	dZ
dZdZdZdZdZeZdZdZdZdZdZdZdZdZdZdZdZdZdZdZdZ e!Z"e# Z$dZ%dZ&dZ'dZ(dde) ddddddf	ddZ*e+dd	 Z,d
d Z-dd Z.dIddZ/dd Z0dd Z1dJddZ2dKddZ3e+dd Z4e+dd Z5e+dd Z6dd Z7d d! Z8d"d# Z9d$d% Z:d&d' Z;d(d) Z<d*d+ Z=d,d- Z>d.d/ Z?d0d1 Z@d2d3 ZAd4d5 ZBd6d7 ZCdeDjEfd8d9ZFdeDjEfd:d;ZGd<d= ZHd>d? ZId@dA ZJ		dLdCdDZKdEdF ZLdGdH ZMeMZNdS )Mr:  a  
    An asynchronous response delivery mechanism that is returned from calls
    to :meth:`.Session.execute_async()`.

    There are two ways for results to be delivered:
     - Synchronously, by calling :meth:`.result()`
     - Asynchronously, by attaching callback and errback functions via
       :meth:`.add_callback()`, :meth:`.add_errback()`, and
       :meth:`.add_callbacks()`.
    NTr   Fc                 C   s   || _ |p|j| _|	p|jj| _|| _|| _|| _|| _|| _	|| _
t | _|
p+t | _|| _|p4| j| _|   t | _i | _g | _g | _g | _|   || _d S r   )r  r   r   rG  r  r   r  r  _retry_policyr  r  r   _callback_lockr1  _start_time_host_spec_execution_plan_make_query_planr   _eventrW  
_callbacks	_errbacksattempted_hostsr  _continuous_paging_state)r   r  r   r  r  r  r  r   r   r)  r*  r+  r,  r  r   r   r   r     s*   
zResponseFuture.__init__c                 C   s"   | j d u rd S | j| j  t  S r   )r  rF  r1  r   r   r   r   _time_remaining  s   
zResponseFuture._time_remainingc                 C   s|   | j d u r:| j| j}|dkr'| jd u s| j|kr'| jjj|| j	| _ d S | jd ur<| jjj| j| j
| _ d S d S d S )Nr   )_timerrH  next_executionrS  rO  r  r   rs  create_timer_on_speculative_execute_on_timeout)r   
spec_delayr   r   r   r    s   

zResponseFuture._start_timerc                 C   s   | j r
| j   d S d S r   )rP  rE  r   r   r   r   _cancel_timer   s   zResponseFuture._cancel_timerc                 C   st  | j du r|dk r| jjjdt| j|d d| _dS | j durz
| j j	| j
 W n tyD   d}|di}| t|| j Y dS w | jj| j}|r|js| j j | j j| j
 t| j j| j jkrmd| j _W d   n1 sww   Y  |j| j dd	 | j}|s| jr| jrt| jjnd
}|di}n| jjjj }|rt|jnd}|di}| t|| j dS )a<  
        Called when the request associated with this ResponseFuture times out.

        This function may reschedule itself. The ``_attempts`` parameter tracks
        the number of times this has happened. This parameter should only be
        set in those cases, where ``_on_timeout`` reschedules itself.
        Nr  r  r   )	_attemptszConnection defunct by heartbeatz<Client request timeout. See Session.execute[_async](timeout)T)stream_was_orphanedzno host queried before timeoutunknownzRequest timed out while waiting for schema agreement. See Session.execute[_async](timeout) and Cluster.max_schema_agreement_wait.)r  r  r   rs  rR  r
   rT  rP  	_requestsri  _req_idrA  _set_final_exceptionr   rS  rq  rW  r   r<  orphaned_request_idsr   r  orphaned_thresholdorphaned_threshold_reachedr-  rW  r@  r~  r  rO  )r   rW  ro  r   rs  r  r  r   r   r   rT  $  sB   	




zResponseFuture._on_timeoutc                 C   sl   d | _ | j s4| js| jjjd| j| _ d S | j	d ur(| j	dkr(| 
  d S | jdd |   d S d S )Nr  r   F)error_no_hosts)rP  rJ  ra  rM  r  r   rs  rR  rS  rO  rT  r  r  r   r   r   r   rS  [  s   
	

z&ResponseFuture._on_speculative_executec                 C   s2   | j r
| j g| _d S t| j| jj| j| _d S r   )rG  r   iterr  r  r  r  r  r   r   r   r   rI  q  s   zResponseFuture._make_query_planc                 C   sp   | j D ]'}| |}|dur|| _ dS | jdur*t | j | jkr*|    dS q|r6| td| j	 dS )rk  NTz2Unable to complete the operation against any hostsF)
r   rV  r[  r  r1  rF  rT  r\  r   rW  )r   r`  r  req_idr   r   r   r  }  s   

zResponseFuture.send_requestc           	   
   C   s  |d u r| j }| jj|}|std| j|< d S |jr%td| j|< d S || _d }z9|jdd\}}|| _	| j
r=| j
jng }|d u rKt| j|||}|j|||| jj| jj|d| _| j| |W S  ty } ztd| || j|< W Y d }~d S d }~w ty } ztd| || j|< W Y d }~d S d }~w ty } z,tjd|d	d
 || j|< | jd ur| j  |r|| W Y d }~d S W Y d }~d S d }~ww )Nz$Host has been marked down or removedzPool is shutdownrL  r  )r  r  decoderr3  zDAll connections for host %s are at capacity, moving to the next hostz7Connection for host %s is busy, moving to the next hostzError querying host %sTr   )r   r  rq  rW  r"   rW  r   rS  borrow_connectionr  r  r3  r
   _set_resultsend_msgr  encode_messagedecode_messagerequest_encoded_sizerM  r   rW   r   r   r+   r   r  on_connection_errorr-  )	r   r  r   r  rs  r  rY  result_metar   r   r   r   rV    s\   
	



zResponseFuture._queryc                 C   s
   | j duS )z
        Returns :const:`True` if there are more pages left in the
        query results, :const:`False` otherwise.  This should only
        be checked after the first page has been returned.

        .. versionadded:: 2.0.0
        N)_paging_stater   r   r   r   has_more_pages  s   
	zResponseFuture.has_more_pagesc                 C      | j  s	td| jS )a  
        Warnings returned from the server, if any. This will only be
        set for protocol_version 4+.

        Warnings may be returned for such things as oversized batches,
        or too many tombstones in slice queries.

        Ensure the future is complete before trying to access this property
        (call :meth:`.result()`, or after callback is invoked).
        Otherwise it may throw if the response has not been received.
        z?warnings cannot be retrieved before ResponseFuture is finalized)rJ  ra  r   	_warningsr   r   r   r   warnings     
zResponseFuture.warningsc                 C   rn  )a  
        The custom payload returned from the server, if any. This will only be
        set by Cassandra servers implementing a custom QueryHandler, and only
        for protocol_version 4+.

        Ensure the future is complete before trying to access this property
        (call :meth:`.result()`, or after callback is invoked).
        Otherwise it may throw if the response has not been received.

        :return: :ref:`custom_payload`.
        zEcustom_payload cannot be retrieved before ResponseFuture is finalized)rJ  ra  r   _custom_payloadr   r   r   r   r    rq  zResponseFuture.custom_payloadc                 C   sH   | j st |   | j | j_| j  t| _d| _	| 
  |   dS )aU  
        If there are more pages left in the query result, this asynchronously
        starts fetching the next page.  If there are no pages left, :exc:`.QueryExhausted`
        is raised.  Also see :attr:`.has_more_pages`.

        This should only be called after the first page has been returned.

        .. versionadded:: 2.0.0
        N)rl  QueryExhaustedrI  r   r  rJ  clearr   _final_result_final_exceptionr  r  r   r   r   r   start_fetching_next_page  s   


z'ResponseFuture.start_fetching_next_pagec                 C   s>   t | jj| j|||}| j|||d}|d u r|   d S d S )N)r  )r
   r  r   _execute_after_preparerV  r  )r   prepare_messager  r  rs  r  rY  r   r   r   
_reprepare   s
   zResponseFuture._repreparec              
   C   sF  z|| _ |r|| t|dd }|r$| jsg | _| jt|| j t|dd | _t|dd | _t	|t
r|jtkrRt| dd }|rO||j| j W d S W d S |jtkrnd| _| jjt| jjj| |fi |j W d S |jtkr|j| _|j| _|j| _t| jdd r| || W d S |  | !|j|j" W d S |jt#kr|  d  W d S |  | W d S t	|t$r| j%}t	|t&r| j'd ur| j'(  |j(| j)fd| j*i|j+}n-t	|t,r| j'd ur| j'-  |j-| j)fd| j*i|j+}nt	|t.r| j'd ur
| j'/  |j/| j)fd| j*i|j+}nt	|t0t1t2t3frIt45d||j6 | j'd ur6| j'7  t| jd	d }	|j8| j)|	|| j*d
}nt	|t9r| j:rg| j:j;}
|
|j+ksfJ d|j+|
f n|j+}
z	| jjj<|
 }W n* t=y   | j:st4>d|
?d | @| Y W d S | j:}|| jjj<|
< Y nw | jAjB}|jB}tCD| jjjEs|r||kr| @tFd||f  W d S t4Gd||jH tCD| jjjEr|jBnd }tI|jH|d}| j| jJ|||| W d S tK|dr| @|L  W d S | @| W d S | M||| W d S t	|tNrK| j'd ur"| j'O  t	|tPs.| jAQ| t| jd	d }	| j%j8| j)|	|| j*d
}| M||| W d S t	|tRritK|dra| @|L  W d S | @| W d S d|f }tN||}| S  | jAQ| | @| W d S  tRy } zt4Td | @| W Y d }~d S d }~ww )Ntrace_idrp  r  r  Fr   	retry_numzHost %s error: %s.r   )r;  r|  zFGot different query ID in server response (%s) than we had before (%s)z2Tried to execute unknown prepared statement: id=%shexzdThe Session's current keyspace (%s) does not match the keyspace the statement was prepared with (%s)z@Re-preparing unrecognized prepared statement against host %s: %sr  to_exceptionzGot unexpected message: %rz=Unexpected exception while handling result in ResponseFuture:)Ucoordinator_hostr-  rr  _query_tracesr   r]   r  ro  rr  ra  r/   kindr=   rp  new_keyspace_set_keyspace_completedr?   r@  r   rC  r   rO  schema_change_eventr>   r  rl  r  
_col_namescolumn_types
_col_typesr   (_handle_continuous_paging_first_responserA  r   r  rA   r0   rD  r1   r  on_read_timeoutr  _query_retriesr?  r2   on_write_timeoutr3   on_unavailabler4   r8   r9   r:   r   r  summaryon_other_erroron_request_errorr7   r  r  r  rA  r;  r  r\  r  r  r   r  r=  r   r   r  r5   rz  r  r~  _handle_retry_decisionr"   rj  r#   defunctr   rV  r   )r   r  r  rs  r  r{  r  r   retryr  r  r  current_keyspacerT  ry  r  r   r   r   r   re    s:  

























zResponseFuture._set_resultc                 C   s<   | |j| jj| j| j| _| j| | | j	  d S r   )
new_continuous_paging_session	stream_idr  rh  r   rN  _continuous_paging_session
on_messagerA  r@  )r   r  r  r   r   r   r    s   z7ResponseFuture._handle_continuous_paging_first_responsec                 C   s*   |s	|  d  d S | td|f  d S )Nz'Failed to set keyspace on all hosts: %s)rA  r\  r"   )r   r   r   r   r   r    s
   
z&ResponseFuture._set_keyspace_completedc                 C   s>  |r| | | jrdS t|tra|jtkrT| jrC| jj|jkr3| t	dj
t| jjt|jd |j| j_|j}|durC|| j_| |}|du rR|   dS dS | td||f  dS t|tr{t|drt| |  dS | | dS t|trtd|| || j|< |   dS | td||f  dS )z
        Handle the response to our attempt to prepare a statement.
        If it succeeded, run the original query again against the same host.
        NzID mismatch while trying to reprepare (expected {expected}, got {got}). This prepared statement won't work anymore. This usually happens when you run a 'USE...' query after the statement was prepared.)expectedgotz?Got unexpected response when preparing statement on host %s: %sr~  z8Connection error when preparing statement on host %s: %szDGot unexpected response type when preparing statement on host %s: %s)r-  rv  ra  r/   r  r<   r  r  r\  r   r   r   rR  r3  r(  rV  r  r"   r0   r  r~  r   r   rW  )r   r  r  rs  r  new_metadata_idrY  r   r   r   rx    sT   










z%ResponseFuture._execute_after_preparec                       |    | jd ur| jjt | j  | j  | _t fdd| j	D }W d    n1 s2w   Y  | j
  |D ]}|  q>d S )Nc                 3   .    | ]\}}}t | g|R i |V  qd S r   r
   r  rJ  r   r   r  r   r   r    
    
z3ResponseFuture._set_final_result.<locals>.<genexpr>)rV  r  request_timeraddValuer1  rF  rE  ru  r  rK  rJ  r  r   r  to_callcallback_partialr   r  r   rA    s   


z ResponseFuture._set_final_resultc                    r  )Nc                 3   r  r   r  r  r  r   r   r    r  z6ResponseFuture._set_final_exception.<locals>.<genexpr>)rV  r  r  r  r1  rF  rE  rv  r  rL  rJ  r  r  r   r  r   r\    s   



z#ResponseFuture._set_final_exceptionc                 C   s   dd }|\}}|t jt jfv r$|  jd7  _|t jk}| ||| n|t ju r1| || n| jd ur;| j  | 	d  ||| j
|< d S )Nc                 S   s   t | dr	|  S | S )Nr~  )r  r~  r  r   r   r   exception_from_response  s   
zFResponseFuture._handle_retry_decision.<locals>.exception_from_responser   )rL   RETRYRETRY_NEXT_HOSTr  _retryRETHROWr\  r  	on_ignorerA  rW  )r   retry_decisionr  r  r  
retry_typeconsistencyreuser   r   r   r    s   




z%ResponseFuture._handle_retry_decisionc                 C   sD   | j rd S | jd ur| j  |d ur|| j_| j| j|| d S r   )rv  r  on_retryr   r   r  r   _retry_task)r   reuse_connectionr   r  r   r   r   r  %  s   

zResponseFuture._retryc                 C   s,   | j rd S |r| |d urd S |   d S r   )rv  rV  r  )r   r  r  r   r   r   r  3  s
   zResponseFuture._retry_taskc                 C   s&   | j   | jturt| | jS | j)a  
        Return the final result or raise an Exception if errors were
        encountered.  If the final result or error has not been set
        yet, this method will block until it is set, or the timeout
        set for the request expires.

        Timeout is specified in the Session request execution functions.
        If the timeout is exceeded, an :exc:`cassandra.OperationTimedOut` will be raised.
        This is a client-side timeout. For more information
        about server-side coordinator timeouts, see :class:`.policies.RetryPolicy`.

        Example usage::

            >>> future = session.execute_async("SELECT * FROM mycf")
            >>> # do other stuff...

            >>> try:
            ...     rows = future.result()
            ...     for row in rows:
            ...         ... # process results
            ... except Exception:
            ...     log.exception("Operation failed:")

        )rJ  r   ru  r   	ResultSetrv  r   r   r   r   r:  ?  s   

zResponseFuture.resultc                 C   s   dd | j D S )zt
        Returns the trace session ids for this future, if tracing was enabled (does not fetch trace data).
        c                 S   r  r   )r{  )r  r  r   r   r   r_  b  r  z6ResponseFuture.get_query_trace_ids.<locals>.<listcomp>)r  r   r   r   r   get_query_trace_ids^  s   z"ResponseFuture.get_query_trace_idsc                 C   s>   | j tu r| jdu rtd| jr| t| jd ||S dS )aO  
        Fetches and returns the query trace of the last response, or `None` if tracing was
        not enabled.

        Note that this may raise an exception if there are problems retrieving the trace
        details from Cassandra. If the trace is not available after `max_wait`,
        :exc:`cassandra.query.TraceUnavailable` will be raised.

        If the ResponseFuture is not done (async execution) and you try to retrieve the trace,
        :exc:`cassandra.query.TraceUnavailable` will be raised.

        `query_cl` is the consistency level used to poll the trace tables.
        NzDTrace information was not available. The ResponseFuture is not done.r   )ru  r   rv  r^   r  _get_query_tracer  )r   max_waitquery_clr   r   r   get_query_traced  s   zResponseFuture.get_query_tracec                    s*   j r fddttj D S g S )z
        Fetches and returns the query traces for all query pages, if tracing was enabled.

        See note in :meth:`~.get_query_trace` regarding possible exceptions.
        c                    s   g | ]	} | qS r   )r  )r  r  max_wait_perr  r   r   r   r_    rd  z7ResponseFuture.get_all_query_traces.<locals>.<listcomp>)r  r  r  )r   r  r  r   r  r   get_all_query_tracesy  s    z#ResponseFuture.get_all_query_tracesc                 C   s"   | j | }|js|j||d |S )N)r  r  )r  eventsr#  )r   r  r  r  r  r   r   r   r    s   
zResponseFuture._get_query_tracec                 O   sj   d}| j  | j|||f | jturd}W d   n1 s w   Y  |r3|| jg|R i | | S )ay  
        Attaches a callback function to be called when the final results arrive.

        By default, `fn` will be called with the results as the first and only
        argument.  If `*args` or `**kwargs` are supplied, they will be passed
        through as additional positional or keyword arguments to `fn`.

        If an error is hit while executing the operation, a callback attached
        here will not be called.  Use :meth:`.add_errback()` or :meth:`add_callbacks()`
        if you wish to handle that case.

        If the final result has already been seen when this method is called,
        the callback will be called immediately (before this method returns).

        Note: in the case that the result is not available when the callback is added,
        the callback is executed by IO event thread. This means that the callback
        should not block or attempt further synchronous requests, because no further
        IO will be processed until the callback returns.

        **Important**: if the callback you attach results in an exception being
        raised, **the exception will be ignored**, so please ensure your
        callback handles all error cases that you care about.

        Usage example::

            >>> session = cluster.connect("mykeyspace")

            >>> def handle_results(rows, start_time, should_log=False):
            ...     if should_log:
            ...         log.info("Total time: %f", time.time() - start_time)
            ...     ...

            >>> future = session.execute_async("SELECT * FROM users")
            >>> future.add_callback(handle_results, time.time(), should_log=True)

        FTN)rE  rK  r   ru  r   r   rJ  r   r   run_nowr   r   r   add_callback  s   %
zResponseFuture.add_callbackc                 O   sf   d}| j  | j|||f | jrd}W d   n1 sw   Y  |r1|| jg|R i | | S )z
        Like :meth:`.add_callback()`, but handles error cases.
        An Exception instance will be passed as the first positional argument
        to `fn`.
        FTN)rE  rL  r   rv  r  r   r   r   add_errback  s   zResponseFuture.add_errbackr   c                 C   s<   | j |g|R i |pi  | j|g|R i |pi  dS )a  
        A convenient combination of :meth:`.add_callback()` and
        :meth:`.add_errback()`.

        Example usage::

            >>> session = cluster.connect()
            >>> query = "SELECT * FROM mycf"
            >>> future = session.execute_async(query)

            >>> def log_results(results, level='debug'):
            ...     for row in results:
            ...         log.log(level, "Result: %s", row)

            >>> def log_error(exc, query):
            ...     log.error("Query '%s' failed: %s", query, exc)

            >>> future.add_callbacks(
            ...     callback=log_results, callback_kwargs={'level': 'info'},
            ...     errback=log_error, errback_args=(query,))

        N)r  r  )r   rL  r  r  callback_kwargsr  errback_kwargsr   r   r   r    s    zResponseFuture.add_callbacksc                 C   s8   | j  g | _g | _W d    d S 1 sw   Y  d S r   )rE  rK  rL  r   r   r   r   clear_callbacks  s   "zResponseFuture.clear_callbacksc                 C   s.   | j tu rdn| j }d| j| j|| j| jf S )Nz(no result yet)zU<ResponseFuture: query='%s' request_id=%s result=%s exception=%s coordinator_host=%s>)ru  r   r  r[  rv  r  r   r:  r   r   r   __str__  s   zResponseFuture.__str__)r   r  r   )r   Nr   N)Or   r   r   r   r  r@  ri  r  rM  r  r   r   r  rD  r  r[  r   ru  r  r  rv  r  rK  rL  rS  r  r  rF  r  rl  rr  ro  rP  r@   r  rN   rH  _continuous_paging_optionsr  rG  _warned_timeoutrL   r   r.  rO  r  rV  rT  rS  rI  r  rV  rm  rp  r  rw  rz  re  r  r  rx  rA  r\  r  r  r  r:  r  r   r   r  r  r  r  r  r  r  r  __repr__r   r   r   r   r:    s    




7

.



 6
1
r:  c                   @   r  )rs  z
    Raised when :meth:`.ResponseFuture.start_fetching_next_page()` is called and
    there are no more pages.  You can check :attr:`.ResponseFuture.has_more_pages`
    before calling to avoid this.

    .. versionadded:: 2.0.0
    Nr  r   r   r   r   rs    s    rs  c                   @   s   e Zd ZdZdd Zedd Zedd Zdd	 Zd
d Z	dd Z
dd ZeZdd Zdd Zdd Zdd Zdd Zdd Zdd ZeZd)dd Zd)d!d"Zd#d$ Zed%d& Zed'd( ZdS )*r  a  
    An iterator over the rows from a query result. Also supplies basic equality
    and indexing methods for backward-compatability. These methods materialize
    the entire result set (loading all pages), and should only be used if the
    total result size is understood. Warnings are emitted when paged results
    are materialized in this fashion.

    You can treat this as a normal iterator over rows::

        >>> from cassandra.query import SimpleStatement
        >>> statement = SimpleStatement("SELECT * FROM users", fetch_size=10)
        >>> for user_row in session.execute(statement):
        ...     process_user(user_row)

    Whenever there are no more rows in the current page, the next page will
    be fetched transparently.  However, note that it *is* possible for
    an :class:`Exception` to be raised while fetching the next page, just
    like you might see on a normal call to ``session.execute()``.
    c                 C   s0   || _ |j| _|j| _| | d | _d| _d S r  )rM  r  r  r  r  _set_current_rows
_page_iter
_list_mode)r   rM  initial_responser   r   r   r     s   

zResultSet.__init__c                 C   rN  )zQ
        True if the last response indicated more pages; False otherwise
        )rM  rm  r   r   r   r   rm    s   zResultSet.has_more_pagesc                 C   s
   | j pg S )zx
        The list of current page rows. May be empty if the result was empty,
        or this is the last page.
        )_current_rowsr   r   r   r   current_rows#  s   
zResultSet.current_rowsc                 C   s   t | S )z
        Returns all the remaining rows as a list. This is basically
        a convenient shortcut to `list(result_set)`.

        This function is not recommended for queries that return a large number of elements.
        r  r   r   r   r   r>  +  s   zResultSet.allc                 C   sB   d}| j rz| j d }W |S  ty   tt| j }Y |S w |S )a'  
        Return a single row of the results or None if empty. This is basically
        a shortcut to `result_set.current_rows[0]` and should only be used when
        you know a query returns a single row. Consider using an iterator if the
        ResultSet contains more than one row.
        Nr   )r  r>  r:  ra  )r   r#  r   r   r   rN  4  s   zResultSet.onec                 C   s    | j rt| jS t| j| _| S r   )r  ra  r  r  r   r   r   r   __iter__D  s   
zResultSet.__iter__c                 C   sd   zt | jW S  ty   | jjs| jsg | _ Y nw | jjs-|   t	| j| _|   S t | jS r   )
r:  r  StopIterationrM  rm  r  r  r  fetch_next_pagera  r   r   r   r   r:  J  s   
zResultSet.nextc                 C   s2   | j jr| j   | j  }|j| _dS g | _dS )a  
        Manually, synchronously fetch the next page. Supplied for manually retrieving pages
        and inspecting :meth:`~.current_page`. It is not necessary to call this when iterating
        through results; paging happens implicitly in iteration.
        N)rM  rm  rw  r:  r  r  r   r   r   r  a  s
   


zResultSet.fetch_next_pagec                 C   s^   t |tr|r|g| _d S g | _d S z
t| || _W d S  ty.   |r(|gng | _Y d S w r   )ra  r   r  ra  r>  r  r   r   r   r  n  s   
zResultSet._set_current_rowsc                 C   s   t | | _d | _d S r   )r  r  r  r   r   r   r   
_fetch_allx     

zResultSet._fetch_allc                 C   sB   | j rd S | jrtd| | jjrtd| |   d| _ d S )Nz.Cannot use %s when results have been iterated.zFUsing %s on paged results causes entire result set to be materialized.T)r  r  RuntimeErrorrM  rm  r   r  r  )r   operatorr   r   r   _enter_list_mode|  s   
zResultSet._enter_list_modec                 C   s   |  d | j|kS )Nzequality operator)r  r  )r   otherr   r   r   __eq__  r  zResultSet.__eq__c                 C   s&   |dkr	t dt | d | j| S )Nr   zfResultSet indexing support will be removed in 4.0. Consider using ResultSet.one() to get a single row.zindex operator)r   r  r  r  )r   r  r   r   r   __getitem__  s   

zResultSet.__getitem__c                 C   s
   t | jS r   )rS  r  r   r   r   r   __nonzero__  r  zResultSet.__nonzero__Nc                 C      | j |S )z
        Gets the last query trace from the associated future.
        See :meth:`.ResponseFuture.get_query_trace` for details.
        )rM  r  )r   max_wait_secr   r   r   r       zResultSet.get_query_tracec                 C   r  )z
        Gets all query traces from the associated future.
        See :meth:`.ResponseFuture.get_all_query_traces` for details.
        )rM  r  )r   max_wait_sec_perr   r   r   r    r  zResultSet.get_all_query_tracesc                 C   s*   z	| j j  W d S  ty   tdw )NzkAttempted to cancel paging with no active session. This is only for requests with ContinuousdPagingOptions.)rM  r  rE  r   r   r   r   r   r   cancel_continuous_paging  s
   z"ResultSet.cancel_continuous_pagingc                 C   s   | j jtttfvrtd| j jf t| j jt}|r)| j	r%| j	d dkr)td|s;t
| jdkr;tdt
| j | jd }t|trI|d S |d S )aq  
        For LWT results, returns whether the transaction was applied.

        Result is indeterminate if called on a result that was not an LWT request or on
        a :class:`.query.BatchStatement` containing LWT. In the latter case either all the batch
        succeeds or fails.

        Only valid when one of the of the internal row factories is in use.
        z/Cannot determine LWT result with row factory %sr   z	[applied]z)No LWT were present in the BatchStatementr   z4LWT result should have exactly one row. This has %d.)rM  r   r_   r`   ra   r  ra  r  r[   r  r  r  r  )r   is_batch_statementr#  r   r   r   was_applied  s   

zResultSet.was_appliedc                 C   rN  )a  
        Server paging state of the query. Can be `None` if the query was not paged.

        The driver treats paging state as opaque, but it may contain primary key data, so applications may want to
        avoid sending this to untrusted parties.
        )rM  rl  r   r   r   r   r    s   zResultSet.paging_stater   )r   r   r   r   r   r.  rm  r  r>  rN  r  r:  __next__r  r  r  r  r  r  r  __bool__r  r  r  r  r  r   r   r   r   r    s6    

	




r  )r   
__future__r   atexitbinasciir   collectionsr   collections.abcr   concurrent.futuresr   r   r   r  r	   	functoolsr
   r   r   	itertoolsr   r   r   r  loggingrp  r   r   rer0  r~   r{   r1  	threadingr   r   r   r   r  r  r   	cassandrar   r   r   r   r   r   r   r   r   cassandra.authr    r!   cassandra.connectionr"   r#   r$   r%   r&   r'   r(   r)   r*   r+   cassandra.cqltypesr,   cassandra.encoderr-   cassandra.protocolr.   r/   r0   r1   r2   r3   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   r@   rA   rB   cassandra.metadatarC   rD   rE   rF   cassandra.policiesrG   rH   rI   rJ   rK   rL   rM   rN   rO   rP   rQ   cassandra.poolrR   rS   rT   rU   rV   rW   cassandra.queryrX   rY   rZ   r[   r\   r]   r^   r_   r`   ra   rb   rc   cassandra.marshalrd   cassandra.timestampsre   cassandra.utilrf   rg   $cassandra.datastax.insights.reporterrh    cassandra.datastax.insights.utilri   cassandra.datastax.graphrj   rk   rl   rm   rn   ro   rp   rq   rr   rs   cassandra.datastax.graph.queryrt   ru   cassandra.datastaxrv   ru  cassandra.io.twistedreactorrw   r  r   ry   r   rz   r   r   r   r   r   r   r   	getLoggerr   r   conn_fns
conn_classr   r  r  r  r  r  r  r  r  r  r   r   r   r   r   r   r  r   r   r   r   registerr   r   r   r   r   r   r
  r  r   r   r   r   r0  r5  r  rw  r  r  r  r  r/  r  rC  r:  rs  r  r   r   r   r   <module>   s(  ,0\4 80





)i#4             j        .	     ?
O      M