Python SignalR Client

Real-time .NET backend,
Python frontend.

A full-featured Python client for ASP.NET Core SignalR. Connect with WebSockets, Server-Sent Events, or Long Polling — with JWT auth, MessagePack, and automatic reconnection.

pip install signalrcore

Everything you need

A complete implementation of the SignalR protocol for Python

Multiple Transports

WebSockets, Server-Sent Events, and Long Polling all supported. Automatic negotiation picks the best available transport.

Authentication

Token factory pattern for seamless JWT-based auth. Compatible with any identity provider — pass a callable that returns your token.

Auto Reconnect

Configurable reconnection strategies with raw intervals or exponential backoff. Max attempts and keep-alive intervals keep your app resilient.

Server Streaming

Subscribe to real-time data streams from the server using a clean observer pattern. Client-to-server streaming also supported via Subject.

Binary & Text Encoding

MessagePack binary encoding alongside standard JSON. Choose the right format for performance or compatibility.

Azure Compatible

Works with Azure SignalR Service and serverless functions. Deploy to any Azure-hosted environment without extra configuration.

Quick start

Get connected in minutes

import logging
from signalrcore.hub_connection_builder import HubConnectionBuilder

hub_connection = HubConnectionBuilder()\
    .with_url("wss://my-app.com/chathub")\
    .configure_logging(logging.DEBUG)\
    .with_automatic_reconnect({
        "type": "raw",
        "keep_alive_interval": 10,
        "reconnect_interval": 5,
        "max_attempts": 5
    }).build()

# Lifecycle hooks
hub_connection.on_open(lambda: print("Connection opened"))
hub_connection.on_close(lambda: print("Connection closed"))
hub_connection.on_error(lambda data: print(f"Error: {data.error}"))

# Handle incoming messages from the server
hub_connection.on("ReceiveMessage", lambda data: print(f"Message: {data}"))

hub_connection.start()

# Send a message to the server hub method
hub_connection.send("SendMessage", ["Alice", "Hello from Python!"])
import logging
import requests
from signalrcore.hub_connection_builder import HubConnectionBuilder

def get_token():
    """Fetch a JWT from your auth endpoint."""
    response = requests.post(
        "https://my-app.com/api/auth/token",
        json={"username": "alice", "password": "secret"},
        verify=False
    )
    if response.status_code == 200:
        return response.json()["token"]
    raise requests.exceptions.ConnectionError()

hub_connection = HubConnectionBuilder()\
    .with_url("wss://my-app.com/securehub", options={
        "access_token_factory": get_token,
        "headers": {
            "mycustomheader": "mycustomheadervalue"
        }
    })\
    .configure_logging(logging.DEBUG)\
    .with_automatic_reconnect({
        "type": "raw",
        "keep_alive_interval": 10,
        "reconnect_interval": 5,
        "max_attempts": 5
    }).build()

hub_connection.on_open(lambda: print("Authenticated connection ready"))
hub_connection.on("SecureData", lambda data: print(f"Data: {data}"))

hub_connection.start()
from signalrcore.hub_connection_builder import HubConnectionBuilder
from signalrcore.subject import Subject

hub_connection = HubConnectionBuilder()\
    .with_url("wss://my-app.com/streaminghub")\
    .build()

hub_connection.start()

# --- Server-to-client streaming ---
# "Counter" streams `count` items with `delay` ms between each
hub_connection.stream(
    "Counter",
    [10, 500]   # count=10, delay=500ms
).subscribe({
    "next":     lambda item: print(f"  received: {item}"),
    "complete": lambda: print("Stream complete!"),
    "error":    lambda err: print(f"Stream error: {err}")
})

# --- Client-to-server streaming ---
subject = Subject()
hub_connection.send("UploadStream", subject)

for i in range(5):
    subject.next(f"chunk-{i}")

subject.complete()