Skip to main content
Configure and control CAN bus interfaces for automotive and industrial communication testing.

Import

from lager.core import can_up, can_down

Overview

The CAN (Controller Area Network) module provides functions to configure CAN interfaces on the Lager Box for communication with automotive ECUs, industrial controllers, and other CAN-enabled devices.

Functions

FunctionDescription
can_up()Bring up CAN interface(s) at specified bitrate
can_down()Bring down CAN interface(s)

Function Reference

can_up(interfaces, bitrate)

Bring up one or more CAN interfaces with the specified bitrate.
from lager.core import can_up

# Bring up single interface
can_up(0, 500000)  # CAN0 at 500kbps

# Bring up multiple interfaces
can_up([0, 1], 250000)  # CAN0 and CAN1 at 250kbps
Parameters:
ParameterTypeDescription
interfacesint or listInterface number(s) (0, 1, etc.)
bitrateintCAN bitrate in bits per second
Returns: True on success Common Bitrates:
BitrateUse Case
125000Low-speed CAN, truck/trailer
250000J1939, heavy-duty vehicles
500000High-speed CAN, passenger vehicles
1000000CAN FD, high-performance

can_down(interfaces)

Bring down one or more CAN interfaces.
from lager.core import can_down

# Bring down single interface
can_down(0)

# Bring down multiple interfaces
can_down([0, 1])
Parameters:
ParameterTypeDescription
interfacesint or listInterface number(s) to bring down
Returns: True on success

Examples

Basic CAN Setup

from lager.core import can_up, can_down

# Configure CAN at 500kbps
can_up(0, 500000)
print("CAN0 configured at 500kbps")

# ... perform CAN operations ...

# Clean up
can_down(0)
print("CAN0 disabled")

Using python-can Library

After bringing up the interface, use the python-can library for communication:
from lager.core import can_up, can_down
import can

# Configure CAN interface
can_up(0, 500000)

try:
    # Create CAN bus instance
    bus = can.interface.Bus(channel='can0', bustype='socketcan')

    # Send a message
    msg = can.Message(
        arbitration_id=0x123,
        data=[0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88],
        is_extended_id=False
    )
    bus.send(msg)
    print(f"Sent: {msg}")

    # Receive messages
    print("Waiting for messages...")
    for _ in range(10):
        msg = bus.recv(timeout=1.0)
        if msg:
            print(f"Received: ID={msg.arbitration_id:03X} Data={msg.data.hex()}")

    bus.shutdown()

finally:
    can_down(0)

CAN Message Monitor

from lager.core import can_up, can_down
import can
import time

def monitor_can_traffic(interface=0, bitrate=500000, duration=30):
    """Monitor CAN bus traffic for a specified duration."""
    can_up(interface, bitrate)

    message_counts = {}

    try:
        bus = can.interface.Bus(channel=f'can{interface}', bustype='socketcan')
        start_time = time.time()

        print(f"Monitoring CAN{interface} at {bitrate}bps for {duration}s...")
        print("-" * 60)

        while time.time() - start_time < duration:
            msg = bus.recv(timeout=0.1)
            if msg:
                msg_id = msg.arbitration_id
                message_counts[msg_id] = message_counts.get(msg_id, 0) + 1
                print(f"[{time.time()-start_time:6.2f}s] "
                      f"ID: 0x{msg_id:03X}  Data: {msg.data.hex()}  "
                      f"DLC: {msg.dlc}")

        bus.shutdown()

    finally:
        can_down(interface)

    # Print summary
    print("\n" + "=" * 60)
    print("MESSAGE SUMMARY")
    print("=" * 60)
    for msg_id in sorted(message_counts.keys()):
        print(f"  0x{msg_id:03X}: {message_counts[msg_id]} messages")

    return message_counts

# Run monitor
monitor_can_traffic(interface=0, bitrate=500000, duration=10)

ECU Communication Test

from lager.core import can_up, can_down
import can
import time

class ECUTester:
    """Simple ECU communication tester."""

    def __init__(self, interface=0, bitrate=500000):
        self.interface = interface
        self.bitrate = bitrate
        self.bus = None

    def __enter__(self):
        can_up(self.interface, self.bitrate)
        self.bus = can.interface.Bus(
            channel=f'can{self.interface}',
            bustype='socketcan'
        )
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.bus:
            self.bus.shutdown()
        can_down(self.interface)

    def send_request(self, request_id, response_id, data, timeout=1.0):
        """Send request and wait for response."""
        # Send request
        msg = can.Message(
            arbitration_id=request_id,
            data=data,
            is_extended_id=False
        )
        self.bus.send(msg)

        # Wait for response
        start = time.time()
        while time.time() - start < timeout:
            response = self.bus.recv(timeout=0.1)
            if response and response.arbitration_id == response_id:
                return response.data
        return None

    def read_pid(self, pid, mode=0x01):
        """Read OBD-II PID."""
        # OBD-II request format: [length, mode, PID, ...]
        request_data = [0x02, mode, pid, 0x00, 0x00, 0x00, 0x00, 0x00]

        # Standard OBD-II IDs
        REQUEST_ID = 0x7DF  # Broadcast request
        RESPONSE_ID = 0x7E8  # First ECU response

        response = self.send_request(REQUEST_ID, RESPONSE_ID, request_data)
        if response:
            return list(response)
        return None

# Usage
with ECUTester(interface=0, bitrate=500000) as ecu:
    # Read engine RPM (PID 0x0C)
    response = ecu.read_pid(0x0C)
    if response and len(response) >= 4:
        rpm = ((response[3] * 256) + response[4]) / 4
        print(f"Engine RPM: {rpm}")

    # Read vehicle speed (PID 0x0D)
    response = ecu.read_pid(0x0D)
    if response and len(response) >= 3:
        speed = response[3]
        print(f"Vehicle Speed: {speed} km/h")

J1939 Communication

from lager.core import can_up, can_down
import can
import struct

def decode_j1939_id(can_id):
    """Decode J1939 29-bit identifier."""
    priority = (can_id >> 26) & 0x07
    reserved = (can_id >> 25) & 0x01
    data_page = (can_id >> 24) & 0x01
    pdu_format = (can_id >> 16) & 0xFF
    pdu_specific = (can_id >> 8) & 0xFF
    source_address = can_id & 0xFF

    # Determine PGN
    if pdu_format < 240:
        pgn = (data_page << 16) | (pdu_format << 8)
        destination = pdu_specific
    else:
        pgn = (data_page << 16) | (pdu_format << 8) | pdu_specific
        destination = 0xFF  # Broadcast

    return {
        'priority': priority,
        'pgn': pgn,
        'source': source_address,
        'destination': destination
    }

def monitor_j1939(interface=0, duration=30):
    """Monitor J1939 traffic."""
    can_up(interface, 250000)  # J1939 uses 250kbps

    try:
        bus = can.interface.Bus(
            channel=f'can{interface}',
            bustype='socketcan'
        )

        print(f"Monitoring J1939 traffic for {duration}s...")
        start = time.time()

        while time.time() - start < duration:
            msg = bus.recv(timeout=0.1)
            if msg and msg.is_extended_id:
                info = decode_j1939_id(msg.arbitration_id)
                print(f"PGN: {info['pgn']:05X}  "
                      f"Src: {info['source']:02X}  "
                      f"Pri: {info['priority']}  "
                      f"Data: {msg.data.hex()}")

        bus.shutdown()

    finally:
        can_down(interface)

monitor_j1939(interface=0, duration=10)

Production CAN Test

from lager.core import can_up, can_down
from lager.factory import Step, Failure, run
import can
import time

class CANLoopbackTest(Step):
    """Test CAN loopback (requires two CAN interfaces connected)."""

    def run(self):
        self.update_heading("CAN Loopback Test")

        # Configure both CAN interfaces
        can_up([0, 1], 500000)

        try:
            bus0 = can.interface.Bus(channel='can0', bustype='socketcan')
            bus1 = can.interface.Bus(channel='can1', bustype='socketcan')

            # Test data
            test_messages = [
                (0x100, [0x01, 0x02, 0x03, 0x04]),
                (0x200, [0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]),
                (0x7FF, [0x55] * 8),
            ]

            for msg_id, data in test_messages:
                # Send from CAN0
                tx_msg = can.Message(arbitration_id=msg_id, data=data)
                bus0.send(tx_msg)

                # Receive on CAN1
                rx_msg = bus1.recv(timeout=1.0)

                if not rx_msg:
                    raise Failure(f"No response for ID 0x{msg_id:03X}")

                if rx_msg.arbitration_id != msg_id:
                    raise Failure(f"ID mismatch: sent 0x{msg_id:03X}, "
                                f"received 0x{rx_msg.arbitration_id:03X}")

                if list(rx_msg.data) != data:
                    raise Failure(f"Data mismatch for ID 0x{msg_id:03X}")

                self.log(f"  0x{msg_id:03X}: OK")

            bus0.shutdown()
            bus1.shutdown()

        finally:
            can_down([0, 1])

        self.log("CAN loopback test passed")
        return True

# Run test
run([CANLoopbackTest])

Hardware Setup

Physical Connection

Gateway CAN Interface
├── CANH ────────── CANH on DUT
├── CANL ────────── CANL on DUT
└── GND  ────────── GND on DUT

Note: 120Ω termination resistor required at each end of the bus

Typical CAN Hardware

InterfaceDescription
MCP2515SPI-based CAN controller
KvaserUSB-CAN adapters
SocketCANLinux CAN subsystem
CAN HATRaspberry Pi CAN interfaces

Troubleshooting

Common Issues

IssueSolution
”Device or resource busy”Interface already up, call can_down() first
No messages receivedCheck wiring, termination, and bitrate
Bus errorsVerify bitrate matches all devices
Timeout errorsCheck CAN transceiver power

Debug Commands

# Check CAN interface status (on Lager Box via SSH)
ip -details link show can0

# View CAN statistics
cat /sys/class/net/can0/statistics/*

# Monitor raw CAN traffic
candump can0

Notes

  • CAN interfaces use Linux SocketCAN subsystem
  • Interface numbers correspond to can0, can1, etc.
  • Bring down interfaces when not in use to release resources
  • Standard CAN supports up to 1Mbps, CAN FD supports higher
  • Always include proper termination (120Ω) on the bus
  • For CAN FD, use appropriate bitrate settings for arbitration and data phases