from lager.core import can_up, can_down
import can
import time
class ECUTester:
"""Simple ECU communication tester."""
def __init__(self, interface=0, bitrate=500000):
self.interface = interface
self.bitrate = bitrate
self.bus = None
def __enter__(self):
can_up(self.interface, self.bitrate)
self.bus = can.interface.Bus(
channel=f'can{self.interface}',
bustype='socketcan'
)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.bus:
self.bus.shutdown()
can_down(self.interface)
def send_request(self, request_id, response_id, data, timeout=1.0):
"""Send request and wait for response."""
# Send request
msg = can.Message(
arbitration_id=request_id,
data=data,
is_extended_id=False
)
self.bus.send(msg)
# Wait for response
start = time.time()
while time.time() - start < timeout:
response = self.bus.recv(timeout=0.1)
if response and response.arbitration_id == response_id:
return response.data
return None
def read_pid(self, pid, mode=0x01):
"""Read OBD-II PID."""
# OBD-II request format: [length, mode, PID, ...]
request_data = [0x02, mode, pid, 0x00, 0x00, 0x00, 0x00, 0x00]
# Standard OBD-II IDs
REQUEST_ID = 0x7DF # Broadcast request
RESPONSE_ID = 0x7E8 # First ECU response
response = self.send_request(REQUEST_ID, RESPONSE_ID, request_data)
if response:
return list(response)
return None
# Usage
with ECUTester(interface=0, bitrate=500000) as ecu:
# Read engine RPM (PID 0x0C)
response = ecu.read_pid(0x0C)
if response and len(response) >= 4:
rpm = ((response[3] * 256) + response[4]) / 4
print(f"Engine RPM: {rpm}")
# Read vehicle speed (PID 0x0D)
response = ecu.read_pid(0x0D)
if response and len(response) >= 3:
speed = response[3]
print(f"Vehicle Speed: {speed} km/h")