o
    CihN                     @   s  d dl mZ d dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZmZmZmZmZmZmZ d dlmZmZmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dl mZ! d d	l"m#Z# d d
l$m%Z% d dl&m'Z' d dl(m(Z( d dl)m*Z* d dl+m,Z, d dl-Z-d dl.Z.d dl/Z/d dl0Z0d dl1Z1d dl2Z2d dl(m3Z3 d dl4Z5d dl6m7Z7 d dl8m9Z9m:Z: d dl;m<Z< d dl=m>Z> G dd deZG dd deZ?G dd deZ@G dd deZAG dd deZBdS )    )APIView)Responsestatus)csrf_exempt)transaction)generate_user_unique_codefile_save_1s3UUIDEncoderCustomPaginator
SSHConnectdecode_value)Create_FLowUpdate_FlowBoardServer_fileList_server_file)models)token_function)swagger_auto_schema)airflow_token)datetime)settings)utcN)timezone)Path)require_permissionCustomIsAuthenticated)method_decorator)OAuth2Authenticationc                   @   sf   e Zd ZegZegZeZe	ede
edeejdd ZeZe	ede
eddd ZdS )		FlowBoard)request_bodyzflowboard.createc                 C   sx  |j j}| j|jd}|jddr|jd }|jd }|jd }tjjj	|d}t
jjj||d r<td	d
itjdS t|d}||d< ||d< |j|d< ||d< tjtjdt|}	tj|	dd tj|	| d}
t|
d}tj||dtd W d   n1 sw   Y  | d}t |d|dd}
t
jjj!|||
d |d}td|jdtj"dS td	ditj#dS )zG
        To Create a FlowBoard and save and Create a DAG on it
        dataTraise_exception	flow_name	flow_plandrawflowid)Flow_name__exactuser_idmessageFlow name already Existsr   FLOWdag_idusernamer,   r    )exist_ok.jsonw   )indentclsNutf-8 file_url)Flow_id	Flow_nameDrawFlowr,   zSaved SucessFullyr-   Flow_Board_idSerializer Error)$userr*   serializer_classr#   is_validvalidated_dataauth_modelsUserProfileobjectsget
flow_modelr    filterexistsr   r   HTTP_406_NOT_ACCEPTABLEr   r1   ospathjoinr   
config_dirstrmakedirsopenjsondumpr   readdecoder	   createHTTP_200_OKHTTP_400_BAD_REQUEST)selfrequestr,   
serializerr&   flowr(   rA   r;   configs_dir	file_pathf	file_datar*    rc   &/var/www/Datamplify/FlowBoard/views.pypost#   s<   




zFlowBoard.postzflowboard.editc                 C   sv  |j }|j}|g}t|dr|jr||jj | j|jd}|jddr2|jd }|jd }|jd }|jd }	t	j
jj|d	}tjjj||d
j|d	 rZtdditjdS tjjj||d rotjjj||d}
n	tdditjdS |
j|d< ||d< t|
jdr|
jjn|
j}ddlm} t|
dd }|rt|dr||j|d< nd|d< ||d< tj dt| }tj !||
j d}tj !||
j d}|}t"|d}t#j$||dt%dd W d    n1 sw   Y  t&|| |
j'(dd }|	) *d}t+|d|
jdd | }tjjj|d	j,||d! t-.t/d"}td#t|d$tj0dS tdd%itj1dS )&N
created_byr"   Tr$   r*   r&   r'   r(   r)   )r+   user_id__inr-   r.   r   r*   rg   Data Flow Not Createdr0   r   )	force_strr,   r1   r9   /FlowBoard/r3   r4   r5   F)r6   r7   ensure_ascii
FlowBoard/   r8   r    Datamplify/FlowBoard/r:   )r<   r=   
updated_atzupdated SucessFullyr>   r@   )2rA   r*   hasattrrf   appendserializer_class1r#   rC   rD   rE   rF   rG   rH   rI   r    rJ   excluderK   r   r   rL   HTTP_404_NOT_FOUNDr;   r,   django.utils.encodingrj   getattrr1   r   rP   rQ   rM   rN   rO   rS   rT   rU   r   renamer=   splitrV   rW   r	   updater   nowr   rY   rZ   )r[   r\   rA   r,   accessible_user_idsr]   r*   r&   r^   r(   	Flow_dataflow_owner_idrj   user_objr_   r`   new_file_pathr#   ra   datasrc_keyrb   updated_datarc   rc   rd   putL   sX   




zFlowBoard.putN)__name__
__module____qualname__r   authentication_classesr   permission_classesr   rB   r   r   r   r   r   atomicre   r   rs   r   rc   rc   rc   rd   r       s    
$
r    c                   @   sL   e Zd ZegZegZee	 e
eddd Ze
eddd ZdS )FlowOperationflowboard.viewc                 C   s  |j }|j}|g}t|dr|jr||jj tjjj||d	 rtjjj
||d}|j}t|jdr9|jjn|j}tj dt| }	tj|	| d}
t|
d}t|}W d    n1 sew   Y  t
|j}| }td|||||jdtjd	S td
ditjd	S )Nrf   rh   r*   rk   r3   rsuccess)r-   r*   r;   r'   r(   r&   r   r-   zFlow Plan Not Found)rA   r*   rq   rf   rr   rI   r    rG   rJ   rK   rH   r;   r,   r   rP   rQ   rM   rN   rO   rS   rT   loadrequestsr=   r   r<   r   rY   ru   )r[   r\   r*   rA   r,   r|   r#   r0   r~   r_   r`   ra   dag_jsontransformationtransformations_flowrc   rc   rd   rH      s6   
zFlowOperation.getzflowboard.deletec                 C   sv  |j }|j}|g}t|dr|jr||jj tjjj||d	 rtjjj
||d}t }tj d|j }dd| i}	tj||	d}
t|jdrP|jjn|j}tj dt| }tj||j d	}tj	|rst| |jd
d }tjtjdt| d tjjj||d  tjjj|jd  tj jj|jd  t!ddit"j#dS t!ddit"j$dS )Nrf   rh   z/api/v2/dags/AuthorizationzBearer )headersr*   rk   r3   rm   rn   ro   )BucketKey)r*   r,   )	source_idr-   zDeleted Successfullyr   ri   )%rA   r*   rq   rf   rr   rI   r    rG   rJ   rK   rH   r   r   airflow_hostr;   r   deleter,   rP   rQ   rM   rN   rO   remover=   ry   r
   delete_objectAWS_STORAGE_BUCKET_NAME
mon_models
RunHistoryscheduler_modelsScheduler   r   rY   ru   )r[   r\   r*   rA   r,   r|   	flow_data	air_tokenurlr   responser~   r_   r`   r   rc   rc   rd   r      s0   

zFlowOperation.deleteN)r   r   r   r   r   r   r   r   r   r   r   r   rH   r   rc   rc   rc   rd   r      s    

r   c                   @   s0   e Zd ZegZegZeee	ddd Z
dS )	Flow_Listr   c              	   C   s*  ddl m} |j}|j}t }|j|jd}|j|j|j	}|jdd}|g}	|j
r3|	|j
 zt|}tt||j}W n ttfyT   tddidd	 Y S w tjjj|	d
d}
|rh|
j|d}
|
 }||| }|d | }|
dddddd|||  }t|||||dtjd	S )Nr   )ceilrn   searchr9   errorzInvalid pagination parametersi  r   )rg   z-updated_at)Flow_name__icontainsr*   r<   r;   
created_atrp   r,   )r#   total_pagestotal_recordspage_number	page_size)mathr   rA   r*   r   query_paramsrH   page_query_parampage_size_query_paramr   created_by_idrr   intminmax_page_size
ValueError	TypeErrorr   rI   r    rG   rJ   order_bycountvaluesr   rY   )r[   r\   r   rA   r,   	paginatorr   r   r   r|   r   r   r   offsetr#   rc   rc   rd   rH      sL   
zFlow_List.getN)r   r   r   r   r   r   r   r   r   r   rH   rc   rc   rc   rd   r      s    
r   c                   @   <   e Zd ZegZegZeZe	e
dee dd ZdS )ListoutServerfilesconnection.viewc              
   C   s(  | j |jd}|jddr|jd }|j}|j}|g}t|dr+|jr+||jj |jd }|jd }|s@t	dd	it
jd
S ztjjj||d}	tjjj||	jd}
W n   t	ddit
jd
 Y S t|
jj |
j|
jt|
j|
j}|d dkrt|d  t	ddit
jd
S |d }|pd}g }zPz|| W n ty   t	ddit
jd
 Y W S w ||}|D ]}|j }|j!}t"#|rq| $d|  r|| q|%  t	d|it
j&d
W S  t'y
 } z|%  t	ddt(| it
j)d
W  Y d }~S d }~ww t	ddit
jd
S )Nr"   Tr$   rN   rf   typeconn_idr-   connection is requiredr   rh   rg   r*   #Connection not found for this user.r      Failed to connect to serversftp_client/zPath does not exist..fileszError reading directory: r@   )*rB   r#   rC   rD   rA   r*   rq   rf   rr   r   r   rZ   conn_modelsConnectionsrG   rH   Remote_file_connectionstable_idru   r   server_typenamelowerhostnamer1   r   passwordportprintlistdirIOErrorlistdir_attrst_modefilenamestatS_ISDIRendswithcloserY   	ExceptionrQ   HTTP_500_INTERNAL_SERVER_ERROR)r[   r\   r]   rN   rA   r,   r|   r   r   
datasource
connectionssh_connectionsftpremote_pathr   entriesentrymoder   erc   rc   rd   re   (  sd   



$


(zListoutServerfiles.postN)r   r   r   r   r   r   r   r   rB   r   r   r   r   r   re   rc   rc   rc   rd   r   #      
r   c                   @   r   )Server_file_schemar   c              	   C   s  | j |jd}|jddr|jd }|jd }|j}|j}|g}t|dr/|jr/||jj |s:t	ddit
jd	S ztjjj||d
}tjjj||jd}	W n   t	ddit
jd	 Y S t|	jj |	j|	jt|	j|	j}
|
d dkrt	ddit
jd	S |
d }g }||dI}tj|dd}tj dd}|!d| |"d# }t$|}|j%}dd |& D }|||d t	d||d|	j'|dt
j(d	W  d    S 1 sw   Y  d S t	ddit
jd	S )Nr"   Tr$   rN   r   rf   r-   r   r   rh   r   r   r   r   r   r   r   
   )nrowsz:memory:)databaseremote_filez$describe SELECT * FROM 'remote_file'c                 S   s&   g | ]\}}|d  |d   dqS )column_namecolumn_type)coldtype)r   ).0_rowrc   rc   rd   
<listcomp>  s    z+Server_file_schema.post.<locals>.<listcomp>)tablescolumnsr   remote)r-   r   database_nameschemaconnection_namer*   r@   ))rB   r#   rC   rD   rA   r*   rq   rf   rr   r   r   rZ   r   r   rG   rH   r   r   ru   r   r   r   r   r   r1   r   r   r   rS   pdread_csvduckdbconnectregisterquerydfr   stemiterrowsr   rY   )r[   r\   r]   rN   r   rA   r,   r|   r   r   r   sshr   ra   r#   connr   r`   	file_namer   rc   rc   rd   re   r  sd   


$$zServer_file_schema.postN)r   r   r   r   r   r   r   r   rB   r   r   r   r   r   re   rc   rc   rc   rd   r   m  r   r   )Crest_framework.viewsr   rest_framework.responser   rest_frameworkr   django.views.decorators.csrfr   	django.dbr   Service.utilsr   r	   r
   r   r   r   r   FlowBoard.serializersr   r   r   r   r    r   rI   r   r   authenticationrE   Monitorr   Tasks_Schedulerr   authentication.utilsr   drf_yasg.utilsr   Monitor.utilsr   r   
Datamplifyr   pytzr   rM   rT   r   uuidr   r  r   pandasr   pathlibr   authentication.permissionsr   r   django.utils.decoratorsr   &oauth2_provider.contrib.rest_frameworkr   r   r   r   r   rc   rc   rc   rd   <module>   s<    $0gJTJ