Merge master

This commit is contained in:
2020-05-21 17:35:33 -04:00
26 changed files with 7490 additions and 16 deletions

View File

@ -0,0 +1,59 @@
from redis.client import Redis, StrictRedis
from redis.connection import (
BlockingConnectionPool,
ConnectionPool,
Connection,
SSLConnection,
UnixDomainSocketConnection
)
from redis.utils import from_url
from redis.exceptions import (
AuthenticationError,
AuthenticationWrongNumberOfArgsError,
BusyLoadingError,
ChildDeadlockedError,
ConnectionError,
DataError,
InvalidResponse,
PubSubError,
ReadOnlyError,
RedisError,
ResponseError,
TimeoutError,
WatchError
)
def int_or_str(value):
try:
return int(value)
except ValueError:
return value
__version__ = '3.5.2'
VERSION = tuple(map(int_or_str, __version__.split('.')))
__all__ = [
'AuthenticationError',
'AuthenticationWrongNumberOfArgsError',
'BlockingConnectionPool',
'BusyLoadingError',
'ChildDeadlockedError',
'Connection',
'ConnectionError',
'ConnectionPool',
'DataError',
'from_url',
'InvalidResponse',
'PubSubError',
'ReadOnlyError',
'Redis',
'RedisError',
'ResponseError',
'SSLConnection',
'StrictRedis',
'TimeoutError',
'UnixDomainSocketConnection',
'WatchError',
]

View File

@ -0,0 +1,188 @@
"""Internal module for Python 2 backwards compatibility."""
# flake8: noqa
import errno
import socket
import sys
def sendall(sock, *args, **kwargs):
return sock.sendall(*args, **kwargs)
def shutdown(sock, *args, **kwargs):
return sock.shutdown(*args, **kwargs)
def ssl_wrap_socket(context, sock, *args, **kwargs):
return context.wrap_socket(sock, *args, **kwargs)
# For Python older than 3.5, retry EINTR.
if sys.version_info[0] < 3 or (sys.version_info[0] == 3 and
sys.version_info[1] < 5):
# Adapted from https://bugs.python.org/review/23863/patch/14532/54418
import time
# Wrapper for handling interruptable system calls.
def _retryable_call(s, func, *args, **kwargs):
# Some modules (SSL) use the _fileobject wrapper directly and
# implement a smaller portion of the socket interface, thus we
# need to let them continue to do so.
timeout, deadline = None, 0.0
attempted = False
try:
timeout = s.gettimeout()
except AttributeError:
pass
if timeout:
deadline = time.time() + timeout
try:
while True:
if attempted and timeout:
now = time.time()
if now >= deadline:
raise socket.error(errno.EWOULDBLOCK, "timed out")
else:
# Overwrite the timeout on the socket object
# to take into account elapsed time.
s.settimeout(deadline - now)
try:
attempted = True
return func(*args, **kwargs)
except socket.error as e:
if e.args[0] == errno.EINTR:
continue
raise
finally:
# Set the existing timeout back for future
# calls.
if timeout:
s.settimeout(timeout)
def recv(sock, *args, **kwargs):
return _retryable_call(sock, sock.recv, *args, **kwargs)
def recv_into(sock, *args, **kwargs):
return _retryable_call(sock, sock.recv_into, *args, **kwargs)
else: # Python 3.5 and above automatically retry EINTR
def recv(sock, *args, **kwargs):
return sock.recv(*args, **kwargs)
def recv_into(sock, *args, **kwargs):
return sock.recv_into(*args, **kwargs)
if sys.version_info[0] < 3:
# In Python 3, the ssl module raises socket.timeout whereas it raises
# SSLError in Python 2. For compatibility between versions, ensure
# socket.timeout is raised for both.
import functools
try:
from ssl import SSLError as _SSLError
except ImportError:
class _SSLError(Exception):
"""A replacement in case ssl.SSLError is not available."""
pass
_EXPECTED_SSL_TIMEOUT_MESSAGES = (
"The handshake operation timed out",
"The read operation timed out",
"The write operation timed out",
)
def _handle_ssl_timeout(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except _SSLError as e:
message = len(e.args) == 1 and unicode(e.args[0]) or ''
if any(x in message for x in _EXPECTED_SSL_TIMEOUT_MESSAGES):
# Raise socket.timeout for compatibility with Python 3.
raise socket.timeout(*e.args)
raise
return wrapper
recv = _handle_ssl_timeout(recv)
recv_into = _handle_ssl_timeout(recv_into)
sendall = _handle_ssl_timeout(sendall)
shutdown = _handle_ssl_timeout(shutdown)
ssl_wrap_socket = _handle_ssl_timeout(ssl_wrap_socket)
if sys.version_info[0] < 3:
from urllib import unquote
from urlparse import parse_qs, urlparse
from itertools import imap, izip
from string import letters as ascii_letters
from Queue import Queue
# special unicode handling for python2 to avoid UnicodeDecodeError
def safe_unicode(obj, *args):
""" return the unicode representation of obj """
try:
return unicode(obj, *args)
except UnicodeDecodeError:
# obj is byte string
ascii_text = str(obj).encode('string_escape')
return unicode(ascii_text)
def iteritems(x):
return x.iteritems()
def iterkeys(x):
return x.iterkeys()
def itervalues(x):
return x.itervalues()
def nativestr(x):
return x if isinstance(x, str) else x.encode('utf-8', 'replace')
def next(x):
return x.next()
unichr = unichr
xrange = xrange
basestring = basestring
unicode = unicode
long = long
BlockingIOError = socket.error
else:
from urllib.parse import parse_qs, unquote, urlparse
from string import ascii_letters
from queue import Queue
def iteritems(x):
return iter(x.items())
def iterkeys(x):
return iter(x.keys())
def itervalues(x):
return iter(x.values())
def nativestr(x):
return x if isinstance(x, str) else x.decode('utf-8', 'replace')
def safe_unicode(value):
if isinstance(value, bytes):
value = value.decode('utf-8', 'replace')
return str(value)
next = next
unichr = chr
imap = map
izip = zip
xrange = range
basestring = str
unicode = str
long = int
BlockingIOError = BlockingIOError
try: # Python 3
from queue import LifoQueue, Empty, Full
except ImportError: # Python 2
from Queue import LifoQueue, Empty, Full

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,82 @@
"Core exceptions raised by the Redis client"
class RedisError(Exception):
pass
class ConnectionError(RedisError):
pass
class TimeoutError(RedisError):
pass
class AuthenticationError(ConnectionError):
pass
class BusyLoadingError(ConnectionError):
pass
class InvalidResponse(RedisError):
pass
class ResponseError(RedisError):
pass
class DataError(RedisError):
pass
class PubSubError(RedisError):
pass
class WatchError(RedisError):
pass
class NoScriptError(ResponseError):
pass
class ExecAbortError(ResponseError):
pass
class ReadOnlyError(ResponseError):
pass
class NoPermissionError(ResponseError):
pass
class LockError(RedisError, ValueError):
"Errors acquiring or releasing a lock"
# NOTE: For backwards compatability, this class derives from ValueError.
# This was originally chosen to behave like threading.Lock.
pass
class LockNotOwnedError(LockError):
"Error trying to extend or release a lock that is (no longer) owned"
pass
class ChildDeadlockedError(Exception):
"Error indicating that a child process is deadlocked after a fork()"
pass
class AuthenticationWrongNumberOfArgsError(ResponseError):
"""
An error to indicate that the wrong number of args
were sent to the AUTH command
"""
pass

View File

@ -0,0 +1,293 @@
import threading
import time as mod_time
import uuid
from redis.exceptions import LockError, LockNotOwnedError
from redis.utils import dummy
class Lock(object):
"""
A shared, distributed Lock. Using Redis for locking allows the Lock
to be shared across processes and/or machines.
It's left to the user to resolve deadlock issues and make sure
multiple clients play nicely together.
"""
lua_release = None
lua_extend = None
lua_reacquire = None
# KEYS[1] - lock name
# ARGV[1] - token
# return 1 if the lock was released, otherwise 0
LUA_RELEASE_SCRIPT = """
local token = redis.call('get', KEYS[1])
if not token or token ~= ARGV[1] then
return 0
end
redis.call('del', KEYS[1])
return 1
"""
# KEYS[1] - lock name
# ARGV[1] - token
# ARGV[2] - additional milliseconds
# ARGV[3] - "0" if the additional time should be added to the lock's
# existing ttl or "1" if the existing ttl should be replaced
# return 1 if the locks time was extended, otherwise 0
LUA_EXTEND_SCRIPT = """
local token = redis.call('get', KEYS[1])
if not token or token ~= ARGV[1] then
return 0
end
local expiration = redis.call('pttl', KEYS[1])
if not expiration then
expiration = 0
end
if expiration < 0 then
return 0
end
local newttl = ARGV[2]
if ARGV[3] == "0" then
newttl = ARGV[2] + expiration
end
redis.call('pexpire', KEYS[1], newttl)
return 1
"""
# KEYS[1] - lock name
# ARGV[1] - token
# ARGV[2] - milliseconds
# return 1 if the locks time was reacquired, otherwise 0
LUA_REACQUIRE_SCRIPT = """
local token = redis.call('get', KEYS[1])
if not token or token ~= ARGV[1] then
return 0
end
redis.call('pexpire', KEYS[1], ARGV[2])
return 1
"""
def __init__(self, redis, name, timeout=None, sleep=0.1,
blocking=True, blocking_timeout=None, thread_local=True):
"""
Create a new Lock instance named ``name`` using the Redis client
supplied by ``redis``.
``timeout`` indicates a maximum life for the lock.
By default, it will remain locked until release() is called.
``timeout`` can be specified as a float or integer, both representing
the number of seconds to wait.
``sleep`` indicates the amount of time to sleep per loop iteration
when the lock is in blocking mode and another client is currently
holding the lock.
``blocking`` indicates whether calling ``acquire`` should block until
the lock has been acquired or to fail immediately, causing ``acquire``
to return False and the lock not being acquired. Defaults to True.
Note this value can be overridden by passing a ``blocking``
argument to ``acquire``.
``blocking_timeout`` indicates the maximum amount of time in seconds to
spend trying to acquire the lock. A value of ``None`` indicates
continue trying forever. ``blocking_timeout`` can be specified as a
float or integer, both representing the number of seconds to wait.
``thread_local`` indicates whether the lock token is placed in
thread-local storage. By default, the token is placed in thread local
storage so that a thread only sees its token, not a token set by
another thread. Consider the following timeline:
time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
thread-1 sets the token to "abc"
time: 1, thread-2 blocks trying to acquire `my-lock` using the
Lock instance.
time: 5, thread-1 has not yet completed. redis expires the lock
key.
time: 5, thread-2 acquired `my-lock` now that it's available.
thread-2 sets the token to "xyz"
time: 6, thread-1 finishes its work and calls release(). if the
token is *not* stored in thread local storage, then
thread-1 would see the token value as "xyz" and would be
able to successfully release the thread-2's lock.
In some use cases it's necessary to disable thread local storage. For
example, if you have code where one thread acquires a lock and passes
that lock instance to a worker thread to release later. If thread
local storage isn't disabled in this case, the worker thread won't see
the token set by the thread that acquired the lock. Our assumption
is that these cases aren't common and as such default to using
thread local storage.
"""
self.redis = redis
self.name = name
self.timeout = timeout
self.sleep = sleep
self.blocking = blocking
self.blocking_timeout = blocking_timeout
self.thread_local = bool(thread_local)
self.local = threading.local() if self.thread_local else dummy()
self.local.token = None
self.register_scripts()
def register_scripts(self):
cls = self.__class__
client = self.redis
if cls.lua_release is None:
cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
if cls.lua_extend is None:
cls.lua_extend = client.register_script(cls.LUA_EXTEND_SCRIPT)
if cls.lua_reacquire is None:
cls.lua_reacquire = \
client.register_script(cls.LUA_REACQUIRE_SCRIPT)
def __enter__(self):
# force blocking, as otherwise the user would have to check whether
# the lock was actually acquired or not.
if self.acquire(blocking=True):
return self
raise LockError("Unable to acquire lock within the time specified")
def __exit__(self, exc_type, exc_value, traceback):
self.release()
def acquire(self, blocking=None, blocking_timeout=None, token=None):
"""
Use Redis to hold a shared, distributed lock named ``name``.
Returns True once the lock is acquired.
If ``blocking`` is False, always return immediately. If the lock
was acquired, return True, otherwise return False.
``blocking_timeout`` specifies the maximum number of seconds to
wait trying to acquire the lock.
``token`` specifies the token value to be used. If provided, token
must be a bytes object or a string that can be encoded to a bytes
object with the default encoding. If a token isn't specified, a UUID
will be generated.
"""
sleep = self.sleep
if token is None:
token = uuid.uuid1().hex.encode()
else:
encoder = self.redis.connection_pool.get_encoder()
token = encoder.encode(token)
if blocking is None:
blocking = self.blocking
if blocking_timeout is None:
blocking_timeout = self.blocking_timeout
stop_trying_at = None
if blocking_timeout is not None:
stop_trying_at = mod_time.time() + blocking_timeout
while True:
if self.do_acquire(token):
self.local.token = token
return True
if not blocking:
return False
next_try_at = mod_time.time() + sleep
if stop_trying_at is not None and next_try_at > stop_trying_at:
return False
mod_time.sleep(sleep)
def do_acquire(self, token):
if self.timeout:
# convert to milliseconds
timeout = int(self.timeout * 1000)
else:
timeout = None
if self.redis.set(self.name, token, nx=True, px=timeout):
return True
return False
def locked(self):
"""
Returns True if this key is locked by any process, otherwise False.
"""
return self.redis.get(self.name) is not None
def owned(self):
"""
Returns True if this key is locked by this lock, otherwise False.
"""
stored_token = self.redis.get(self.name)
# need to always compare bytes to bytes
# TODO: this can be simplified when the context manager is finished
if stored_token and not isinstance(stored_token, bytes):
encoder = self.redis.connection_pool.get_encoder()
stored_token = encoder.encode(stored_token)
return self.local.token is not None and \
stored_token == self.local.token
def release(self):
"Releases the already acquired lock"
expected_token = self.local.token
if expected_token is None:
raise LockError("Cannot release an unlocked lock")
self.local.token = None
self.do_release(expected_token)
def do_release(self, expected_token):
if not bool(self.lua_release(keys=[self.name],
args=[expected_token],
client=self.redis)):
raise LockNotOwnedError("Cannot release a lock"
" that's no longer owned")
def extend(self, additional_time, replace_ttl=False):
"""
Adds more time to an already acquired lock.
``additional_time`` can be specified as an integer or a float, both
representing the number of seconds to add.
``replace_ttl`` if False (the default), add `additional_time` to
the lock's existing ttl. If True, replace the lock's ttl with
`additional_time`.
"""
if self.local.token is None:
raise LockError("Cannot extend an unlocked lock")
if self.timeout is None:
raise LockError("Cannot extend a lock with no timeout")
return self.do_extend(additional_time, replace_ttl)
def do_extend(self, additional_time, replace_ttl):
additional_time = int(additional_time * 1000)
if not bool(
self.lua_extend(
keys=[self.name],
args=[
self.local.token,
additional_time,
replace_ttl and "1" or "0"
],
client=self.redis,
)
):
raise LockNotOwnedError(
"Cannot extend a lock that's" " no longer owned"
)
return True
def reacquire(self):
"""
Resets a TTL of an already acquired lock back to a timeout value.
"""
if self.local.token is None:
raise LockError("Cannot reacquire an unlocked lock")
if self.timeout is None:
raise LockError("Cannot reacquire a lock with no timeout")
return self.do_reacquire()
def do_reacquire(self):
timeout = int(self.timeout * 1000)
if not bool(self.lua_reacquire(keys=[self.name],
args=[self.local.token, timeout],
client=self.redis)):
raise LockNotOwnedError("Cannot reacquire a lock that's"
" no longer owned")
return True

View File

@ -0,0 +1,286 @@
import random
import weakref
from redis.client import Redis
from redis.connection import ConnectionPool, Connection
from redis.exceptions import (ConnectionError, ResponseError, ReadOnlyError,
TimeoutError)
from redis._compat import iteritems, nativestr, xrange
class MasterNotFoundError(ConnectionError):
pass
class SlaveNotFoundError(ConnectionError):
pass
class SentinelManagedConnection(Connection):
def __init__(self, **kwargs):
self.connection_pool = kwargs.pop('connection_pool')
super(SentinelManagedConnection, self).__init__(**kwargs)
def __repr__(self):
pool = self.connection_pool
s = '%s<service=%s%%s>' % (type(self).__name__, pool.service_name)
if self.host:
host_info = ',host=%s,port=%s' % (self.host, self.port)
s = s % host_info
return s
def connect_to(self, address):
self.host, self.port = address
super(SentinelManagedConnection, self).connect()
if self.connection_pool.check_connection:
self.send_command('PING')
if nativestr(self.read_response()) != 'PONG':
raise ConnectionError('PING failed')
def connect(self):
if self._sock:
return # already connected
if self.connection_pool.is_master:
self.connect_to(self.connection_pool.get_master_address())
else:
for slave in self.connection_pool.rotate_slaves():
try:
return self.connect_to(slave)
except ConnectionError:
continue
raise SlaveNotFoundError # Never be here
def read_response(self):
try:
return super(SentinelManagedConnection, self).read_response()
except ReadOnlyError:
if self.connection_pool.is_master:
# When talking to a master, a ReadOnlyError when likely
# indicates that the previous master that we're still connected
# to has been demoted to a slave and there's a new master.
# calling disconnect will force the connection to re-query
# sentinel during the next connect() attempt.
self.disconnect()
raise ConnectionError('The previous master is now a slave')
raise
class SentinelConnectionPool(ConnectionPool):
"""
Sentinel backed connection pool.
If ``check_connection`` flag is set to True, SentinelManagedConnection
sends a PING command right after establishing the connection.
"""
def __init__(self, service_name, sentinel_manager, **kwargs):
kwargs['connection_class'] = kwargs.get(
'connection_class', SentinelManagedConnection)
self.is_master = kwargs.pop('is_master', True)
self.check_connection = kwargs.pop('check_connection', False)
super(SentinelConnectionPool, self).__init__(**kwargs)
self.connection_kwargs['connection_pool'] = weakref.proxy(self)
self.service_name = service_name
self.sentinel_manager = sentinel_manager
def __repr__(self):
return "%s<service=%s(%s)" % (
type(self).__name__,
self.service_name,
self.is_master and 'master' or 'slave',
)
def reset(self):
super(SentinelConnectionPool, self).reset()
self.master_address = None
self.slave_rr_counter = None
def get_master_address(self):
master_address = self.sentinel_manager.discover_master(
self.service_name)
if self.is_master:
if self.master_address is None:
self.master_address = master_address
elif master_address != self.master_address:
# Master address changed, disconnect all clients in this pool
self.disconnect()
return master_address
def rotate_slaves(self):
"Round-robin slave balancer"
slaves = self.sentinel_manager.discover_slaves(self.service_name)
if slaves:
if self.slave_rr_counter is None:
self.slave_rr_counter = random.randint(0, len(slaves) - 1)
for _ in xrange(len(slaves)):
self.slave_rr_counter = (
self.slave_rr_counter + 1) % len(slaves)
slave = slaves[self.slave_rr_counter]
yield slave
# Fallback to the master connection
try:
yield self.get_master_address()
except MasterNotFoundError:
pass
raise SlaveNotFoundError('No slave found for %r' % (self.service_name))
class Sentinel(object):
"""
Redis Sentinel cluster client
>>> from redis.sentinel import Sentinel
>>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
>>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
>>> master.set('foo', 'bar')
>>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
>>> slave.get('foo')
b'bar'
``sentinels`` is a list of sentinel nodes. Each node is represented by
a pair (hostname, port).
``min_other_sentinels`` defined a minimum number of peers for a sentinel.
When querying a sentinel, if it doesn't meet this threshold, responses
from that sentinel won't be considered valid.
``sentinel_kwargs`` is a dictionary of connection arguments used when
connecting to sentinel instances. Any argument that can be passed to
a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
not specified, any socket_timeout and socket_keepalive options specified
in ``connection_kwargs`` will be used.
``connection_kwargs`` are keyword arguments that will be used when
establishing a connection to a Redis server.
"""
def __init__(self, sentinels, min_other_sentinels=0, sentinel_kwargs=None,
**connection_kwargs):
# if sentinel_kwargs isn't defined, use the socket_* options from
# connection_kwargs
if sentinel_kwargs is None:
sentinel_kwargs = {
k: v
for k, v in iteritems(connection_kwargs)
if k.startswith('socket_')
}
self.sentinel_kwargs = sentinel_kwargs
self.sentinels = [Redis(hostname, port, **self.sentinel_kwargs)
for hostname, port in sentinels]
self.min_other_sentinels = min_other_sentinels
self.connection_kwargs = connection_kwargs
def __repr__(self):
sentinel_addresses = []
for sentinel in self.sentinels:
sentinel_addresses.append('%s:%s' % (
sentinel.connection_pool.connection_kwargs['host'],
sentinel.connection_pool.connection_kwargs['port'],
))
return '%s<sentinels=[%s]>' % (
type(self).__name__,
','.join(sentinel_addresses))
def check_master_state(self, state, service_name):
if not state['is_master'] or state['is_sdown'] or state['is_odown']:
return False
# Check if our sentinel doesn't see other nodes
if state['num-other-sentinels'] < self.min_other_sentinels:
return False
return True
def discover_master(self, service_name):
"""
Asks sentinel servers for the Redis master's address corresponding
to the service labeled ``service_name``.
Returns a pair (address, port) or raises MasterNotFoundError if no
master is found.
"""
for sentinel_no, sentinel in enumerate(self.sentinels):
try:
masters = sentinel.sentinel_masters()
except (ConnectionError, TimeoutError):
continue
state = masters.get(service_name)
if state and self.check_master_state(state, service_name):
# Put this sentinel at the top of the list
self.sentinels[0], self.sentinels[sentinel_no] = (
sentinel, self.sentinels[0])
return state['ip'], state['port']
raise MasterNotFoundError("No master found for %r" % (service_name,))
def filter_slaves(self, slaves):
"Remove slaves that are in an ODOWN or SDOWN state"
slaves_alive = []
for slave in slaves:
if slave['is_odown'] or slave['is_sdown']:
continue
slaves_alive.append((slave['ip'], slave['port']))
return slaves_alive
def discover_slaves(self, service_name):
"Returns a list of alive slaves for service ``service_name``"
for sentinel in self.sentinels:
try:
slaves = sentinel.sentinel_slaves(service_name)
except (ConnectionError, ResponseError, TimeoutError):
continue
slaves = self.filter_slaves(slaves)
if slaves:
return slaves
return []
def master_for(self, service_name, redis_class=Redis,
connection_pool_class=SentinelConnectionPool, **kwargs):
"""
Returns a redis client instance for the ``service_name`` master.
A SentinelConnectionPool class is used to retrive the master's
address before establishing a new connection.
NOTE: If the master's address has changed, any cached connections to
the old master are closed.
By default clients will be a redis.Redis instance. Specify a
different class to the ``redis_class`` argument if you desire
something different.
The ``connection_pool_class`` specifies the connection pool to use.
The SentinelConnectionPool will be used by default.
All other keyword arguments are merged with any connection_kwargs
passed to this class and passed to the connection pool as keyword
arguments to be used to initialize Redis connections.
"""
kwargs['is_master'] = True
connection_kwargs = dict(self.connection_kwargs)
connection_kwargs.update(kwargs)
return redis_class(connection_pool=connection_pool_class(
service_name, self, **connection_kwargs))
def slave_for(self, service_name, redis_class=Redis,
connection_pool_class=SentinelConnectionPool, **kwargs):
"""
Returns redis client instance for the ``service_name`` slave(s).
A SentinelConnectionPool class is used to retrive the slave's
address before establishing a new connection.
By default clients will be a redis.Redis instance. Specify a
different class to the ``redis_class`` argument if you desire
something different.
The ``connection_pool_class`` specifies the connection pool to use.
The SentinelConnectionPool will be used by default.
All other keyword arguments are merged with any connection_kwargs
passed to this class and passed to the connection pool as keyword
arguments to be used to initialize Redis connections.
"""
kwargs['is_master'] = False
connection_kwargs = dict(self.connection_kwargs)
connection_kwargs.update(kwargs)
return redis_class(connection_pool=connection_pool_class(
service_name, self, **connection_kwargs))

View File

@ -0,0 +1,33 @@
from contextlib import contextmanager
try:
import hiredis # noqa
HIREDIS_AVAILABLE = True
except ImportError:
HIREDIS_AVAILABLE = False
def from_url(url, db=None, **kwargs):
"""
Returns an active Redis client generated from the given database URL.
Will attempt to extract the database id from the path url fragment, if
none is provided.
"""
from redis.client import Redis
return Redis.from_url(url, db, **kwargs)
@contextmanager
def pipeline(redis_obj):
p = redis_obj.pipeline()
yield p
p.execute()
class dummy(object):
"""
Instances of this class can be used as an attribute container.
"""
pass