o
    RDi,                     @  s*  d dl mZ d dlZd dlmZ d dlmZmZmZm	Z	 d dl
mZ d dlmZ d dlmZmZmZmZmZmZmZmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlm Z m!Z!m"Z"m#Z# ddl$m%Z% ddl&m'Z' erd dlm(Z( d dlm)Z) d dl*m+Z+ ee,Z-d'd#d$Z.G d%d& d&ee/ Z0dS )(    )annotationsN)deque)ALL_COMPLETEDFutureProcessPoolExecutorwaitThreadPoolExecutor)	getLogger)TYPE_CHECKINGAnyCallableDequeIterableIteratorLiteraloverload   )IterUnit)NotSupportedError)pandas)pyarrow)ArrowResultBatchDownloadMetricsJSONResultBatchResultBatch)TelemetryField)get_time_millis)	DataFrame)Table)SnowflakeCursorfirst_batch_iterIterator[tuple]unconsumed_batchesDeque[Future[Iterator[tuple]]]unfetched_batchesDeque[ResultBatch]finalCallable[[], None]prefetch_thread_numintuse_mpboolkwr   returnJIterator[dict | Exception] | Iterator[tuple | Exception] | Iterator[Table]c                 +  s     dd}|r!dfdd}dd	d
}	d fdd}
d d< nd fdd}ddd
}	d!dd}
|r| `}td | E dH  |rctd|d j  |j|	| fi  }|| |sDt|td\}}d}|rtd|  |
| 	 E dH  td|  |d7 }|soW d   n1 sw   Y  |  dS | }td t
tt|D ]}td|d j  ||j|	| fi   q| E dH  d}|r+td|  |rtd|d j  |j|	| fi  }|| | }|
|	 }td|  |E dH  td|  |d7 }|sW d   n	1 s6w   Y  |  dS )"a  Creates an iterator over some other iterators.

    Very similar to itertools.chain but we need some keywords to be propagated to
    ``_download`` functions later.

    We need this to have ResultChunks fall out of usage so that they can be garbage
    collected.

    Just like ``ResultBatch`` iterator, this might yield an ``Exception`` to allow users
    to continue iterating through the rest of the ``ResultBatch``.
    is_fetch_allFr.   r   c                        t  S N)r    r)   r3   W/var/www/Datamplify/venv/lib/python3.10/site-packages/snowflake/connector/result_set.pycreate_pool_executorA      z1result_set_iterator.<locals>.create_pool_executorbatchr   c                 S     | j S r2   )populate_datar8   r3   r3   r5   create_fetch_taskD      z.result_set_iterator.<locals>.create_fetch_taskfuture_resultc                   s   | j di  S )Nr3   create_iterr>   )r-   r3   r5   get_fetch_resultG   s   z-result_set_iterator.<locals>.get_fetch_resultN
connectionr	   c                     r1   r2   r   r3   r4   r3   r5   r6   M   r7   c                 S  r9   r2   r?   r;   r3   r3   r5   r<   P   r=   r   c                 S  s   | S r2   r3   rA   r3   r3   r5   rB   S      z,beginning to schedule result batch downloadsz%queuing download of result batch id: r   )return_whenr   z"user began consuming result batch z(user requesting to consume result batch z%user finished consuming result batch )r.   r   )r8   r   )r>   r   )r.   r	   )r>   r   )poploggerdebugidsubmitpopleftappendr   r   resultrangeminlen)r!   r#   r%   r'   r)   r+   r-   r0   r6   r<   rB   poolfuture_ibatch_iteratorr3   )r-   r)   r5   result_set_iterator)   s   














'rV   c                   @  s   e Zd ZdZd3ddZd4ddZd4ddZd4ddZd5ddZe	d6ddZ
e	d7ddZ
d8d9d dZ
d:d"d#Zd;d%d&Zd<d(d)Zd=d+d,Zd>d.d/Zd?d0d1Zd2S )@	ResultSeta  This class retrieves the results of a query with the historical strategy.

    It pre-downloads the first up to 4 ResultChunks (this doesn't include the 1st chunk
    as that is embedded in the response JSON from Snowflake) upon creating an Iterator
    on it.

    It also reports telemetry data about its ``ResultBatch``es once it's done iterating
    through them.

    Currently we do not support mixing multiple ``ResultBatch`` types and having
    different column definitions types per ``ResultBatch``.
    cursorr    result_chunks.list[JSONResultBatch] | list[ArrowResultBatch]r)   r*   r+   r,   r.   Nonec                 C  s   || _ || _|| _|| _d S r2   )batches_cursorr)   _use_mp)selfrX   rY   r)   r+   r3   r3   r5   __init__   s   
zResultSet.__init__c                 C  s   | j jdurt | j j }| j tj| |  }tjj	|v r,| j tj
|tjj	 tjj	|v rA| j tj|tjj	 dS dS )zReport all metrics totalled up.

        This includes TIME_CONSUME_LAST_RESULT, TIME_DOWNLOADING_CHUNKS and
        TIME_PARSING_CHUNKS in that order.
        N)r]   _first_chunk_timer   _log_telemetry_job_datar   TIME_CONSUME_LAST_RESULT_get_metricsr   downloadvalueTIME_DOWNLOADING_CHUNKSgetparseTIME_PARSING_CHUNKS)r_   time_consume_last_resultmetricsr3   r3   r5   _report_metrics   s$   zResultSet._report_metricsc                 C  s   |    dS )z;Used for any cleanup after the result set iterator is done.N)rm   r_   r3   r3   r5   _finish_iterating   s   zResultSet._finish_iteratingc                 C  s*   t | jd }|tkrtd| dd S )Nr   z Trying to use arrow fetching on z which is not ArrowResultChunk)typer\   r   r   )r_   	head_typer3   r3   r5   _can_create_arrow_iter   s   
z ResultSet._can_create_arrow_iterIterator[Table]c                 C  s   |    | jtjddS )zGFetches all the results as Arrow Tables, chunked by Snowflake back-end.arrow	iter_unit	structurerr   _create_iterr   
TABLE_UNITrn   r3   r3   r5   _fetch_arrow_batches   s   zResultSet._fetch_arrow_batchesforce_return_tableLiteral[False]Table | Nonec                 C     d S r2   r3   r_   r|   r3   r3   r5   _fetch_arrow_all   rD   zResultSet._fetch_arrow_allLiteral[True]r   c                 C  r   r2   r3   r   r3   r3   r5   r      rD   Fc                 C  s0   t |  }|rt|S |r| jd  S dS )z=Fetches a single Arrow Table from all of the ``ResultBatch``.r   N)listr{   paconcat_tablesr\   to_arrow)r_   r|   tablesr3   r3   r5   r      s   
Iterator[DataFrame]c                 K  s    |    | jdtjdd|S )zFetches Pandas dataframes in batches, where batch refers to Snowflake Chunk.

        Thus, the batch size (the number of rows in dataframe) is determined by
        Snowflake's back-end.
        r   ru   Nr3   rx   )r_   kwargsr3   r3   r5   _fetch_pandas_batches   s   zResultSet._fetch_pandas_batchesr   c                   sr   t ttjj  fddtD }t | jdddi}|r.tj|fddi|S | jd j	di S )	z"Fetches a single Pandas dataframe.c                   s    i | ]}| v r| |qS r3   )rF   ).0kconcat_argsr   r3   r5   
<dictcomp>   s     z/ResultSet._fetch_pandas_all.<locals>.<dictcomp>r0   Tignore_indexr   Nr3   )
r   inspect	signaturer   concat
parametersdictr   r\   	to_pandas)r_   r   concat_kwargs
dataframesr3   r   r5   _fetch_pandas_all   s   zResultSet._fetch_pandas_alldict[str, int]c                 C  s<   i }| j D ]}|j D ]\}}||d| ||< qq|S )z6Sum up all the chunks' metrics and show them together.r   )r\   _metricsitemsrh   )r_   overall_metricscnvr3   r3   r5   rd     s   
zResultSet._get_metricsr"   c                 C  s   |   S )z?Returns a new iterator through all batches with default values.)ry   rn   r3   r3   r5   __iter__  s   zResultSet.__iter__`Iterator[dict | Exception] | Iterator[tuple | Exception] | Iterator[Table] | Iterator[DataFrame]c                 K  s   | dd}| jj|d< | jd jd
i |}t }t| jdd }t|D ]\}}td|d  d|j	  q't
|||| j| jf|| jd	|S )zSet up a new iterator through all batches with first 5 chunks downloaded.

        This function is a helper function to ``__iter__`` and it was introduced for the
        cases where we need to propagate some values to later ``_download`` calls.
        r0   FrC   r   r   Nzresult batch z	 has id: )r0   r+   r3   )rF   r]   rC   r\   r@   r   	enumeraterG   rH   rI   rV   ro   r)   r^   )r_   r   r0   r!   r#   r%   numr8   r3   r3   r5   ry     s&   zResultSet._create_iterc                 C  s   d}| j D ]}||j7 }q|S )z1Returns the total rowcount of the ``ResultSet`` .r   )r\   rowcount)r_   totalpr3   r3   r5   total_row_index:  s   
zResultSet.total_row_indexN)
rX   r    rY   rZ   r)   r*   r+   r,   r.   r[   )r.   r[   )r.   rs   )r|   r}   r.   r~   )r|   r   r.   r   )F)r|   r,   r.   r~   )r.   r   )r.   r   )r.   r   )r.   r"   )r.   r   )r.   r*   )__name__
__module____qualname____doc__r`   rm   ro   rr   r{   r   r   r   r   rd   r   ry   r   r3   r3   r3   r5   rW      s$    










)rW   )r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r   r.   r/   )1
__future__r   r   collectionsr   concurrent.futuresr   r   r   r   concurrent.futures.threadr	   loggingr
   typingr   r   r   r   r   r   r   r   	constantsr   errorsr   optionsr   r   r   result_batchr   r   r   r   	telemetryr   	time_utilr   r   r   snowflake.connector.cursorr    r   rG   rV   r   rW   r3   r3   r3   r5   <module>   s*    (
l