o
    BDiH                     @   sR  d dl Z d dlmZ d dlmZ d dlZd dlmZ d dlmZ d dl	m
Z
mZmZ d dlmZmZmZ d dlmZmZmZ d d	lmZ d d
lmZ d dlmZmZ d dlmZ d dlmZm Z m!Z!m"Z" d dl#m$Z$ e$rd dl#m%Z% d dl#m&Z& d dl#m'Z' d dl#m(Z( d dl#m)Z) d dl#m*Z* d dl+m,Z,m-Z-m.Z.m/Z/ d dlm0Z0 e)de&de%f dZ1z&d dl2m3Z4 d dl5m6Z6 d dl7m8Z8 d dl9m:Z:m;Z;m<Z<m=Z= d dl>m?Z? W n e@y   edw e<e:e;fZAG d d! d!eZBd"d# ZCd$d% ZDd>d&d'ZEd(d) ZFG d*d+ d+ZGd,d- ZHd.d/ ZId0d1 ZJd2d3 ZKd4d5 ZLd6d7 ZMd8d9 ZNd:d; ZOd<d= ZPdS )?    N)Mappingwraps)isolation_scope)continue_trace)OP
SPANSTATUSSPANDATA)_check_minimum_versionIntegrationDidNotEnable)_patch_beat_apply_entry_patch_redbeat_apply_async_setup_celery_beat_signals)_now_seconds_since_epoch)ignore_logger)BAGGAGE_HEADER_NAMETransactionSource)Baggage)capture_internal_exceptionsensure_integration_enabledevent_from_exceptionreraise)TYPE_CHECKING)Any)Callable)List)Optional)TypeVar)Union)EventProcessorEventHintExcInfo)SpanF.)bound)VERSION)Task)task_has_custom)IgnoreRejectRetrySoftTimeLimitExceeded)ProducerzCelery not installedc                   @   s6   e Zd ZdZde Z			d
ddZedd	 ZdS )CeleryIntegrationceleryzauto.queue.TFNc                 C   s*   || _ || _|| _t  t  t| d S N)propagate_tracesmonitor_beat_tasksexclude_beat_tasksr   r   r   )selfr2   r3   r4    r6   `/var/www/Datamplify/venv/lib/python3.10/site-packages/sentry_sdk/integrations/celery/__init__.py__init__@   s   zCeleryIntegration.__init__c                   C   sD   t tt t  t  t  t  t  td td td d S )Nzcelery.worker.jobzcelery.app.tracezcelery.redirected)	r
   r/   CELERY_VERSION_patch_build_tracer_patch_task_apply_async_patch_celery_send_task_patch_worker_exit_patch_producer_publishr   r6   r6   r6   r7   
setup_onceO   s   
zCeleryIntegration.setup_once)TFN)__name__
__module____qualname__
identifieroriginr8   staticmethodr?   r6   r6   r6   r7   r/   <   s    

r/   c                 C   sZ   t    t }|jd ur|j|  W d    d S W d    d S 1 s&w   Y  d S r1   )r   
sentry_sdkget_current_scopespan
set_status)statusscoper6   r6   r7   _set_statuse   s   
"rL   c                 C   s   t  }|td u rd S t|d trtd d S td t| dr-t|d | jr-d S t	||j
dddd\}}t j||d	 d S )
N   abortedinternal_errorthrowsr0   F)typehandled)client_options	mechanism)hint)rF   
get_clientget_integrationr/   
isinstanceCELERY_CONTROL_FLOW_EXCEPTIONSrL   hasattrrP   r   optionscapture_event)taskexc_infoclienteventrU   r6   r6   r7   _capture_exceptionm   s   
ra   c                    s    fdd}|S )Nc                    s   t  ! | di }|d< | di }j d|d< W d    n1 s'w   Y  d|v r`t  % t|d d trPdd	td
g| d< W d    | S W d    | S 1 s[w   Y  | S )Ntagscelery_task_idextra)	task_nameargskwargsz
celery-jobr^   r   r0   r-   namefingerprint)r   
setdefaultrh   
issubclassr-   getattr)r`   rU   rb   rd   rf   rg   r]   uuidr6   r7   event_processor   s.   




z._make_event_processor.<locals>.event_processorr6   )r]   rn   rf   rg   requestro   r6   rm   r7   _make_event_processor   s   rq   c                 C   sT  |   }t  tt j|d}|r|ddt  i |dt i |r|t	}|t	}|p5|}|r`|r`t
|}t
|}	|	j|j ddd |	j|jfD |	_|	jdd	}|| |rk||t	< |d
i | |r|||d
 t	< | D ]\}
}|
dr||d
 |
< qW d   |S W d   |S 1 sw   Y  |S )z
    Updates the headers of the Celery task with the tracing information
    and eventually Sentry Crons monitoring information for beat tasks.
    )rH   z sentry-monitor-start-timestamp-sz%.9fsentry-task-enqueued-time,c                 S   s    g | ]}|d ur|dkr|qS )N r6   ).0xr6   r6   r7   
<listcomp>   s
    z/_update_celery_task_headers.<locals>.<listcomp>T)include_third_partyheaderszsentry-N)copyr   dictrF   get_isolation_scopeiter_trace_propagation_headersupdater   getr   r   from_incoming_headersentry_itemsjointhird_party_items	serializerj   items
startswith)original_headersrH   r3   updated_headersry   existing_baggagesentry_baggagecombined_baggageincomingcombinedkeyvaluer6   r6   r7   _update_celery_task_headers   sb   	







@
@@r   c                   @   s   e Zd Zdd Zdd ZdS )NoOpMgrc                 C      d S r1   r6   )r5   r6   r6   r7   	__enter__      zNoOpMgr.__enter__c                 C   r   r1   r6   )r5   exc_type	exc_value	tracebackr6   r6   r7   __exit__   r   zNoOpMgr.__exit__N)r@   rA   rB   r   r   r6   r6   r6   r7   r      s    r   c                    s   t   fdd}|S )Nc            	         s  t  t}|d u r | i |S |dpi }|d|j}|s) | i |S t| d tr6| d j	}nt
| dkrHt| d trH| d }nd}t  jdk}|s]t jtj|tjdnt }|}t|||j|d<  | i |W  d    S 1 s}w   Y  d S )Nry   zsentry-propagate-tracesr   rM   z<unknown Celery task>zcelery-beatoprh   rD   )rF   rV   rW   r/   r   popr2   rX   r(   rh   lenstrr|   _name
start_spanr   QUEUE_SUBMIT_CELERYrD   r   r   r3   )	rf   rg   integrationkwarg_headersr2   re   task_started_from_beatspan_mgrrH   fr6   r7   apply_async   s:   

$z#_wrap_task_run.<locals>.apply_asyncr   )r   r   r6   r   r7   _wrap_task_run   s   )r   c                    s$   t  tt  fdd}|S )Nc               	      sF  t  }d|_|  |tg| R i | d }t ' | d dp'i }t|tj	dt
jtjd}j|_|tj W d    n1 sHw   Y  |d u r_ | i |W  d    S tj|djt| d | d d	id
  | i |W  d    W  d    S 1 sw   Y  W d    d S 1 sw   Y  d S )Nr0      ry   zunknown celery task)r   rh   sourcerD   
celery_jobrM      )r]   rf   rg   )custom_sampling_context)r   r   clear_breadcrumbsadd_event_processorrq   r   r   r   r   QUEUE_TASK_CELERYr   TASKr/   rD   rh   rI   r   OKrF   start_transactionlist)rf   rg   rK   transactionry   r   r]   r6   r7   _inner/  sD   
"z_wrap_tracer.<locals>._inner)r   r   r/   r]   r   r   r6   r   r7   _wrap_tracer&  s   	(r   c                 C   s   t  > | jj}|r)|d}|ddkr1|dur9|tj| W d   dS W d   dS W d   dS W d   dS 1 sDw   Y  dS )z-Set "messaging.destination.name" tag for spanrouting_keyexchangert   N)r   rp   delivery_infor   set_datar	   MESSAGING_DESTINATION_NAME)r]   rH   r   r   r6   r6   r7   _set_messaging_destination_name\  s   
"r   c                    s   t t  fdd}|S )Nc                     s  zt jtjjtjd}t| d }t  j	j
d ur.dj	j
v r.t j	j
d }W d    n1 s8w   Y  |d urH|tj| t  |tjj	j W d    n1 s_w   Y  t  |tjj	j W d    n1 s{w   Y  t  |tjj jj W d    n1 sw   Y   | i |W  d    W S 1 sw   Y  W d S  ty   t }t  t| W d    n1 sw   Y  t|  Y d S w )Nr   rr   )rF   r   r   QUEUE_PROCESSrh   r/   rD   r   r   rp   ry   r   r   r   r	   !MESSAGING_MESSAGE_RECEIVE_LATENCYMESSAGING_MESSAGE_IDidMESSAGING_MESSAGE_RETRY_COUNTretriesMESSAGING_SYSTEMapp
connection	transportdriver_type	Exceptionsysr^   ra   r   )rf   rg   rH   latencyr^   r   r6   r7   r   t  sT   
	
(#z_wrap_task_call.<locals>._inner)r   r/   r   r6   r   r7   _wrap_task_calli  s   ,r   c                     s.   dd l m  m}  | j  fdd}|| _d S )Nr   c                    s`   t |dds!t|drt|t|jt|_nt||j|_d|_t| | |g|R i |S )N_sentry_is_patchedF__call__T)rl   r)   r   rQ   r   runr   r   )rh   r]   rf   rg   original_build_tracerr6   r7   sentry_build_tracer  s   
z0_patch_build_tracer.<locals>.sentry_build_tracer)celery.app.tracer   tracebuild_tracer)r   r   r6   r   r7   r:     s   
r:   c                   C   s   t tjt_d S r1   )r   r(   r   r6   r6   r6   r7   r;     s   r;   c                  C   s   ddl m}  t| j| _d S )Nr   Celery)r0   r   r   	send_taskr   r6   r6   r7   r<     s   r<   c                     s(   ddl m}  | j  fdd}|| _d S )Nr   )Workerc                     s   z/ | i |W t   t td ur t  W d    S W d    S 1 s*w   Y  S t   t td urHt  W d    w W d    w 1 sRw   Y  w r1   )r   rF   rV   rW   r/   flush)rf   rg   original_workloopr6   r7   sentry_workloop  s"   
&
z+_patch_worker_exit.<locals>.sentry_workloop)billiard.poolr   workloop)r   r   r6   r   r7   r=     s   
r=   c                     s&   t j tt  fdd} | t _d S )Nc           
   	      s  | di }t|tsi }| d}| d}| d}| d}| d}tjtj|tjdU}	|d ur<|		t
j| |dkrK|d urK|		t
j| |d urV|		t
j| t  |		t
j| jjj W d    n1 snw   Y   | g|R i |W  d    S 1 sw   Y  d S )	Nry   r]   r   r   r   r   r   rt   )r   rX   r   rF   r   r   QUEUE_PUBLISHr/   rD   r   r	   r   r   r   r   r   r   r   r   )
r5   rf   rg   kwargs_headersre   task_idr   r   r   rH   original_publishr6   r7   sentry_publish  s6   





$z/_patch_producer_publish.<locals>.sentry_publish)r.   publishr   r/   )r   r6   r   r7   r>     s   
)r>   r1   )Qr   collections.abcr   	functoolsr   rF   r   sentry_sdk.apir   sentry_sdk.constsr   r   r	   sentry_sdk.integrationsr
   r   r   #sentry_sdk.integrations.celery.beatr   r   r   $sentry_sdk.integrations.celery.utilsr   sentry_sdk.integrations.loggingr   sentry_sdk.tracingr   r   sentry_sdk.tracing_utilsr   sentry_sdk.utilsr   r   r   r   typingr   r   r   r   r   r   r   sentry_sdk._typesr    r!   r"   r#   r$   r%   r0   r'   r9   celery.app.taskr(   r   r)   celery.exceptionsr*   r+   r,   r-   kombur.   ImportErrorrY   r/   rL   ra   rq   r   r   r   r   r   r   r:   r;   r<   r=   r>   r6   r6   r6   r7   <module>   sd    
)
J
/6;