o
    RDi"                     @   s&  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZmZm	Z	m
Z
 d dlmZmZ zd dlZW n ey=   dZY nw eeZG dd dZG dd deZG dd	 d	eZG d
d deZG dd deZG dd deZG dd deZG dd dZdd Zedkre e  dS dS )    N)quoteunquoteurljoinurlparse)
httpclientioloopc                   @      e Zd Zdd Zdd ZdS )
BrokerBasec                 O   sb   t |}|j| _|j| _|jdd  | _|j}|j}|r t|n|| _|r,t|| _d S || _d S )N   )	r   hostnamehostportpathvhostusernamepasswordr   )self
broker_url___purlr   r    r   L/var/www/Datamplify/venv/lib/python3.10/site-packages/flower/utils/broker.py__init__   s   zBrokerBase.__init__c                       t NNotImplementedErrorr   namesr   r   r   queues!      zBrokerBase.queuesN)__name__
__module____qualname__r   r    r   r   r   r   r	      s    r	   c                       s2   e Zd Zd fdd	Zdd Zedd Z  ZS )	RabbitMQNc              
      s   t  | |ptj | _| jpd| _| jpd| _| jdkr't	| jdp&dn| j| _| j
p/d| _
| jp5d| _|sOd| j
 d| j d| j d| j d	| j 
}z| | W n tye   td
| Y nw || _d S )N	localhosti8=  / guestzhttp://:@z/api/zInvalid broker api url: %s)superr   r   IOLoopinstanceio_loopr   r   r   r   r   r   validate_http_api
ValueErrorloggererrorhttp_api)r   r   r4   r/   r   	__class__r   r   r   &   s"    ,
zRabbitMQ.__init__c           
   
      s  t | jd| j }t| j}t|jpdp| j}t|jpdp"| j}t }z:z|j	|||ddddI d H }W n# t
jtjfy[ } ztd| g W  Y d }~W |  S d }~ww W |  n|  w |jdkr}t|j }	 fd	d
|	D S |  d S )Nzqueues/r(   g      ?g       @F)auth_usernameauth_passwordconnect_timeoutrequest_timeoutvalidate_certz'RabbitMQ management API call failed: %s   c                    s   g | ]
}|d   v r|qS )namer   .0xr   r   r   
<listcomp>N   s    z#RabbitMQ.queues.<locals>.<listcomp>)r   r4   r   r   r   r   r   r   AsyncHTTPClientfetchsocketr3   	HTTPErrorr2   closecodejsonloadsbodydecoderethrow)
r   r   urlapi_urlr   r   http_clientresponseeinfor   rA   r   r    :   s0   


zRabbitMQ.queuesc                 C   s&   t |}|jdvrtd|j d S )N)httphttpszInvalid http api schema: )r   schemer1   )clsr4   rN   r   r   r   r0   Q   s   
zRabbitMQ.validate_http_apir   )r"   r#   r$   r   r    classmethodr0   __classcell__r   r   r5   r   r%   %   s
    r%   c                       s8   e Zd ZdZg dZ fddZdd Zdd Z  ZS )		RedisBasez)r         	   c                    s\   t  | d | _tstd|di }|d| j| _|d| j| _|dd| _	d S )Nzredis library is requiredbroker_optionspriority_stepssepglobal_keyprefixr(   )
r,   r   redisImportErrorgetDEFAULT_PRIORITY_STEPSr_   DEFAULT_SEPr`   broker_prefix)r   r   r   kwargsr^   r5   r   r   r   \   s   zRedisBase.__init__c                 C   s4   || j vr	tddj|r|| j|f S |ddf S )NzPriority not in priority stepsz	{0}{1}{2}r(   )r_   r1   formatr`   )r   queueprir   r   r   
_q_for_prii   s   
"zRedisBase._q_for_pric                    sL   g }|D ]  fddj D }| tfdd|D d q|S )Nc                    s   g | ]}j  | qS r   )rg   rl   )r?   rk   r=   r   r   r   rB   r   s
    
z$RedisBase.queues.<locals>.<listcomp>c                 3   s    | ]	} j |V  qd S r   )rb   llenr>   r   r   r   	<genexpr>v   s    z#RedisBase.queues.<locals>.<genexpr>)r=   messages)r_   appendsum)r   r   queue_statspriority_namesr   rm   r   r    o   s   
zRedisBase.queues)	r"   r#   r$   rf   re   r   rl   r    rY   r   r   r5   r   rZ   X   s    rZ   c                       4   e Zd Z fddZdd Zdd Zdd Z  ZS )	Redisc                    sN   t  j|g|R i | | jpd| _| jpd| _| | j| _|  | _d S )Nr&   i  )r,   r   r   r   _prepare_virtual_hostr   _get_redis_clientrb   r   r   argsrh   r5   r   r   r   }   s
   zRedis.__init__c              
   C   sp   t |tjs6|r|dkrd}n|dr|dd  }zt|}W |S  ty5 } ztd| |d }~ww |S )Nr'   r   r
   z-Database is int between 0 and limit - 1, not 
isinstancenumbersIntegral
startswithintr1   r   r   excr   r   r   rx      s   

zRedis._prepare_virtual_hostc                 C   s   | j | j| j| j| jdS )N)r   r   dbr   r   )r   r   r   r   r   ro   r   r   r   _get_redis_client_args   s   zRedis._get_redis_client_argsc                 C   s   t jdi |  S )Nr   )rb   rw   r   ro   r   r   r   ry      s   zRedis._get_redis_client)r"   r#   r$   r   rx   r   ry   rY   r   r   r5   r   rw   {   s
    	rw   c                       rv   )	RedisSentinelc                    sh   t  j|g|R i | |di }| jpd| _| jpd| _| | j| _| || _| 	|| _
d S )Nr^   r&   ig  )r,   r   rd   r   r   rx   r   _prepare_master_namemaster_namery   rb   )r   r   r{   rh   r^   r5   r   r   r      s   zRedisSentinel.__init__c              
   C   sj   t |tjs3|r|dkrd}n|dr|dd  }zt|}W |S  ty2 } ztd|d }~ww |S )Nr'   r   r
   z4Database is int between 0 and limit - 1, not {vhost}r|   r   r   r   r   rx      s   


z#RedisSentinel._prepare_virtual_hostc              
   C   s2   z|d }W |S  t y } ztd|d }~ww )Nr   z+master_name is required for Sentinel broker)KeyErrorr1   )r   r^   r   r   r   r   r   r      s   

z"RedisSentinel._prepare_master_namec                 C   s@   | j |dd}tjj| j| jfgfi |}|| j}|S )Nsentinel_kwargs)r   r   )	r   rd   rb   sentinelSentinelr   r   
master_forr   )r   r^   connection_kwargsr   redis_clientr   r   r   ry      s   zRedisSentinel._get_redis_client)r"   r#   r$   r   rx   r   ry   rY   r   r   r5   r   r      s
    	r   c                       s   e Zd Z fddZ  ZS )RedisSocketc                    s6   t  j|g|R i | tjd| j | jd| _d S )Nr'   )unix_socket_pathr   )r,   r   rb   rw   r   r   rz   r5   r   r   r      s   zRedisSocket.__init__)r"   r#   r$   r   rY   r   r   r5   r   r      s    r   c                       s,   e Zd ZdZ fddZ fddZ  ZS )RedisSslz
    Redis SSL class offering connection to the broker over SSL.
    This does not currently support SSL settings through the url, only through
    the broker_use_ssl celery configuration.
    c                    s<   d|vrt d|di | _t j|g|R i | d S )Nbroker_use_sslz%rediss broker requires broker_use_ssl)r1   rd   r   r,   r   rz   r5   r   r   r      s   zRedisSsl.__init__c                    s.   t   }d|d< t| jtr|| j |S )NTssl)r,   r   r}   r   dictupdate)r   client_argsr5   r   r   r      s
   
zRedisSsl._get_redis_client_args)r"   r#   r$   __doc__r   r   rY   r   r   r5   r   r      s    r   c                   @   r   )Brokerc                 O   s   t |j}|dkrt|g|R i |S |dkr#t|g|R i |S |dkr2t|g|R i |S |dkrAt|g|R i |S |dkrPt|g|R i |S t)Namqprb   redisszredis+socketr   )r   rV   r%   rw   r   r   r   r   )rW   r   r{   rh   rV   r   r   r   __new__   s   
zBroker.__new__c                    r   r   r   r   r   r   r   r       r!   zBroker.queuesN)r"   r#   r$   r   r    r   r   r   r   r      s    r   c                     s   t tjdkrtjd nd} t tjdkrtjd nd}t tjdkr*tjd }nd}t| |d}||gI d H }|rCt| d S d S )Nr
   zamqp://   celeryr[   z'http://guest:guest@localhost:15672/api/)r4   )lensysargvr   r    print)r   
queue_namer4   brokerr    r   r   r   main   s   r   __main__)asynciorI   loggingr~   rE   r   urllib.parser   r   r   r   tornador   r   rb   rc   	getLoggerr"   r2   r	   r%   rZ   rw   r   r   r   r   r   runr   r   r   r   <module>   s4    
3#"*