Real-time .NET backend,
Python frontend.
A full-featured Python client for ASP.NET Core SignalR. Connect with WebSockets, Server-Sent Events, or Long Polling — with JWT auth, MessagePack, and automatic reconnection.
pip install signalrcore Everything you need
A complete implementation of the SignalR protocol for Python
Multiple Transports
WebSockets, Server-Sent Events, and Long Polling all supported. Automatic negotiation picks the best available transport.
Authentication
Token factory pattern for seamless JWT-based auth. Compatible with any identity provider — pass a callable that returns your token.
Auto Reconnect
Configurable reconnection strategies with raw intervals or exponential backoff. Max attempts and keep-alive intervals keep your app resilient.
Server Streaming
Subscribe to real-time data streams from the server using a clean observer pattern. Client-to-server streaming also supported via Subject.
Binary & Text Encoding
MessagePack binary encoding alongside standard JSON. Choose the right format for performance or compatibility.
Azure Compatible
Works with Azure SignalR Service and serverless functions. Deploy to any Azure-hosted environment without extra configuration.
Quick start
Get connected in minutes
import logging
from signalrcore.hub_connection_builder import HubConnectionBuilder
hub_connection = HubConnectionBuilder()\
.with_url("wss://my-app.com/chathub")\
.configure_logging(logging.DEBUG)\
.with_automatic_reconnect({
"type": "raw",
"keep_alive_interval": 10,
"reconnect_interval": 5,
"max_attempts": 5
}).build()
# Lifecycle hooks
hub_connection.on_open(lambda: print("Connection opened"))
hub_connection.on_close(lambda: print("Connection closed"))
hub_connection.on_error(lambda data: print(f"Error: {data.error}"))
# Handle incoming messages from the server
hub_connection.on("ReceiveMessage", lambda data: print(f"Message: {data}"))
hub_connection.start()
# Send a message to the server hub method
hub_connection.send("SendMessage", ["Alice", "Hello from Python!"]) import logging
import requests
from signalrcore.hub_connection_builder import HubConnectionBuilder
def get_token():
"""Fetch a JWT from your auth endpoint."""
response = requests.post(
"https://my-app.com/api/auth/token",
json={"username": "alice", "password": "secret"},
verify=False
)
if response.status_code == 200:
return response.json()["token"]
raise requests.exceptions.ConnectionError()
hub_connection = HubConnectionBuilder()\
.with_url("wss://my-app.com/securehub", options={
"access_token_factory": get_token,
"headers": {
"mycustomheader": "mycustomheadervalue"
}
})\
.configure_logging(logging.DEBUG)\
.with_automatic_reconnect({
"type": "raw",
"keep_alive_interval": 10,
"reconnect_interval": 5,
"max_attempts": 5
}).build()
hub_connection.on_open(lambda: print("Authenticated connection ready"))
hub_connection.on("SecureData", lambda data: print(f"Data: {data}"))
hub_connection.start() from signalrcore.hub_connection_builder import HubConnectionBuilder
from signalrcore.subject import Subject
hub_connection = HubConnectionBuilder()\
.with_url("wss://my-app.com/streaminghub")\
.build()
hub_connection.start()
# --- Server-to-client streaming ---
# "Counter" streams `count` items with `delay` ms between each
hub_connection.stream(
"Counter",
[10, 500] # count=10, delay=500ms
).subscribe({
"next": lambda item: print(f" received: {item}"),
"complete": lambda: print("Stream complete!"),
"error": lambda err: print(f"Stream error: {err}")
})
# --- Client-to-server streaming ---
subject = Subject()
hub_connection.send("UploadStream", subject)
for i in range(5):
subject.next(f"chunk-{i}")
subject.complete()