o
    ;Di O                     @   sb  d dl mZmZmZ d dlZd dlZd dlmZ d dlmZm	Z	m
Z
 d dlmZ d dlmZmZmZmZ d dlmZmZ d dlmZmZ d d	lmZmZmZmZmZ d2d
dZ		d3ddZd4ddZ G dd deZ!	d5ddZ"dd Z#dd Z$dd Z%dd Z&dd Z'd d! Z(d"d# Z)d$d% Z*d&d' Z+d(d) Z,d*d+ Z-d,d- Z.d.d/ Z/d0d1 Z0ee_e e_ dS )6    )absolute_importdivisionprint_functionN)OrderedDict)datetimedatetime)Decimal)izip_longest	text_typestring_typesPY3)read_source_from_argwrite_source_from_arg)skip	setheader)Tabledicts
fieldnamesiterpeekwrapc                 K   s   t | }td|||d|S )a
  Extract a table from the records of a avro file.

    The `source` argument (string or file-like or fastavro.reader) can either
    be  the path of the file, a file-like input stream or a instance from
    fastavro.reader.

    The `limit` and `skip` arguments can be used to limit the range of rows 
    to extract.

    The `sample` argument (int, optional) defines how many rows are inspected
    for discovering the field types and building a schema for the avro file 
    when the `schema` argument is not passed.

    The rows fields read from file can have scalar values like int, string,
    float, datetime, date and decimal but can also have compound types like 
    enum, :ref:`array <array_schema>`, map, union and record. 
    The fields types can also have recursive structures defined 
    in :ref:`complex schemas <complex_schema>`.

    Also types with :ref:`logical types <logical_schema>` types are read and 
    translated to coresponding python types: long timestamp-millis and 
    long timestamp-micros: datetime.datetime, int date: datetime.date, 
    bytes decimal and fixed decimal: Decimal, int time-millis and 
    long time-micros: datetime.time.

    Example usage for reading files::

        >>> # set up a Avro file to demonstrate with
        ...
        >>> schema1 = {
        ...     'doc': 'Some people records.',
        ...     'name': 'People',
        ...     'namespace': 'test',
        ...     'type': 'record',
        ...     'fields': [
        ...         {'name': 'name', 'type': 'string'},
        ...         {'name': 'friends', 'type': 'int'},
        ...         {'name': 'age', 'type': 'int'},
        ...     ]
        ... }
        ...
        >>> records1 = [
        ...     {'name': 'Bob', 'friends': 42, 'age': 33},
        ...     {'name': 'Jim', 'friends': 13, 'age': 69},
        ...     {'name': 'Joe', 'friends': 86, 'age': 17},
        ...     {'name': 'Ted', 'friends': 23, 'age': 51}
        ... ]
        ...
        >>> import fastavro
        >>> parsed_schema1 = fastavro.parse_schema(schema1)
        >>> with open('example.file1.avro', 'wb') as f1:
        ...     fastavro.writer(f1, parsed_schema1, records1)
        ...
        >>> # now demonstrate the use of fromavro()
        >>> import petl as etl
        >>> tbl1 = etl.fromavro('example.file1.avro')
        >>> tbl1
        +-------+---------+-----+
        | name  | friends | age |
        +=======+=========+=====+
        | 'Bob' |      42 |  33 |
        +-------+---------+-----+
        | 'Jim' |      13 |  69 |
        +-------+---------+-----+
        | 'Joe' |      86 |  17 |
        +-------+---------+-----+
        | 'Ted' |      23 |  51 |
        +-------+---------+-----+

    .. versionadded:: 1.4.0

    )sourcelimitskipsN )r   AvroView)r   r   r   	avro_argssource2r   r   E/var/www/Datamplify/venv/lib/python3.10/site-packages/petl/io/avro.pyfromavro   s   Jr   	   deflatec              	   K   s"   t | f|d||||d| dS )a  
    Write the table into a new avro file according to schema passed.

    This method assume that each column has values with the same type 
    for all rows of the source `table`.

    `Apache Avro`_ is a data
    serialization framework. It is used in data serialization (especially in
    Hadoop ecosystem), for dataexchange for databases (Redshift) and RPC 
    protocols (like in Kafka). It has libraries to support many languages and
    generally is faster and safer than text formats like Json, XML or CSV.

    The `target` argument is the file path for creating the avro file.
    Note that if a file already exists at the given location, it will be
    overwritten.

    The `schema` argument (dict) defines the rows field structure of the file.
    Check fastavro `documentation`_ and Avro schema `reference`_ for details.

    The `sample` argument (int, optional) defines how many rows are inspected
    for discovering the field types and building a schema for the avro file 
    when the `schema` argument is not passed.

    The `codec` argument (string, optional) sets the compression codec used to
    shrink data in the file. It can be 'null', 'deflate' (default), 'bzip2' or
    'snappy', 'zstandard', 'lz4', 'xz' (if installed)

    The `compression_level` argument (int, optional) sets the level of 
    compression to use with the specified codec (if the codec supports it)

    Additionally there are support for passing extra options in the 
    argument `**avro_args` that are fowarded directly to fastavro. Check the
    fastavro `documentation`_ for reference.

    The avro file format preserves type information, i.e., reading and writing
    is round-trippable for tables with non-string data values. However the
    conversion from Python value types to avro fields is not perfect. Use the
    `schema` argument to define proper type to the conversion.

    The following avro types are supported by the schema: null, boolean, 
    string, int, long, float, double, bytes, fixed, enum, 
    :ref:`array <array_schema>`, map, union, record, and recursive types 
    defined in :ref:`complex schemas <complex_schema>`.

    Also :ref:`logical types <logical_schema>` are supported and translated to 
    coresponding python types: long timestamp-millis, long timestamp-micros, int date, 
    bytes decimal, fixed decimal, string uuid, int time-millis, long time-micros.

    Example usage for writing files::

        >>> # set up a Avro file to demonstrate with
        >>> table2 = [['name', 'friends', 'age'],
        ...           ['Bob', 42, 33],
        ...           ['Jim', 13, 69],
        ...           ['Joe', 86, 17],
        ...           ['Ted', 23, 51]]
        ...
        >>> schema2 = {
        ...     'doc': 'Some people records.',
        ...     'name': 'People',
        ...     'namespace': 'test',
        ...     'type': 'record',
        ...     'fields': [
        ...         {'name': 'name', 'type': 'string'},
        ...         {'name': 'friends', 'type': 'int'},
        ...         {'name': 'age', 'type': 'int'},
        ...     ]
        ... }
        ...
        >>> # now demonstrate what writing with toavro()
        >>> import petl as etl
        >>> etl.toavro(table2, 'example.file2.avro', schema=schema2)
        ...
        >>> # this was what was saved above
        >>> tbl2 = etl.fromavro('example.file2.avro')
        >>> tbl2
        +-------+---------+-----+
        | name  | friends | age |
        +=======+=========+=====+
        | 'Bob' |      42 |  33 |
        +-------+---------+-----+
        | 'Jim' |      13 |  69 |
        +-------+---------+-----+
        | 'Joe' |      86 |  17 |
        +-------+---------+-----+
        | 'Ted' |      23 |  51 |
        +-------+---------+-----+

    .. versionadded:: 1.4.0

    .. _Apache Avro: https://avro.apache.org/docs/current/spec.html
    .. _reference: https://avro.apache.org/docs/current/spec.html#schemas
    .. _documentation : https://fastavro.readthedocs.io/en/latest/writer.html

    wb)targetmodeschemasamplecodeccompression_levelN_write_toavro)tabler#   r%   r&   r'   r(   r   r   r   r   toavroc   s   a
r,   c                 K   s   t | f|d||d| dS )aW  
    Append rows into a avro existing avro file or create a new one.

    The `target` argument can be either an existing avro file or the file 
    path for creating new one.

    The `schema` argument is checked against the schema of the existing file.
    So it must be the same schema as used by `toavro()` or the schema of the
    existing file.

    The `sample` argument (int, optional) defines how many rows are inspected
    for discovering the field types and building a schema for the avro file 
    when the `schema` argument is not passed.

    Additionally there are support for passing extra options in the 
    argument `**avro_args` that are fowarded directly to fastavro. Check the
    fastavro documentation for reference.

    See :meth:`petl.io.avro.toavro` method for more information and examples.

    .. versionadded:: 1.4.0

    za+b)r#   r$   r%   r&   Nr)   )r+   r#   r%   r&   r   r   r   r   
appendavro   s   
r-   c                   @   sH   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dS )r   z;Read rows from avro file with their types and logical typesc                 K   s"   || _ || _|| _|| _d | _d S N)r   r   r   r   avro_schema)selfr   r   r   r   r   r   r   __init__   s
   
zAvroView.__init__c                 C   s   | j S )z*gets the schema stored in avro file header)r/   )r0   r   r   r   get_avro_schema   s   zAvroView.get_avro_schemac                 c   sf    | j d"}| |}| |}|V  | ||D ]}|V  qW d    d S 1 s,w   Y  d S )Nrb)r   open_open_reader_decode_schema_read_rows_from)r0   source_fileavro_readerheaderrowr   r   r   __iter__   s   

"zAvroView.__iter__c                 C   s    ddl }|j|fi | j}|S )z@This could raise a error when the file is corrupt or is not avror   N)fastavroreaderr   )r0   r8   r=   r9   r   r   r   r5     s   zAvroView._open_readerc                 C   s6   |j | _| jdu rdS | jd }tdd |D }|S )z9extract the header from schema stored in avro file headerNNNfieldsc                 s   s    | ]}|d  V  qdS )nameNr   .0colr   r   r   	<genexpr>  s    z*AvroView._decode_schema.<locals>.<genexpr>)writer_schemar/   tuple)r0   r9   schema_fieldsr:   r   r   r   r6     s   

zAvroView._decode_schemac                 c   sf    d}| j d ur| j ntj}t|D ]\}}|| jk rq||kr# d S |d7 }| ||}|V  qd S )Nr      )r   sysmaxsize	enumerater   _map_row_from)r0   r9   r:   countmaximumirecordr;   r   r   r   r7     s   
zAvroView._read_rows_fromc                    s6   |du st rt  }|S t fdd|D }|S )aV  
        fastavro auto converts logical types defined in avro schema to 
        correspoding python types. E.g: 
        - avro type: long logicalType: timestamp-millis -> python datetime
        - avro type: int logicalType: date              -> python date
        - avro type: bytes logicalType: decimal         -> python Decimal
        Nc                 3   s    | ]}  |V  qd S r.   getrB   rQ   r   r   rE   0  s    z)AvroView._map_row_from.<locals>.<genexpr>)r   rG   values)r0   r:   rQ   rr   rT   r   rM   $  s
   zAvroView._map_row_fromN)__name__
__module____qualname____doc__r1   r2   r<   r5   r6   r7   rM   r   r   r   r   r      s    	r   c                 K   sX  | d u rd S |st | |\}}nt| |}trt|nt|}	t||d}
|
|x}ddlm} ddl	m
} ||}|d||||d|}d}|	D ]J}z|| |d }W qK tyw } zt|||||}tt| W Y d }~qKd }~w ty } zt|||||}tt| W Y d }~qKd }~ww |  W d    d S 1 sw   Y  d S )N)r$   r   )parse_schema)Writer)for%   r'   r(   rI   r   )_build_schema_from_values_fix_missing_headersr   r   _ordered_dict_iteratorr   r4   r=   r[   fastavro.writer\   write
ValueError_get_error_details_raise_error	TypeErrorflush)r+   r#   r$   r%   r&   r'   r(   r   table2rowstarget2target_filer[   r\   parsed_schemawriternumrQ   verrvmsgterrtmsgr   r   r   r*   4  sD   


"r*   c                 C   s>   t | |d \}}t|}t|d}t||}t|}||fS )NrI   )r   r   r    _build_schema_fields_from_values_build_schema_with)r+   r&   samplesrh   propspeekrH   schema_sourcer   r   r   r^   _  s   

r^   c                 C   s   dddd| d}|S )NrQ   outputavrozgenerated by petl)typerA   	namespacedocr@   r   )rH   r%   r   r   r   rt   i  s   rt   c                 C   sD   t  }d}t  }| D ]}t||||| d}q
dd | D }|S )NTFc                 S      g | ]}|qS r   r   rC   itemr   r   r   
<listcomp>      z4_build_schema_fields_from_values.<locals>.<listcomp>)r   _update_field_defs_fromrU   )rw   rv   previousfill_missingr@   r;   rH   r   r   r   rs   t  s   rs   c                 C   s   t | |D ]a\}}|d u r d S ||d }||d }d }	t|tr1t|||||\}
}}	nt|||\}
}|
d urG|d|
gd||< n|rR|ddgd||< |d ur\|||d < |	d urf|	||d < qd S )N_prec_propnull)rA   r{   string)r
   rS   
isinstancedict_get_definition_from_record_get_definition_from_type_of)rv   r;   r@   r   r   propvaldprevfprevfcurrtdefdcurrr   r   r   r     s&   
r   c                 C   s   d }d }t |trddd}||fS t |tr ddd}||fS t |tr.ddd}||fS t |trGt|||\}}}dd||d	}||fS t |trRd}||fS t |trct| ||\}}||fS t |t	rnd
}||fS t |t
ryd}||fS t |trd}||fS |d urd}||fS dS )Nlongztimestamp-millis)r{   logicalTypeintztime-millisr   bytesdecimal)r{   r   	precisionscalebooleandoubler   r?   )r   r   r   r   r	   _get_precision_from_decimalr   list_get_definition_from_arrayboolfloatr   )r   r   prevr   currr   r   r   r   r   r     sH   









	

r   c           	      C   sf   d }t |D ]}|d u rqt| ||\}}|d ur|}|d ur!|}q|d u r(dn|}d|d}||fS )Nr   array)r{   items)iterr   )	r   r   r   afieldr   field2curr2bfieldr   r   r   r   r     s   
r   c           	      C   sr   |d u rt  }|d u rt  }t| }t| }t||||| dd | D }d| d d|d}|||fS )Nc                 S   r~   r   r   r   r   r   r   r     r   z/_get_definition_from_record.<locals>.<listcomp>rQ   _recordrz   )r{   rA   r|   r@   )r   r   keysrU   r   )	r   r   r   r   r   rv   r;   rH   r   r   r   r   r     s   
r   c                 C   st   |d u r	d }}nt |\}}}}|d ur+|d|d}}t||t||}}t|d}||d} | ||fS )Nr   r   r      )r   r   )precision_and_scalerS   max)r   r   r   precr   _prec0scale0r   r   r   r     s   



r   c                 C   s   |   \}}}d}|D ]}|d | }qd}d| | }t|}| d }|d d }	|r1| }tttt|}
t|}|
||	|fS )Nr   
   rI   r   )as_tupler   
bit_lengthmathceillog10abs)numeric_valuesigndigitsexpnumberdigitdeltainumberbits_req	bytes_reqr   r   r   r   r   r     s   r   c                 C   sT   |du sd|vr
| S t | d\}}t|}t|}t|t|kr#|S t||}|S )z'add missing columns headers from schemaNr@      )r   r   _get_schema_header_nameslenr   )r+   r%   r&   rh   colsheaderstable3r   r   r   r_     s   
r_   c                 C   sh   t |}t|tr|t| g}n||g}t| }t| tr%d|  nd}d}	|	|||||f }
|
S )z6show last row when failed writing for throubleshootingz output: %s z/failed writing on row #%d: %s
%s
 schema: %s
%s)r   r   r   r   rU   r   lookr   )r#   rn   errrQ   r%   r   r+   exampledestprinteddetailsr   r   r   rd     s   
rd   c                 C   s(   |  d}|d u rg S dd |D }|S )Nr@   c                 S   s   g | ]}| d qS )rA   rR   )rC   fieldr   r   r   r     s    z,_get_schema_header_names.<locals>.<listcomp>rR   )r%   r@   r:   r   r   r   r     s
   
r   c                 C   sD   t  }|d }ztr| ||td W d}d}dS d}d}w )z8Works like raise Excetion(msg) from prev_exp in python3.r   z%raise ErrorType, new_message, tracebkN)rJ   exc_infor   with_tracebackexec)	ErrorTypenew_messageexinftracebkr   r   r   re   #  s   
re   c           	   
   c   s    t | }t|}dd |D }|D ],}t }t|D ]\}}z|| }W n ty0   d }Y nw |||f qt|V  qd S )Nc                 S   s   g | ]}t |qS r   )r   )rC   fr   r   r   r   4  s    z*_ordered_dict_iterator.<locals>.<listcomp>)r   nextr   rL   
IndexErrorappendr   )	r+   ithdrfldsr;   r   rP   r   vr   r   r   r`   1  s   r`   )Nr   )Nr    r!   N)Nr    )r!   N)1
__future__r   r   r   rJ   r   collectionsr   r   r   r   r   r	   petl.compatr
   r   r   r   petl.io.sourcesr   r   petl.transform.headersr   r   petl.util.baser   r   r   r   r   r   r,   r-   r   r*   r^   rt   rs   r   r   r   r   r   r   r_   rd   r   re   r`   r   r   r   r   <module>   sD   
Q

k$C
+

