Skip to content

BasePool

Bases: object

Base connection pool for synchronous Redis clients.

Manages a pool of free and used connections, handling acquisition, release, and automatic scaling up to the configured pool limit.

Source code in pyredis/pool/base.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
class BasePool(object):
    """
    Base connection pool for synchronous Redis clients.

    Manages a pool of free and used connections, handling acquisition, release,
    and automatic scaling up to the configured pool limit.
    """

    def __init__(
        self,
        database=0,
        password=None,
        encoding=None,
        conn_timeout=2,
        read_timeout=2,
        pool_size=16,
        lock=None,
        username=None,
    ):
        """
        Initialize connection pool parameters.

        Args:
            database: Database index to select.
            password: Password for authentication.
            encoding: Optional string encoding for automatic decoding.
            conn_timeout: Socket connection timeout in seconds.
            read_timeout: Socket read timeout in seconds.
            pool_size: Maximum number of connections allowed in the pool.
            lock: Threading lock for synchronization.
            username: Username for ACL authentication.
        """

        self._conn_timeout = conn_timeout
        self._read_timeout = read_timeout
        if lock is None:
            self._lock = threading.Lock()
        else:
            self._lock = lock
        self._pool_free = set()
        self._pool_used = set()
        self._database = database
        self._password = password
        self._encoding = encoding
        self._pool_size = pool_size
        self._close_on_err = False
        self._cluster = False
        self._username = username

    @property
    def conn_timeout(self):
        """Socket connection timeout in seconds."""
        return self._conn_timeout

    @property
    def read_timeout(self):
        """Socket read timeout in seconds."""
        return self._read_timeout

    @property
    def database(self):
        """Database index selected on connections."""
        return self._database

    @property
    def password(self):
        """Authentication password."""
        return self._password

    @property
    def encoding(self):
        """Optional string decoding encoding."""
        return self._encoding

    @property
    def pool_size(self):
        """Maximum number of connections allowed in the pool."""
        return self._pool_size

    @pool_size.setter
    def pool_size(self, size):
        try:
            self._lock.acquire()
            self._pool_size = size
            current_size = len(self._pool_free) + len(self._pool_used)
            while current_size > size:
                try:
                    client = self._pool_free.pop()
                    client.close()
                    current_size -= 1
                except KeyError:
                    break
        finally:
            self._lock.release()

    @property
    def close_on_err(self):
        """Whether to close all idle connections when a connection closes on error."""
        return self._close_on_err

    @property
    def username(self):
        """ACL authentication username."""
        return self._username

    def _connect(self):
        raise NotImplementedError

    def acquire(self):
        """
        Acquire a connection from the pool.

        Reuses an idle connection or establishes a new one if the pool size limit
        has not been reached.

        Returns:
            A Connection instance.

        Raises:
            PyRedisError: If the maximum pool size is exceeded.
        """
        try:
            self._lock.acquire()
            client = self._pool_free.pop()
            self._pool_used.add(client)
        except KeyError:
            if len(self._pool_used) < self.pool_size:
                client = self._connect()
                self._pool_used.add(client)
            else:
                raise PyRedisError(
                    f"Max connections {self.pool_size} exhausted"
                )
        finally:
            self._lock.release()
        return client


    def release(self, conn):
        """
        Release a connection back to the pool.

        Args:
            conn: The Connection instance to return.
        """
        try:
            self._lock.acquire()
            current_size = len(self._pool_free) + len(self._pool_used)
            self._pool_used.remove(conn)
            if conn.closed and self.close_on_err:
                for c in self._pool_free:
                    c.close()
                self._pool_free = set()
                self._pool_used = set()
            elif not conn.closed:
                if current_size > self.pool_size:
                    conn.close()
                else:
                    self._pool_free.add(conn)
        except KeyError:
            conn.close()
        finally:
            self._lock.release()


    def execute(self, *args, **kwargs):
        """
        Acquire a connection, execute a command, and release it back to the pool.

        Args:
            *args: Command name and positional arguments.
            **kwargs: Execution options (e.g. shard_key, sock).

        Returns:
            Parsed Redis reply.
        """
        conn = self.acquire()
        try:
            return conn.execute(
                *args,
                **kwargs
            )
        finally:
            self.release(conn)

close_on_err property

Whether to close all idle connections when a connection closes on error.

conn_timeout property

Socket connection timeout in seconds.

database property

Database index selected on connections.

encoding property

Optional string decoding encoding.

password property

Authentication password.

pool_size property writable

Maximum number of connections allowed in the pool.

read_timeout property

Socket read timeout in seconds.

username property

ACL authentication username.

__init__(database=0, password=None, encoding=None, conn_timeout=2, read_timeout=2, pool_size=16, lock=None, username=None)

Initialize connection pool parameters.

Parameters:

Name Type Description Default
database

Database index to select.

0
password

Password for authentication.

None
encoding

Optional string encoding for automatic decoding.

None
conn_timeout

Socket connection timeout in seconds.

2
read_timeout

Socket read timeout in seconds.

2
pool_size

Maximum number of connections allowed in the pool.

16
lock

Threading lock for synchronization.

None
username

Username for ACL authentication.

None
Source code in pyredis/pool/base.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    database=0,
    password=None,
    encoding=None,
    conn_timeout=2,
    read_timeout=2,
    pool_size=16,
    lock=None,
    username=None,
):
    """
    Initialize connection pool parameters.

    Args:
        database: Database index to select.
        password: Password for authentication.
        encoding: Optional string encoding for automatic decoding.
        conn_timeout: Socket connection timeout in seconds.
        read_timeout: Socket read timeout in seconds.
        pool_size: Maximum number of connections allowed in the pool.
        lock: Threading lock for synchronization.
        username: Username for ACL authentication.
    """

    self._conn_timeout = conn_timeout
    self._read_timeout = read_timeout
    if lock is None:
        self._lock = threading.Lock()
    else:
        self._lock = lock
    self._pool_free = set()
    self._pool_used = set()
    self._database = database
    self._password = password
    self._encoding = encoding
    self._pool_size = pool_size
    self._close_on_err = False
    self._cluster = False
    self._username = username

acquire()

Acquire a connection from the pool.

Reuses an idle connection or establishes a new one if the pool size limit has not been reached.

Returns:

Type Description

A Connection instance.

Raises:

Type Description
PyRedisError

If the maximum pool size is exceeded.

Source code in pyredis/pool/base.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def acquire(self):
    """
    Acquire a connection from the pool.

    Reuses an idle connection or establishes a new one if the pool size limit
    has not been reached.

    Returns:
        A Connection instance.

    Raises:
        PyRedisError: If the maximum pool size is exceeded.
    """
    try:
        self._lock.acquire()
        client = self._pool_free.pop()
        self._pool_used.add(client)
    except KeyError:
        if len(self._pool_used) < self.pool_size:
            client = self._connect()
            self._pool_used.add(client)
        else:
            raise PyRedisError(
                f"Max connections {self.pool_size} exhausted"
            )
    finally:
        self._lock.release()
    return client

execute(*args, **kwargs)

Acquire a connection, execute a command, and release it back to the pool.

Parameters:

Name Type Description Default
*args

Command name and positional arguments.

()
**kwargs

Execution options (e.g. shard_key, sock).

{}

Returns:

Type Description

Parsed Redis reply.

Source code in pyredis/pool/base.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def execute(self, *args, **kwargs):
    """
    Acquire a connection, execute a command, and release it back to the pool.

    Args:
        *args: Command name and positional arguments.
        **kwargs: Execution options (e.g. shard_key, sock).

    Returns:
        Parsed Redis reply.
    """
    conn = self.acquire()
    try:
        return conn.execute(
            *args,
            **kwargs
        )
    finally:
        self.release(conn)

release(conn)

Release a connection back to the pool.

Parameters:

Name Type Description Default
conn

The Connection instance to return.

required
Source code in pyredis/pool/base.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def release(self, conn):
    """
    Release a connection back to the pool.

    Args:
        conn: The Connection instance to return.
    """
    try:
        self._lock.acquire()
        current_size = len(self._pool_free) + len(self._pool_used)
        self._pool_used.remove(conn)
        if conn.closed and self.close_on_err:
            for c in self._pool_free:
                c.close()
            self._pool_free = set()
            self._pool_used = set()
        elif not conn.closed:
            if current_size > self.pool_size:
                conn.close()
            else:
                self._pool_free.add(conn)
    except KeyError:
        conn.close()
    finally:
        self._lock.release()