Documentation Index
Fetch the complete documentation index at: https://docs.lagerdata.com/llms.txt
Use this file to discover all available pages before exploring further.
Communicate with Bluetooth Low Energy (BLE) devices for scanning, connecting, reading/writing characteristics, and subscribing to notifications.
Import
from lager.ble import Client, Central, noop_handler, notify_handler, waiter
Classes
| Class | Description |
|---|
Central | BLE central role for scanning and initiating connections |
Client | BLE client for GATT operations on a connected device |
Functions
| Function | Description |
|---|
noop_handler() | No-op notification handler |
notify_handler() | Event-based notification handler |
waiter() | Async wait helper |
Central Class
The Central class provides BLE scanning and connection initiation.
Central(loop=None)
Create a BLE central instance.
from lager.ble import Central
central = Central()
Parameters:
| Parameter | Type | Description |
|---|
loop | asyncio.EventLoop | Event loop (optional, uses default) |
scan(scan_time=5.0, name=None, address=None)
Scan for nearby BLE devices.
central = Central()
# Scan for all devices
devices = central.scan(scan_time=5.0)
for device in devices:
print(f"{device.name}: {device.address}")
# Scan for specific device name
devices = central.scan(name="MyDevice")
# Scan for specific MAC address
devices = central.scan(address="AA:BB:CC:DD:EE:FF")
Parameters:
| Parameter | Type | Description |
|---|
scan_time | float | Scan duration in seconds (default: 5.0) |
name | str | Filter by device name (optional) |
address | str | Filter by MAC address (optional) |
Returns: list - List of discovered BLE devices
connect(address)
Connect to a BLE device by address.
central = Central()
client = central.connect("AA:BB:CC:DD:EE:FF")
Parameters:
| Parameter | Type | Description |
|---|
address | str | MAC address of the device |
Returns: Client - Connected BLE client
pair(address)
Pair with a BLE device.
central = Central()
client = central.pair("AA:BB:CC:DD:EE:FF")
Client Class
The Client class provides GATT operations on a connected BLE device.
Creating a Client
from lager.ble import Client, Central
from bleak import BleakClient
import asyncio
# Method 1: Using Central
central = Central()
client = central.connect("AA:BB:CC:DD:EE:FF")
# Method 2: Direct creation with context manager
loop = asyncio.get_event_loop()
with Client(BleakClient("AA:BB:CC:DD:EE:FF"), loop=loop) as client:
# Use client
pass
connect()
Establish connection to the BLE device.
disconnect()
Disconnect from the BLE device.
pair()
Pair with the connected device.
get_services()
Discover and retrieve all GATT services.
services = client.get_services()
for service in services:
print(f"Service: {service.uuid}")
for char in service.characteristics:
print(f" Characteristic: {char.uuid}")
Returns: BleakGATTServiceCollection - Collection of discovered services
has_characteristic(uuid)
Check if a characteristic exists on the device.
if client.has_characteristic("00002a19-0000-1000-8000-00805f9b34fb"):
print("Battery level characteristic found")
Parameters:
| Parameter | Type | Description |
|---|
uuid | str | Characteristic UUID |
Returns: bool - True if characteristic exists
read_gatt_char(char_specifier)
Read a characteristic value.
# Read by UUID
data = client.read_gatt_char("00002a19-0000-1000-8000-00805f9b34fb")
print(f"Battery level: {data[0]}%")
# Read by handle
data = client.read_gatt_char(0x0012)
Parameters:
| Parameter | Type | Description |
|---|
char_specifier | str or int | UUID string or handle number |
Returns: bytearray - Characteristic value
write_gatt_char(char_specifier, data)
Write a value to a characteristic.
# Write bytes
client.write_gatt_char("characteristic-uuid", b'\x01\x02\x03')
# Write string
client.write_gatt_char("characteristic-uuid", "hello".encode('utf-8'))
Parameters:
| Parameter | Type | Description |
|---|
char_specifier | str or int | UUID string or handle number |
data | bytes | Data to write |
start_notify(char_specifier, callback=noop_handler, max_messages=None, timeout=None)
Subscribe to characteristic notifications.
from lager.ble import noop_handler
# Simple notification subscription
def my_callback(handle, data):
print(f"Received: {data.hex()}")
timed_out, messages = client.start_notify(
"characteristic-uuid",
callback=my_callback,
max_messages=10,
timeout=30.0
)
if timed_out:
print("Timed out waiting for notifications")
else:
print(f"Received {len(messages)} messages")
Parameters:
| Parameter | Type | Description |
|---|
char_specifier | str or int | Characteristic UUID or handle |
callback | callable | Function called for each notification |
max_messages | int | Stop after receiving this many messages |
timeout | float | Timeout in seconds |
Returns: tuple[bool, list] - (timed_out, messages) where timed_out is True if timeout occurred
stop_notify(char_specifier)
Unsubscribe from characteristic notifications.
client.stop_notify("characteristic-uuid")
sleep(timeout)
Sleep for a duration (async-safe).
client.sleep(1.0) # Sleep for 1 second
Helper Functions
noop_handler(handle, data)
A no-op notification handler.
from lager.ble import noop_handler
# Use when you only care about collecting messages
timed_out, messages = client.start_notify(
"uuid",
callback=noop_handler,
max_messages=5
)
notify_handler(evt, messages, callback, max_messages, handle, data)
Internal notification handler that collects messages and signals completion.
waiter(event, timeout)
Async wait helper for notification events.
Examples
Scan for Devices
from lager.ble import Central
central = Central()
# Discover all nearby BLE devices
print("Scanning for BLE devices...")
devices = central.scan(scan_time=10.0)
for device in devices:
name = device.name or "Unknown"
print(f" {name}: {device.address}")
Connect and Read Characteristic
from lager.ble import Client, Central
from bleak import BleakClient
import asyncio
# Standard BLE UUIDs
BATTERY_SERVICE = "0000180f-0000-1000-8000-00805f9b34fb"
BATTERY_LEVEL = "00002a19-0000-1000-8000-00805f9b34fb"
loop = asyncio.get_event_loop()
with Client(BleakClient("AA:BB:CC:DD:EE:FF"), loop=loop) as client:
# Check if battery service exists
if client.has_characteristic(BATTERY_LEVEL):
data = client.read_gatt_char(BATTERY_LEVEL)
print(f"Battery level: {data[0]}%")
Subscribe to Notifications
from lager.ble import Client, Central
from bleak import BleakClient
import asyncio
NOTIFY_UUID = "your-characteristic-uuid"
def handle_notification(handle, data):
print(f"Notification from {handle}: {data.hex()}")
loop = asyncio.get_event_loop()
with Client(BleakClient("AA:BB:CC:DD:EE:FF"), loop=loop) as client:
# Subscribe and wait for 10 messages or 30 seconds
timed_out, messages = client.start_notify(
NOTIFY_UUID,
callback=handle_notification,
max_messages=10,
timeout=30.0
)
if timed_out:
print(f"Timeout - received {len(messages)} messages")
else:
print(f"Received all {len(messages)} messages")
# Process collected messages
for msg in messages:
print(f" {msg.hex()}")
client.stop_notify(NOTIFY_UUID)
Write Command and Read Response
from lager.ble import Client
from bleak import BleakClient
import asyncio
WRITE_UUID = "write-characteristic-uuid"
READ_UUID = "read-characteristic-uuid"
loop = asyncio.get_event_loop()
with Client(BleakClient("AA:BB:CC:DD:EE:FF"), loop=loop) as client:
# Send command
command = b'\x01\x02\x03'
client.write_gatt_char(WRITE_UUID, command)
# Wait for processing
client.sleep(0.1)
# Read response
response = client.read_gatt_char(READ_UUID)
print(f"Response: {response.hex()}")
Device Firmware Version Check
from lager.ble import Client, Central
from bleak import BleakClient
import asyncio
# Standard Device Information Service UUIDs
DEVICE_INFO_SERVICE = "0000180a-0000-1000-8000-00805f9b34fb"
FIRMWARE_REVISION = "00002a26-0000-1000-8000-00805f9b34fb"
MANUFACTURER_NAME = "00002a29-0000-1000-8000-00805f9b34fb"
def check_device_info(address):
loop = asyncio.get_event_loop()
with Client(BleakClient(address), loop=loop) as client:
# Read manufacturer
if client.has_characteristic(MANUFACTURER_NAME):
data = client.read_gatt_char(MANUFACTURER_NAME)
print(f"Manufacturer: {data.decode('utf-8')}")
# Read firmware version
if client.has_characteristic(FIRMWARE_REVISION):
data = client.read_gatt_char(FIRMWARE_REVISION)
print(f"Firmware: {data.decode('utf-8')}")
# First scan to find device
central = Central()
devices = central.scan(name="MyDevice")
if devices:
check_device_info(devices[0].address)
BLE Production Test
from lager.ble import Client, Central
from bleak import BleakClient
import asyncio
DEVICE_NAME = "DUT_BLE"
TEST_CHAR = "test-characteristic-uuid"
def ble_production_test():
central = Central()
loop = asyncio.get_event_loop()
# Step 1: Scan for DUT
print("Scanning for DUT...")
devices = central.scan(name=DEVICE_NAME, scan_time=10.0)
if not devices:
print("FAIL: DUT not found")
return False
address = devices[0].address
print(f"Found DUT at {address}")
# Step 2: Connect and test
try:
with Client(BleakClient(address), loop=loop) as client:
# Test read
data = client.read_gatt_char(TEST_CHAR)
if len(data) == 0:
print("FAIL: Empty response")
return False
# Test write
client.write_gatt_char(TEST_CHAR, b'\x55')
client.sleep(0.1)
# Verify write
data = client.read_gatt_char(TEST_CHAR)
if data[0] != 0x55:
print("FAIL: Write verification failed")
return False
print("PASS: BLE test complete")
return True
except Exception as e:
print(f"FAIL: {e}")
return False
# Run test
ble_production_test()
Hardware Requirements
| Requirement | Description |
|---|
| BLE Hardware | Bluetooth 4.0+ adapter on Lager Box |
| Permissions | May require root/sudo for BLE operations |
Dependencies
The BLE module uses Bleak as the underlying BLE library, which provides cross-platform BLE support.
Notes
- BLE operations are synchronous wrappers around async Bleak operations
- The Client class supports context manager (
with statement) for automatic cleanup
- Notification callbacks receive
(handle, data) parameters
- Use
max_messages and timeout together to control notification collection
- MAC addresses are typically in format
AA:BB:CC:DD:EE:FF
- Some BLE operations may require pairing before they work
- Signal strength (RSSI) is available on scanned device objects