o
    RDiC                     @   sZ  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ ddlmZ ddlmZ ddlmZ eeZG dd deZG dd deZG dd deZ G dd deZ!G dd deZ"G dd deZ#G dd deZ$G dd  d eZ%G d!d" d"eZ&G d#d$ d$eZ'dS )%    N)OrderedDict)datetime)states)DisabledBackend)AbortableAsyncResult)AsyncResult)web)json_decode)IOLoop)	HTTPError   )tasks)Broker   )BaseApiHandlerc                   @   sD   e Zd ZdZdd Zedd Zdd Zdd	 Zd
d Z	dd Z
dS )BaseTaskHandlerz%Y-%m-%d %H:%M:%S.%fc              
   C   s   z| j j}|rt|ni }W n ty" } ztdt||d }~ww t|ts-tdd|dg }|di }t|t	t
fsEtdd|||fS )N  zinvalid optionsargskwargszargs must be an array)requestbodyr	   
ValueErrorr   str
isinstancedictpoplisttuple)selfr   optionser   r    r!   I/var/www/Datamplify/venv/lib/python3.10/site-packages/flower/api/tasks.pyget_task_args   s   



zBaseTaskHandler.get_task_argsc                 C   s   t | jt S N)r   backendr   )resultr!   r!   r"   backend_configured+   s   z"BaseTaskHandler.backend_configuredc                 K   s   |  | d S r$   )
set_status)r   status_coder   r!   r!   r"   write_error/   s   zBaseTaskHandler.write_errorc                 C   sD   |j tjkr|| |j|jd d S |d| |ji d S )N)r&   	tracebackr&   )stater   FAILUREupdatesafe_resultr&   r+   )r   responser&   r!   r!   r"   update_response_result2   s
   z&BaseTaskHandler.update_response_resultc                 C   s   d|v rt |d | j|d< d|v rt|d |d< d|v r@|d }zt|}W n ty9   t || j}Y nw ||d< d S d S )Neta	countdownexpires)r   strptimeDATE_FORMATfloatr   )r   r   r4   r!   r!   r"   normalize_options9   s   
z!BaseTaskHandler.normalize_optionsc                 C   s,   zt | W |S  ty   t| Y S w )zreturns json encodable result)jsondumps	TypeErrorrepr)r   r&   r!   r!   r"   r/   G   s   zBaseTaskHandler.safe_resultN)__name__
__module____qualname__r6   r#   staticmethodr'   r*   r1   r8   r/   r!   r!   r!   r"   r      s    
r   c                   @   s"   e Zd Zejdd Zdd ZdS )	TaskApplyc           	   
      s   |   \}}}td||| z| jj| }W n ty. } z
tdd| d|d}~ww z| | W n tyH } ztdd|d}~ww |j	d
||d|}d	|j
i}t d| j||I dH }| | dS )a#  
Execute a task by name and wait results

**Example request**:

.. sourcecode:: http

  POST /api/task/apply/tasks.add HTTP/1.1
  Accept: application/json
  Accept-Encoding: gzip, deflate, compress
  Content-Length: 16
  Content-Type: application/json; charset=utf-8
  Host: localhost:5555

  {
      "args": [1, 2]
  }

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 71
  Content-Type: application/json; charset=UTF-8

  {
      "state": "SUCCESS",
      "task-id": "c60be250-fe52-48df-befb-ac66174076e6",
      "result": 3
  }

:query args: a list of arguments
:query kwargs: a dictionary of arguments
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task
        'Invoking a task '%s' with '%s' and '%s'  Unknown task ''Nr   Invalid optionr   r   task-idr!   )r#   loggerdebugcappr   KeyErrorr   r8   r   apply_asynctask_idr
   currentrun_in_executorwait_resultswrite	r   tasknamer   r   r   taskexcr&   r0   r!   r!   r"   postQ   s.   )


zTaskApply.postc                 C   s4   |j dd | || | |r|j|jd |S )NF)	propagater,   )getr1   r'   r.   r,   )r   r&   r0   r!   r!   r"   rQ      s
   
zTaskApply.wait_resultsN)r=   r>   r?   r   authenticatedrW   rQ   r!   r!   r!   r"   rA   P   s    
=rA   c                   @      e Zd Zejdd ZdS )TaskAsyncApplyc           	   
   C   s   |   \}}}td||| z| jj| }W n ty- } z
tdd| d|d}~ww z| | W n tyG } ztdd|d}~ww |j	d||d|}d	|j
i}| |rd|j|jd
 | | dS )ac  
Execute a task

**Example request**:

.. sourcecode:: http

  POST /api/task/async-apply/tasks.add HTTP/1.1
  Accept: application/json
  Accept-Encoding: gzip, deflate, compress
  Content-Length: 16
  Content-Type: application/json; charset=utf-8
  Host: localhost:5555

  {
      "args": [1, 2]
  }

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 71
  Content-Type: application/json; charset=UTF-8
  Date: Sun, 13 Apr 2014 15:55:00 GMT

  {
      "state": "PENDING",
      "task-id": "abc300c7-2922-4069-97b6-a635cc2ac47c"
  }

:query args: a list of arguments
:query kwargs: a dictionary of arguments
:query options: a dictionary of `apply_async` keyword arguments
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task
        rB   rC   rD   rE   Nr   rF   rG   rH   rY   r!   )r#   rI   rJ   rK   r   rL   r   r8   r   rM   rN   r'   r.   r,   rR   rS   r!   r!   r"   rW      s*   *

zTaskAsyncApply.postNr=   r>   r?   r   r[   rW   r!   r!   r!   r"   r]      s    r]   c                   @   r\   )TaskSendc                 C   sh   |   \}}}td||| | jj|f||d|}d|ji}| |r-|j|jd | 	| dS )a"  
Execute a task by name (doesn't require task sources)

**Example request**:

.. sourcecode:: http

  POST /api/task/send-task/tasks.add HTTP/1.1
  Accept: application/json
  Accept-Encoding: gzip, deflate, compress
  Content-Length: 16
  Content-Type: application/json; charset=utf-8
  Host: localhost:5555

  {
      "args": [1, 2]
  }

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 71
  Content-Type: application/json; charset=UTF-8

  {
      "state": "SUCCESS",
      "task-id": "c60be250-fe52-48df-befb-ac66174076e6"
  }

:query args: a list of arguments
:query kwargs: a dictionary of arguments
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task
        z%Invoking task '%s' with '%s' and '%s'rG   rH   rY   N)
r#   rI   rJ   rK   	send_taskrN   r'   r.   r,   rR   )r   rT   r   r   r   r&   r0   r!   r!   r"   rW      s   (

zTaskSend.postNr^   r!   r!   r!   r"   r_          r_   c                   @   r\   )
TaskResultc                 C   s   |  dd}|durt|nd}t|}| |std||jd}|r3|j|dd | || n
| r=| || | 	| dS )a  
Get a task result

**Example request**:

.. sourcecode:: http

  GET /api/task/result/c60be250-fe52-48df-befb-ac66174076e6 HTTP/1.1
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 84
  Content-Type: application/json; charset=UTF-8

  {
      "result": 3,
      "state": "SUCCESS",
      "task-id": "c60be250-fe52-48df-befb-ac66174076e6"
  }

:query timeout: how long to wait, in seconds, before the operation times out
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
        timeoutN  )rH   r,   F)rc   rX   )
get_argumentr7   r   r'   r   r,   rZ   r1   readyrR   )r   taskidrc   r&   r0   r!   r!   r"   rZ     s    
zTaskResult.getNr=   r>   r?   r   r[   rZ   r!   r!   r!   r"   rb     ra   rb   c                   @   r\   )	TaskAbortc                 C   sJ   t d| t|}| |std|  | td| dd dS )a)  
Abort a running task

**Example request**:

.. sourcecode:: http

  POST /api/task/abort/c60be250-fe52-48df-befb-ac66174076e6 HTTP/1.1
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 61
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Aborted '1480b55c-b8b2-462c-985e-24af3e9158f9'"
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
        zAborting task '%s'rd   z	Aborted 'rE   )messageN)rI   infor   r'   r   abortrR   r   )r   rg   r&   r!   r!   r"   rW   @  s   
zTaskAbort.postNr^   r!   r!   r!   r"   ri   ?  ra   ri   c                   @   r\   )GetQueueLengthsc                    sv   | j }d}|jdkr|jjr|jj}t|j jdd|| jjj	| jjj
d}||  I dH }| d|i dS )aH  
Return length of all active queues

**Example request**:

.. sourcecode:: http

  GET /api/queues/length
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 94
  Content-Type: application/json; charset=UTF-8

  {
      "active_queues": [
          {"name": "celery", "messages": 0},
          {"name": "video-queue", "messages": 5}
      ]
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
        NamqpT)include_password)http_apibroker_optionsbroker_use_sslactive_queues)application	transportr   
broker_apir   rK   
connectionas_uriconfbroker_transport_optionsrr   queuesget_active_queue_namesrR   )r   apprp   brokerr{   r!   r!   r"   rZ   i  s    
zGetQueueLengths.getNrh   r!   r!   r!   r"   rm   h  ra   rm   c                   @   r\   )	ListTasksc                 C   s2  | j }| dd}| jddtd}| dd}| dd}| dd}| d	d}| d
d}| dd}	| dd}
|o@t|}t|d}|dkrL|nd}|dkrT|nd}|dkr\|nd}g }tj|j|||	||||||
d
D ]\}}t|}|dd}|dur|j	|d< |
||f qp| t| dS )aU
  
List tasks

**Example request**:

.. sourcecode:: http

  GET /api/tasks HTTP/1.1
  Host: localhost:5555
  User-Agent: HTTPie/0.8.0

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 1109
  Content-Type: application/json; charset=UTF-8
  Etag: "b2478118015c8b825f7b88ce6b660e5449746c37"
  Server: TornadoServer/3.1.1

  {
      "e42ceb2d-8730-47b5-8b4d-8e0d2a1ef7c9": {
          "args": "[3, 4]",
          "client": null,
          "clock": 1079,
          "eta": null,
          "exception": null,
          "exchange": null,
          "expires": null,
          "failed": null,
          "kwargs": "{}",
          "name": "tasks.add",
          "received": 1398505411.107885,
          "result": "'7'",
          "retried": null,
          "retries": 0,
          "revoked": null,
          "routing_key": null,
          "runtime": 0.01610181899741292,
          "sent": null,
          "started": 1398505411.108985,
          "state": "SUCCESS",
          "succeeded": 1398505411.124802,
          "timestamp": 1398505411.124802,
          "traceback": null,
          "uuid": "e42ceb2d-8730-47b5-8b4d-8e0d2a1ef7c9",
          "worker": "celery@worker1"
      },
      "f67ea225-ae9e-42a8-90b0-5de0b24507e0": {
          "args": "[1, 2]",
          "client": null,
          "clock": 1042,
          "eta": null,
          "exception": null,
          "exchange": null,
          "expires": null,
          "failed": null,
          "kwargs": "{}",
          "name": "tasks.add",
          "received": 1398505395.327208,
          "result": "'3'",
          "retried": null,
          "retries": 0,
          "revoked": null,
          "routing_key": null,
          "runtime": 0.012884548006695695,
          "sent": null,
          "started": 1398505395.3289,
          "state": "SUCCESS",
          "succeeded": 1398505395.341089,
          "timestamp": 1398505395.341089,
          "traceback": null,
          "uuid": "f67ea225-ae9e-42a8-90b0-5de0b24507e0",
          "worker": "celery@worker1"
      }
  }

:query limit: maximum number of tasks
:query offset: skip first n tasks
:query sort_by: sort tasks by attribute (name, state, received, started)
:query workername: filter task by workername
:query taskname: filter tasks by taskname
:query state: filter tasks by state
:query received_start: filter tasks by received date (must be greater than) format %Y-%m-%d %H:%M
:query received_end: filter tasks by received date (must be less than) format %Y-%m-%d %H:%M
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
        limitNoffsetr   )defaulttype
workernamerT   r,   received_startreceived_endsort_bysearchAll)	r   r   r   r   workerr,   r   r   r   r   )rt   re   intmaxr   
iter_taskseventsas_dictr   hostnameappendrR   r   )r   r}   r   r   r   r   r,   r   r   r   r   r&   rN   rU   r!   r!   r"   rZ     s:   \


zListTasks.getNrh   r!   r!   r!   r"   r     ra   r   c                   @   r\   )ListTaskTypesc                 C   s(   | j jj }i }||d< | | dS )a  
List (seen) task types

**Example request**:

.. sourcecode:: http

  GET /api/task/types HTTP/1.1
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 44
  Content-Type: application/json; charset=UTF-8

  {
      "task-types": [
          "tasks.add",
          "tasks.sleep"
      ]
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
        z
task-typesN)rt   r   r,   
task_typesrR   )r   seen_task_typesr0   r!   r!   r"   rZ     s   zListTaskTypes.getNrh   r!   r!   r!   r"   r     ra   r   c                   @   r\   )TaskInfoc                 C   sR   t | jj|}|stdd| d| }|jdur"|jj|d< | | dS )a  
Get a task info

**Example request**:

.. sourcecode:: http

  GET /api/task/info/91396550-c228-4111-9da4-9d88cfd5ddc6 HTTP/1.1
  Accept: */*
  Accept-Encoding: gzip, deflate, compress
  Host: localhost:5555


**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 575
  Content-Type: application/json; charset=UTF-8

  {
      "args": "[2, 2]",
      "client": null,
      "clock": 25,
      "eta": null,
      "exception": null,
      "exchange": null,
      "expires": null,
      "failed": null,
      "kwargs": "{}",
      "name": "tasks.add",
      "received": 1400806241.970742,
      "result": "'4'",
      "retried": null,
      "retries": null,
      "revoked": null,
      "routing_key": null,
      "runtime": 2.0037889280356467,
      "sent": null,
      "started": 1400806241.972624,
      "state": "SUCCESS",
      "succeeded": 1400806243.975336,
      "task-id": "91396550-c228-4111-9da4-9d88cfd5ddc6",
      "timestamp": 1400806243.975336,
      "traceback": null,
      "worker": "celery@worker1"
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task
        rC   rD   rE   Nr   )	r   get_task_by_idrt   r   r   r   r   r   rR   )r   rg   rU   r0   r!   r!   r"   rZ   =  s   9
zTaskInfo.getNrh   r!   r!   r!   r"   r   <  ra   r   )(r9   loggingcollectionsr   r   celeryr   celery.backends.baser   celery.contrib.abortabler   celery.resultr   tornador   tornado.escaper	   tornado.ioloopr
   tornado.webr   utilsr   utils.brokerr    r   	getLoggerr=   rI   r   rA   r]   r_   rb   ri   rm   r   r   r   r!   r!   r!   r"   <module>   s4    
:IA41)/~'