Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:illuusio:python3
python-can
python-can_axiomtek_ax9290_hack.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File python-can_axiomtek_ax9290_hack.patch of Package python-can
diff -urN python-can-3.3.4/can/interfaces/serial/serial_can.py python-can-3.3.4-patch/can/interfaces/serial/serial_can.py --- python-can-3.3.4/can/interfaces/serial/serial_can.py 2020-10-04 23:05:14.000000000 +0300 +++ python-can-3.3.4-patch/can/interfaces/serial/serial_can.py 2021-01-14 13:46:54.865536708 +0200 @@ -11,6 +11,8 @@ import logging import struct +import binascii +import time from can import BusABC, Message @@ -33,7 +35,7 @@ """ - def __init__(self, channel, baudrate=115200, timeout=0.1, rtscts=False, + def __init__(self, channel, baudrate=115200, timeout=0.5, rtscts=False, *args, **kwargs): """ :param str channel: @@ -57,8 +59,9 @@ raise ValueError("Must specify a serial port.") self.channel_info = "Serial interface: " + channel + self.send_tx = 0 self.ser = serial.serial_for_url( - channel, baudrate=baudrate, timeout=timeout, rtscts=rtscts) + channel, baudrate=baudrate, timeout=(timeout / 1000), rtscts=rtscts) super(SerialBus, self).__init__(channel=channel, *args, **kwargs) @@ -86,25 +89,44 @@ used instead. """ - try: - timestamp = struct.pack('<I', int(msg.timestamp * 1000)) - except struct.error: - raise ValueError('Timestamp is out of range') - try: - a_id = struct.pack('<I', msg.arbitration_id) - except struct.error: - raise ValueError('Arbitration Id is out of range') + # @t send CAN A message + # + # Example string is like + # @t10001000110021133 byte_msg = bytearray() - byte_msg.append(0xAA) - for i in range(0, 4): - byte_msg.append(timestamp[i]) - byte_msg.append(msg.dlc) - for i in range(0, 4): - byte_msg.append(a_id[i]) - for i in range(0, msg.dlc): - byte_msg.append(msg.data[i]) - byte_msg.append(0xBB) + + # CAN port number, only 1 is allowed for AX92903-series. + byte_msg.extend("@t1".encode()) + # Transmission intervals in hexadecimal.. Four byte milliseconds + byte_msg.extend("0001".encode()) + # Repeat counter in hexadecimal. Range: 0000~FFFF (0000 = Send once) + byte_msg.extend("0001".encode()) + byte_msg.extend('{:03x}'.format(msg.arbitration_id).encode()) + byte_msg.extend('{:1x}'.format(msg.dlc).encode()) + byte_msg.extend(binascii.hexlify(msg.data)) + + # If we want to change the default active (Command) Mode to + # frame transmission (Data mode), the only is using command + # string “+++” to make it. + # + # The string “+++” is defined by the STM chip which uses + # their firmware to convert the signal from the UART to CAN Bus. + self.ser.write(str.encode('+++')) + time.sleep(0.1) + + # Somewho Tx buffer is full.. tell to empty it + # ERROR05 CAN Tx buffer is full + if self.send_tx == 1: + # print("Empty buffer") + self.ser.write(str.encode('@Tx')) + self.send_tx = 0 + self.ser.write(byte_msg) + self.ser.write(str.encode('\r\n')) + + # Then get back to operate mode + self.ser.write(str.encode('@S3\r\n')) + time.sleep(0.8) def _recv_internal(self, timeout): """ @@ -129,28 +151,53 @@ try: # ser.read can return an empty string # or raise a SerialException - rx_byte = self.ser.read() + error = 0 + while True: + rx_byte = self.ser.read() + if rx_byte == b'' or ord(rx_byte) == ord('@'): + break + elif ord(rx_byte) == ord('#'): + rxd_byte = self.ser.read() + if rxd_byte and ord(rxd_byte) == ord('O'): + rxd_byte = self.ser.read(3) + elif rxd_byte and ord(rxd_byte) == ord('E'): + rxd_byte = self.ser.read(8) + # ERROR05 CAN Tx buffer is full. This should not + # happend but it does + if rxd_byte[5] == ord('5'): + # print("Request empty") + self.send_tx = 1 except serial.SerialException: return None, False - if rx_byte and ord(rx_byte) == 0xAA: - s = bytearray(self.ser.read(4)) - timestamp = (struct.unpack('<I', s))[0] - dlc = ord(self.ser.read()) - - s = bytearray(self.ser.read(4)) - arb_id = (struct.unpack('<I', s))[0] - - data = self.ser.read(dlc) - - rxd_byte = ord(self.ser.read()) - if rxd_byte == 0xBB: - # received message data okay - msg = Message(timestamp=timestamp/1000, - arbitration_id=arb_id, - dlc=dlc, - data=data) - return msg, False + if rx_byte and ord(rx_byte) == ord('@'): + rx_byte = ord(self.ser.read()) + + if rx_byte == ord('F'): + s = bytearray(self.ser.read(4)) + timestamp = time.time() + + ext = int(self.ser.read()) + + s = bytearray(self.ser.read(3)) + arb_id = int(s, 16) + + s = bytearray(self.ser.read(2)) + dlc = int(s) + + orig_data = self.ser.read(dlc * 2) + data = bytearray.fromhex(orig_data.decode('utf-8')) + + rxd_byte = ord(self.ser.read()) + if rxd_byte == ord('\r'): + # received message data okay + msg = Message(timestamp=timestamp, + arbitration_id=arb_id, + dlc=dlc, + data=data, + channel=self.channel_info, + is_extended_id=ext) + return msg, False else: return None, False @@ -160,3 +207,4 @@ return self.ser.fileno() # Return an invalid file descriptor on Windows return -1 +
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor