Skip to content

AsyncBasePool

Bases: object

Base connection pool for asynchronous Redis clients.

Manages a pool of free and used connections asynchronously, handling acquisition, release, and automatic scaling up to the configured pool limit.

Source code in pyredis/pool/async_base.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class AsyncBasePool(object):
    """
    Base connection pool for asynchronous Redis clients.

    Manages a pool of free and used connections asynchronously, handling acquisition,
    release, and automatic scaling up to the configured pool limit.
    """

    def __init__(
        self,
        database=0,
        password=None,
        encoding=None,
        conn_timeout=2,
        read_timeout=2,
        pool_size=16,
        lock=None,
        username=None,
    ):
        """
        Initialize asynchronous connection pool parameters.

        Args:
            database: Database index to select.
            password: Password for authentication.
            encoding: Optional string encoding for automatic decoding.
            conn_timeout: Async connection timeout in seconds.
            read_timeout: Async read timeout in seconds.
            pool_size: Maximum number of connections allowed in the pool.
            lock: Asyncio lock for synchronization.
            username: Username for ACL authentication.
        """

        self._conn_timeout = conn_timeout
        self._read_timeout = read_timeout
        if lock is None:
            self._lock = asyncio.Lock()
        else:
            self._lock = lock
        self._pool_free = set()
        self._pool_used = set()
        self._database = database
        self._password = password
        self._encoding = encoding
        self._pool_size = pool_size
        self._close_on_err = False
        self._cluster = False
        self._username = username

    @property
    def conn_timeout(self):
        """Async connection timeout in seconds."""
        return self._conn_timeout

    @property
    def read_timeout(self):
        """Async read timeout in seconds."""
        return self._read_timeout

    @property
    def database(self):
        """Database index selected on connections."""
        return self._database

    @property
    def password(self):
        """Authentication password."""
        return self._password

    @property
    def encoding(self):
        """Optional string decoding encoding."""
        return self._encoding

    @property
    def pool_size(self):
        """Maximum number of connections allowed in the pool."""
        return self._pool_size

    @pool_size.setter
    def pool_size(self, size):
        self._pool_size = size
        current_size = len(self._pool_free) + len(self._pool_used)
        while current_size > size:
            try:
                client = self._pool_free.pop()
                asyncio.create_task(
                    client.close()
                )
                current_size -= 1
            except KeyError:
                break

    @property
    def close_on_err(self):
        """Whether to close all idle connections when a connection closes on error."""
        return self._close_on_err

    @property
    def username(self):
        """ACL authentication username."""
        return self._username

    def _connect(self):
        raise NotImplementedError

    async def acquire(self):
        """
        Asynchronously acquire a connection from the pool.

        Reuses an idle connection or establishes a new one if the pool size limit
        has not been reached.

        Returns:
            An AsyncConnection instance.

        Raises:
            PyRedisError: If the maximum pool size is exceeded.
        """
        async with self._lock:
            try:
                client = self._pool_free.pop()
                self._pool_used.add(client)
            except KeyError:
                if len(self._pool_used) < self.pool_size:
                    client = self._connect()
                    if asyncio.iscoroutine(client):
                        client = await client
                    self._pool_used.add(client)
                else:
                    raise PyRedisError(
                        f"Max connections {self.pool_size} exhausted"
                    )
            return client


    async def release(
        self,
        conn
    ):
        """
        Asynchronously release a connection back to the pool.

        Args:
            conn: The AsyncConnection instance to return.
        """
        async with self._lock:
            try:
                current_size = len(self._pool_free) + len(self._pool_used)
                self._pool_used.remove(conn)
                if conn.closed and self.close_on_err:
                    for c in self._pool_free:
                        await c.close()
                    self._pool_free = set()
                    self._pool_used = set()
                elif not conn.closed:
                    if current_size > self.pool_size:
                        await conn.close()
                    else:
                        self._pool_free.add(conn)
            except KeyError:
                await conn.close()


    async def execute(
        self,
        *args,
        **kwargs
    ):
        """
        Asynchronously acquire a connection, execute a command, and release it.

        Args:
            *args: Command name and positional arguments.
            **kwargs: Execution options (e.g. shard_key, sock).

        Returns:
            Parsed Redis reply.
        """
        conn = await self.acquire()
        try:
            return await conn.execute(
                *args,
                **kwargs
            )
        finally:
            await self.release(
                conn=conn
            )

close_on_err property

Whether to close all idle connections when a connection closes on error.

conn_timeout property

Async connection timeout in seconds.

database property

Database index selected on connections.

encoding property

Optional string decoding encoding.

password property

Authentication password.

pool_size property writable

Maximum number of connections allowed in the pool.

read_timeout property

Async read timeout in seconds.

username property

ACL authentication username.

__init__(database=0, password=None, encoding=None, conn_timeout=2, read_timeout=2, pool_size=16, lock=None, username=None)

Initialize asynchronous connection pool parameters.

Parameters:

Name Type Description Default
database

Database index to select.

0
password

Password for authentication.

None
encoding

Optional string encoding for automatic decoding.

None
conn_timeout

Async connection timeout in seconds.

2
read_timeout

Async read timeout in seconds.

2
pool_size

Maximum number of connections allowed in the pool.

16
lock

Asyncio lock for synchronization.

None
username

Username for ACL authentication.

None
Source code in pyredis/pool/async_base.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    database=0,
    password=None,
    encoding=None,
    conn_timeout=2,
    read_timeout=2,
    pool_size=16,
    lock=None,
    username=None,
):
    """
    Initialize asynchronous connection pool parameters.

    Args:
        database: Database index to select.
        password: Password for authentication.
        encoding: Optional string encoding for automatic decoding.
        conn_timeout: Async connection timeout in seconds.
        read_timeout: Async read timeout in seconds.
        pool_size: Maximum number of connections allowed in the pool.
        lock: Asyncio lock for synchronization.
        username: Username for ACL authentication.
    """

    self._conn_timeout = conn_timeout
    self._read_timeout = read_timeout
    if lock is None:
        self._lock = asyncio.Lock()
    else:
        self._lock = lock
    self._pool_free = set()
    self._pool_used = set()
    self._database = database
    self._password = password
    self._encoding = encoding
    self._pool_size = pool_size
    self._close_on_err = False
    self._cluster = False
    self._username = username

acquire() async

Asynchronously acquire a connection from the pool.

Reuses an idle connection or establishes a new one if the pool size limit has not been reached.

Returns:

Type Description

An AsyncConnection instance.

Raises:

Type Description
PyRedisError

If the maximum pool size is exceeded.

Source code in pyredis/pool/async_base.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
async def acquire(self):
    """
    Asynchronously acquire a connection from the pool.

    Reuses an idle connection or establishes a new one if the pool size limit
    has not been reached.

    Returns:
        An AsyncConnection instance.

    Raises:
        PyRedisError: If the maximum pool size is exceeded.
    """
    async with self._lock:
        try:
            client = self._pool_free.pop()
            self._pool_used.add(client)
        except KeyError:
            if len(self._pool_used) < self.pool_size:
                client = self._connect()
                if asyncio.iscoroutine(client):
                    client = await client
                self._pool_used.add(client)
            else:
                raise PyRedisError(
                    f"Max connections {self.pool_size} exhausted"
                )
        return client

execute(*args, **kwargs) async

Asynchronously acquire a connection, execute a command, and release it.

Parameters:

Name Type Description Default
*args

Command name and positional arguments.

()
**kwargs

Execution options (e.g. shard_key, sock).

{}

Returns:

Type Description

Parsed Redis reply.

Source code in pyredis/pool/async_base.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
async def execute(
    self,
    *args,
    **kwargs
):
    """
    Asynchronously acquire a connection, execute a command, and release it.

    Args:
        *args: Command name and positional arguments.
        **kwargs: Execution options (e.g. shard_key, sock).

    Returns:
        Parsed Redis reply.
    """
    conn = await self.acquire()
    try:
        return await conn.execute(
            *args,
            **kwargs
        )
    finally:
        await self.release(
            conn=conn
        )

release(conn) async

Asynchronously release a connection back to the pool.

Parameters:

Name Type Description Default
conn

The AsyncConnection instance to return.

required
Source code in pyredis/pool/async_base.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
async def release(
    self,
    conn
):
    """
    Asynchronously release a connection back to the pool.

    Args:
        conn: The AsyncConnection instance to return.
    """
    async with self._lock:
        try:
            current_size = len(self._pool_free) + len(self._pool_used)
            self._pool_used.remove(conn)
            if conn.closed and self.close_on_err:
                for c in self._pool_free:
                    await c.close()
                self._pool_free = set()
                self._pool_used = set()
            elif not conn.closed:
                if current_size > self.pool_size:
                    await conn.close()
                else:
                    self._pool_free.add(conn)
        except KeyError:
            await conn.close()