Source code for canlib.kvDevice

from __future__ import print_function

import re
import time
import traceback

from . import kvMemoConfig
from . import canlib
from . import kvmlib


[docs]class kvDevice(): """Holds information about a Kvaser CAN device Args: ch (int): CANlib channel where the device is currently connected. flags (int): Optional flags such as canlib.canOPEN_EXCLUSIVE. canlibHnd : Optional canlib.canlib object, usually creates a new one. ean (str): EAN for the device, e.g. "73-30130-00778-9". Used instead of ch. serial (str): Optional serial number of the device, can be used in conjuntion with ean. cardChannel (int): Optional local channel on card, can be used in conjunction with ean. """ # ean: 73-30130-00671-3 # ean_hi: 73301 # ean_lo: 30006713
[docs] @staticmethod def ean2ean_hi(ean): eanCompact = re.sub('-', '', ean) match = re.match(r'(\d{5})(\d{8})', eanCompact) return int('0x%s' % match.group(1), 0)
[docs] @staticmethod def ean2ean_lo(ean): eanCompact = re.sub('-', '', ean) match = re.match(r'(\d{5})(\d{8})', eanCompact) return int('0x%s' % match.group(2), 0)
[docs] @staticmethod def ean_hi_lo2ean(ean_hi, ean_lo): return "%02x-%05x-%05x-%x" % (ean_hi >> 12, ((ean_hi & 0xfff) << 8) | (ean_lo >> 24), (ean_lo >> 4) & 0xfffff, ean_lo & 0xf)
[docs] @staticmethod def allDevices(reinitialize=True): devices = [] indx = -1 if reinitialize: canlib.reinitializeLibrary() for ch in range(canlib.getNumberOfChannels()): try: dev = kvDevice(ch=ch) except canlib.canError as e: if e.status == canlib.canERR_NOTFOUND: return devices else: raise e if indx == -1 or dev != devices[indx]: devices.append(dev) indx += 1 return devices
def __init__(self, ch=None, flags=0, canlibHnd=None, ean=None, serial=None, cardChannel=0): self._cardChannel = cardChannel if ch is None and (ean is not None or serial is not None): ch = self._findChannel(ean, serial, cardChannel) if ch is not None: self.channel = canlib.openChannel(ch, flags) self._loadInfo() self.close() else: self.channel = None self._loadInfo() self._ean = ean self._serial = serial self._channel = ch self.noOfChannels = None self.memo = None def _loadInfo(self): if self.channel is not None: self._card = self.cardNumber() self._name = self.name() self._ean = self.ean() self._serial = self.serial() self._fw = self.fw() self._driver = self.driverName() self._defaultHostname = self.defaultHostname() self._cardChannel = self.cardChannel() else: self.channel = None self._card = None self._name = None self._ean = None self._serial = None self._fw = None self._driver = None self._defaultHostname = None
[docs] def memoOpenEx(self): # Deprecated, use memoOpen() instead. self.memoOpen()
[docs] def memoOpen(self, deviceType=kvmlib.kvmDEVICE_MHYDRA_EXT): self.memo = kvmlib.kvmlib() self.memo.deviceOpen(memoNr=self._card, devicetype=deviceType)
[docs] def memoClose(self): self.memo.close() self.memo = None
[docs] def readConfig(self): memoWasClosed = False if self.memo is None: memoWasClosed = True self.memoOpen() self.config = kvMemoConfig.kvMemoConfig( param_lif=self.memo.kmfReadConfig()) if memoWasClosed: self.memoClose() return self.config
[docs] def writeConfig(self, config=None): if config is not None: self.config = config memoWasClosed = False if self.memo is None: memoWasClosed = True self.memoOpen() self.memo.writeConfig(self.config) if memoWasClosed: self.memoClose()
[docs] def memoReadEvents(self, fileIndx): self.memo.logFileMount(fileIndx) memoEvents = self.memo.logFileReadEvents() self.memo.logFileDismount() return memoEvents
[docs] def lastKnowncanlibChannel(self): return self._channel
[docs] def cardNumber(self): return self.channel.getChannelData_CardNumber()
[docs] def cardChannels(self): return self.noOfChannels
[docs] def cardChannel(self): """Read card channel number from device Returns: cardChannel (int): The local channel on card, first channel is number 0. """ return self.channel.getChannelData_Chan_No_On_Card()
[docs] def close(self): try: self.channel.close() except canlib.canError as e: # qqqmac Turn this into a debugger warning print("WARNING: Failed to close, ", e) traceback.print_stack(limit=2) self.channel = None
[docs] def driverName(self): return self.channel.getChannelData_DriverName()
[docs] def ean(self): return self.channel.getChannelData_EAN()
[docs] def fw(self): (major, minor, build) = self.channel.getChannelData_Firmware() return (major, minor, build)
[docs] def name(self): return self.channel.getChannelData_Name()
[docs] def serial(self): return self.channel.getChannelData_Serial()
[docs] def setModeVirtualLogger(self): # kvDeviceSetMode fails silently if it doesn't support the given device # mode self.channel.kvDeviceSetMode(canlib.kvDEVICE_MODE_LOGGER) if self.channel.kvDeviceGetMode() != canlib.kvDEVICE_MODE_LOGGER: raise Exception("ERROR: Could not set device in virtual logger" " mode. Is CAN power applied?.")
[docs] def setModeNormal(self): # kvDeviceSetMode fails silently if it doesn't support the given device # mode self.channel.kvDeviceSetMode(canlib.kvDEVICE_MODE_INTERFACE) if self.channel.kvDeviceGetMode() != canlib.kvDEVICE_MODE_INTERFACE: raise Exception("ERROR: Could not set device in normal mode.")
[docs] def defaultHostname(self): ean_part = '%x' % kvDevice.ean2ean_lo(self._ean) return 'kv-%s-%06d' % (ean_part[-5:], self._serial)
[docs] def hasScript(self): if (self._ean == '73-30130-00567-9' or self._ean == '73-30130-00778-9' or self._ean == '73-30130-00821-2' or self._ean == '73-30130-00819-9' or self._ean == '73-30130-00752-9' or self._ean == '73-30130-00779-6'): return True else: return False
[docs] def isLogger(self): if (self._ean == '73-30130-00567-9' or self._ean == '73-30130-00778-9' or self._ean == '73-30130-00821-2' or self._ean == '73-30130-00819-9'): return True else: return False
[docs] def open(self, flags=0, timeout=10, unloadCanlib=False): if self.channel is not None: self.close() startTime = time.time() while True: ch = self._findChannel(self._ean, self._serial, self._cardChannel, unloadCanlib=unloadCanlib) if ch is not None: self.channel = canlib.openChannel(ch, flags) self._channel = ch self._loadInfo() if self.channel is None: print('Waiting for device %s %s. Slept %ds (timeout:%d)' % ( self._ean, self._serial, time.time() - startTime, timeout)) time.sleep(2) if (not (self.channel is None)) or ( (time.time() - startTime) > timeout): break if self.channel is None: raise Exception("ERROR: Could not find device %s %s (timeout: %d" " s)." % (self._ean, self._serial, timeout))
[docs] def write(self, frame): """Write a CAN message to the bus. Args: frame (canlib.frame.Frame): The data frame to be sent """ self.channel.write(frame)
[docs] def writeWait(self, frame, timeout): """Write a CAN message to the bus. Args: message (canlib.frame.Frame): The data frame to send """ self.channel.writeWait(frame, timeout)
def _findChannel(self, wanted_ean, wanted_serial=None, card_channel=0, unloadCanlib=False): channel = None if unloadCanlib: canlib.reinitializeLibrary() else: canlib.initializeLibrary() for ch in range(canlib.getNumberOfChannels()): try: ean = canlib.getChannelData_EAN(ch) serial = canlib.getChannelData_Serial(ch) c_channel = canlib.getChannelData_Chan_No_On_Card(ch) except canlib.canError as e: if e.status == canlib.canERR_NOCARD: print('Card was removed') break else: raise e if (ean == wanted_ean or wanted_ean is None) and (c_channel == card_channel) and ( (serial == wanted_serial) or (wanted_serial is None)): channel = ch break return channel def _waitToDisappear(self, timeout=10): startTime = time.time() print('Wait for disappear...', end="") while self._findChannel(self._ean, self._serial, self._cardChannel, unloadCanlib=True) is not None: if time.time() - startTime > timeout: print("\nWARNING: Timeout (%s s) reached while waiting for" " device (ean:%s, sn:%s) to disappear!" % (timeout, self._ean, self._serial)) print('I will keep running and assume that I was too slow...') return time.sleep(1) print('.\n', end="") def __ne__(self, other): return not self.__eq__(other) def __eq__(self, other): if other is None: return False if self._ean == other._ean and self._serial == other._serial: return True else: if (self._serial is None or other._serial is None) and \ self._ean == other._ean: return True else: return False def __hash__(self): return hash("%s %s" % (self._ean, self._serial)) def __str__(self): text = 'Device: %s\n' % self._name text = text + 'EAN : %s\n' % self._ean text = text + 'S/N : %s\n' % self._serial if self._fw is not None: fwVersion = "v%d.%d.%d" % self._fw else: fwVersion = "None" text = text + 'FW : %s\n' % fwVersion text = text + 'Card : %s\n' % self._card text = text + 'Drv : %s\n' % self._driver text = text + 'Card channel : %s\n' % self._cardChannel text = text + 'Canlib channel: %s\n' % self._channel return text
if __name__ == '__main__': devices = kvDevice.allDevices() print("List all %d devices..." % (len(devices))) for dev in devices: print("\n", dev)