#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
test_caninterface_bcm
----------------------------------
Tests for `caninterface_bcm` module.
Notes:
A virtual CAN interface 'vcan' must be enabled for this test. See enable_virtual_can_bus().
Must be run as sudo.
"""
import subprocess
import sys
import time
import unittest
assert sys.version_info >= (3, 4, 0), "Python version 3.4 or later required!"
from can4python import canframe
from can4python import caninterface_bcm
from can4python import exceptions
try:
from .test_caninterface_raw import VIRTUAL_CAN_BUS_NAME, enable_virtual_can_bus, disable_virtual_can_bus
except SystemError: # When running this file directly
from test_caninterface_raw import VIRTUAL_CAN_BUS_NAME, enable_virtual_can_bus, disable_virtual_can_bus
[docs]class TestSocketCanBcmInterface(unittest.TestCase):
# Scaffolding #
NUMBER_OF_LOOPS = 10000
FRAME_SENDER_SPACING_MILLISECONDS = 0.1
FRAME_ID_RECEIVE = 4
FRAME_ID_SEND = 1
FRAME_NUMBER_OF_DATABYTES = 8
NONEXISTING_CAN_BUS_NAME = "vcan8"
NONEXISTING_FRAME_ID = 22
[docs] def setUp(self):
enable_virtual_can_bus()
self.interface = caninterface_bcm.SocketCanBcmInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0)
self.simulated_can_process = None
[docs] def tearDown(self):
self.interface.close()
try:
self.simulated_can_process.terminate()
except (AttributeError, ProcessLookupError) as _:
pass
enable_virtual_can_bus()
[docs] def start_can_frame_sender(self, interval_milliseconds=FRAME_SENDER_SPACING_MILLISECONDS):
"""Send CAN frames using the cangen command. Unlimited number of frames."""
self.simulated_can_process = subprocess.Popen(["cangen", VIRTUAL_CAN_BUS_NAME,
"-I", str(self.FRAME_ID_RECEIVE),
"-L", str(self.FRAME_NUMBER_OF_DATABYTES),
"-D", "i",
"-g", str(interval_milliseconds)],
shell=False, stderr=subprocess.STDOUT)
# Creation etc #
[docs] def testConstructor(self):
a = caninterface_bcm.SocketCanBcmInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0)
self.assertEqual(a.interfacename, VIRTUAL_CAN_BUS_NAME)
a.close()
self.assertEqual(a.interfacename, VIRTUAL_CAN_BUS_NAME)
b = caninterface_bcm.SocketCanBcmInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0)
self.assertEqual(b.interfacename, VIRTUAL_CAN_BUS_NAME)
b.close()
c = caninterface_bcm.SocketCanBcmInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0)
self.assertEqual(c.interfacename, VIRTUAL_CAN_BUS_NAME)
c.close()
d = caninterface_bcm.SocketCanBcmInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0)
self.assertEqual(d.interfacename, VIRTUAL_CAN_BUS_NAME)
d.close()
[docs] def testConstructorWrongValue(self):
self.assertRaises(exceptions.CanException, caninterface_bcm.SocketCanBcmInterface, VIRTUAL_CAN_BUS_NAME, -1)
self.assertRaises(exceptions.CanException, caninterface_bcm.SocketCanBcmInterface, VIRTUAL_CAN_BUS_NAME, -1.0)
[docs] def testConstructorWrongType(self):
self.assertRaises(exceptions.CanException, caninterface_bcm.SocketCanBcmInterface, VIRTUAL_CAN_BUS_NAME, "ABC")
[docs] def testConstructorSeveralInterfaces(self):
a = caninterface_bcm.SocketCanBcmInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0)
self.assertEqual(a.interfacename, VIRTUAL_CAN_BUS_NAME)
b = caninterface_bcm.SocketCanBcmInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0)
self.assertEqual(b.interfacename, VIRTUAL_CAN_BUS_NAME)
c = caninterface_bcm.SocketCanBcmInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0)
self.assertEqual(c.interfacename, VIRTUAL_CAN_BUS_NAME)
a.close()
b.close()
c.close()
[docs] def testCreateNonExistingBus(self):
self.assertRaises(exceptions.CanException,
caninterface_bcm.SocketCanBcmInterface,
self.NONEXISTING_CAN_BUS_NAME, timeout=1.0)
self.assertRaises(exceptions.CanException, caninterface_bcm.SocketCanBcmInterface, 1, 1.0)
[docs] def testWriteToInterfacenameAttribute(self):
self.assertRaises(AttributeError, setattr, self.interface, 'interfacename', 'can0')
[docs] def testRepr(self):
result = repr(self.interface)
known_result = "SocketCan BCM interface: {}".format(VIRTUAL_CAN_BUS_NAME.strip())
self.assertEqual(result.strip(), known_result.strip())
# Receive #
[docs] def testSetupReception(self):
self.interface.setup_reception(self.FRAME_ID_RECEIVE)
self.interface.setup_reception(self.FRAME_ID_RECEIVE, min_interval=None)
self.interface.setup_reception(self.FRAME_ID_RECEIVE, min_interval=0) # 0 and None should be the same
[docs] def testSetupReceptionWrongValue(self):
self.assertRaises(exceptions.CanException, self.interface.setup_reception, -1)
self.assertRaises(exceptions.CanException, self.interface.setup_reception, 0x007, "ABC")
self.assertRaises(exceptions.CanException, self.interface.setup_reception, 0x007, 'standard', -1.0)
self.assertRaises(exceptions.CanException, self.interface.setup_reception, 0x007, 'standard', 0.05, b"\x00")
self.assertRaises(exceptions.CanException, self.interface.setup_reception, 0x007, 'standard', 0.05, "ABC")
[docs] def testSetupReceptionWrongType(self):
self.assertRaises(exceptions.CanException, self.interface.setup_reception, "ABC")
self.assertRaises(exceptions.CanException, self.interface.setup_reception, 0x007, 1)
self.assertRaises(TypeError, self.interface.setup_reception, 0x007, 'standard', "ABC")
self.assertRaises(TypeError, self.interface.setup_reception, 0x007, 'standard', 0.05, 1)
[docs] def testReceiveData(self):
self.start_can_frame_sender()
self.interface.setup_reception(self.FRAME_ID_RECEIVE)
received_frame = self.interface.recv_next_frame()
self.assertEqual(len(received_frame), self.FRAME_NUMBER_OF_DATABYTES)
[docs] def testReceiveStoppedReception(self):
self.start_can_frame_sender()
self.interface.setup_reception(self.FRAME_ID_RECEIVE)
self.interface.stop_reception(self.FRAME_ID_RECEIVE)
self.assertRaises(exceptions.CanTimeoutException, self.interface.recv_next_frame)
[docs] def testReceiveWrongId(self):
self.start_can_frame_sender()
self.interface.setup_reception(self.NONEXISTING_FRAME_ID)
self.assertRaises(exceptions.CanTimeoutException, self.interface.recv_next_frame)
[docs] def testReceiveSpeed(self):
self.start_can_frame_sender()
self.interface.setup_reception(self.FRAME_ID_RECEIVE)
time.sleep(0.1)
starttime = time.time()
for i in range(self.NUMBER_OF_LOOPS):
self.interface.recv_next_frame()
execution_time = time.time() - starttime
time_per_loop_ms = 1000 * execution_time / self.NUMBER_OF_LOOPS
outputstring = "\n --> Received {} frames in {:.1f} s ({:.1f} ms per frame). Frame sender spacing {:.1f} ms.\n".\
format(self.NUMBER_OF_LOOPS, execution_time, time_per_loop_ms, self.FRAME_SENDER_SPACING_MILLISECONDS)
print(outputstring)
[docs] def testReceiveSpeedThrottled(self):
THROTTLE_INTERVAL_MILLISECONDS = 20
THROTTLE_LOOPS = 100
ALLOWED_RELATIVE_ERROR = 0.1
nominal_time = THROTTLE_INTERVAL_MILLISECONDS * THROTTLE_LOOPS / 1000 # seconds
self.start_can_frame_sender()
self.interface.setup_reception(self.FRAME_ID_RECEIVE, min_interval=THROTTLE_INTERVAL_MILLISECONDS)
time.sleep(0.1)
starttime = time.time()
for i in range(THROTTLE_LOOPS):
self.interface.recv_next_frame()
execution_time = time.time() - starttime
self.assertLess(abs(execution_time - nominal_time), ALLOWED_RELATIVE_ERROR * nominal_time)
[docs] def testReceiveDataChanged(self):
"""Verify that data change filtering works, by measuring time to receive a small number of frames from a larger data flow."""
SENDER_INTERVAL_MILLISECONDS = 1
DATA_CHANGED_LOOPS = 20
DATA_MASK = b"\x80\x00\x00\x00\x00\x00\x00\x00" # One frame out of 128
ALLOWED_RELATIVE_ERROR = 2.0
nominal_time = SENDER_INTERVAL_MILLISECONDS * 128 * DATA_CHANGED_LOOPS / 1000 # seconds
self.start_can_frame_sender(SENDER_INTERVAL_MILLISECONDS)
self.interface.setup_reception(self.FRAME_ID_RECEIVE, data_mask=DATA_MASK)
time.sleep(0.1)
starttime = time.time()
for i in range(DATA_CHANGED_LOOPS):
self.interface.recv_next_frame()
execution_time = time.time() - starttime
allowed_error = ALLOWED_RELATIVE_ERROR * nominal_time
time_error = abs(execution_time - nominal_time)
outputstring = "\n --> Received {} frames in {:.1f} s, nominal time {:.1f} s. Error {:.1f} s (allowed {:.1f} s).\n".\
format(DATA_CHANGED_LOOPS, execution_time, nominal_time, time_error, allowed_error)
print(outputstring)
self.assertLess(time_error, allowed_error)
[docs] def testReceiveDataChangedShorterDlcThanMask(self):
"""Verify that filtering works also when the input frame is shorter (3 bytes) than the data mask (8 bytes)."""
SENDER_INTERVAL_MILLISECONDS = 1
DATA_CHANGED_LOOPS = 20
DATA_MASK = b"\x80\x00\x00\x00\x00\x00\x00\x00" # One frame out of 128
ALLOWED_RELATIVE_ERROR = 2.0
nominal_time = SENDER_INTERVAL_MILLISECONDS * DATA_CHANGED_LOOPS * 128 / 1000
self.interface.setup_reception(self.FRAME_ID_RECEIVE, data_mask=DATA_MASK)
time.sleep(0.1)
self.simulated_can_process = subprocess.Popen(["cangen", VIRTUAL_CAN_BUS_NAME,
"-I", str(self.FRAME_ID_RECEIVE),
"-L", "3",
"-D", "i",
"-g", str(SENDER_INTERVAL_MILLISECONDS)],
shell=False, stderr=subprocess.STDOUT)
starttime = time.time()
for i in range(DATA_CHANGED_LOOPS):
self.interface.recv_next_frame()
execution_time = time.time() - starttime
allowed_error = ALLOWED_RELATIVE_ERROR * nominal_time
time_error = abs(execution_time - nominal_time)
outputstring = "\n --> Received {} frames in {:.1f} s, nominal time {:.1f} s. Error {:.1f} s (allowed {:.1f} s).\n".\
format(DATA_CHANGED_LOOPS, execution_time, nominal_time, time_error, allowed_error)
print(outputstring)
self.assertLess(time_error, allowed_error)
[docs] def testReceiveDlcChanged(self):
"""Verify that we are receiving frames where each frame is longer (or no data) than the previous."""
SENDER_INTERVAL_MILLISECONDS = 1
NUMBER_OF_DLC_FRAMES = 30
DATA_MASK = b"\x00\x00\x00\x00\x00\x00\x00\x00" # Do not look at data changes
self.interface.setup_reception(self.FRAME_ID_RECEIVE, data_mask=DATA_MASK)
self.simulated_can_process = subprocess.Popen(["cangen", VIRTUAL_CAN_BUS_NAME,
"-I", str(self.FRAME_ID_RECEIVE),
"-L", "i",
"-D", "0102030405060708",
"-n", str(NUMBER_OF_DLC_FRAMES),
"-g", str(SENDER_INTERVAL_MILLISECONDS)],
shell=False, stderr=subprocess.STDOUT)
number_of_received_frames = 0
while True:
try:
self.interface.recv_next_frame()
number_of_received_frames += 1
except exceptions.CanTimeoutException:
break
self.assertEqual(number_of_received_frames, NUMBER_OF_DLC_FRAMES)
[docs] def testReceiveNoData(self):
self.assertRaises(exceptions.CanTimeoutException, self.interface.recv_next_frame)
[docs] def testReceiveClosedBus(self):
disable_virtual_can_bus()
self.assertRaises(exceptions.CanException, self.interface.recv_next_frame)
[docs] def testReceiveClosedInterface(self):
self.interface.close()
self.assertRaises(exceptions.CanException, self.interface.recv_next_frame)
# Send #
[docs] def testSendSingleFrameWrongType(self):
self.assertRaises(exceptions.CanException, self.interface.send_frame, 1)
self.assertRaises(exceptions.CanException, self.interface.send_frame, "ABC")
[docs] def testSendSingleFrame(self):
self.simulated_can_process = subprocess.Popen(['candump', VIRTUAL_CAN_BUS_NAME, '-n', '1'], shell=False,
universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
time.sleep(0.1)
frame = canframe.CanFrame.from_empty_bytes(self.FRAME_ID_SEND, self.FRAME_NUMBER_OF_DATABYTES)
self.interface.send_frame(frame)
print("\nWaiting for a subprocess to receive a single CAN frame, sent via 'send_frame'.")
out, err = self.simulated_can_process.communicate()
print("Confirmed CAN frame.")
known_result = "[8] 00 00 00 00 00 00 00 00"
self.assertIn(known_result, out)
[docs] def testSetupPeriodicSendWrongValue(self):
frame_zeros = canframe.CanFrame(self.FRAME_ID_SEND, b'\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertRaises(exceptions.CanException, self.interface.setup_periodic_send, frame_zeros, -1)
def testSetupPeriodicSendWrongType(self):
frame_zeros = canframe.CanFrame(self.FRAME_ID_SEND, b'\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertRaises(exceptions.CanException, self.interface.setup_periodic_send, 1)
self.assertRaises(exceptions.CanException, self.interface.setup_periodic_send, frame_zeros, "ABC")
[docs] def testSetupPeriodicSendWrongType(self):
frame_zeros = canframe.CanFrame(self.FRAME_ID_SEND, b'\x00\x00\x00\x00\x00\x00\x00\x00')
self.assertRaises(exceptions.CanException, self.interface.setup_periodic_send, 1)
self.assertRaises(exceptions.CanException, self.interface.setup_periodic_send, frame_zeros, "ABC")
[docs] def testSendPeriodicAndChangeFrameAndStop(self):
"""Start periodic CAN transmission, update frame data (not interval), and finally stop transmission."""
RESULT_ALLOWED_DIFFERENCE = 5 # Number of missed frames, or when stopping sending too late.
TRANSMISSION_INTERVAL_MILLISECONDS = 20
MEASUREMENT_TIME = 1 # seconds
self.simulated_can_process = subprocess.Popen(['candump', VIRTUAL_CAN_BUS_NAME], shell=False,
universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
time.sleep(0.1)
frame_zeros = canframe.CanFrame(self.FRAME_ID_SEND, b'\x00\x00\x00\x00\x00\x00\x00\x00')
frame_ones = canframe.CanFrame(self.FRAME_ID_SEND, b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF')
print("\nSetting up periodic CAN frame transmission.")
self.interface.setup_periodic_send(frame_zeros, TRANSMISSION_INTERVAL_MILLISECONDS)
time.sleep(MEASUREMENT_TIME)
self.interface.setup_periodic_send(frame_ones, restart_timer=False)
time.sleep(MEASUREMENT_TIME)
self.interface.stop_periodic_send(self.FRAME_ID_SEND)
time.sleep(MEASUREMENT_TIME)
self.simulated_can_process.terminate()
out, err = self.simulated_can_process.communicate()
print("Periodic CAN frame transmission done.")
projected_number_of_frames = MEASUREMENT_TIME * 1000 / TRANSMISSION_INTERVAL_MILLISECONDS
number_of_frames_zeros = out.count(" {:03.0f} [8] 00 00 00 00 00 00 00 00".format(self.FRAME_ID_SEND))
self.assertGreaterEqual(number_of_frames_zeros, projected_number_of_frames - RESULT_ALLOWED_DIFFERENCE)
self.assertLessEqual(number_of_frames_zeros, projected_number_of_frames + RESULT_ALLOWED_DIFFERENCE)
number_of_frames_ones = out.count(" {:03.0f} [8] FF FF FF FF FF FF FF FF".format(self.FRAME_ID_SEND))
self.assertGreaterEqual(number_of_frames_ones, projected_number_of_frames - RESULT_ALLOWED_DIFFERENCE)
self.assertLessEqual(number_of_frames_ones, projected_number_of_frames + RESULT_ALLOWED_DIFFERENCE)
[docs] def testSendPeriodicAndChangeFrameAndStopExtended(self):
"""Start periodic CAN transmission, update frame data (not interval), and finally stop transmission."""
RESULT_ALLOWED_DIFFERENCE = 5 # Number of missed frames, or when stopping sending too late.
TRANSMISSION_INTERVAL_MILLISECONDS = 20
MEASUREMENT_TIME = 1 # seconds
self.simulated_can_process = subprocess.Popen(['candump', VIRTUAL_CAN_BUS_NAME], shell=False,
universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
time.sleep(0.1)
frame_zeros = canframe.CanFrame(self.FRAME_ID_SEND, b'\x00\x00\x00\x00\x00\x00\x00\x00', 'extended')
frame_ones = canframe.CanFrame(self.FRAME_ID_SEND, b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF', 'extended')
self.interface.setup_periodic_send(frame_zeros, TRANSMISSION_INTERVAL_MILLISECONDS)
time.sleep(MEASUREMENT_TIME)
self.interface.setup_periodic_send(frame_ones, restart_timer=False)
time.sleep(MEASUREMENT_TIME)
self.interface.stop_periodic_send(self.FRAME_ID_SEND, 'extended')
time.sleep(MEASUREMENT_TIME)
self.simulated_can_process.terminate()
out, err = self.simulated_can_process.communicate()
projected_number_of_frames = MEASUREMENT_TIME * 1000 / TRANSMISSION_INTERVAL_MILLISECONDS
number_of_frames_zeros = out.count(" {:08.0f} [8] 00 00 00 00 00 00 00 00".format(self.FRAME_ID_SEND))
self.assertGreaterEqual(number_of_frames_zeros, projected_number_of_frames - RESULT_ALLOWED_DIFFERENCE)
self.assertLessEqual(number_of_frames_zeros, projected_number_of_frames + RESULT_ALLOWED_DIFFERENCE)
number_of_frames_ones = out.count(" {:08.0f} [8] FF FF FF FF FF FF FF FF".format(self.FRAME_ID_SEND))
self.assertGreaterEqual(number_of_frames_ones, projected_number_of_frames - RESULT_ALLOWED_DIFFERENCE)
self.assertLessEqual(number_of_frames_ones, projected_number_of_frames + RESULT_ALLOWED_DIFFERENCE)
[docs] def testStopNonexistingPeriodicTask(self):
self.assertRaises(exceptions.CanException, self.interface.stop_periodic_send, self.FRAME_ID_SEND)
TRANSMISSION_INTERVAL_MILLISECONDS = 20
frame_zeros = canframe.CanFrame(self.FRAME_ID_SEND, b'\x00\x00\x00\x00\x00\x00\x00\x00', 'standard')
self.interface.setup_periodic_send(frame_zeros, TRANSMISSION_INTERVAL_MILLISECONDS)
self.assertRaises(exceptions.CanException, self.interface.stop_periodic_send, self.FRAME_ID_SEND, 'extended')
[docs] def testSendClosedBus(self):
disable_virtual_can_bus()
frame = canframe.CanFrame.from_empty_bytes(self.FRAME_ID_SEND, self.FRAME_NUMBER_OF_DATABYTES)
self.assertRaises(exceptions.CanException, self.interface.send_frame, frame)
[docs] def testSendClosedInterface(self):
frame = canframe.CanFrame.from_empty_bytes(self.FRAME_ID_SEND, self.FRAME_NUMBER_OF_DATABYTES)
self.interface.close()
self.assertRaises(exceptions.CanException, self.interface.send_frame, frame)
if __name__ == '__main__':
# Run all tests #
unittest.main()
# Run a single test #
# suite = unittest.TestSuite()
# suite.addTest(TestSocketCanBcmInterface("testReceiveDlcChanged"))
# unittest.TextTestRunner(verbosity=2).run(suite)