celery_redis_sentinel.redis_sentinel module

class celery_redis_sentinel.redis_sentinel.CelerySentinelConnectionPool(service_name, sentinel_manager, **kwargs)[source]

Bases: redis.sentinel.SentinelConnectionPool

Redis Sentinel connection pool to be exclusively used with celery broker.

Why?

The reason we need this is because celery broker needs to completely loose connection with redis node in order to detect the connection failure hence release the sockets which it polls for redis responses. If the connection failure is not detected, celery will continue polling those sockets even though it will never receive any responses. That is precisely what happens when trying to use regular SentinelConnectionPool with celery. That pool does a too-good of a job of failing over. What happens is that it would fail in celery callback trying to read the socket response (e.g. in Channel._brpop_read()) which will disconnect the connection. Then on the next iteration of the celery event-loop (e.g. in Channel._brpop_start()), the connection pool will find a new master and connect to it instead. That sounds like expected behavior except celery will not be aware of the change and hence continue polling on the previous socket which at that point will never receive any responses. The solution for this is to cripple the connection pool by not allowing it to switch masters. This will force it to keep trying to connect to the same master during failover which will obviously raise ConnectionError. That is good because in that case celery will fail at the start of the event-loop iteration (e.g. in Channel._brpop_start()) which celery picks up and will cause it to start reconnection logic to establish connection with broker again. When it does that a new Transport is instantiated with new Channel which will create a new connection pool hence this crippled connection pool will not have any further impact on celery. It is just merely crippled in order for celery to notice connection errors with redis sentinel master node.

get_master_address()[source]

Crippled implementation of getting master address which only finds master address once after which point it returns same master address.

Please refer to CelerySentinelConnectionPool for explanation why this is necessary.

class celery_redis_sentinel.redis_sentinel.EnsuredRedisMixin[source]

Bases: object

Mixin to be used for Redis or its subclasses which uses ensure_redis_call() that each command is executed with retry logic.

execute_command(*args, **options)[source]

Same as super implementation except its wrapped with ensure_redis_call()

class celery_redis_sentinel.redis_sentinel.ShortLivedSentinel(sentinels, *args, **kwargs)[source]

Bases: redis.sentinel.Sentinel

Custom Sentinel implementation which uses ShortLivedStrictRedis to query sentinel for information.

That assures that sentinel connection is being opened when sentinel is being queried. After the query is successful, sentinel connection is closed.

class celery_redis_sentinel.redis_sentinel.ShortLivedStrictRedis(host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None, max_connections=None)[source]

Bases: redis.client.StrictRedis

Custom StrictRedis which disconnects from redis after sending any command to redis.

This is really useful in 2 scenarios:

  1. Connections made to redis server via firewall. When firewall closes the connection redis will not always notice that and so will not release the connection. As a result connections maxlimit eventually will be reached as more clients will be connecting.
  2. When the connection is used to make only very few queries. In sentinel case, sentinel is only used once to query the master address. There is no need to maintain the connection after master address is found and so the connection can be closed.
execute_command(*args, **options)[source]

In addition to executing redis command, this method closes the redis connection

celery_redis_sentinel.redis_sentinel.ensure_redis_call(f, *args, **kwargs)[source]

Helper for executing any callable with retry-logic for when redis is timing out or is experiencing connection errors while redis sentinel failover is in progress.

The retries are attempted in exponential ease-off (1, 2, 4, ... sec).

Note

This helper is a blocking function. It waits between retries in a blocking fashion.

Note

In between reties this function prints a helpful error message. The reason why print is used rather then lets say a logger is because celery has a configuration at what level stdout should be logged as - CELERY_REDIRECT_STDOUTS_LEVEL

Parameters:
  • f (callable) – The callable to be executed
  • attempts (int, optional) – Number of attempts to make with exponential ease-off. By default 5 is used which means the the wait time before last retry will be 16 seconds. Also in that case total wait time is 31 seconds.
  • args (tuple) – Any arguments to be passed to f when calling it
  • kwargs (dict) – Any keyword arguments to be passed to f when calling it
celery_redis_sentinel.redis_sentinel.get_redis_via_sentinel(db, sentinels, service_name, socket_timeout=0.1, redis_class=<class 'redis.client.StrictRedis'>, sentinel_class=<class 'celery_redis_sentinel.redis_sentinel.ShortLivedSentinel'>, connection_pool_class=<class 'redis.sentinel.SentinelConnectionPool'>, **kwargs)[source]

Helper function for getting Redis instance via sentinel with sentinel connection pool

Parameters:
  • db (int, str) – Redis DB to which connection should be established to. In most cases this should be 0 DB.
  • sentinels (list) – List of tuples of all sentinel nodes within the sentinel cluster. Each tuple should be of format (ip, port).
  • service_name (str) – Name of the sentinel service_name.
  • socket_timeout (float, optional) – Socket timeout value. By default 0.1 is used.
  • redis_class (type, optional) – Class to be used for the Redis being returned. By default StrictRedis is used. This is useful since sometimes Redis needs to be used instead of StrictRedis. For example Channel implementation in kombu uses Redis instead of StrictRedis.
  • sentinel_class (type, optional) – Class to be used for the Sentinel being used to return redis client. By default ShortLivedSentinel is used.
  • connection_pool_class (type, optional) – Class to be used for the connection pool. By default SentinelConnectionPool is used.
Returns:

Connected Redis instance with sentinel connection pool

Return type:

Redis