
    iS                        d dl Z d dlZd dlmZmZmZmZmZmZ d dl	m
Z
 d dlmZ d dlmZmZmZmZ d dlmZmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl$m&Z' d dl(m)Z)m*Z*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1 d dl2m3Z3  ej4        e5          Z6e3 G d de#e"                      Z7de%fdZ8 G d de#e"          Z9 G d d          Z:dS )    N)Any	AwaitableCallableListOptionalUnion)PubSubHandler)DefaultCommandExecutor)DEFAULT_GRACE_PERIODDatabaseConfigInitialHealthCheckMultiDbConfig)AsyncDatabaseDatabase	Databases)AsyncFailureDetector)HealthCheckHealthCheckPolicy)Retry)BackgroundScheduler)	NoBackoff)AsyncCoreCommandsAsyncRedisModuleCommands)CircuitBreaker)State)InitialHealthCheckFailedErrorNoValidDatabaseExceptionUnhealthyDatabaseException)GeoFailoverReason)ChannelT
EncodableTKeyT)experimentalc                   p   e Zd ZdZdefdZd.dZd Zd Zd	 Z	de
fd
ZdeddfdZ	 d/dedefdZdedefdZdefdZdedefdZdefdZdefdZd Zd Zddddded geeee         f         f         d!ed"e e!         d#ed$e e         f
d%Z"d& Z#de$e%ef         fd'Z&d( Z'dedefd)Z(d*e)d+e*d,e*fd-Z+dS )0MultiDBClientz
    Client that operates on multiple logical Redis databases.
    Should be used in Client-side geographic failover database setups.
    configc           
      `   |                                 | _        |j        s|                                n|j        | _        |j        | _        |j                                        | _	        |j
        s|                                n|j
        | _        |j        |                                n|j        | _        | j                            | j                   |j        | _        |j        | _        |j        | _        | j                            t0          g           t3          | j        | j        | j        | j        |j        |j        | j        | j                  | _        d| _        t=          j                    | _         tC                      | _"        || _#        d | _$        g | _%        d | _&        d S )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)'r)   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvalue_health_check_policyr(   default_failure_detectors_failure_detectorsr+   default_failover_strategy_failover_strategyset_databasesr/   _auto_fallback_intervalr.   _event_dispatcherr*   _command_retryupdate_supported_errorsConnectionRefusedErrorr
   r,   r-   command_executorinitializedasyncioLock_hc_lockr   _bg_scheduler_config_recurring_hc_task	_hc_tasks_half_open_state_task)selfr&   s     n/Users/user/workspace/sujinbaek/cqa-test-app/venv/lib/python3.11/site-packages/redis/asyncio/multidb/client.py__init__zMultiDBClient.__init__)   s    **,, '&F((***% 	
 '-&B#&,,.. 	!
 +*F,,...) 	 '/ ,,...) 	
 	--do>>>'-'D$!'!8$2335K4LMMM 6"5o-"5$6!0!3#'#?	!
 	!
 	!
 !022"&%)"""    rM   returnc                 L   K   | j         s|                                  d {V  | S N)rD   
initializerM   s    rN   
__aenter__zMultiDBClient.__aenter__V   s8       	$//#########rP   c                 r  K   | j         r| j                                          | j        r| j                                         | j        D ]}|                                 | j                                         d {V  | j        j        r+| j        j        j        	                                 d {V  d S d S rS   )
rJ   cancelrL   rK   r8   closerC   active_databaseclientaclose)rM   hc_tasks     rN   r\   zMultiDBClient.aclose[   s      " 	-#**,,,% 	0&--///~ 	 	GNN '--/////////  0 	H'7>EEGGGGGGGGGGG	H 	HrP   c                 >   K   |                                   d {V  d S rS   r\   rM   exc_type	exc_value	tracebacks       rN   	__aexit__zMultiDBClient.__aexit__k   ,      kkmmrP   c                   K   |                                   d{V  t          j        | j                            | j        | j                            | _        d}| j        D ]N\  }}|j	        
                    | j                   |j	        j        t          j        k    r|s|| j        _        d}O|st#          d          d| _        dS )zT
        Perform initialization of databases to define their initial state.
        NFTz4Initial connection failed - no active database found)_perform_initial_health_checkrE   create_taskrH   run_recurring_asyncr5   _check_databases_healthrJ   r0   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDrC   _active_databaser   rD   )rM   is_active_db_founddatabaseweights       rN   rT   zMultiDBClient.initializen   s       00222222222 #*"522+, #
 #
 # $ 		* 		*Hf--d.TUUU %77@R7 :B%6%)"! 	*F    rP   c                     | j         S )zE
        Returns a sorted (by weight) list of all databases.
        )r0   rU   s    rN   get_databaseszMultiDBClient.get_databases   s     rP   rs   Nc                   K   d}| j         D ]\  }}||k    rd} n|st          d          |                     |           d{V  |j        j        t
          j        k    rP| j                             d          d         \  }}| j        	                    |t          j                   d{V  dS t          d          )zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r0   
ValueError_check_db_healthrk   rn   ro   rp   	get_top_nrC   set_active_databaser   MANUALr   )rM   rs   existsexisting_db_highest_weighted_dbs         rN   r}   z!MultiDBClient.set_active_database   s      "o 	 	NKh&& '  	PNOOO##H---------!W^33%)_%>%>q%A%A!%D"';;+2         F&?
 
 	
rP   Tskip_initial_health_checkc                 Z  K   |j                             dt          dt                                i           |j        r# | j        j        j        |j        fi |j         }ny|j        r[|j                            t          dt                                           | j        j                            |j                  }n | j        j        di |j         }|j	        |
                                n|j	        }t          |||j        |j                  }	 |                     |           d{V  n# t          $ r |s Y nw xY w| j                            d          d         \  }}| j                            ||j                   |                     ||           d{V  dS )	z
        Adds a new database to the database list.

        Args:
            config: DatabaseConfig object that contains the database configuration.
            skip_initial_health_check: If True, adds the database even if it is unhealthy.
        retryr   )retriesbackoff)connection_poolN)r[   rk   rt   health_check_urlry    )client_kwargsupdater   r   from_urlrI   client_class	from_pool	set_retryrk   default_circuit_breakerr   rt   r   r{   r   r0   r|   add_change_active_database)rM   r&   r   r[   rk   rs   r   highest_weights           rN   add_databasezMultiDBClient.add_database   s      	##WeAy{{.S.S.S$TUUU? 
	G7T\.7 #)#7 FF  	G&&uQ	'L'L'LMMM\.88 & 0 9  FF /T\.FF1EFFF ~% **,,, 	 =#4	
 
 
	''1111111111) 	 	 	,  	 /3o.G.G.J.J1.M+^Hho666**85HIIIIIIIIIIIs   D9 9E	E	new_databasehighest_weight_databasec                    K   |j         |j         k    rG|j        j        t          j        k    r/| j                            |t          j                   d {V  d S d S d S rS   )	rt   rk   rn   ro   rp   rC   r}   r   	AUTOMATIC)rM   r   r   s      rN   r   z%MultiDBClient._change_active_database   s       "9"@@@$*gn<<';;/9           A@<<rP   c                 "  K   | j                             |          }| j                             d          d         \  }}||k    rG|j        j        t
          j        k    r/| j                            |t          j
                   d{V  dS dS dS )z<
        Removes a database from the database list.
        ry   r   N)r0   remover|   rk   rn   ro   rp   rC   r}   r   r~   )rM   rs   rt   r   r   s        rN   remove_databasezMultiDBClient.remove_database   s       ''11.2o.G.G.J.J1.M+^ f$$#+1W^CC';;#%6%=           %$CCrP   rt   c                    K   d}| j         D ]\  }}||k    rd} n|st          d          | j                             d          d         \  }}| j                             ||           ||_        |                     ||           d{V  dS )z<
        Updates a database from the database list.
        NTrx   ry   r   )r0   rz   r|   update_weightrt   r   )rM   rs   rt   r   r   r   r   r   s           rN   update_database_weightz$MultiDBClient.update_database_weight   s       "o 	 	NKh&& '  	PNOOO.2o.G.G.J.J1.M+^%%h777 **85HIIIIIIIIIIIrP   failure_detectorc                 :    | j                             |           dS )z>
        Adds a new failure detector to the database.
        N)r:   append)rM   r   s     rN   add_failure_detectorz"MultiDBClient.add_failure_detector  s"     	&&'788888rP   healthcheckc                    K   | j         4 d{V  | j                            |           ddd          d{V  dS # 1 d{V swxY w Y   dS )z:
        Adds a new health check to the database.
        N)rG   r3   r   )rM   r   s     rN   add_health_checkzMultiDBClient.add_health_check  s       = 	4 	4 	4 	4 	4 	4 	4 	4&&{333	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4s   >
AAc                 x   K   | j         s|                                  d{V   | j        j        |i | d{V S )zB
        Executes a single command and return its result.
        N)rD   rT   rC   execute_commandrM   argsoptionss      rN   r   zMultiDBClient.execute_command  sb        	$//#########:T*:DLGLLLLLLLLLrP   c                      t          |           S )z:
        Enters into pipeline mode of the client.
        )PipelinerU   s    rN   pipelinezMultiDBClient.pipeline'  s     ~~rP   F
shard_hintvalue_from_callablewatch_delayfuncr   watchesr   r   r   c                   K   | j         s|                                  d{V   | j        j        |g|R |||d d{V S )z3
        Executes callable as transaction.
        Nr   )rD   rT   rC   execute_transaction)rM   r   r   r   r   r   s         rN   transactionzMultiDBClient.transaction-  s        	$//#########>T*>

 
 " 3#
 
 
 
 
 
 
 
 
 	
rP   c                 b   K   | j         s|                                  d{V  t          | fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        N)rD   rT   PubSub)rM   kwargss     rN   pubsubzMultiDBClient.pubsubC  sK        	$//#########d%%f%%%rP   c                 "  K   i g | _         | j        D ]K\  }}t          j        |                     |                    }||<   | j                             |           Lt          j        | j         ddi d{V }fdt          | j         |          D             }|                                D ]]\  }}t          |t                    rC|j        }t          j        |j        _        t                               d|j                   d||<   ^|S )zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        return_exceptionsTNc                 (    i | ]\  }}|         |S r   r   ).0taskresult
task_to_dbs      rN   
<dictcomp>z9MultiDBClient._check_databases_health.<locals>.<dictcomp>^  s1     
 
 
)5vJtf
 
 
rP   z%Health check failed, due to exception)exc_infoF)rK   r0   rE   rh   r{   r   gatherzipitems
isinstancer   rs   ro   OPENrk   rn   loggerdebugoriginal_exception)	rM   rs   r   r   results
db_resultsr   unhealthy_dbr   s	           @rN   rj   z%MultiDBClient._check_databases_healthN  sG     
 46
? 	( 	(KHa&t'<'<X'F'FGGD'JtN!!$''''O$OOOOOOOO
 
 
 
9<T^W9U9U
 
 

 !+ 0 0 2 2 
	1 
	1Hf&"<== 	1%-4\$*;#6    
 ,1
<(rP   c                   K   |                                   d{V }d}| j        j        t          j        k    rd|                                v}n| j        j        t          j        k    r6t          |                                          t          |          dz  k    }n0| j        j        t          j	        k    rd|                                v }|st          d| j        j                   dS )zj
        Runs initial health check and evaluate healthiness based on initial_health_check_policy.
        NTF   z:Initial health check failed. Initial health check policy: )rj   rI   initial_health_check_policyr   ALL_AVAILABLEvaluesMAJORITY_AVAILABLEsumlenONE_AVAILABLEr   )rM   r   
is_healthys      rN   rg   z+MultiDBClient._perform_initial_health_checkp  s       4466666666
<37I7WWWgnn&6&66JJL4!45 5 W^^--..W1AAJJL48J8XXX!1!11J 	/wT\Muww  	 	rP   c                 "  K   | j                             | j        |           d{V }|s2|j        j        t
          j        k    rt
          j        |j        _        |S |r0|j        j        t
          j        k    rt
          j        |j        _        |S )zO
        Runs health checks on the given database until first failure.
        N)r8   executer3   rk   rn   ro   r   rp   )rM   rs   r   s      rN   r{   zMultiDBClient._check_db_health  s      
  4<<
 
 
 
 
 
 
 

  	4%55)0 & 	4H,2gnDD%,^H"rP   rk   	old_state	new_statec                    t          j                    }|t          j        k    r3t          j        |                     |j                            | _        d S |t          j        k    rT|t          j	        k    rDt                              d|j         d           |                    t          t          |           |t          j        k    r5|t          j        k    r't                              d|j         d           d S d S d S )Nz	Database z- is unreachable. Failover has been initiated.z is reachable again.)rE   get_running_loopro   	HALF_OPENrh   r{   rs   rL   rp   r   r   warning
call_laterr   _half_open_circuitinfo)rM   rk   r   r   loops        rN   rm   z/MultiDBClient._on_circuit_state_change_callback  s     '))))))0)<%%g&677* *D& F&&9+D+DNN[G,[[[   OO02DgNNN&&9+F+FKKJG$4JJJKKKKK '&+F+FrP   )rM   r%   rQ   r%   )T),__name__
__module____qualname____doc__r   rO   rV   r\   rd   rT   r   rv   r   r}   r   boolr   r   r   floatr   r   r   r   r   r   r   r   r   r   r   r"   r   strr   r   dictr   rj   rg   r{   r   ro   rm   r   rP   rN   r%   r%   "   s        
+*} +* +* +* +*Z   
H H H   "  "  " Hy    
- 
D 
 
 
 
8 IM/J /J$/JAE/J /J /J /Jb	)	DQ	 	 	 	m    J] JE J J J J&95I 9 9 9 94+ 4 4 4 4M M M   %)$)'+
 
 

|U3	#+>%??@
 
 SM	

 "
 e_
 
 
 
,	& 	& 	& tHdN/C        D  0}     $L%L29LFML L L L L LrP   r%   rk   c                 (    t           j        | _        d S rS   )ro   r   rn   )rk   s    rN   r   r     s    %GMMMrP   c                       e Zd ZdZdefdZddZd Zd Zd	 Z	de
fd
ZdefdZddZddZddZd Zdee         fdZdS )r   zG
    Pipeline implementation for multiple logical Redis databases.
    r[   c                 "    g | _         || _        d S rS   )_command_stack_client)rM   r[   s     rN   rO   zPipeline.__init__  s     rP   rM   rQ   c                 
   K   | S rS   r   rU   s    rN   rV   zPipeline.__aenter__        rP   c                    K   |                                   d {V  | j                            |||           d {V  d S rS   )resetr   rd   r`   s       rN   rd   zPipeline.__aexit__  sX      jjlll$$Xy)DDDDDDDDDDDrP   c                 N    |                                                                  S rS   )_async_self	__await__rU   s    rN   r   zPipeline.__await__  s     !!++---rP   c                 
   K   | S rS   r   rU   s    rN   r   zPipeline._async_self  r   rP   c                 *    t          | j                  S rS   )r   r   rU   s    rN   __len__zPipeline.__len__  s    4&'''rP   c                     dS )z1Pipeline instances should always evaluate to TrueTr   rU   s    rN   __bool__zPipeline.__bool__  s    trP   Nc                    K   g | _         d S rS   )r   rU   s    rN   r   zPipeline.reset  s       rP   c                 >   K   |                                   d{V  dS )zClose the pipelineN)r   rU   s    rN   r\   zPipeline.aclose  s,      jjllrP   c                 >    | j                             ||f           | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   s      rN   pipeline_execute_commandz!Pipeline.pipeline_execute_command  s$     	""D'?333rP   c                      | j         |i |S )zAdds a command to the stack)r  rM   r   r   s      rN   r   zPipeline.execute_command  s    ,t,d=f===rP   c                 @  K   | j         j        s| j                                          d{V  	 | j         j                            t          | j                             d{V 	 |                                  d{V  S # |                                  d{V  w xY w)z0Execute all the commands in the current pipelineN)r   rD   rT   rC   execute_pipelinetupler   r   rU   s    rN   r   zPipeline.execute  s      |' 	,,))+++++++++	6GGd)**         **,,$**,,s   6B B)rM   r   rQ   r   rQ   N)rQ   r   )r   r   r   r   r%   rO   rV   rd   r   r   intr   r   r   r   r\   r  r   r   r   r   r   rP   rN   r   r     s        }       E E E. . .  ( ( ( ( ($    ! ! ! !      > > >
tCy 
 
 
 
 
 
rP   r   c                       e Zd ZdZdefdZddZddZd Ze	de
fd	            Zd
efdZd
edefdZd
efdZd
edefdZd Z	 dde
dee         fdZddddeddfdZdS )r   z2
    PubSub object for multi database client.
    r[   c                 B    || _          | j         j        j        di | dS )zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        Nr   )r   rC   r   )rM   r[   r   s      rN   rO   zPubSub.__init__  s/     ,%,66v66666rP   rQ   c                 
   K   | S rS   r   rU   s    rN   rV   zPubSub.__aenter__  r   rP   Nc                 >   K   |                                   d {V  d S rS   r_   r`   s       rN   rd   zPubSub.__aexit__
  re   rP   c                 P   K   | j         j                            d           d {V S )Nr\   r   rC   execute_pubsub_methodrU   s    rN   r\   zPubSub.aclose  s1      \2HHRRRRRRRRRrP   c                 .    | j         j        j        j        S rS   )r   rC   active_pubsub
subscribedrU   s    rN   r  zPubSub.subscribed  s    |,:EErP   r   c                 B   K    | j         j        j        dg|R   d {V S )Nr   r  rM   r   s     rN   r   zPubSub.execute_command  sP      HT\2H
 $
 
 
 
 
 
 
 
 
 	
rP   r   c                 H   K    | j         j        j        dg|R i | d{V S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscribeNr  r  s      rN   r  zPubSub.psubscribe  s`       IT\2H

 
 
#)
 
 
 
 
 
 
 
 	
rP   c                 B   K    | j         j        j        dg|R   d{V S )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscribeNr  r  s     rN   r  zPubSub.punsubscribe%  sS      
 IT\2H
!
 
 
 
 
 
 
 
 
 	
rP   c                 H   K    | j         j        j        dg|R i | d{V S )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscribeNr  r  s      rN   r  zPubSub.subscribe.  s`       IT\2H

 
 
"(
 
 
 
 
 
 
 
 	
rP   c                 B   K    | j         j        j        dg|R   d{V S )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscribeNr  r  s     rN   r  zPubSub.unsubscribe:  sS      
 IT\2H
 
 
 
 
 
 
 
 
 
 	
rP   F        ignore_subscribe_messagestimeoutc                 V   K   | j         j                            d||           d{V S )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number or None to wait indefinitely.
        get_message)r  r   Nr  )rM   r  r   s      rN   r"  zPubSub.get_messageC  sR       \2HH&? I 
 
 
 
 
 
 
 
 	
rP   g      ?)exception_handlerpoll_timeoutr$  c                V   K   | j         j                            |||            d{V S )a  Process pub/sub messages using registered callbacks.

        This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in
        redis-py, but it is a coroutine. To launch it as a separate task, use
        ``asyncio.create_task``:

            >>> task = asyncio.create_task(pubsub.run())

        To shut it down, use asyncio cancellation:

            >>> task.cancel()
            >>> await task
        )
sleep_timer#  r   N)r   rC   execute_pubsub_run)rM   r#  r$  s      rN   runz
PubSub.runS  sR      & \2EE#7HQU F 
 
 
 
 
 
 
 
 	
rP   )rQ   r   r  )Fr  )r   r   r   r   r%   rO   rV   rd   r\   propertyr   r  r!   r   r    r	   r  r  r   r  r  r   r   r"  r(  r   rP   rN   r   r     s        	7} 	7 	7 	7 	7      S S S FD F F F XF
: 
 
 
 



h 

- 

 

 

 


 
 
 
 


X 

 

 

 

 


 
 
 SV
 
)-
@H
 
 
 
& !	
 
 
 	

 

 
 
 
 
 
rP   r   );rE   loggingtypingr   r   r   r   r   r   redis.asyncio.clientr	   &redis.asyncio.multidb.command_executorr
   redis.asyncio.multidb.configr   r   r   r   redis.asyncio.multidb.databaser   r   r   &redis.asyncio.multidb.failure_detectorr   !redis.asyncio.multidb.healthcheckr   r   redis.asyncio.retryr   redis.backgroundr   redis.backoffr   redis.commandsr   r   redis.multidb.circuitr   r   ro   redis.multidb.exceptionr   r   r   redis.observability.attributesr   redis.typingr    r!   r"   redis.utilsr#   	getLoggerr   r   r%   r   r   r   r   rP   rN   <module>r<     s     B B B B B B B B B B B B B B B B . . . . . . I I I I I I            N M M M M M M M M M G G G G G G L L L L L L L L % % % % % % 0 0 0 0 0 0 # # # # # # F F F F F F F F 0 0 0 0 0 0 2 2 2 2 2 2         
 = < < < < < 3 3 3 3 3 3 3 3 3 3 $ $ $ $ $ $		8	$	$ IL IL IL IL IL,.? IL IL ILX& & & & &A A A A A'): A A AHq
 q
 q
 q
 q
 q
 q
 q
 q
 q
rP   