Skip to content

AsyncConnection

Bases: object

Low level asynchronous client for talking to a Redis Server.

Manages connection lifecycle, non-blocking network streams via asyncio, protocol serialization, authentication, and async reads/writes.

Source code in pyredis/connection/async_connection.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class AsyncConnection(object):
    """
    Low level asynchronous client for talking to a Redis Server.

    Manages connection lifecycle, non-blocking network streams via asyncio,
    protocol serialization, authentication, and async reads/writes.
    """

    def __init__(
        self,
        host=None,
        port=6379,
        unix_sock=None,
        database=None,
        password=None,
        encoding=None,
        conn_timeout=2,
        read_only=False,
        read_timeout=2,
        sentinel=False,
        username=None,
    ):
        """
        Initialize asynchronous connection parameters.

        Args:
            host: Redis server hostname or IP.
            port: Redis server port number.
            unix_sock: Path to Unix domain socket.
            database: Database index to select.
            password: Password for authentication.
            encoding: Optional string encoding for automatic decoding.
            conn_timeout: Async connection timeout in seconds.
            read_only: Flag indicating if the connection is read-only.
            read_timeout: Async socket read timeout in seconds.
            sentinel: Flag indicating if this is a Sentinel connection.
            username: Username for ACL authentication.
        """

        if not bool(host) != bool(unix_sock):
            raise PyRedisError("Ether host or unix_sock has to be provided")
        self._closed = False
        self._conn_timeout = conn_timeout
        self._read_only = read_only
        self._read_timeout = read_timeout
        self._encoding = encoding
        self._reader_parser = None
        self._sentinel = sentinel
        self._writer_func = pyredis.connection.writer
        self._reader = None
        self._writer = None
        self.host = host
        self.port = port
        self.unix_sock = unix_sock
        self.password = password
        self.username = username
        self.database = database

    async def _authenticate(self):
        if self.username and self.password:
            await self.write(
                *["AUTH", self.username, self.password]
            )
            try:
                await self.read()
            except ReplyError as err:
                await self.close()
                raise err
        elif self.password:
            await self.write(
                *["AUTH", self.password]
            )
            try:
                await self.read()
            except ReplyError as err:
                await self.close()
                raise err

    async def _connect(self):
        if self._closed:
            raise PyRedisConnError("Connection Gone")
        try:
            if self.host:
                reader, writer = await asyncio.wait_for(
                    asyncio.open_connection(
                        host=self.host,
                        port=self.port
                    ),
                    timeout=self._conn_timeout
                )
            else:
                reader, writer = await asyncio.wait_for(
                    asyncio.open_unix_connection(
                        path=self.unix_sock
                    ),
                    timeout=self._conn_timeout
                )
        except (
            ConnectionAbortedError,
            ConnectionRefusedError,
            OverflowError,
            asyncio.TimeoutError,
            OSError,
        ) as err:
            await self.close()
            raise PyRedisConnError(
                f"Could not Connect to {self.host}:{self.port}: {err}"
            )
        self._reader = reader
        self._writer = writer
        if self._encoding:
            self._reader_parser = pyredis.connection.Reader(
                encoding=self._encoding
            )
        else:
            self._reader_parser = pyredis.connection.Reader()
        await self._authenticate()
        if not self._sentinel:
            await self._setdb()
            await self._set_read_only()

    async def _setdb(self):
        if self._sentinel:
            return
        if self.database is None:
            return
        await self.write(
            *["SELECT", self.database]
        )
        try:
            await self.read()
        except ReplyError as err:
            await self.close()
            raise err

    async def _set_read_only(self):
        if self._read_only:
            await self.write("READONLY")
            try:
                await self.read()
            except ReplyError as err:
                await self.close()
                raise err

    async def close(self):
        """
        Asynchronously close the socket writer and clean up connection resources.
        """
        if self._writer:
            self._writer.close()
            try:
                await self._writer.wait_closed()
            except Exception:
                pass
        self._reader = None
        self._writer = None
        self._reader_parser = None
        self._closed = True

    @property
    def closed(self):
        """
        Check if the connection has been closed.

        Returns:
            True if closed, False otherwise.
        """
        return self._closed


    async def read(self, close_on_timeout=True, raise_on_result_err=True):
        """
        Asynchronously read and parse a reply from the Redis server.

        Args:
            close_on_timeout: If True, closes the connection on read timeout.
            raise_on_result_err: If True, raises exceptions returned as replies.

        Returns:
            Parsed Redis reply (e.g. string, integer, list, dict, or None).
        """

        if not self._writer:
            await self._connect()
        while True:
            result = self._reader_parser.gets()
            if result is not False:
                if raise_on_result_err:
                    if isinstance(result, Exception):
                        raise result
                return result
            try:
                data = await asyncio.wait_for(
                    self._reader.read(1500),
                    timeout=self._read_timeout
                )
            except asyncio.TimeoutError:
                if close_on_timeout:
                    await self.close()
                raise PyRedisConnReadTimeout(
                    "Connection timeout while reading"
                )
            except ConnectionResetError:
                await self.close()
                raise PyRedisConnError("Connection reset by peer")
            if not data:
                await self.close()
                raise PyRedisConnClosed("Connection went away while reading")
            self._reader_parser.feed(data)

    async def write(self, *args):
        """
        Asynchronously serialize and send a command to the Redis server.

        Args:
            *args: Command name and positional arguments.
        """

        if not self._writer:
            await self._connect()
        data = self._writer_func(*args)
        try:
            self._writer.write(data)
            await self._writer.drain()
        except BrokenPipeError as err:
            await self.close()
            raise PyRedisConnError(
                f"Connection lost while writing: {err}"
            )

closed property

Check if the connection has been closed.

Returns:

Type Description

True if closed, False otherwise.

database = database instance-attribute

host = host instance-attribute

password = password instance-attribute

port = port instance-attribute

unix_sock = unix_sock instance-attribute

username = username instance-attribute

__init__(host=None, port=6379, unix_sock=None, database=None, password=None, encoding=None, conn_timeout=2, read_only=False, read_timeout=2, sentinel=False, username=None)

Initialize asynchronous connection parameters.

Parameters:

Name Type Description Default
host

Redis server hostname or IP.

None
port

Redis server port number.

6379
unix_sock

Path to Unix domain socket.

None
database

Database index to select.

None
password

Password for authentication.

None
encoding

Optional string encoding for automatic decoding.

None
conn_timeout

Async connection timeout in seconds.

2
read_only

Flag indicating if the connection is read-only.

False
read_timeout

Async socket read timeout in seconds.

2
sentinel

Flag indicating if this is a Sentinel connection.

False
username

Username for ACL authentication.

None
Source code in pyredis/connection/async_connection.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    host=None,
    port=6379,
    unix_sock=None,
    database=None,
    password=None,
    encoding=None,
    conn_timeout=2,
    read_only=False,
    read_timeout=2,
    sentinel=False,
    username=None,
):
    """
    Initialize asynchronous connection parameters.

    Args:
        host: Redis server hostname or IP.
        port: Redis server port number.
        unix_sock: Path to Unix domain socket.
        database: Database index to select.
        password: Password for authentication.
        encoding: Optional string encoding for automatic decoding.
        conn_timeout: Async connection timeout in seconds.
        read_only: Flag indicating if the connection is read-only.
        read_timeout: Async socket read timeout in seconds.
        sentinel: Flag indicating if this is a Sentinel connection.
        username: Username for ACL authentication.
    """

    if not bool(host) != bool(unix_sock):
        raise PyRedisError("Ether host or unix_sock has to be provided")
    self._closed = False
    self._conn_timeout = conn_timeout
    self._read_only = read_only
    self._read_timeout = read_timeout
    self._encoding = encoding
    self._reader_parser = None
    self._sentinel = sentinel
    self._writer_func = pyredis.connection.writer
    self._reader = None
    self._writer = None
    self.host = host
    self.port = port
    self.unix_sock = unix_sock
    self.password = password
    self.username = username
    self.database = database

close() async

Asynchronously close the socket writer and clean up connection resources.

Source code in pyredis/connection/async_connection.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
async def close(self):
    """
    Asynchronously close the socket writer and clean up connection resources.
    """
    if self._writer:
        self._writer.close()
        try:
            await self._writer.wait_closed()
        except Exception:
            pass
    self._reader = None
    self._writer = None
    self._reader_parser = None
    self._closed = True

read(close_on_timeout=True, raise_on_result_err=True) async

Asynchronously read and parse a reply from the Redis server.

Parameters:

Name Type Description Default
close_on_timeout

If True, closes the connection on read timeout.

True
raise_on_result_err

If True, raises exceptions returned as replies.

True

Returns:

Type Description

Parsed Redis reply (e.g. string, integer, list, dict, or None).

Source code in pyredis/connection/async_connection.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
async def read(self, close_on_timeout=True, raise_on_result_err=True):
    """
    Asynchronously read and parse a reply from the Redis server.

    Args:
        close_on_timeout: If True, closes the connection on read timeout.
        raise_on_result_err: If True, raises exceptions returned as replies.

    Returns:
        Parsed Redis reply (e.g. string, integer, list, dict, or None).
    """

    if not self._writer:
        await self._connect()
    while True:
        result = self._reader_parser.gets()
        if result is not False:
            if raise_on_result_err:
                if isinstance(result, Exception):
                    raise result
            return result
        try:
            data = await asyncio.wait_for(
                self._reader.read(1500),
                timeout=self._read_timeout
            )
        except asyncio.TimeoutError:
            if close_on_timeout:
                await self.close()
            raise PyRedisConnReadTimeout(
                "Connection timeout while reading"
            )
        except ConnectionResetError:
            await self.close()
            raise PyRedisConnError("Connection reset by peer")
        if not data:
            await self.close()
            raise PyRedisConnClosed("Connection went away while reading")
        self._reader_parser.feed(data)

write(*args) async

Asynchronously serialize and send a command to the Redis server.

Parameters:

Name Type Description Default
*args

Command name and positional arguments.

()
Source code in pyredis/connection/async_connection.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
async def write(self, *args):
    """
    Asynchronously serialize and send a command to the Redis server.

    Args:
        *args: Command name and positional arguments.
    """

    if not self._writer:
        await self._connect()
    data = self._writer_func(*args)
    try:
        self._writer.write(data)
        await self._writer.drain()
    except BrokenPipeError as err:
        await self.close()
        raise PyRedisConnError(
            f"Connection lost while writing: {err}"
        )