o
    RDiK=                     @   s   d dl Z d dlmZ ddlmZ e eZG dd deZG dd deZ	G d	d
 d
eZ
G dd deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZdS )    N)web   )BaseApiHandlerc                   @   s   e Zd Zdd Zdd ZdS )ControlHandlerc                 C   s   |o|| j jv S )N)applicationworkersself
workername r   K/var/www/Datamplify/venv/lib/python3.10/site-packages/flower/api/control.py	is_worker   s   zControlHandler.is_workerc              	   C   sB   |D ]}z||  ddW   S  ty   Y qw td| dS )z$extracts error message from responseerrorzUnknown reasonz(Failed to extract error reason from '%s')getKeyErrorloggerr   )r	   r
   responseresr   r   r   error_reason   s   zControlHandler.error_reasonN)__name__
__module____qualname__r   r   r   r   r   r   r   
   s    r   c                   @      e Zd Zejdd ZdS )WorkerShutDownc                 C   sR   |  |stdd| dtd| | jjjd|gd | t	dd d	S )
a  
Shut down a worker

**Example request**:

.. sourcecode:: http

  POST /api/worker/shutdown/celery@worker2 HTTP/1.1
  Content-Length: 0
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 29
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Shutting down!"
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown worker
          Unknown worker ''zShutting down '%s' workershutdown)destinationzShutting down!messageN)
r   r   	HTTPErrorr   infocappcontrol	broadcastwritedictr   r   r   r   post   s
   
zWorkerShutDown.postNr   r   r   r   authenticatedr(   r   r   r   r   r          r   c                   @   r   )WorkerPoolRestartc                 C   s   |  |stdd| dtd| | jjjdddi|gdd	}|r;d
|d | v r;| t	d| dd dS t
| | d | ||}| d| d|  dS )ac  
Restart worker's pool

**Example request**:

.. sourcecode:: http

  POST /api/worker/pool/restart/celery@worker2 HTTP/1.1
  Content-Length: 0
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 56
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Restarting 'celery@worker2' worker's pool"
  }

:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: pool restart is not enabled (see CELERYD_POOL_RESTARTS)
:statuscode 404: unknown worker
        r   r   r   zRestarting '%s' worker's poolpool_restartreloadFT	argumentsr   replyokr   zRestarting 'z' worker's poolr     zFailed to restart the 'z' pool: N)r   r   r!   r   r"   r#   r$   r%   r&   r'   r   
set_statusr   )r	   r
   r   reasonr   r   r   r(   A   s   


zWorkerPoolRestart.postNr)   r   r   r   r   r,   @   r+   r,   c                   @   r   )WorkerPoolGrowc                 C      |  |stdd| d| jddtd}td|| | jjj	|d|gd	}|rCd
|d | v rC| 
td| d| d dS t| | d | ||}| 
d| d|  dS )as  
Grow worker's pool

**Example request**:

.. sourcecode:: http

  POST /api/worker/pool/grow/celery@worker2?n=3 HTTP/1.1
  Content-Length: 0
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 58
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Growing 'celery@worker2' worker's pool by 3"
  }

:query n: number of pool processes to grow, default is 1
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: failed to grow
:statuscode 404: unknown worker
        r   r   r   nr   defaulttypez"Growing '%s' worker's pool by '%s'Tr8   r1   r   r2   r   z	Growing '' worker's pool by r   r3   zFailed to grow '' worker's pool: N)r   r   r!   get_argumentintr   r"   r#   r$   	pool_growr&   r'   r   r4   r   r	   r
   r8   r   r5   r   r   r   r(   q      
! 

zWorkerPoolGrow.postNr)   r   r   r   r   r6   p   r+   r6   c                   @   r   )WorkerPoolShrinkc                 C   r7   )ay  
Shrink worker's pool

**Example request**:

.. sourcecode:: http

  POST /api/worker/pool/shrink/celery@worker2 HTTP/1.1
  Content-Length: 0
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 60
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Shrinking 'celery@worker2' worker's pool by 1"
  }

:query n: number of pool processes to shrink, default is 1
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: failed to shrink
:statuscode 404: unknown worker
        r   r   r   r8   r   r9   z$Shrinking '%s' worker's pool by '%s'Tr<   r2   r   zShrinking 'r=   r   r3   zFailed to shrink 'r>   N)r   r   r!   r?   r@   r   r"   r#   r$   pool_shrinkr&   r'   r   r4   r   rB   r   r   r   r(      rC   zWorkerPoolShrink.postNr)   r   r   r   r   rD      r+   rD   c                   @   r   )WorkerPoolAutoscalec                 C   s   |  |stdd| d| jdtd}| jdtd}td|||f | jjj	d||d	|gd
d}|rMd|d | v rM| 
td| dd dS t| | d | ||}| 
d| d|  dS )a  
Autoscale worker pool

**Example request**:

.. sourcecode:: http

  POST /api/worker/pool/autoscale/celery@worker2?min=3&max=10 HTTP/1.1
  Content-Length: 0
  Content-Type: application/x-www-form-urlencoded; charset=utf-8
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 66
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Autoscaling 'celery@worker2' worker (min=3, max=10)"
  }

:query min: minimum number of pool processes
:query max: maximum number of pool processes
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: autoscaling is not enabled (see CELERYD_AUTOSCALER)
:statuscode 404: unknown worker
        r   r   r   min)r;   maxzAutoscaling '%s' worker by '%s'	autoscale)rG   rH   Tr/   r2   r   zAutoscaling 'z' worker (min={min}, max={max})r   r3   zFailed to autoscale '
' worker: N)r   r   r!   r?   r@   r   r"   r#   r$   r%   r&   r'   r   r4   r   )r	   r
   rG   rH   r   r5   r   r   r   r(      s"   
#


zWorkerPoolAutoscale.postNr)   r   r   r   r   rF      r+   rF   c                   @   r   )WorkerQueueAddConsumerc                 C      |  |stdd| d| d}td|| | jjjdd|i|gdd}|rCd	|d
 | v rC| 	t
|d
 | d	 d dS t| | d | ||}| 	d| d| d|  dS )a  
Start consuming from a queue

**Example request**:

.. sourcecode:: http

  POST /api/worker/queue/add-consumer/celery@worker2?queue=sample-queue
  Content-Length: 0
  Content-Type: application/x-www-form-urlencoded; charset=utf-8
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 40
  Content-Type: application/json; charset=UTF-8

  {
      "message": "add consumer sample-queue"
  }

:query queue: the name of a new queue
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: failed to add consumer
:statuscode 404: unknown worker
        r   r   r   queuez#Adding consumer '%s' to worker '%s'add_consumerTr/   r2   r   r   r3   zFailed to add 'z' consumer to 'rJ   Nr   r   r!   r?   r   r"   r#   r$   r%   r&   r'   r   r4   r   r	   r
   rM   r   r5   r   r   r   r(         
!
 

 zWorkerQueueAddConsumer.postNr)   r   r   r   r   rK     r+   rK   c                   @   r   )WorkerQueueCancelConsumerc                 C   rL   )a  
Stop consuming from a queue

**Example request**:

.. sourcecode:: http

  POST /api/worker/queue/cancel-consumer/celery@worker2?queue=sample-queue
  Content-Length: 0
  Content-Type: application/x-www-form-urlencoded; charset=utf-8
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 52
  Content-Type: application/json; charset=UTF-8

  {
      "message": "no longer consuming from sample-queue"
  }

:query queue: the name of queue
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 403: failed to cancel consumer
:statuscode 404: unknown worker
        r   r   r   rM   z(Canceling consumer '%s' from worker '%s'cancel_consumerTr/   r2   r   r   r3   zFailed to cancel 'z' consumer from 'rJ   NrO   rP   r   r   r   r(   E  rQ   zWorkerQueueCancelConsumer.postNr)   r   r   r   r   rR   D  r+   rR   c                   @   r   )
TaskRevokec                 C   s\   t d| | jddtd}| jddtd}| jjj|||d | t	d| d	d
 dS )a  
Revoke a task

**Example request**:

.. sourcecode:: http

  POST /api/task/revoke/1480b55c-b8b2-462c-985e-24af3e9158f9?terminate=true
  Content-Length: 0
  Content-Type: application/x-www-form-urlencoded; charset=utf-8
  Host: localhost:5555

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 61
  Content-Type: application/json; charset=UTF-8

  {
      "message": "Revoked '1480b55c-b8b2-462c-985e-24af3e9158f9'"
  }

:query terminate: terminate the task if it is running
:query signal: name of signal to send to process if terminate (default: 'SIGTERM')
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
        zRevoking task '%s'	terminateFr9   signalSIGTERM)rU   rV   z	Revoked 'r   r   N)
r   r"   r?   boolstrr#   r$   revoker&   r'   )r	   taskidrU   rV   r   r   r   r(   z  s
    zTaskRevoke.postNr)   r   r   r   r   rT   y  r+   rT   c                   @   r   )
TaskTimoutc                 C   s  |  d}| j ddtd}| j ddtd}|| jjvr%tdd| d|dur8| |s8tdd	| dtd
||| |durG|gnd}| jj	j
|d|||d}|rod|d | v ro| t|d | d d dS t| | d | ||}| d| d dS )a  
Change soft and hard time limits for a task

**Example request**:

.. sourcecode:: http

    POST /api/task/timeout/tasks.sleep HTTP/1.1
    Content-Length: 44
    Content-Type: application/x-www-form-urlencoded; charset=utf-8
    Host: localhost:5555

    soft=30&hard=100&workername=celery%40worker1

**Example response**:

.. sourcecode:: http

    HTTP/1.1 200 OK
    Content-Length: 46
    Content-Type: application/json; charset=UTF-8

    {
        "message": "time limits set successfully"
    }

:query workername: worker name
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task/worker
        r
   hardNr9   softr   Unknown task 'r   r   z'Setting timeouts for '%s' task (%s, %s)T)r1   r]   r^   r   r2   r   r   r3   zFailed to set timeouts: ')r?   floatr#   tasksr   r!   r   r   r"   r$   
time_limitr&   r'   r   r4   r   )r	   tasknamer
   r]   r^   r   r   r5   r   r   r   r(     s*   
" 

zTaskTimout.postNr)   r   r   r   r   r\     r+   r\   c                   @   r   )TaskRateLimitc                 C   s   |  d}|  d}|| jjvrtdd| d|dur-| |s-tdd| dtd|| |dur;|gnd}| jjj	||d	|d
}|rbd|d | v rb| 
t|d | d d dS t| | d | ||}| 
d| d dS )a  
Change rate limit for a task

**Example request**:

.. sourcecode:: http

    POST /api/task/rate-limit/tasks.sleep HTTP/1.1
    Content-Length: 41
    Content-Type: application/x-www-form-urlencoded; charset=utf-8
    Host: localhost:5555

    ratelimit=200&workername=celery%40worker1

**Example response**:

.. sourcecode:: http

  HTTP/1.1 200 OK
  Content-Length: 61
  Content-Type: application/json; charset=UTF-8

  {
      "message": "new rate limit set successfully"
  }

:query workername: worker name
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task/worker
        r
   	ratelimitr   r_   r   Nr   z%Setting '%s' rate limit for '%s' taskT)r1   r   r2   r   r   r3   zFailed to set rate limit: ')r?   r#   ra   r   r!   r   r   r"   r$   
rate_limitr&   r'   r   r4   r   )r	   rc   r
   re   r   r   r5   r   r   r   r(     s&   
"
 

zTaskRateLimit.postNr)   r   r   r   r   rd     r+   rd   )loggingtornador    r   	getLoggerr   r   r   r   r,   r6   rD   rF   rK   rR   rT   r\   rd   r   r   r   r   <module>   s    
'033955(<