Source code for celery_redis_sentinel.transport

# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals

from kombu.transport.redis import Channel, Transport
from kombu.utils import cached_property

from .redis_sentinel import CelerySentinelConnectionPool, get_redis_via_sentinel


[docs]class SentinelChannel(Channel): """ Redis Channel for interacting with Redis Sentinel .. note:: In order to correctly configure the sentinel, this channel expects specific broker transport options to be provided via ``BROKER_TRANSPORT_OPTIONS``. Here is are sample transport options:: BROKER_TRANSPORT_OPTIONS = { 'sentinels': [('192.168.1.1', 26379), ('192.168.1.2', 26379), ('192.168.1.3', 26379)], 'service_name': 'master', 'socket_timeout': 0.1, } """ from_transport_options = Channel.from_transport_options + ( 'sentinels', 'service_name', 'socket_timeout', ) @cached_property
[docs] def sentinel_pool(self): """ Cached property for getting connection pool to redis sentinel. In addition to returning connection pool, this property changes the ``Transport`` connection details to match the connected master so that celery can correctly log to which node it is actually connected. Returns ------- CelerySentinelConnectionPool Connection pool instance connected to redis sentinel """ params = self._connparams() params.update({ 'sentinels': self.sentinels, 'service_name': self.service_name, 'socket_timeout': self.socket_timeout, }) sentinel = get_redis_via_sentinel( redis_class=self.Client, connection_pool_class=CelerySentinelConnectionPool, **params ) pool = sentinel.connection_pool hostname, port = pool.get_master_address() # update connection details so that celery correctly logs # where it connects self.connection.client.hostname = hostname self.connection.client.port = port return pool
def _get_pool(self, *args, **kwargs): return self.sentinel_pool
[docs]class SentinelTransport(Transport): """ Redis transport with support for Redis Sentinel. """ Channel = SentinelChannel