Source code for tests.test_caninterface_raw

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
test_caninterface_raw
----------------------------------

Tests for `caninterface_raw` module.


Notes:
  A virtual CAN interface 'vcan' must be enabled for this test. See enable_virtual_can_bus().
  Must be run as sudo.

"""

import subprocess
import sys
import time
import unittest

assert sys.version_info >= (3, 3, 0), "Python version 3.3 or later required!"

from can4python import exceptions
from can4python import canframe
from can4python import caninterface_raw

VIRTUAL_CAN_BUS_NAME = "vcan0"


[docs]def enable_virtual_can_bus(): try: subprocess.check_output(["modprobe", "vcan"]) except: raise exceptions.CanException("Could not modprobe vcan. Are you sure you are running as sudo?") try: subprocess.check_output(["ip", "link", "add", "dev", VIRTUAL_CAN_BUS_NAME, "type", "vcan"], stderr=subprocess.STDOUT) except subprocess.CalledProcessError: pass try: subprocess.check_output(["ifconfig", VIRTUAL_CAN_BUS_NAME, "up"]) except subprocess.CalledProcessError: raise exceptions.CanException("Could not enable {}. Are you sure you are running as sudo?".format( VIRTUAL_CAN_BUS_NAME))
[docs]def disable_virtual_can_bus(): subprocess.check_output(["ifconfig", VIRTUAL_CAN_BUS_NAME, "down"])
[docs]class TestSocketCanRawInterface(unittest.TestCase): # Scaffolding # NUMBER_OF_LOOPS = 10000 FRAME_SENDER_SPACING_MILLISECONDS = 0.1 FRAME_ID_RECEIVE = 4 FRAME_ID_SEND = 1 FRAME_NUMBER_OF_DATABYTES = 8 NONEXISTING_CAN_BUS_NAME = "vcan8"
[docs] def setUp(self): enable_virtual_can_bus() self.interface = caninterface_raw.SocketCanRawInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0) self.simulated_can_process = None
[docs] def tearDown(self): self.interface.close() try: self.simulated_can_process.terminate() except (AttributeError, ProcessLookupError) as _: pass enable_virtual_can_bus()
[docs] def start_can_frame_sender(self): """Send CAN frames using the cangen command.""" self.simulated_can_process = subprocess.Popen(["cangen", VIRTUAL_CAN_BUS_NAME, "-I", str(self.FRAME_ID_RECEIVE), "-L", str(self.FRAME_NUMBER_OF_DATABYTES), "-D", "i", "-g", str(self.FRAME_SENDER_SPACING_MILLISECONDS)], shell=False, stderr=subprocess.STDOUT)
# Creation etc #
[docs] def testConstructor(self): a = caninterface_raw.SocketCanRawInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0) self.assertEqual(a.interfacename, VIRTUAL_CAN_BUS_NAME) a.close() self.assertEqual(a.interfacename, VIRTUAL_CAN_BUS_NAME) b = caninterface_raw.SocketCanRawInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0) self.assertEqual(b.interfacename, VIRTUAL_CAN_BUS_NAME) b.close() c = caninterface_raw.SocketCanRawInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0) self.assertEqual(c.interfacename, VIRTUAL_CAN_BUS_NAME) c.close() d = caninterface_raw.SocketCanRawInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0) self.assertEqual(d.interfacename, VIRTUAL_CAN_BUS_NAME) d.close()
[docs] def testConstructorWrongValue(self): self.assertRaises(exceptions.CanException, caninterface_raw.SocketCanRawInterface, VIRTUAL_CAN_BUS_NAME, -1) self.assertRaises(exceptions.CanException, caninterface_raw.SocketCanRawInterface, VIRTUAL_CAN_BUS_NAME, -1.0)
[docs] def testConstructorWrongType(self): self.assertRaises(exceptions.CanException, caninterface_raw.SocketCanRawInterface, 1, 1.0) self.assertRaises(exceptions.CanException, caninterface_raw.SocketCanRawInterface, VIRTUAL_CAN_BUS_NAME, "ABC")
[docs] def testConstructorSeveralInterfaces(self): a = caninterface_raw.SocketCanRawInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0) self.assertEqual(a.interfacename, VIRTUAL_CAN_BUS_NAME) b = caninterface_raw.SocketCanRawInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0) self.assertEqual(b.interfacename, VIRTUAL_CAN_BUS_NAME) c = caninterface_raw.SocketCanRawInterface(VIRTUAL_CAN_BUS_NAME, timeout=1.0) self.assertEqual(c.interfacename, VIRTUAL_CAN_BUS_NAME) a.close() b.close() c.close()
[docs] def testCreateNonExistingBus(self): self.assertRaises(exceptions.CanException, caninterface_raw.SocketCanRawInterface, self.NONEXISTING_CAN_BUS_NAME)
[docs] def testWriteToInterfacenameAttribute(self): self.assertRaises(AttributeError, setattr, self.interface, 'interfacename', 'can0')
[docs] def testRepr(self): result = repr(self.interface) known_result = "SocketCan raw interface: {}".format(VIRTUAL_CAN_BUS_NAME.strip()) self.assertEqual(result.strip(), known_result.strip())
# Receive #
[docs] def testReceiveData(self): self.start_can_frame_sender() received_frame = self.interface.recv_next_frame() self.assertEqual(len(received_frame.frame_data), self.FRAME_NUMBER_OF_DATABYTES)
[docs] def testReceiveSpeed(self): self.start_can_frame_sender() starttime = time.time() for i in range(self.NUMBER_OF_LOOPS): self.interface.recv_next_frame() execution_time = time.time() - starttime time_per_loop_ms = 1000 * execution_time / self.NUMBER_OF_LOOPS outputstring = "\n\n --> Received {} frames in {:.1f} s ({:.1f} ms per frame). Frame sender spacing {:.1f} ms.\n".\ format(self.NUMBER_OF_LOOPS, execution_time, time_per_loop_ms, self.FRAME_SENDER_SPACING_MILLISECONDS) print(outputstring)
[docs] def testReceiveNoData(self): self.assertRaises(exceptions.CanTimeoutException, self.interface.recv_next_frame)
[docs] def testReceiveClosedBus(self): disable_virtual_can_bus() self.assertRaises(exceptions.CanException, self.interface.recv_next_frame)
[docs] def testReceiveClosedInterface(self): self.interface.close() self.assertRaises(exceptions.CanException, self.interface.recv_next_frame)
# Send #
[docs] def testSend(self): self.simulated_can_process = subprocess.Popen(['candump', VIRTUAL_CAN_BUS_NAME, '-n', '1'], shell=False, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) time.sleep(0.1) frame = canframe.CanFrame.from_empty_bytes(self.FRAME_ID_SEND, self.FRAME_NUMBER_OF_DATABYTES) self.interface.send_frame(frame) print("\nWaiting for testfile to send_frame CAN frame ") out, err = self.simulated_can_process.communicate() print("Confirmed CAN frame") known_result = "[8] 00 00 00 00 00 00 00 00" self.assertIn(known_result, out)
[docs] def testSendClosedBus(self): disable_virtual_can_bus() frame = canframe.CanFrame.from_empty_bytes(self.FRAME_ID_SEND, self.FRAME_NUMBER_OF_DATABYTES) self.assertRaises(exceptions.CanException, self.interface.send_frame, frame)
[docs] def testSendClosedInterface(self): frame = canframe.CanFrame.from_empty_bytes(self.FRAME_ID_SEND, self.FRAME_NUMBER_OF_DATABYTES) self.interface.close() self.assertRaises(exceptions.CanException, self.interface.send_frame, frame)
# Set filters #
[docs] def testTooFewTooManyFiltersDefined(self): # Will skip setting any filters. self.interface.set_receive_filters([]) self.interface.set_receive_filters(list(range(200)))
if __name__ == '__main__': # Run all tests # unittest.main() # Run a single test # # suite = unittest.TestSuite() # suite.addTest(TestSocketCanInterfaceRaw("testConstructor")) # unittest.TextTestRunner(verbosity=2).run(suite)