Skip to main content
Communicate with Bluetooth Low Energy (BLE) devices for scanning, connecting, reading/writing characteristics, and subscribing to notifications.

Import

from lager.ble import Client, Central, do_nothing, notify_handler, waiter

Classes

ClassDescription
CentralBLE central role for scanning and initiating connections
ClientBLE client for GATT operations on a connected device

Functions

FunctionDescription
do_nothing()No-op notification handler
notify_handler()Event-based notification handler
waiter()Async wait helper

Central Class

The Central class provides BLE scanning and connection initiation.

Central(loop=None)

Create a BLE central instance.
from lager.ble import Central

central = Central()
Parameters:
ParameterTypeDescription
loopasyncio.EventLoopEvent loop (optional, uses default)

scan(scan_time=5.0, name=None, address=None)

Scan for nearby BLE devices.
central = Central()

# Scan for all devices
devices = central.scan(scan_time=5.0)
for device in devices:
    print(f"{device.name}: {device.address}")

# Scan for specific device name
devices = central.scan(name="MyDevice")

# Scan for specific MAC address
devices = central.scan(address="AA:BB:CC:DD:EE:FF")
Parameters:
ParameterTypeDescription
scan_timefloatScan duration in seconds (default: 5.0)
namestrFilter by device name (optional)
addressstrFilter by MAC address (optional)
Returns: list - List of discovered BLE devices

connect(address)

Connect to a BLE device by address.
central = Central()
client = central.connect("AA:BB:CC:DD:EE:FF")
Parameters:
ParameterTypeDescription
addressstrMAC address of the device
Returns: Client - Connected BLE client

pair(address)

Pair with a BLE device.
central = Central()
client = central.pair("AA:BB:CC:DD:EE:FF")

Client Class

The Client class provides GATT operations on a connected BLE device.

Creating a Client

from lager.ble import Client, Central
from bleak import BleakClient
import asyncio

# Method 1: Using Central
central = Central()
client = central.connect("AA:BB:CC:DD:EE:FF")

# Method 2: Direct creation with context manager
loop = asyncio.get_event_loop()
with Client(BleakClient("AA:BB:CC:DD:EE:FF"), loop=loop) as client:
    # Use client
    pass

connect()

Establish connection to the BLE device.
client.connect()

disconnect()

Disconnect from the BLE device.
client.disconnect()

pair()

Pair with the connected device.
client.pair()

get_services()

Discover and retrieve all GATT services.
services = client.get_services()
for service in services:
    print(f"Service: {service.uuid}")
    for char in service.characteristics:
        print(f"  Characteristic: {char.uuid}")
Returns: BleakGATTServiceCollection - Collection of discovered services

has_characteristic(uuid)

Check if a characteristic exists on the device.
if client.has_characteristic("00002a19-0000-1000-8000-00805f9b34fb"):
    print("Battery level characteristic found")
Parameters:
ParameterTypeDescription
uuidstrCharacteristic UUID
Returns: bool - True if characteristic exists

read_gatt_char(char_specifier)

Read a characteristic value.
# Read by UUID
data = client.read_gatt_char("00002a19-0000-1000-8000-00805f9b34fb")
print(f"Battery level: {data[0]}%")

# Read by handle
data = client.read_gatt_char(0x0012)
Parameters:
ParameterTypeDescription
char_specifierstr or intUUID string or handle number
Returns: bytearray - Characteristic value

write_gatt_char(char_specifier, data)

Write a value to a characteristic.
# Write bytes
client.write_gatt_char("characteristic-uuid", b'\x01\x02\x03')

# Write string
client.write_gatt_char("characteristic-uuid", "hello".encode('utf-8'))
Parameters:
ParameterTypeDescription
char_specifierstr or intUUID string or handle number
databytesData to write

start_notify(char_specifier, callback=do_nothing, max_messages=None, timeout=None)

Subscribe to characteristic notifications.
from lager.ble import do_nothing

# Simple notification subscription
def my_callback(handle, data):
    print(f"Received: {data.hex()}")

timed_out, messages = client.start_notify(
    "characteristic-uuid",
    callback=my_callback,
    max_messages=10,
    timeout=30.0
)

if timed_out:
    print("Timed out waiting for notifications")
else:
    print(f"Received {len(messages)} messages")
Parameters:
ParameterTypeDescription
char_specifierstr or intCharacteristic UUID or handle
callbackcallableFunction called for each notification
max_messagesintStop after receiving this many messages
timeoutfloatTimeout in seconds
Returns: tuple[bool, list] - (timed_out, messages) where timed_out is True if timeout occurred

stop_notify(char_specifier)

Unsubscribe from characteristic notifications.
client.stop_notify("characteristic-uuid")

sleep(timeout)

Sleep for a duration (async-safe).
client.sleep(1.0)  # Sleep for 1 second

Helper Functions

do_nothing(handle, data)

A no-op notification handler.
from lager.ble import do_nothing

# Use when you only care about collecting messages
timed_out, messages = client.start_notify(
    "uuid",
    callback=do_nothing,
    max_messages=5
)

notify_handler(evt, messages, callback, max_messages, handle, data)

Internal notification handler that collects messages and signals completion.

waiter(event, timeout)

Async wait helper for notification events.

Examples

Scan for Devices

from lager.ble import Central

central = Central()

# Discover all nearby BLE devices
print("Scanning for BLE devices...")
devices = central.scan(scan_time=10.0)

for device in devices:
    name = device.name or "Unknown"
    print(f"  {name}: {device.address}")

Connect and Read Characteristic

from lager.ble import Client, Central
from bleak import BleakClient
import asyncio

# Standard BLE UUIDs
BATTERY_SERVICE = "0000180f-0000-1000-8000-00805f9b34fb"
BATTERY_LEVEL = "00002a19-0000-1000-8000-00805f9b34fb"

loop = asyncio.get_event_loop()

with Client(BleakClient("AA:BB:CC:DD:EE:FF"), loop=loop) as client:
    # Check if battery service exists
    if client.has_characteristic(BATTERY_LEVEL):
        data = client.read_gatt_char(BATTERY_LEVEL)
        print(f"Battery level: {data[0]}%")

Subscribe to Notifications

from lager.ble import Client, Central
from bleak import BleakClient
import asyncio

NOTIFY_UUID = "your-characteristic-uuid"

def handle_notification(handle, data):
    print(f"Notification from {handle}: {data.hex()}")

loop = asyncio.get_event_loop()

with Client(BleakClient("AA:BB:CC:DD:EE:FF"), loop=loop) as client:
    # Subscribe and wait for 10 messages or 30 seconds
    timed_out, messages = client.start_notify(
        NOTIFY_UUID,
        callback=handle_notification,
        max_messages=10,
        timeout=30.0
    )

    if timed_out:
        print(f"Timeout - received {len(messages)} messages")
    else:
        print(f"Received all {len(messages)} messages")

    # Process collected messages
    for msg in messages:
        print(f"  {msg.hex()}")

    client.stop_notify(NOTIFY_UUID)

Write Command and Read Response

from lager.ble import Client
from bleak import BleakClient
import asyncio

WRITE_UUID = "write-characteristic-uuid"
READ_UUID = "read-characteristic-uuid"

loop = asyncio.get_event_loop()

with Client(BleakClient("AA:BB:CC:DD:EE:FF"), loop=loop) as client:
    # Send command
    command = b'\x01\x02\x03'
    client.write_gatt_char(WRITE_UUID, command)

    # Wait for processing
    client.sleep(0.1)

    # Read response
    response = client.read_gatt_char(READ_UUID)
    print(f"Response: {response.hex()}")

Device Firmware Version Check

from lager.ble import Client, Central
from bleak import BleakClient
import asyncio

# Standard Device Information Service UUIDs
DEVICE_INFO_SERVICE = "0000180a-0000-1000-8000-00805f9b34fb"
FIRMWARE_REVISION = "00002a26-0000-1000-8000-00805f9b34fb"
MANUFACTURER_NAME = "00002a29-0000-1000-8000-00805f9b34fb"

def check_device_info(address):
    loop = asyncio.get_event_loop()

    with Client(BleakClient(address), loop=loop) as client:
        # Read manufacturer
        if client.has_characteristic(MANUFACTURER_NAME):
            data = client.read_gatt_char(MANUFACTURER_NAME)
            print(f"Manufacturer: {data.decode('utf-8')}")

        # Read firmware version
        if client.has_characteristic(FIRMWARE_REVISION):
            data = client.read_gatt_char(FIRMWARE_REVISION)
            print(f"Firmware: {data.decode('utf-8')}")

# First scan to find device
central = Central()
devices = central.scan(name="MyDevice")
if devices:
    check_device_info(devices[0].address)

BLE Production Test

from lager.ble import Client, Central
from bleak import BleakClient
import asyncio

DEVICE_NAME = "DUT_BLE"
TEST_CHAR = "test-characteristic-uuid"

def ble_production_test():
    central = Central()
    loop = asyncio.get_event_loop()

    # Step 1: Scan for DUT
    print("Scanning for DUT...")
    devices = central.scan(name=DEVICE_NAME, scan_time=10.0)

    if not devices:
        print("FAIL: DUT not found")
        return False

    address = devices[0].address
    print(f"Found DUT at {address}")

    # Step 2: Connect and test
    try:
        with Client(BleakClient(address), loop=loop) as client:
            # Test read
            data = client.read_gatt_char(TEST_CHAR)
            if len(data) == 0:
                print("FAIL: Empty response")
                return False

            # Test write
            client.write_gatt_char(TEST_CHAR, b'\x55')
            client.sleep(0.1)

            # Verify write
            data = client.read_gatt_char(TEST_CHAR)
            if data[0] != 0x55:
                print("FAIL: Write verification failed")
                return False

            print("PASS: BLE test complete")
            return True

    except Exception as e:
        print(f"FAIL: {e}")
        return False

# Run test
ble_production_test()

Hardware Requirements

RequirementDescription
BLE HardwareBluetooth 4.0+ adapter on gateway
PermissionsMay require root/sudo for BLE operations

Dependencies

The BLE module uses Bleak as the underlying BLE library, which provides cross-platform BLE support.

Notes

  • BLE operations are synchronous wrappers around async Bleak operations
  • The Client class supports context manager (with statement) for automatic cleanup
  • Notification callbacks receive (handle, data) parameters
  • Use max_messages and timeout together to control notification collection
  • MAC addresses are typically in format AA:BB:CC:DD:EE:FF
  • Some BLE operations may require pairing before they work
  • Signal strength (RSSI) is available on scanned device objects