o
    CiL&                     @   s  d dl mZ d dlmZ d dlmZmZ d dlmZ d dl	m
Z
 d dlmZ d dlZd dlmZ d d	lmZ d(ddZG dd deZG dd deZd)ddZG dd deZd*ddZG dd deZd+ddZG dd deZd d! ZG d"d# d#eZd$d% ZG d&d' d'eZdS ),    )APIView)OAuth2Authentication)require_permissionCustomIsAuthenticated)csrf_exempt)settings)ResponseNstatusairflow_tokenFc                 C   s   t | d}dd| iS )Nforce_refreshAuthorizationzBearer r   )r   token r   +/var/www/Datamplify/Monitor/airflow_apis.pyget_headers   s   

r   c                   @   $   e Zd ZdZegZegZdd ZdS )AirflowDagRunStatusViewzK
    Proxy endpoint:
    GET /api/airflow/dags/<dag_id>/runs/?limit=10
    c              
   C   s   |j dd}tj d| }|dd}z)tj||t dd}|jdkr0tj||tdd	dd}|  t|	 |jd
W S  tj
jy\ } ztdt|itjd
W  Y d }~S d }~ww )Nlimit
   z	/ui/grid/
-run_after)r   order_byparamsheaderstimeout  Tr   r	   errorquery_paramsgetr   airflow_hostrequestsr   status_coderaise_for_statusr   json
exceptionsRequestExceptionstrr
   HTTP_502_BAD_GATEWAY)selfrequestdag_idr   airflow_urlr   responseer   r   r   r"      s8   

zAirflowDagRunStatusView.getN	__name__
__module____qualname____doc__r   authentication_classesr   permission_classesr"   r   r   r   r   r      
    r   c                   @   r   )AirflowRecentDagRunsViewzI
    Proxy endpoint:
    GET /api/airflow/dags/<dag_id>/recent-runs/
    c              
   C   s   |j d}tj d}d|i}z)tj||t dd}|jdkr,tj||tdddd}|  t|	 |jd	W S  tj
jyX } ztd
t|itjd	W  Y d }~S d }~ww )Nr.   z/ui/dags/recent_dag_runsdag_idsr   r   r   Tr   r	   r   r    )r,   r-   r.   r/   r   r0   r1   r   r   r   r"   C   s6   

zAirflowRecentDagRunsView.getNr2   r   r   r   r   r:   ;   r9   r:   c           
      C   s   |d | }t j d|  d}||d}|r||d< |r ||d< |r&||d< tj||t dd	}	|	jd
krAtj||tdddd	}	|	  |	 S )N   /api/v2/dags/z/dagRuns)r   offsetr   staterun_type   r   r   Tr   r   r#   r$   r"   r   r%   r&   r'   )
r.   r   current_pager?   r@   r   r>   urlr   r0   r   r   r   get_dag_runse   s4   
rE   c                   @   r   )AirflowDagRunsViewz2
    GET /api/airflow/dags/<dag_id>/dag-runs/
    c           
   
   C   s   z4t |jdd}t |jdd}|jd}|jd}|jdd}t||||||d	}t|W S  tyQ }	 ztd
t|	itjdW  Y d }	~	S d }	~	ww )Nr   r   pager<   r?   r@   r   z-start_date)r.   r   rC   r?   r@   r   r   r	   )	intr!   r"   rE   r   	Exceptionr*   r
   r+   )
r,   r-   r.   r   rC   r?   r@   r   datar1   r   r   r   r"      s,   
	
zAirflowDagRunsView.getNr2   r   r   r   r   rF      
    rF      c                 C   sd   t j d|  d}|d|d}tj||t dd}|jdkr*tj||tdd	dd}|  | S )
Nr=   z/dagRuns/~/taskInstancesr   )task_idr   r   rA   r   r   Tr   rB   )r.   rM   r   rD   r   r0   r   r   r   get_task_instances   s.   
rN   c                   @   r   )AirflowTaskInstancesViewzF
    GET /api/airflow/dags/<dag_id>/runs/<run_id>/task-instances/
    c              
   C   s   z'|j d}t|j dd}|stdditjdW S t|||d}t|W S  tyD } ztdt|itj	dW  Y d }~S d }~ww )NrM   r   rL   r   ztask_id is requiredr	   )r.   rM   r   )
r!   r"   rH   r   r
   HTTP_400_BAD_REQUESTrN   rI   r*   r+   )r,   r-   r.   rM   r   rJ   r1   r   r   r   r"      s*   

zAirflowTaskInstancesView.getNr2   r   r   r   r   rO      rK   rO   r<   c              	   C   sp   t j d|  d| d| d| 	}d|i}tj||t dd}|jdkr0tj||td	d
dd}|  | S )Nr=   	/dagRuns/z/taskInstances/z/logs/	map_index   r   r   Tr   rB   )r.   run_idrM   
try_numberrS   rD   r   r0   r   r   r   get_task_instance_logs   s6   
rW   c                   @   r   )AirflowTaskLogsViewzL
    GET /api/airflow/dags/<dag_id>/runs/<run_id>/tasks/<task_id>/logs/
    c           	   
   C   sz   zt |jdd}t |jdd}t|||||}t|W S  ty< } ztdt|itjdW  Y d }~S d }~ww )NrV   r<   rS   rQ   r   r	   )	rH   r!   r"   rW   r   rI   r*   r
   r+   )	r,   r-   r.   rU   rM   rV   rS   rJ   r1   r   r   r   r"     s$   

zAirflowTaskLogsView.getNr2   r   r   r   r   rX     rK   rX   c                 C   sZ   t j d|  d| d}tj|t dd}|jdkr%tj|tdddd}|  | S )	Nr=   rR   z/taskInstancesrA   r   r   r   Tr   rB   )r.   rU   rD   r0   r   r   r   get_task_instances_of_run+  s&   
rZ   c                   @   r   )AirflowTaskInstancesOfRunView=
    GET /api/airflow/dags/<dag_id>/runs/<run_id>/tasks/
    c              
   C   sP   z
t ||}t|W S  ty' } ztdt|itjdW  Y d }~S d }~ww Nr   r	   )rZ   r   rI   r*   r
   r+   )r,   r-   r.   rU   rJ   r1   r   r   r   r"   I  s   


z!AirflowTaskInstancesOfRunView.getNr2   r   r   r   r   r[   B  rK   r[   c                 C   sT   t j d|  d}tj|t dd}|jdkr"tj|tdddd}|  | S )Nr=   z/detailsrT   rY   r   Tr   rB   )r.   rD   r0   r   r   r   get_dag_detailsU  s"   
r^   c                   @   r   )AirflowDAGDetailsr\   c              
   C   sN   z	t |}t|W S  ty& } ztdt|itjdW  Y d }~S d }~ww r]   )r^   r   rI   r*   r
   r+   )r,   r-   r.   rJ   r1   r   r   r   r"   r  s   

zAirflowDAGDetails.getNr2   r   r   r   r   r_   k  rK   r_   )F)NNN)rL   )r<   rQ   )rest_framework.viewsr   &oauth2_provider.contrib.rest_frameworkr   authentication.permissionsr   r   django.views.decorators.csrfr   
Datamplifyr   rest_framework.responser   r$   rest_frameworkr
   Monitor.utilsr   r   r   r:   rE   rF   rN   rO   rW   rX   rZ   r[   r^   r_   r   r   r   r   <module>   s,    
*
*&
" 
"