Skip to main content
Stream video from webcams attached to the Lager Box for visual inspection, automated vision testing, and remote monitoring.

Import

from lager.webcam import (
    start_stream,
    stop_stream,
    get_stream_info,
    get_active_streams,
    rename_stream,
    WebcamService
)

# Or import directly from service module
from lager.webcam.service import WebcamStreamState  # For zoom control

Functions

FunctionDescription
start_stream()Start a webcam stream
stop_stream()Stop a webcam stream
get_stream_info()Get info about an active stream
get_active_streams()List all active stream names
rename_stream()Rename an active stream
WebcamService.get_stream_url()Get just the URL for a stream

Function Reference

start_stream(net_name, video_device, dut_ip)

Start a webcam video stream.
from lager.webcam import start_stream

result = start_stream(
    net_name='camera1',
    video_device='/dev/video0',
    dut_ip='100.91.127.26'
)

print(f"Stream URL: {result['url']}")
print(f"Port: {result['port']}")
Parameters:
ParameterTypeDescription
net_namestrUnique name for this stream
video_devicestrVideo device path (e.g., /dev/video0)
dut_ipstrLager Box IP address for URL generation
Returns: dict with keys:
  • url - Full stream URL (e.g., http://100.91.127.26:8081/)
  • port - Port number for the stream
  • already_running - Boolean indicating if stream was already active
Raises: RuntimeError if device is already in use or not found

stop_stream(net_name)

Stop an active webcam stream.
from lager.webcam import stop_stream

stopped = stop_stream('camera1')
if stopped:
    print("Stream stopped")
else:
    print("Stream was not running")
Parameters:
ParameterTypeDescription
net_namestrName of the stream to stop
Returns: bool - True if stopped successfully, False if not running

get_stream_info(net_name, dut_ip)

Get information about an active stream.
from lager.webcam import get_stream_info

info = get_stream_info('camera1', '100.91.127.26')
if info:
    print(f"URL: {info['url']}")
    print(f"Port: {info['port']}")
    print(f"Device: {info['video_device']}")
else:
    print("Stream not active")
Parameters:
ParameterTypeDescription
net_namestrName of the stream
dut_ipstrLager Box IP address
Returns: dict or None - Stream info dict or None if not running

get_active_streams()

Get a list of all active stream names.
from lager.webcam import get_active_streams

streams = get_active_streams()
print(f"Active streams: {streams}")
# ['camera1', 'camera2', 'microscope']
Returns: list[str] - List of active stream names

rename_stream(old_name, new_name)

Rename an active stream.
from lager.webcam import rename_stream

success = rename_stream('camera1', 'main_camera')
if success:
    print("Stream renamed")
else:
    print("Stream not found")
Parameters:
ParameterTypeDescription
old_namestrCurrent stream name
new_namestrNew stream name
Returns: bool - True if renamed, False if stream not found

WebcamService Class

For more control, use the WebcamService class directly.

WebcamService()

Create a webcam service instance.
from lager.webcam import WebcamService

service = WebcamService()

start_stream(net_name, video_device, dut_ip)

Start a stream (same as module function).
service = WebcamService()
result = service.start_stream('camera1', '/dev/video0', '100.91.127.26')

stop_stream(net_name)

Stop a stream (same as module function).
service = WebcamService()
service.stop_stream('camera1')

get_stream_url(net_name, dut_ip)

Get just the URL for a stream.
service = WebcamService()
url = service.get_stream_url('camera1', '100.91.127.26')
if url:
    print(f"Stream at: {url}")

cleanup_dead_streams()

Remove state for streams whose processes have died.
service = WebcamService()
service.cleanup_dead_streams()

Stream State Management

The webcam service manages stream state in /etc/lager/webcam_streams.json.

WebcamStreamState

Internal class for state management.
from lager.webcam.service import WebcamStreamState

state = WebcamStreamState()

# Get zoom level for a stream
zoom = state.get_zoom('camera1')

# Set zoom level (1.0 to 4.0)
state.set_zoom('camera1', 2.0)

# Get all stream info
all_streams = state.get_all_streams()

Stream Features

Zoom Control

Streams support digital zoom from 1.0x to 4.0x:
from lager.webcam.service import WebcamStreamState

state = WebcamStreamState()

# Get current zoom
zoom = state.get_zoom('camera1')
print(f"Current zoom: {zoom}x")

# Set zoom
state.set_zoom('camera1', 2.5)  # 2.5x zoom
Zoom can also be controlled via the web interface API:
  • POST /api/zoom/in - Increase zoom by 0.25x
  • POST /api/zoom/out - Decrease zoom by 0.25x
  • POST /api/zoom/reset - Reset to 1.0x
  • GET /api/zoom - Get current zoom level

FPS Monitoring

The stream reports real-time FPS:
  • GET /api/fps - Get current FPS

Examples

Start Multiple Cameras

from lager.webcam.service import start_stream, get_active_streams

# Lager Box IP
DUT_IP = '100.91.127.26'

# Start multiple camera streams
cameras = [
    ('overview', '/dev/video0'),
    ('microscope', '/dev/video2'),
    ('solder_station', '/dev/video4'),
]

for name, device in cameras:
    try:
        result = start_stream(name, device, DUT_IP)
        print(f"{name}: {result['url']}")
    except RuntimeError as e:
        print(f"{name}: Failed - {e}")

# List active streams
print(f"\nActive: {get_active_streams()}")

Stream Management Script

from lager.webcam.service import (
    start_stream, stop_stream, get_stream_info, get_active_streams
)

DUT_IP = '100.91.127.26'

def list_streams():
    """List all active streams."""
    streams = get_active_streams()
    if not streams:
        print("No active streams")
        return

    for name in streams:
        info = get_stream_info(name, DUT_IP)
        if info:
            print(f"  {name}: {info['url']} ({info['video_device']})")

def start_camera(name, device):
    """Start a camera stream."""
    try:
        result = start_stream(name, device, DUT_IP)
        if result['already_running']:
            print(f"Stream '{name}' was already running at {result['url']}")
        else:
            print(f"Started '{name}' at {result['url']}")
    except RuntimeError as e:
        print(f"Failed to start '{name}': {e}")

def stop_camera(name):
    """Stop a camera stream."""
    if stop_stream(name):
        print(f"Stopped '{name}'")
    else:
        print(f"Stream '{name}' was not running")

def stop_all():
    """Stop all camera streams."""
    for name in get_active_streams():
        stop_stream(name)
        print(f"Stopped '{name}'")

# Usage
list_streams()
start_camera('main', '/dev/video0')
list_streams()
stop_all()

Visual Inspection Test

from lager.webcam.service import start_stream, stop_stream, get_stream_info
import time

DUT_IP = '100.91.127.26'

def visual_inspection_test(camera_name, video_device, inspection_url_callback):
    """
    Start camera stream and wait for operator inspection.

    Args:
        camera_name: Stream name
        video_device: Video device path
        inspection_url_callback: Function to handle the stream URL

    Returns:
        bool: True if inspection passed
    """
    # Start stream
    result = start_stream(camera_name, video_device, DUT_IP)
    stream_url = result['url']

    print(f"Visual inspection stream: {stream_url}")

    # Notify external system (could open browser, send to UI, etc.)
    inspection_url_callback(stream_url)

    # Wait for inspection (in real usage, this would wait for operator input)
    print("Waiting for visual inspection...")
    time.sleep(10)  # Placeholder

    # Stop stream
    stop_stream(camera_name)

    # Return result (would come from operator in real usage)
    return True

# Usage
def handle_url(url):
    print(f"Open in browser: {url}")

result = visual_inspection_test('inspection_cam', '/dev/video0', handle_url)
print(f"Inspection result: {'PASS' if result else 'FAIL'}")

Camera Discovery

import os
from lager.webcam.service import start_stream, stop_stream

DUT_IP = '100.91.127.26'

def discover_cameras():
    """Find all available video devices."""
    cameras = []
    for i in range(10):  # Check video0 through video9
        device = f'/dev/video{i}'
        if os.path.exists(device):
            cameras.append(device)
    return cameras

def test_camera(device):
    """Test if a camera device works."""
    name = f"test_{device.replace('/', '_')}"
    try:
        result = start_stream(name, device, DUT_IP)
        print(f"{device}: OK - {result['url']}")
        stop_stream(name)
        return True
    except RuntimeError as e:
        print(f"{device}: FAIL - {e}")
        return False

# Discover and test all cameras
print("Discovering cameras...")
devices = discover_cameras()
print(f"Found {len(devices)} video devices")

for device in devices:
    test_camera(device)

Web Interface

Each stream provides a web interface at its URL with:
  • Live MJPEG video stream
  • Zoom controls (+, -, Reset)
  • FPS display
  • Sidebar with links to other active streams

API Endpoints

EndpointMethodDescription
/GETHTML page with video viewer
/streamGETRaw MJPEG video stream
/api/zoomGETGet current zoom level
/api/zoom/inPOSTIncrease zoom
/api/zoom/outPOSTDecrease zoom
/api/zoom/resetPOSTReset zoom to 1.0x
/api/fpsGETGet current FPS
/api/streamsGETList all active streams
/testGETHealth check endpoint

Hardware Requirements

RequirementDescription
USB WebcamsUVC-compatible cameras
Video Devices/dev/video* device files
OpenCVRequired for video capture

Notes

  • Streams run on ports starting from 8081
  • Each stream uses a separate port, automatically allocated
  • Streams persist until explicitly stopped or the process dies
  • Dead stream processes are automatically cleaned up
  • Only one stream can use a video device at a time
  • Default resolution is 640x480 at 30 FPS
  • JPEG quality is set to 80 for bandwidth/quality balance
  • Streams are accessible via HTTP from any network the Lager Box is on
  • Zoom is digital (crop and scale), not optical