287 lines
11 KiB
Python
287 lines
11 KiB
Python
|
import random
|
||
|
import weakref
|
||
|
|
||
|
from redis.client import Redis
|
||
|
from redis.connection import ConnectionPool, Connection
|
||
|
from redis.exceptions import (ConnectionError, ResponseError, ReadOnlyError,
|
||
|
TimeoutError)
|
||
|
from redis._compat import iteritems, nativestr, xrange
|
||
|
|
||
|
|
||
|
class MasterNotFoundError(ConnectionError):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class SlaveNotFoundError(ConnectionError):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class SentinelManagedConnection(Connection):
|
||
|
def __init__(self, **kwargs):
|
||
|
self.connection_pool = kwargs.pop('connection_pool')
|
||
|
super(SentinelManagedConnection, self).__init__(**kwargs)
|
||
|
|
||
|
def __repr__(self):
|
||
|
pool = self.connection_pool
|
||
|
s = '%s<service=%s%%s>' % (type(self).__name__, pool.service_name)
|
||
|
if self.host:
|
||
|
host_info = ',host=%s,port=%s' % (self.host, self.port)
|
||
|
s = s % host_info
|
||
|
return s
|
||
|
|
||
|
def connect_to(self, address):
|
||
|
self.host, self.port = address
|
||
|
super(SentinelManagedConnection, self).connect()
|
||
|
if self.connection_pool.check_connection:
|
||
|
self.send_command('PING')
|
||
|
if nativestr(self.read_response()) != 'PONG':
|
||
|
raise ConnectionError('PING failed')
|
||
|
|
||
|
def connect(self):
|
||
|
if self._sock:
|
||
|
return # already connected
|
||
|
if self.connection_pool.is_master:
|
||
|
self.connect_to(self.connection_pool.get_master_address())
|
||
|
else:
|
||
|
for slave in self.connection_pool.rotate_slaves():
|
||
|
try:
|
||
|
return self.connect_to(slave)
|
||
|
except ConnectionError:
|
||
|
continue
|
||
|
raise SlaveNotFoundError # Never be here
|
||
|
|
||
|
def read_response(self):
|
||
|
try:
|
||
|
return super(SentinelManagedConnection, self).read_response()
|
||
|
except ReadOnlyError:
|
||
|
if self.connection_pool.is_master:
|
||
|
# When talking to a master, a ReadOnlyError when likely
|
||
|
# indicates that the previous master that we're still connected
|
||
|
# to has been demoted to a slave and there's a new master.
|
||
|
# calling disconnect will force the connection to re-query
|
||
|
# sentinel during the next connect() attempt.
|
||
|
self.disconnect()
|
||
|
raise ConnectionError('The previous master is now a slave')
|
||
|
raise
|
||
|
|
||
|
|
||
|
class SentinelConnectionPool(ConnectionPool):
|
||
|
"""
|
||
|
Sentinel backed connection pool.
|
||
|
|
||
|
If ``check_connection`` flag is set to True, SentinelManagedConnection
|
||
|
sends a PING command right after establishing the connection.
|
||
|
"""
|
||
|
|
||
|
def __init__(self, service_name, sentinel_manager, **kwargs):
|
||
|
kwargs['connection_class'] = kwargs.get(
|
||
|
'connection_class', SentinelManagedConnection)
|
||
|
self.is_master = kwargs.pop('is_master', True)
|
||
|
self.check_connection = kwargs.pop('check_connection', False)
|
||
|
super(SentinelConnectionPool, self).__init__(**kwargs)
|
||
|
self.connection_kwargs['connection_pool'] = weakref.proxy(self)
|
||
|
self.service_name = service_name
|
||
|
self.sentinel_manager = sentinel_manager
|
||
|
|
||
|
def __repr__(self):
|
||
|
return "%s<service=%s(%s)" % (
|
||
|
type(self).__name__,
|
||
|
self.service_name,
|
||
|
self.is_master and 'master' or 'slave',
|
||
|
)
|
||
|
|
||
|
def reset(self):
|
||
|
super(SentinelConnectionPool, self).reset()
|
||
|
self.master_address = None
|
||
|
self.slave_rr_counter = None
|
||
|
|
||
|
def get_master_address(self):
|
||
|
master_address = self.sentinel_manager.discover_master(
|
||
|
self.service_name)
|
||
|
if self.is_master:
|
||
|
if self.master_address is None:
|
||
|
self.master_address = master_address
|
||
|
elif master_address != self.master_address:
|
||
|
# Master address changed, disconnect all clients in this pool
|
||
|
self.disconnect()
|
||
|
return master_address
|
||
|
|
||
|
def rotate_slaves(self):
|
||
|
"Round-robin slave balancer"
|
||
|
slaves = self.sentinel_manager.discover_slaves(self.service_name)
|
||
|
if slaves:
|
||
|
if self.slave_rr_counter is None:
|
||
|
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
|
||
|
for _ in xrange(len(slaves)):
|
||
|
self.slave_rr_counter = (
|
||
|
self.slave_rr_counter + 1) % len(slaves)
|
||
|
slave = slaves[self.slave_rr_counter]
|
||
|
yield slave
|
||
|
# Fallback to the master connection
|
||
|
try:
|
||
|
yield self.get_master_address()
|
||
|
except MasterNotFoundError:
|
||
|
pass
|
||
|
raise SlaveNotFoundError('No slave found for %r' % (self.service_name))
|
||
|
|
||
|
|
||
|
class Sentinel(object):
|
||
|
"""
|
||
|
Redis Sentinel cluster client
|
||
|
|
||
|
>>> from redis.sentinel import Sentinel
|
||
|
>>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
|
||
|
>>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
|
||
|
>>> master.set('foo', 'bar')
|
||
|
>>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
|
||
|
>>> slave.get('foo')
|
||
|
b'bar'
|
||
|
|
||
|
``sentinels`` is a list of sentinel nodes. Each node is represented by
|
||
|
a pair (hostname, port).
|
||
|
|
||
|
``min_other_sentinels`` defined a minimum number of peers for a sentinel.
|
||
|
When querying a sentinel, if it doesn't meet this threshold, responses
|
||
|
from that sentinel won't be considered valid.
|
||
|
|
||
|
``sentinel_kwargs`` is a dictionary of connection arguments used when
|
||
|
connecting to sentinel instances. Any argument that can be passed to
|
||
|
a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
|
||
|
not specified, any socket_timeout and socket_keepalive options specified
|
||
|
in ``connection_kwargs`` will be used.
|
||
|
|
||
|
``connection_kwargs`` are keyword arguments that will be used when
|
||
|
establishing a connection to a Redis server.
|
||
|
"""
|
||
|
|
||
|
def __init__(self, sentinels, min_other_sentinels=0, sentinel_kwargs=None,
|
||
|
**connection_kwargs):
|
||
|
# if sentinel_kwargs isn't defined, use the socket_* options from
|
||
|
# connection_kwargs
|
||
|
if sentinel_kwargs is None:
|
||
|
sentinel_kwargs = {
|
||
|
k: v
|
||
|
for k, v in iteritems(connection_kwargs)
|
||
|
if k.startswith('socket_')
|
||
|
}
|
||
|
self.sentinel_kwargs = sentinel_kwargs
|
||
|
|
||
|
self.sentinels = [Redis(hostname, port, **self.sentinel_kwargs)
|
||
|
for hostname, port in sentinels]
|
||
|
self.min_other_sentinels = min_other_sentinels
|
||
|
self.connection_kwargs = connection_kwargs
|
||
|
|
||
|
def __repr__(self):
|
||
|
sentinel_addresses = []
|
||
|
for sentinel in self.sentinels:
|
||
|
sentinel_addresses.append('%s:%s' % (
|
||
|
sentinel.connection_pool.connection_kwargs['host'],
|
||
|
sentinel.connection_pool.connection_kwargs['port'],
|
||
|
))
|
||
|
return '%s<sentinels=[%s]>' % (
|
||
|
type(self).__name__,
|
||
|
','.join(sentinel_addresses))
|
||
|
|
||
|
def check_master_state(self, state, service_name):
|
||
|
if not state['is_master'] or state['is_sdown'] or state['is_odown']:
|
||
|
return False
|
||
|
# Check if our sentinel doesn't see other nodes
|
||
|
if state['num-other-sentinels'] < self.min_other_sentinels:
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def discover_master(self, service_name):
|
||
|
"""
|
||
|
Asks sentinel servers for the Redis master's address corresponding
|
||
|
to the service labeled ``service_name``.
|
||
|
|
||
|
Returns a pair (address, port) or raises MasterNotFoundError if no
|
||
|
master is found.
|
||
|
"""
|
||
|
for sentinel_no, sentinel in enumerate(self.sentinels):
|
||
|
try:
|
||
|
masters = sentinel.sentinel_masters()
|
||
|
except (ConnectionError, TimeoutError):
|
||
|
continue
|
||
|
state = masters.get(service_name)
|
||
|
if state and self.check_master_state(state, service_name):
|
||
|
# Put this sentinel at the top of the list
|
||
|
self.sentinels[0], self.sentinels[sentinel_no] = (
|
||
|
sentinel, self.sentinels[0])
|
||
|
return state['ip'], state['port']
|
||
|
raise MasterNotFoundError("No master found for %r" % (service_name,))
|
||
|
|
||
|
def filter_slaves(self, slaves):
|
||
|
"Remove slaves that are in an ODOWN or SDOWN state"
|
||
|
slaves_alive = []
|
||
|
for slave in slaves:
|
||
|
if slave['is_odown'] or slave['is_sdown']:
|
||
|
continue
|
||
|
slaves_alive.append((slave['ip'], slave['port']))
|
||
|
return slaves_alive
|
||
|
|
||
|
def discover_slaves(self, service_name):
|
||
|
"Returns a list of alive slaves for service ``service_name``"
|
||
|
for sentinel in self.sentinels:
|
||
|
try:
|
||
|
slaves = sentinel.sentinel_slaves(service_name)
|
||
|
except (ConnectionError, ResponseError, TimeoutError):
|
||
|
continue
|
||
|
slaves = self.filter_slaves(slaves)
|
||
|
if slaves:
|
||
|
return slaves
|
||
|
return []
|
||
|
|
||
|
def master_for(self, service_name, redis_class=Redis,
|
||
|
connection_pool_class=SentinelConnectionPool, **kwargs):
|
||
|
"""
|
||
|
Returns a redis client instance for the ``service_name`` master.
|
||
|
|
||
|
A SentinelConnectionPool class is used to retrive the master's
|
||
|
address before establishing a new connection.
|
||
|
|
||
|
NOTE: If the master's address has changed, any cached connections to
|
||
|
the old master are closed.
|
||
|
|
||
|
By default clients will be a redis.Redis instance. Specify a
|
||
|
different class to the ``redis_class`` argument if you desire
|
||
|
something different.
|
||
|
|
||
|
The ``connection_pool_class`` specifies the connection pool to use.
|
||
|
The SentinelConnectionPool will be used by default.
|
||
|
|
||
|
All other keyword arguments are merged with any connection_kwargs
|
||
|
passed to this class and passed to the connection pool as keyword
|
||
|
arguments to be used to initialize Redis connections.
|
||
|
"""
|
||
|
kwargs['is_master'] = True
|
||
|
connection_kwargs = dict(self.connection_kwargs)
|
||
|
connection_kwargs.update(kwargs)
|
||
|
return redis_class(connection_pool=connection_pool_class(
|
||
|
service_name, self, **connection_kwargs))
|
||
|
|
||
|
def slave_for(self, service_name, redis_class=Redis,
|
||
|
connection_pool_class=SentinelConnectionPool, **kwargs):
|
||
|
"""
|
||
|
Returns redis client instance for the ``service_name`` slave(s).
|
||
|
|
||
|
A SentinelConnectionPool class is used to retrive the slave's
|
||
|
address before establishing a new connection.
|
||
|
|
||
|
By default clients will be a redis.Redis instance. Specify a
|
||
|
different class to the ``redis_class`` argument if you desire
|
||
|
something different.
|
||
|
|
||
|
The ``connection_pool_class`` specifies the connection pool to use.
|
||
|
The SentinelConnectionPool will be used by default.
|
||
|
|
||
|
All other keyword arguments are merged with any connection_kwargs
|
||
|
passed to this class and passed to the connection pool as keyword
|
||
|
arguments to be used to initialize Redis connections.
|
||
|
"""
|
||
|
kwargs['is_master'] = False
|
||
|
connection_kwargs = dict(self.connection_kwargs)
|
||
|
connection_kwargs.update(kwargs)
|
||
|
return redis_class(connection_pool=connection_pool_class(
|
||
|
service_name, self, **connection_kwargs))
|