Device Patterns¶
The framework provides six base device classes for different connection patterns. Choose the one that matches your device's communication method.
Separation of Concerns¶
The device layer has one job: manage the connection and store raw hardware state. It does not know about ucapi entities, attribute keys, or the Remote. When state changes, it calls push_update() to signal subscribers.
Entities subscribe to the device via subscribe_to_device(device) and translate raw state into ucapi attributes in their sync_state() method.
Device Entity
────────────────── ──────────────────────────────
self.power = "PLAYING" → subscribe_to_device(device)
self.volume = 42 → sync_state() called automatically
self.push_update() → self.update({Attributes.STATE: ..., Attributes.VOLUME: ...})
This separation means:
- The device can be used with any combination of entity types without changes
- Entities can be tested independently by injecting a mock device
- Adding a second entity type (e.g., a Remote alongside a MediaPlayer) requires zero changes to the device
StatelessHTTPDevice¶
For devices with REST APIs where each request creates a new HTTP session.
Good for: REST APIs, simple HTTP devices
You implement:
verify_connection()— Test device is reachable- Property accessors (
identifier,name,address,log_id) - Any methods to send commands or fetch state
Framework handles:
- Connection verification
- Error handling
Example¶
from ucapi_framework import StatelessHTTPDevice
import aiohttp
class MyRESTDevice(StatelessHTTPDevice):
def __init__(self, device_config, config_manager=None):
super().__init__(device_config, config_manager=config_manager)
# Raw device state
self.power: str = "OFF"
self.volume: int = 0
@property
def identifier(self) -> str:
return self._device_config.identifier
@property
def name(self) -> str:
return self._device_config.name
@property
def address(self) -> str:
return self._device_config.host
@property
def log_id(self) -> str:
return f"Device[{self.identifier}]"
async def verify_connection(self) -> None:
"""Verify device is reachable."""
url = f"http://{self.address}/api/status"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
async def fetch_state(self) -> None:
"""Fetch current state and notify subscribers."""
url = f"http://{self.address}/api/state"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
self.power = data["power"]
self.volume = data["volume"]
self.push_update() # Notify subscribed entities
async def send_command(self, command: str, params: dict | None = None) -> None:
"""Send command to device."""
url = f"http://{self.address}/api/command"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={"command": command, **(params or {})}) as response:
response.raise_for_status()
PollingDevice¶
For devices that need periodic state checks.
Good for: Devices without push notifications, devices with changing state
You implement:
establish_connection()— Initial connection setuppoll_device()— Periodic state check; update raw state and callpush_update()- Property accessors
Framework handles:
- Polling loop with configurable interval
- Automatic reconnection on errors
- Task management and cleanup
Example¶
from ucapi_framework import PollingDevice
import aiohttp
class MyPollingDevice(PollingDevice):
def __init__(self, device_config, config_manager=None):
super().__init__(
device_config,
poll_interval=30, # Poll every 30 seconds
config_manager=config_manager,
)
self.power: str = "OFF"
self.volume: int = 0
@property
def identifier(self) -> str:
return self._device_config.identifier
@property
def name(self) -> str:
return self._device_config.name
@property
def address(self) -> str:
return self._device_config.host
@property
def log_id(self) -> str:
return f"Device[{self.identifier}]"
async def establish_connection(self) -> None:
"""Initial connection — fetch current state."""
await self._fetch_and_notify()
async def poll_device(self) -> None:
"""Called on each poll interval — update state and notify subscribers."""
await self._fetch_and_notify()
async def _fetch_and_notify(self) -> None:
url = f"http://{self.address}/api/state"
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
self.power = data["power"]
self.volume = data["volume"]
self.push_update() # Notify subscribed entities
WebSocketDevice¶
For devices with WebSocket APIs providing real-time updates.
Good for: Devices with WebSocket APIs, real-time updates
You implement:
create_websocket()— Establish WebSocket connectionclose_websocket()— Close WebSocket connectionreceive_message()— Receive a single message from the WebSockethandle_message()— Process received message; update raw state and callpush_update()- Property accessors
Framework handles:
- WebSocket lifecycle (connect, reconnect, disconnect)
- Exponential backoff on connection failures
- Ping/pong keepalive
- Message loop and error handling
Example¶
from ucapi_framework import WebSocketDevice
import websockets
import json
class MyWebSocketDevice(WebSocketDevice):
def __init__(self, device_config, config_manager=None):
super().__init__(
device_config,
reconnect=True,
ping_interval=30,
config_manager=config_manager,
)
self.power: str = "OFF"
self.volume: int = 0
self.source: str = ""
@property
def identifier(self) -> str:
return self._device_config.identifier
@property
def name(self) -> str:
return self._device_config.name
@property
def address(self) -> str:
return self._device_config.host
@property
def log_id(self) -> str:
return f"Device[{self.identifier}]"
async def create_websocket(self):
"""Establish WebSocket connection."""
uri = f"ws://{self.address}/ws"
return await websockets.connect(uri)
async def close_websocket(self) -> None:
"""Close WebSocket connection."""
if self._ws:
await self._ws.close()
async def receive_message(self):
"""Receive a message."""
return await self._ws.recv()
async def handle_message(self, message: str) -> None:
"""
Process a received message.
Update raw device state, then call push_update() to notify all
subscribed entities. Entities will call sync_state() to read the
new values and push updated attributes to the Remote.
"""
data = json.loads(message)
self.power = data.get("power", self.power)
self.volume = data.get("volume", self.volume)
self.source = data.get("source", self.source)
self.push_update()
async def establish_connection(self) -> None:
"""Called after WebSocket is connected — fetch initial state."""
# Optionally fetch full state before first push
self.push_update()
WebSocketPollingDevice¶
Hybrid device combining WebSocket for real-time updates with polling as a fallback.
Good for: Smart TVs, media players with WebSocket that may disconnect
You implement: Same as WebSocketDevice + PollingDevice
Framework handles:
- Runs both WebSocket and polling concurrently
- Continues polling if WebSocket fails
- Graceful degradation
Example¶
from ucapi_framework import WebSocketPollingDevice
class MyHybridDevice(WebSocketPollingDevice):
def __init__(self, device_config, config_manager=None):
super().__init__(
device_config,
poll_interval=30,
ping_interval=30,
keep_polling_on_disconnect=True,
config_manager=config_manager,
)
self.power: str = "OFF"
self.volume: int = 0
# Implement WebSocket methods (same as WebSocketDevice)
async def create_websocket(self): ...
async def close_websocket(self): ...
async def receive_message(self): ...
async def handle_message(self, message: str) -> None:
...
self.push_update()
# Implement Polling methods (same as PollingDevice)
async def establish_connection(self): ...
async def poll_device(self) -> None:
...
self.push_update()
PersistentConnectionDevice¶
For devices with persistent TCP connections or custom protocols.
Good for: Proprietary protocols, TCP connections, persistent sessions
You implement:
establish_connection()— Create persistent connectionclose_connection()— Close connectionmaintain_connection()— Keep connection alive (blocking receive loop)- Property accessors
Framework handles:
- Connection loop with automatic reconnection
- Exponential backoff on failures
- Task management
Example¶
from ucapi_framework import PersistentConnectionDevice
import asyncio
class MyTCPDevice(PersistentConnectionDevice):
def __init__(self, device_config, config_manager=None):
super().__init__(device_config, config_manager=config_manager)
self.power: str = "OFF"
@property
def identifier(self) -> str:
return self._device_config.identifier
@property
def name(self) -> str:
return self._device_config.name
@property
def address(self) -> str:
return self._device_config.host
@property
def log_id(self) -> str:
return f"Device[{self.identifier}]"
async def establish_connection(self):
"""Establish TCP connection."""
reader, writer = await asyncio.open_connection(self.address, 8080)
return {"reader": reader, "writer": writer}
async def close_connection(self) -> None:
"""Close TCP connection."""
if self._connection:
self._connection["writer"].close()
await self._connection["writer"].wait_closed()
async def maintain_connection(self) -> None:
"""Receive loop — called by framework, should block until disconnected."""
reader = self._connection["reader"]
while True:
data = await reader.readline()
if not data:
break
message = data.decode().strip()
self.power = message # Parse appropriately
self.push_update()
ExternalClientDevice¶
For devices using external client libraries that manage their own connections.
Good for: Z-Wave JS, Home Assistant WebSocket, MQTT clients, third-party APIs
You implement:
create_client()— Create the external client instanceconnect_client()— Connect and set up event handlersdisconnect_client()— Disconnect and remove event handlerscheck_client_connected()— Query actual client connection state- Property accessors
Framework handles:
- Watchdog polling to detect silent disconnections
- Automatic reconnection with configurable retries
- Early exit if client is already connected
- Task management and cleanup
Example¶
from ucapi_framework import ExternalClientDevice
class MyExternalDevice(ExternalClientDevice):
def __init__(self, device_config, config_manager=None):
super().__init__(
device_config,
enable_watchdog=True,
watchdog_interval=30,
reconnect_delay=5,
max_reconnect_attempts=3,
config_manager=config_manager,
)
self.power: str = "OFF"
self.volume: int = 0
@property
def identifier(self) -> str:
return self._device_config.identifier
@property
def name(self) -> str:
return self._device_config.name
@property
def address(self) -> str:
return self._device_config.host
@property
def log_id(self) -> str:
return f"Device[{self.identifier}]"
async def create_client(self):
from some_library import Client
return Client(self.address)
async def connect_client(self) -> None:
await self._client.connect()
self._client.on("state_changed", self._on_state_changed)
async def disconnect_client(self) -> None:
self._client.off("state_changed", self._on_state_changed)
await self._client.disconnect()
def check_client_connected(self) -> bool:
return self._client is not None and self._client.connected
def _on_state_changed(self, data: dict) -> None:
"""Handle state changes from the client library."""
self.power = data.get("power", self.power)
self.volume = data.get("volume", self.volume)
self.push_update() # Notify subscribed entities
Choosing a Pattern¶
| Pattern | Use Case | Complexity |
|---|---|---|
| StatelessHTTPDevice | REST APIs, no real-time updates | ⭐ Simple |
| PollingDevice | Need periodic state checks | ⭐⭐ Moderate |
| WebSocketDevice | WebSocket APIs, real-time | ⭐⭐⭐ Complex |
| WebSocketPollingDevice | Hybrid with fallback | ⭐⭐⭐⭐ Advanced |
| ExternalClientDevice | Third-party client libraries | ⭐⭐⭐ Moderate |
| PersistentConnectionDevice | Custom protocols, TCP | ⭐⭐⭐⭐ Advanced |
See the API Reference for complete documentation.