Source code for celery_redis_sentinel.backend

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

from celery.backends.redis import RedisBackend
from kombu.utils import cached_property
from redis import Redis

from .redis_sentinel import EnsuredRedisMixin, get_redis_via_sentinel


[docs]class RedisSentinelBackend(RedisBackend): """ Redis results backend with support for Redis Sentinel .. note:: In order to correctly configure the sentinel, this backend expects an additional backend celery configuration to be present - ``CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS``. Here is are sample transport options:: CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = { 'sentinels': [('192.168.1.1', 26379), ('192.168.1.2', 26379), ('192.168.1.3', 26379)], 'service_name': 'master', 'socket_timeout': 0.1, } """ def __init__(self, transport_options=None, *args, **kwargs): super(RedisSentinelBackend, self).__init__(*args, **kwargs) _get = self.app.conf.get self.transport_options = transport_options or _get('CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS') or {} self.sentinels = self.transport_options['sentinels'] self.service_name = self.transport_options['service_name'] self.socket_timeout = self.transport_options.get('socket_timeout', 0.1) @cached_property
[docs] def client(self): """ Cached property for getting ``Redis`` client to be used to interact with redis. Returned client also subclasses from :class:`.EnsuredRedisMixin` which ensures that all redis commands are executed with retry logic in case of sentinel failover. Returns ------- Redis Redis client connected to Sentinel via Sentinel connection pool """ params = self.connparams params.update({ 'sentinels': self.sentinels, 'service_name': self.service_name, 'socket_timeout': self.socket_timeout, }) return get_redis_via_sentinel( redis_class=type(str('Redis'), (EnsuredRedisMixin, Redis), {}), **params )