o
    EDit                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m	Z	 d dl
mZ zd dlZW n ey;   dZY nw ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  ddl!m"Z" ddl#m$Z$m%Z%m&Z& ddl'm(Z( ddl)m*Z*m+Z+ ddl,m-Z- ddl.m/Z/m0Z0 ddl1m2Z2m3Z3 ddl4m5Z5 ddl6m7Z7 ddl8m9Z9m:Z: ddl;m<Z< e =e>Z?G dd de@ZAG dd  d e@ZBG d!d" d"e@ZCdS )#    N)deque)contextmanager)platform)time)urlparse   )defines)errors)RowOrientedBlock)BlockStreamProfileInfo)BufferedSocketReader)BufferedSocketWriter)
ClientInfo)get_compressor_cls)Context)	log_block)Progress)CompressionClientPacketTypesServerPacketTypes)QueryProcessingStage)read_binary_strread_binary_uint64)read_exception)write_settingsSettingsFlags)BlockInputStreamBlockOutputStream)	threading)escape_params)write_varintread_varint)write_binary_strc                       s   e Zd Z fddZ  ZS )Packetc                    s6   d | _ d | _d | _d | _d | _d | _tt|   d S N)	typeblock	exceptionprogressprofile_infomultistring_messagesuperr#   __init__self	__class__ U/var/www/Datamplify/venv/lib/python3.10/site-packages/clickhouse_driver/connection.pyr,   )   s   zPacket.__init__)__name__
__module____qualname__r,   __classcell__r1   r1   r/   r2   r#   (   s    r#   c                       s4   e Zd Z fddZdd Zdd Zdd Z  ZS )	
ServerInfoc	           	         sH   || _ || _|| _|| _|| _|| _d | _|| _|| _t	t
|   d S r$   )nameversion_majorversion_minorversion_patchrevisiontimezonesession_timezonedisplay_nameused_revisionr+   r7   r,   )	r.   r8   r9   r:   r;   r<   r=   r?   r@   r/   r1   r2   r,   5   s   zServerInfo.__init__c                 C   s   | j p| jS r$   )r>   r=   r-   r1   r1   r2   get_timezoneC   s   zServerInfo.get_timezonec                 C   s   | j | j| jfS r$   )r9   r:   r;   r-   r1   r1   r2   version_tupleF      zServerInfo.version_tuplec                 C   sb   d| j | j| jf }d| jfd|fd| jfd| jfd| jfd| jfg}dd	d
 |D }d| S )Nz%s.%s.%sr8   versionr<   zused revisionr=   r?   z, c                 s   s     | ]\}}d  ||V  qdS )z{}={}N)format).0keyvaluer1   r1   r2   	<genexpr>V   s    z&ServerInfo.__repr__.<locals>.<genexpr>z<ServerInfo(%s)>)	r9   r:   r;   r8   r<   r@   r=   r?   join)r.   rD   itemsparamsr1   r1   r2   __repr__I   s   	zServerInfo.__repr__)r3   r4   r5   r,   rA   rB   rM   r6   r1   r1   r/   r2   r7   4   s
    r7   c                       s`  e Zd ZdZdejejejejej	ej
ejejddddddddddddddddf fdd	Zdd Zd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* ZdDd+d,Z d-d. Z!d/d0 Z"d1d2 Z#d3d4 Z$dEd6d7Z%dFd8d9Z&d:d; Z'dGd<d=Z(e)d>d? Z*d@dA Z+dBdC Z,  Z-S )H
Connectiona  
    Represents connection between client and ClickHouse server.

    :param host: host with running ClickHouse server.
    :param port: port ClickHouse server is bound to.
                 Defaults to ``9000`` if connection is not secured and
                 to ``9440`` if connection is secured.
    :param database: database connect to. Defaults to ``'default'``.
    :param user: database user. Defaults to ``'default'``.
    :param password: user's password. Defaults to ``''`` (no password).
    :param client_name: this name will appear in server logs.
                        Defaults to ``'python-driver'``.
    :param connect_timeout: timeout for establishing connection.
                            Defaults to ``10`` seconds.
    :param send_receive_timeout: timeout for sending and receiving data.
                                 Defaults to ``300`` seconds.
    :param sync_request_timeout: timeout for server ping.
                                 Defaults to ``5`` seconds.
    :param compress_block_size: size of compressed block to send.
                                Defaults to ``1048576``.
    :param compression: specifies whether or not use compression.
                        Defaults to ``False``. Possible choices:

                            * ``True`` is equivalent to ``'lz4'``.
                            * ``'lz4'``.
                            * ``'lz4hc'`` high-compression variant of
                              ``'lz4'``.
                            * ``'zstd'``.

    :param secure: establish secure connection. Defaults to ``False``.
    :param verify: specifies whether a certificate is required and whether it
                   will be validated after connection.
                   Defaults to ``True``.
    :param ssl_version: see :func:`ssl.wrap_socket` docs.
    :param ca_certs: see :func:`ssl.wrap_socket` docs.
    :param ciphers: see :func:`ssl.wrap_socket` docs.
    :param keyfile: see :func:`ssl.wrap_socket` docs.
    :param keypass: see :func:`ssl.wrap_socket` docs.
    :param certfile: see :func:`ssl.wrap_socket` docs.
    :param check_hostname: see :func:`ssl.wrap_socket` docs.
                           Defaults to ``True``.
    :param server_hostname: Hostname to use in SSL Wrapper construction.
                            Defaults to `None` which will send the passed
                            host param during SSL initialization. This param
                            may be used when connecting over an SSH tunnel
                            to correctly identify the desired server via SNI.
    :param alt_hosts: list of alternative hosts for connection.
                      Example: alt_hosts=host1:port1,host2:port2.
    :param settings_is_important: ``False`` means unknown settings will be
                                  ignored, ``True`` means that the query will
                                  fail with UNKNOWN_SETTING error.
                                  Defaults to ``False``.
    :param tcp_keepalive: enables `TCP keepalive <https://tldp.org/HOWTO/
                          TCP-Keepalive-HOWTO/overview.html>`_ on established
                          connection. If is set to ``True``` system keepalive
                          settings are used. You can also specify custom
                          keepalive setting with tuple:
                          ``(idle_time_sec, interval_sec, probes)``.
                          Defaults to ``False``.
    :param client_revision: can be used for client version downgrading.
                          Defaults to ``None``.
    :param disable_reconnect: disable automatic reconnect in case of
                              failed ``ping``, helpful when every reconnect
                              need to be caught in calling code.
                              Defaults to ``False``.
    NFTc                    s  |rt j}nt j}t||p|fg| _|r/|dD ]}td| }| j|j|j	p+|f q|| _
|| _|| _t jd | | _|| _|| _|	| _|| _|| _t|pTt jt j| _|| _|| _|| _td url|pkt }i }|d urv||d< |d ur~||d< |d ur||d< |d ur||d< |d ur||d< |d ur||d	< || _| jr|nd
| _|| _|du rd}|d
u rtj | _!d | _"d | _#ntj$| _!t%|| _"|
| _#d | _&d | _'d | _(d
| _)d | _*d | _+t, | _-d | _.d | _/d | _0t12 | _3d
| _4t5t6| 7  d S )N,zclickhouse:// ssl_versionca_certscipherskeyfilekeypasscertfileFTlz4)8r   DEFAULT_SECURE_PORTDEFAULT_PORTr   hostssplitr   appendhostnameportdatabaseuserpassword	DBMS_NAMEclient_nameconnect_timeoutsend_receive_timeoutsync_request_timeoutsettings_is_importanttcp_keepaliveminCLIENT_REVISIONclient_revisiondisable_reconnectsecure_socketverify_certcertifiwheressl_optionscheck_hostnameserver_hostnamer   DISABLEDcompressioncompressor_clscompress_block_sizeENABLEDr   socketfinfout	connectedclient_trace_contextserver_infor   contextblock_in	block_outblock_in_rawr   Lock_lockis_query_executingr+   rN   r,   )r.   hostr^   r_   r`   ra   rc   rd   re   rf   rw   ru   secureverifyrQ   rR   rS   rT   rU   rV   rr   rs   	alt_hostsrg   rh   rk   rl   default_porturlrq   r/   r1   r2   r,      s~   

zConnection.__init__c                 C   s<   | j rd| jr	dnd| j| j| j| jf nd}d|| jf S )Nz%s://%s:***@%s:%s/%sclickhouses
clickhousez(not connected)z$<Connection(dsn=%s, compression=%s)>)r|   rm   r`   r   r^   r_   ru   )r.   dsnr1   r1   r2   rM     s   zConnection.__repr__c                 C   s   d | j| jS )Nz{}:{})rE   r   r^   r-   r1   r1   r2   get_description  rC   zConnection.get_descriptionc                 C   sL   |    | js|   d S |  s$| jrtdtd |   d S d S )Nz-Connection was closed, reconnect is disabled.z$Connection was closed, reconnecting.)	check_query_executionr|   connectpingrl   r	   NetworkErrorloggerwarningr-   r1   r1   r2   force_connect  s   
zConnection.force_connectc                 C   s   i }| j r| jrtj}ntj}| j }||d< d}t||dtj	D ]Q}|\}}}	}
}d}z)t|||	}|
| j | j rN| |}|j|| jpK|d}|| |W   S  tjyt } z|}|durj|  W Y d}~q#d}~ww |dur{|td)zp
        Acts like socket.create_connection, but wraps socket with SSL
        if connection is secure.
        	cert_reqsNr   )rs   z!getaddrinfo returns an empty list)rm   rn   sslCERT_REQUIRED	CERT_NONErq   copyry   getaddrinfoSOCK_STREAM
settimeoutrd   _create_ssl_contextwrap_socketrs   r   errorclose)r.   r   r^   rq   r   errresafsocktypeproto	canonnamesasockssl_context_r1   r1   r2   _create_socket!  s<   





zConnection._create_socketc                 C   s   t jj}|dt j}t |}| j|_d|v r ||d  n|dt jkr-|	| d|v r8|
|d  d|v rA|d |_d|v rY|d}|d}|j|d ||d |S )	NrQ   rR   r   rS   rV   rT   rU   )rT   ra   )r   PurposeSERVER_AUTHgetPROTOCOL_TLS_CLIENT
SSLContextrr   load_verify_locationsr   load_default_certsset_ciphersverify_modeload_cert_chain)r.   rq   purposerD   r   rT   rU   r1   r1   r2   r   J  s(   




zConnection._create_ssl_contextc                 C   s   |  ||| _d| _||| _| _| j| j | jtjtj	d | j
r)|   t| jtj| _t| jtj| _|   |   | jj}|tjkrN|   |  | _t| j| j| _|  | _d S )NTr   )r   ry   r|   r   r^   r   re   
setsockoptIPPROTO_TCPTCP_NODELAYrh   _set_keepaliver   r   BUFFER_SIZErz   r   r{   
send_helloreceive_hellor~   r@   'DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUMsend_addendumget_block_in_streamr   r   r   r   get_block_out_streamr   )r.   r   r^   r<   r1   r1   r2   _init_connectione  s"   

zConnection._init_connectionc                 C   s   | j t jt jd t| jtsd S | j\}}}tdks tdkr@| j t jt j	| | j t jt j
| | j t jt j| d S tdkrQd}| j t j|| d S d S )Nr   linuxwin32darwin   )ry   r   
SOL_SOCKETSO_KEEPALIVE
isinstancerh   tupler   r   TCP_KEEPIDLETCP_KEEPINTVLTCP_KEEPCNT)r.   idle_time_secinterval_secprobesTCP_KEEPALIVEr1   r1   r2   r   ~  s(   


zConnection._set_keepalivec                 C   s$   |j r|j d nd}|d|| S )NrP    z({}:{}))strerrorrE   )r.   er   r^   r   r1   r1   r2   _format_connection_error  s   z#Connection._format_connection_errorc                 C   s2  | j r|   td| j| j d }tt| jD ]w}| jd \}}td|| z	| 	||W   S  t
jy^ } z|   tjd||dd | |||}t|}W Y d }~n0d }~w t
jy } z|   tjd||dd | |||}t|}W Y d }~nd }~ww | jd q|d ur|d S )Nz"Connecting. Database: %s. User: %sr   zConnecting to %s:%szFailed to connect to %s:%sT)exc_info)r|   
disconnectr   debugr_   r`   rangelenrZ   r   ry   timeoutr   r   r	   SocketTimeoutErrorr   r   rotate)r.   r   ir   r^   r   err_strr1   r1   r2   r     s@   
zConnection.connectc                 C   sL   d | _ d | _d | _d | _d | _d| _d | _d | _d | _d | _	d | _
d| _d S )NF)r   r^   ry   rz   r{   r|   r}   r~   r   r   r   r   r-   r1   r1   r2   reset_state  s   
zConnection.reset_statec              
   C   st   | j r,z	| jtj W n tjy% } ztd| W Y d}~nd}~ww | j  n| jr4| j  |   dS )zk
        Closes connection between server and client.
        Frees resources: e.g. closes socket.
        zError on socket shutdown: %sN)	r|   ry   shutdown	SHUT_RDWRr   r   r   r   r   )r.   r   r1   r1   r2   r     s   
zConnection.disconnectc                 C   s~   t tj| j t| j| j t tj| j t tj| j t | j	| j t| j
| j t| j| j t| j| j | j  d S r$   )r    r   HELLOr{   r"   rc   r   CLIENT_VERSION_MAJORCLIENT_VERSION_MINORrk   r_   r`   ra   flushr-   r1   r1   r2   r     s   zConnection.send_helloc              	   C   sD  t | j}|tjkrt| j}t | j}t | j}t | j}t| j|}d }|tjkr0t| j}d}|tj	kr<t| j}|}	|tj
krHt | j}	|tjkrct | j}
t|
D ]}t| j t| j qV|tjkrmt| j t||||	||||| _| j| j_td||||	| d S |tjkr|  | d|}|   t|)Nr   z5Connected to %s server version %s.%s.%s, revision: %szHello or Exception)r!   rz   r   r   r   ri   rk   r   &DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE*DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME$DBMS_MIN_REVISION_WITH_VERSION_PATCH8DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULESr   ,DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2r   r7   r~   r   r   r   	EXCEPTIONreceive_exceptionunexpected_packet_messager   r	   UnexpectedPacketFromServerError)r.   packet_typeserver_nameserver_version_majorserver_version_minorserver_revisionr@   server_timezoneserver_display_nameserver_version_patch
rules_size_imessager1   r1   r2   r     sf   














zConnection.receive_helloc                 C   s.   | j j}|tjkrt| jjd | j d S d S )N	quota_key)r~   r@   r   (DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEYr"   r   client_settingsr{   )r.   r<   r1   r1   r2   r   5  s   
zConnection.send_addendumc                 C   s  | j sd S | j}| |m z6ttj| j | j  t| j	}|t
jkr3|   t| j	}|t
jks%|t
jkrC| d|}t|W n- tjyM     t jtfyq } ztd|  | W Y d }~W d    dS d }~ww W d    dS 1 s}w   Y  dS )NPongzError on %s ping: %sFT)ry   rf   timeout_setterr    r   PINGr{   r   r!   rz   r   PROGRESSreceive_progressPONGr   r	   r   Errorr   EOFErrorr   r   r   )r.   r   r   msgr   r1   r1   r2   r   =  s>   









zConnection.pingc                 C   s  t  }t| j |_}|tjkr| jdd|_|S |tjkr%| 	 |_
|S |tjkr1|  |_|S |tjkr=|  |_|S |tjkrI|  |_|S |tjkrU|  |_|S |tjkrh| jdd|_t|j |S |tjkrsd| _	 |S |tjkr| ||_|S |tjkr|  |_|S |tjkr|  |_|S |tjkr| jdd|_|S |tjkrt| j}|rt d| || j!_"|S d#|| $ }| %  t&'|)NT)may_be_use_numpyF)may_be_compressedzServer timezone changed to %sz Unknown packet {} from server {})(r#   r!   rz   r%   r   DATAreceive_datar&   r   r   r'   r  r  r(   PROFILE_INFOreceive_profile_infor)   TOTALSEXTREMESLOGr   END_OF_STREAMr   TABLE_COLUMNSreceive_multistring_messager*   
PART_UUIDSREAD_TASK_REQUESTPROFILE_EVENTSTIMEZONE_UPDATEr   r   infor~   r>   rE   r   r   r	   UnknownPacketFromServerError)r.   packetr   r=   r   r1   r1   r2   receive_packet^  sj   
4

1

.

+

(

%

!








	
zConnection.receive_packetc                 C   s.   | j rddlm} || j| jS t| j| jS )Nr   )CompressedBlockInputStream)ru   streams.compressedr  rz   r   r   )r.   r  r1   r1   r2   r     s   zConnection.get_block_in_streamc                 C   s6   | j rddlm} || j| j| j| jS t| j| jS )Nr   )CompressedBlockOutputStream)ru   r  r   rv   rw   r{   r   r   )r.   r   r1   r1   r2   r     s   zConnection.get_block_out_streamc                 C   sD   | j j}|tjkrt| j |r| jn| j}|sdnd }|j|dS )NF)	use_numpy)	r~   r@   r   'DBMS_MIN_REVISION_WITH_TEMPORARY_TABLESr   rz   r   r   read)r.   r  r
  r<   readerr!  r1   r1   r2   r    s   

zConnection.receive_datac                 C   s
   t | jS r$   )r   rz   r-   r1   r1   r2   r     s   
zConnection.receive_exceptionc                 C   s   t  }|| j| j |S r$   )r   r#  r~   rz   )r.   r(   r1   r1   r2   r    s   zConnection.receive_progressc                 C   s   t  }|| j |S r$   )r   r#  rz   )r.   r)   r1   r1   r2   r    s   zConnection.receive_profile_infoc                    s    t |} fddt|D S )Nc                    s   g | ]}t  jqS r1   )r   rz   )rF   r   r-   r1   r2   
<listcomp>      z:Connection.receive_multistring_message.<locals>.<listcomp>)r   strings_in_messager   )r.   r   numr1   r-   r2   r    s   
z&Connection.receive_multistring_messager   c                 C   sV   t  }ttj| j | jj}|tjkrt	|| j | j
| td|t  |  d S )NzBlock "%s" send time: %f)r   r    r   r  r{   r~   r@   r   r"  r"   r   writer   r   )r.   r&   
table_namestartr<   r1   r1   r2   	send_data  s   
zConnection.send_datac           	      C   s6  | j s|   ttj| j t|pd| j | jj}|t	j
kr5t| j| j| jd}tjj|_||| j |t	jk}d}| jrD|tjO }t| jj| j|| |t	jkrYtd| j ttj| j t| j| j t|| j |t	jkr| jjd rt|p|i | jdd}ni }t|| jdtj  t!"d| | j#  d S )Nr   )rk   r   server_side_paramsT)
for_serverz	Query: %s)$r|   r   r    r   QUERYr{   r"   r~   r@   r   "DBMS_MIN_REVISION_WITH_CLIENT_INFOr   rc   r   rk   	QueryKindINITIAL_QUERY
query_kindr)  5DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGSrg   r   	IMPORTANTr   settings)DBMS_MIN_REVISION_WITH_INTERSERVER_SECRETr   COMPLETEru   )DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERSr   r   CUSTOMr   r   r   )	r.   queryquery_idrL   r<   client_infosettings_as_stringssettings_flagsescapedr1   r1   r2   
send_query  sF   





zConnection.send_queryc                 C   s   t tj| j | j  d S r$   )r    r   CANCELr{   r   r-   r1   r1   r2   send_cancel  s   zConnection.send_cancelc                    s   |pg D ]G}|d st d|d |d  t}| jjd r9ddlm} dd	 |d D } fd
d	|D  |}||d  |d}| j||d d q| t  d S )N	structurezEmpty table "{}" structurer8   datar!  r   )NumpyColumnOrientedBlockc                 S   s   g | ]}|d  qS )r   r1   )rF   xr1   r1   r2   r%    s    z3Connection.send_external_tables.<locals>.<listcomp>c                    s   g | ]} | j qS r1   )values)rF   columnrE  r1   r2   r%    r&  )types_check)r*  )
ValueErrorrE   r
   r   r   numpy.blockrF  r,  )r.   tablesrK  table	block_clsrF  columnsr&   r1   rJ  r2   send_external_tables  s"   
zConnection.send_external_tablesc                 c   s.    | j  }| j | d V  | j | d S r$   )ry   
gettimeoutr   )r.   new_timeoutold_timeoutr1   r1   r2   r  !  s
   
zConnection.timeout_setterc                 C   s   t |}d|  ||S )Nz6Unexpected packet from server {} (expected {}, got {}))r   to_strrE   r   )r.   expectedr   r1   r1   r2   r   *  s   
z$Connection.unexpected_packet_messagec                 C   s0   | j jdd | jrt d| _| j   d S )NF)blockingT)r   acquirer   r	   PartiallyConsumedQueryErrorreleaser-   r1   r1   r2   r   2  s
   z Connection.check_query_execution)TF)r   )NN)F).r3   r4   r5   __doc__r   DEFAULT_DATABASEDEFAULT_USERDEFAULT_PASSWORDCLIENT_NAME DBMS_DEFAULT_CONNECT_TIMEOUT_SECDBMS_DEFAULT_TIMEOUT_SEC%DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SECDEFAULT_COMPRESS_BLOCK_SIZEr,   rM   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r   r   r  r   r  r  r  r,  rA  rC  rR  r   r  r   r   r6   r1   r1   r/   r2   rN   Z   sh    Di)%;!<



0

rN   )Dloggingry   r   collectionsr   
contextlibr   sysr   r   urllib.parser   ro   ImportErrorr   r   r	   r&   r
   blockstreamprofileinfor   bufferedreaderr   bufferedwriterr   
clientinfor   ru   r   r   r   logr   r(   r   protocolr   r   r   queryprocessingstager   r$  r   r   readhelpersr   settings.writerr   r   streams.nativer   r   util.compatr   util.escaper   varintr    r!   writerr"   	getLoggerr3   r   objectr#   r7   rN   r1   r1   r1   r2   <module>   sL    
&