Skip to content

Commit 15f6107

Browse files
committed
initial commit
1 parent 94d20a8 commit 15f6107

File tree

11 files changed

+1606
-0
lines changed

11 files changed

+1606
-0
lines changed

README.md

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
# Realtime Pub/Sub Client for Python
2+
3+
The `realtime-pubsub-client-python` is a Python client library for interacting with [Realtime Pub/Sub](https://realtime.21no.de) applications. It enables developers to manage real-time WebSocket connections, handle subscriptions, and process messages efficiently. The library provides a simple and flexible API to interact with realtime applications, supporting features like publishing/sending messages, subscribing to topics, handling acknowledgements, and waiting for replies with timeout support.
4+
5+
## Features
6+
7+
- **WebSocket Connection Management**: Seamlessly connect and disconnect from the Realtime Pub/Sub service with automatic reconnection support.
8+
- **Topic Subscription**: Subscribe and unsubscribe to topics for receiving messages.
9+
- **Topic Publishing**: [Publish](https://realtime.21no.de/documentation/#publishers) messages to specific topics with optional message types and compression.
10+
- **Message Sending**: [Send](https://realtime.21no.de/documentation/#websocket-inbound-messaging) messages to backend applications with optional message types and compression.
11+
- **Event Handling**: Handle incoming messages with custom event listeners.
12+
- **Acknowledgements and Replies**: Wait for gateway acknowledgements or replies to messages with timeout support.
13+
- **Error Handling**: Robust error handling and logging capabilities.
14+
- **Asynchronous Support**: Built using `asyncio` for efficient asynchronous programming.
15+
16+
## Installation
17+
18+
Install the `realtime-pubsub-client-python` library via pip:
19+
20+
```bash
21+
pip install realtime-pubsub-client-python
22+
```
23+
24+
## Getting Started
25+
26+
This guide will help you set up and use the `realtime-pubsub-client-python` library in your Python project.
27+
28+
### Connecting to the Server
29+
30+
First, import the `RealtimeClient` class and create a new instance with the required configuration:
31+
32+
```python
33+
import asyncio
34+
import logging
35+
import os
36+
from realtime_pubsub_client import RealtimeClient
37+
38+
APP_ID = 'your-app-id'
39+
40+
async def main():
41+
async def get_url():
42+
# replace with your access token retrieval strategy
43+
access_token = os.environ.get('ACCESS_TOKEN')
44+
app_id = os.environ.get('APP_ID')
45+
46+
# return the WebSocket URL with the access token
47+
return f"wss://genesis.r7.21no.de/apps/{app_id}?access_token={access_token}"
48+
49+
client_options = {
50+
'logger': logging.getLogger('RealtimeClient'),
51+
'websocket_options': {
52+
'urlProvider': get_url,
53+
},
54+
}
55+
client = RealtimeClient(client_options)
56+
57+
async def on_session_started(connection_info):
58+
print('Connection ID:', connection_info['id'])
59+
# Subscribe to topics here
60+
await client.subscribe_remote_topic('topic1')
61+
await client.subscribe_remote_topic('topic2')
62+
63+
client.on('session.started', on_session_started)
64+
65+
await client.connect()
66+
await client.wait_for('session.started')
67+
68+
asyncio.run(main())
69+
```
70+
71+
### Subscribing to Incoming Messages
72+
73+
You can handle messages for specific topics and message types:
74+
75+
> **Note**: The topic and message type are separated by a dot (`.`) in the event name.
76+
77+
```python
78+
def handle_message(message, reply_fn):
79+
# Message handling logic here
80+
print('Received message:', message['data']['payload'])
81+
82+
client.on('topic1.action1', handle_message)
83+
```
84+
85+
Wildcard subscriptions are also supported:
86+
87+
```python
88+
client.on('topic1.*', handle_message)
89+
```
90+
91+
### Publishing Messages
92+
93+
Publish messages to a topic:
94+
95+
```python
96+
await client.publish('topic1', 'Hello, world!', {
97+
'messageType': 'text-message',
98+
})
99+
```
100+
101+
### Responding to Incoming Messages
102+
103+
Set up event listeners to handle incoming messages:
104+
105+
```python
106+
async def handle_message(message, reply_fn):
107+
# Processing the message
108+
print('Received message:', message['data']['payload'])
109+
110+
# Sending a reply
111+
await reply_fn('Message received!', 'ok')
112+
113+
client.on('topic1.text-message', handle_message)
114+
```
115+
116+
### Waiting for Acknowledgements and Replies
117+
118+
- **`wait_for_ack(timeout=None)`**: Waits for an acknowledgement of the message, with an optional timeout in seconds.
119+
- **`wait_for_reply(timeout=None)`**: Waits for a reply to the message, with an optional timeout in seconds.
120+
121+
Wait for the Realtime Gateway acknowledgement after publishing a message:
122+
123+
```python
124+
waiter = await client.publish('secure/peer-to-peer1', 'Hi', {
125+
'messageType': 'greeting',
126+
})
127+
await waiter.wait_for_ack()
128+
```
129+
130+
Wait for the Realtime Gateway acknowledgement after sending a message:
131+
132+
```python
133+
waiter = await client.send({
134+
# Message payload
135+
}, {
136+
'messageType': 'create',
137+
})
138+
await waiter.wait_for_ack()
139+
```
140+
141+
Wait for a reply with a timeout:
142+
143+
```python
144+
waiter = await client.send({
145+
# Message payload
146+
}, {
147+
'messageType': 'create',
148+
})
149+
await waiter.wait_for_reply(timeout=5) # Wait for up to 5 seconds
150+
```
151+
152+
### Error Handling
153+
154+
Handle errors and disconnections:
155+
156+
```python
157+
def on_error(error):
158+
print('WebSocket error:', error)
159+
160+
def on_close(event):
161+
print('WebSocket closed:', event)
162+
163+
client.on('error', on_error)
164+
client.on('close', on_close)
165+
```
166+
167+
## API Reference
168+
169+
### RealtimeClient
170+
171+
#### Constructor
172+
173+
```python
174+
RealtimeClient(config)
175+
```
176+
177+
Creates a new `RealtimeClient` instance.
178+
179+
- **`config`**: Configuration options for the client.
180+
181+
#### Methods
182+
183+
- **`connect()`**: Connects the client to the WebSocket Messaging Gateway.
184+
185+
```python
186+
await client.connect()
187+
```
188+
189+
- **`disconnect()`**: Terminates the WebSocket connection.
190+
191+
```python
192+
await client.disconnect()
193+
```
194+
195+
- **`subscribe_remote_topic(topic)`**: [Subscribes](https://realtime.21no.de/documentation/#subscribers) the connection to a remote topic.
196+
197+
```python
198+
await client.subscribe_remote_topic(topic)
199+
```
200+
201+
- **`unsubscribe_remote_topic(topic)`**: [Unsubscribes](https://realtime.21no.de/documentation/#subscribers) the connection from a remote topic.
202+
203+
```python
204+
await client.unsubscribe_remote_topic(topic)
205+
```
206+
207+
- **`publish(topic, payload, options=None)`**: Publishes a message to a topic.
208+
209+
```python
210+
waiter = await client.publish(topic, payload, options)
211+
```
212+
213+
Returns a `WaitFor` instance to wait for acknowledgements or replies.
214+
215+
- **`send(payload, options=None)`**: Sends a message to the server.
216+
217+
```python
218+
waiter = await client.send(payload, options)
219+
```
220+
221+
Returns a `WaitFor` instance to wait for acknowledgements or replies.
222+
223+
- **`wait(ms)`**: Waits for a specified duration in milliseconds. Utility function for waiting in async functions.
224+
225+
```python
226+
await wait(ms)
227+
```
228+
229+
#### Events
230+
231+
- **`'session.started'`**: Emitted when the session starts.
232+
233+
```python
234+
client.on('session.started', on_session_started)
235+
```
236+
237+
- **`'error'`**: Emitted on WebSocket errors.
238+
239+
```python
240+
client.on('error', on_error)
241+
```
242+
243+
- **`'close'`**: Emitted when the WebSocket connection closes.
244+
245+
```python
246+
client.on('close', on_close)
247+
```
248+
249+
- **Custom Events**: Handle custom events based on topic and message type.
250+
251+
```python
252+
client.on('TOPIC_NAME.MESSAGE_TYPE', handle_message)
253+
```
254+
255+
> **Note**: Wildcard subscriptions are also supported.
256+
257+
## License
258+
259+
This library is licensed under the MIT License.
260+
261+
---
262+
263+
For more detailed examples and advanced configurations, please refer to the [documentation](https://realtime.21no.de/docs).
264+
265+
## Notes
266+
267+
- Ensure that you have an account and an app set up with [Realtime Pub/Sub](https://realtime.21no.de).
268+
- Customize the `urlProvider` or URL to retrieve the access token for connecting to your realtime application.
269+
- Implement the `get_auth_token` function according to your authentication mechanism.
270+
- Optionally use the `logger` option to integrate with your application's logging system.
271+
- Handle errors and disconnections gracefully to improve the robustness of your application.
272+
- Make sure to handle timeouts when waiting for replies to avoid hanging operations.
273+
274+
---
275+
276+
Feel free to contribute to this project by submitting issues or pull requests on [GitHub](https://github.com/BackendStack21/realtime-pubsub-client-python).

demos/publish_and_reply.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import os
2+
from dotenv import load_dotenv
3+
from src.client import *
4+
5+
# Load variables from .env into os.environ
6+
load_dotenv()
7+
8+
# Configure logging
9+
logging.basicConfig(level=logging.INFO)
10+
11+
12+
async def main():
13+
async def get_url():
14+
# replace with your access token retrieval strategy
15+
access_token = os.environ.get('ACCESS_TOKEN')
16+
app_id = os.environ.get('APP_ID')
17+
18+
# return the WebSocket URL with the access token
19+
return f"wss://genesis.r7.21no.de/apps/{app_id}?access_token={access_token}"
20+
21+
config = {
22+
'logger': logging.getLogger('RealtimeClient'),
23+
'websocket_options': {
24+
'urlProvider': get_url,
25+
}
26+
}
27+
client = RealtimeClient(config)
28+
29+
# Connect to the WebSocket server
30+
await client.connect()
31+
32+
# Define a message handler
33+
async def handle_session_started(message):
34+
print('Session started:', message)
35+
await client.subscribe_remote_topic('chat')
36+
37+
client.on('session.started', handle_session_started)
38+
39+
# Send a message
40+
wait_for = await client.send('Hello, world!', {
41+
'messageType': 'text-message'
42+
})
43+
await wait_for.wait_for_ack()
44+
45+
# Define a message handler
46+
def handle_message(message, reply_fn):
47+
print('New chat message arrived:', message)
48+
49+
reply_fn({
50+
'text': 'Hello, back!'
51+
})
52+
53+
# Subscribe to chat.text-message events
54+
client.on('chat.text-message', handle_message)
55+
56+
wait_for = await client.publish('chat', 'Hello out there!', {
57+
'messageType': 'text-message'
58+
})
59+
response = await wait_for.wait_for_reply()
60+
print('Reply:', response)
61+
62+
# Disconnect from the server
63+
await client.disconnect()
64+
65+
66+
# Run the main coroutine
67+
asyncio.run(main())

demos/rpc/client.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import os
2+
import time
3+
4+
from dotenv import load_dotenv
5+
from src.client import *
6+
7+
# Load variables from .env into os.environ
8+
load_dotenv()
9+
10+
# Configure logging
11+
logging.basicConfig(level=logging.DEBUG)
12+
13+
14+
async def main():
15+
async def get_url():
16+
# replace with your access token retrieval strategy
17+
access_token = os.environ.get('ACCESS_TOKEN')
18+
app_id = os.environ.get('APP_ID')
19+
20+
# return the WebSocket URL with the access token
21+
return f"wss://genesis.r7.21no.de/apps/{app_id}?access_token={access_token}"
22+
23+
config = {
24+
'logger': logging.getLogger('RealtimeClient'),
25+
'websocket_options': {
26+
'urlProvider': get_url,
27+
}
28+
}
29+
client = RealtimeClient(config)
30+
31+
# Connect to the WebSocket server
32+
await client.connect()
33+
34+
35+
# Define a message handler
36+
async def handle_session_started(message):
37+
client.logger.info('Requesting server time...')
38+
waiter = await client.send('', {
39+
'messageType': 'gettime'
40+
})
41+
42+
response, = await waiter.wait_for_reply(timeout=5)
43+
client.logger.info(f"Server time: {time.ctime(response['data']['time'])}")
44+
45+
await client.disconnect()
46+
47+
client.on('session.started', handle_session_started)
48+
49+
while client.ws and not client.ws.closed:
50+
await asyncio.sleep(1)
51+
52+
# Run the main coroutine
53+
asyncio.run(main())

0 commit comments

Comments
 (0)