from xml.dom import minidom
from . import kvamemolibxml
# XML Values
TRIG_LOG_ALL = 'TRIG_LOG_ALL'
TRIG_ON_EVENT = 'TRIG_ON_EVENT'
TRIG_SCRIPTED = 'TRIG_SCRIPTED'
[docs]class kvFilter(object):
def __init__(self):
self.msgStop = []
self.msgPass = []
[docs] def add(self, object):
if isinstance(object, kvFilterMsgStop):
self.addMsgStop(object)
else:
raise Exception("ERROR: (kvFilter) Can not add object of type %s!"
% type(object).__name__)
[docs] def addMsgStop(self, msgStop):
self.msgStop.append(msgStop)
[docs]class kvFilterMsgStop(object):
def __init__(self, protocol="NONE", msgid=None, dlc=None, msgid_min=None,
channel=0, can_ext="NO"):
self.msgid = msgid
if msgid_min is None:
msgid_min = msgid
self.msgid_min = msgid_min
self.dlc = dlc
self.protocol = protocol
self.channel = channel
self.can_ext = can_ext
[docs]class kvTrigger(object):
def __init__(self, logmode=TRIG_LOG_ALL, fifomode=True):
self.logmode = logmode
self.fifomode = fifomode
self.trigVar = []
self.statement = []
[docs] def add(self, obj):
obj.addToTriggerList(self)
[docs] def addStatement(self, trigStatement):
self.statement.append(trigStatement)
[docs] def getXmlTriggers(self, document):
xmlTriggers = document.createElement('TRIGGERS')
for obj in self.trigVar:
xmlTrigger = obj.getXml(document)
xmlTriggers.appendChild(xmlTrigger)
return xmlTriggers
[docs] def getXmlStatements(self, document):
xmlStatements = document.createElement('STATEMENTS')
for obj in self.statement:
xmlStatement = obj.getXml(document)
xmlStatements.appendChild(xmlStatement)
return xmlStatements
[docs]class kvScript(object):
def __init__(self, filename, path=''):
self.filename = filename
self.path = path
[docs]class kvTrigVarStartup(object):
def __init__(self, name="trigger_startup_0"):
self.name = name
[docs] def addToTriggerList(self, triggerList):
triggerList.trigVar.append(self)
[docs] def getXml(self, document):
xmlTrigger = document.createElement('TRIGGER_STARTUP')
xmlTrigger.setAttribute('name', self.name)
return xmlTrigger
[docs]class kvTrigVarDiskFull(object):
def __init__(self, name="trigger_diskfull_0"):
self.name = name
[docs] def addToTriggerList(self, triggerList):
triggerList.trigVar.append(self)
[docs] def getXml(self, document):
xmlTrigger = document.createElement('TRIGGER_DISK_FULL')
xmlTrigger.setAttribute('name', self.name)
return xmlTrigger
[docs]class kvTrigVarTimer(object):
def __init__(self, name="trigger_timer_0", offset=600, repeat=False,
channel=0, timeout=0):
self.name = name
self.offset = offset
self.repeat = False
self.channel = channel
self.timeout = timeout
[docs] def addToTriggerList(self, triggerList):
triggerList.trigVar.append(self)
[docs] def getXml(self, document):
xmlTrigger = document.createElement('TRIGGER_TIMER')
xmlTrigger.setAttribute('name', self.name)
xmlTrigger.setAttribute('offset', str(self.offset))
xmlTrigger.setAttribute('repeat', "YES" if self.repeat else "NO")
xmlTrigger.setAttribute('timeout', str(self.timeout))
return xmlTrigger
[docs]class kvTrigVarMsgId(object):
def __init__(self, name="trigger_msg_id_0", channel=0, timeout=0, msgid=0,
msgid_min=None, protocol="NONE", msg_field=None,
can_ext="NO", can_fd="NO"):
self.name = name
self.channel = channel
self.timeout = timeout
self.msgid = msgid
self.msgid_min = msgid_min
self.protocol = protocol
self.msg_field = msg_field
self.can_ext = can_ext
self.can_fd = can_fd
if self.msgid_min is None:
self.msgid_min = self.msgid
[docs] def addToTriggerList(self, triggerList):
triggerList.trigVar.append(self)
[docs] def getXml(self, document):
xmlTrigger = document.createElement('TRIGGER_MSG_ID')
xmlTrigger.setAttribute('name', self.name)
xmlTrigger.setAttribute('channel', str(self.channel))
xmlTrigger.setAttribute('timeout', str(self.timeout))
xmlTrigger.setAttribute('msgid', str(self.msgid))
xmlTrigger.setAttribute('msgid_min', str(self.msgid_min))
xmlTrigger.setAttribute('protocol', str(self.protocol))
if self.msg_field is not None:
xmlTrigger.setAttribute('msg_field', str(self.msg_field))
xmlTrigger.setAttribute('can_ext', str(self.can_ext))
xmlTrigger.setAttribute('can_fd', str(self.can_fd))
return xmlTrigger
[docs]class kvTrigVarMsgDlc(object):
def __init__(self, name="trigger_msg_dlc_0", channel=0, timeout=0, dlc=0,
dlc_min=None, can_fd="NO"):
self.name = name
self.channel = channel
self.timeout = timeout
self.dlc = dlc
self.dlc_min = dlc_min
self.can_fd = can_fd
if self.dlc_min is None:
self.dlc_min = self.dlc
[docs] def addToTriggerList(self, triggerList):
triggerList.trigVar.append(self)
[docs] def getXml(self, document):
xmlTrigger = document.createElement('TRIGGER_MSG_DLC')
xmlTrigger.setAttribute('name', self.name)
xmlTrigger.setAttribute('channel', str(self.channel))
xmlTrigger.setAttribute('timeout', str(self.timeout))
xmlTrigger.setAttribute('dlc', str(self.dlc))
xmlTrigger.setAttribute('dlc_min', str(self.dlc_min))
xmlTrigger.setAttribute('can_fd', str(self.can_fd))
return xmlTrigger
[docs]class kvTrigVarMsgErrorFrame(object):
def __init__(self, name="trigger_msg_errorframe_0", channel=0, timeout=0):
self.name = name
self.channel = channel
self.timeout = timeout
[docs] def addToTriggerList(self, triggerList):
triggerList.trigVar.append(self)
[docs] def getXml(self, document):
xmlTrigger = document.createElement('TRIGGER_MSG_ERROR_FRAME')
xmlTrigger.setAttribute('name', self.name)
xmlTrigger.setAttribute('channel', str(self.channel))
xmlTrigger.setAttribute('timeout', str(self.timeout))
return xmlTrigger
[docs]class kvTrigVarSigVal(object):
[docs] class condition(object):
ON_DATA_EQUAL_TO = "ON_DATA_EQUAL_TO"
ON_DATA_NOT_EQUAL_TO = "ON_DATA_NOT_EQUAL_TO"
ON_DATA_LARGER_THAN = "ON_DATA_LARGER_THAN"
ON_DATA_SMALLER_THAN = "ON_DATA_SMALLER_THAN"
ON_DATA_CHANGE_TO = "ON_DATA_CHANGE_TO"
ON_DATA_CHANGE_FROM = "ON_DATA_CHANGE_FROM"
ON_DATA_CHANGE = "ON_DATA_CHANGE"
ON_DATA_LARGER_THAN_OR_EQUAL = "ON_DATA_LARGER_THAN_OR_EQUAL"
ON_DATA_SMALLER_THAN_OR_EQUAL = "ON_DATA_SMALLER_THAN_OR_EQUAL"
[docs] class byteorder(object):
INTEL = "LITTLE_ENDIAN"
LITTLE_ENDIAN = "LITTLE_ENDIAN"
MOTOROLA = "BIG_ENDIAN"
BIG_ENDIAN = "BIG_ENDIAN"
def __init__(self, name="trigger_sigval_0", channel=0, timeout=0, msgid=0,
dlc=8, startbit=0, length=8, datatype="UNSIGNED",
byteorder="BIG_ENDIAN", protocol="NONE", msg_field=None,
data=0, data_min=None, condition=condition.ON_DATA_EQUAL_TO,
can_ext="NO", can_fd="NO"):
self.name = name
self.channel = channel
self.timeout = timeout
self.msgid = msgid
self.dlc = dlc
self.startbit = startbit
self.length = length
self.datatype = datatype
self.byteorder = byteorder
self.data = data
self.data_min = data_min
self.protocol = protocol
self.msg_field = msg_field
self.can_ext = can_ext
self.can_fd = can_fd
self.condition = condition
if self.data_min is None:
self.data_min = self.data
[docs] def addToTriggerList(self, triggerList):
triggerList.trigVar.append(self)
[docs] def getXml(self, document):
xmlTrigger = document.createElement('TRIGGER_SIGVAL')
xmlTrigger.setAttribute('name', self.name)
xmlTrigger.setAttribute('channel', str(self.channel))
xmlTrigger.setAttribute('timeout', str(self.timeout))
xmlTrigger.setAttribute('msgid', str(self.msgid))
xmlTrigger.setAttribute('protocol', str(self.protocol))
if self.msg_field is not None:
xmlTrigger.setAttribute('msg_field', str(self.msg_field))
xmlTrigger.setAttribute('startbit', str(self.startbit))
xmlTrigger.setAttribute('length', str(self.length))
xmlTrigger.setAttribute('dlc', str(self.dlc))
xmlTrigger.setAttribute('datatype', str(self.datatype))
xmlTrigger.setAttribute('byteorder', str(self.byteorder))
xmlTrigger.setAttribute('data', str(self.data))
xmlTrigger.setAttribute('data_min', str(self.data_min))
xmlTrigger.setAttribute('can_ext', str(self.can_ext))
xmlTrigger.setAttribute('can_fd', str(self.can_fd))
xmlTrigger.setAttribute('condition', str(self.condition))
return xmlTrigger
[docs]class kvTrigStatement(object):
def __init__(self, expression, preTrigger=0, postTrigger=0):
self.preTrigger = preTrigger
self.postTrigger = postTrigger
self.expression = expression
self.actions = []
[docs] def add(self, obj):
if isinstance(obj, kvTrigAction):
self.actions.append(obj)
else:
raise Exception(
"ERROR: (kvTrigStatement) Can not add object of type {0}!".format(
type(obj).__name__))
[docs] def addToTriggerList(self, triggerList):
triggerList.statement.append(self)
[docs] def getXml(self, document):
xmlStatement = document.createElement('STATEMENT')
xmlStatement.setAttribute('pretrigger', str(self.preTrigger))
xmlStatement.setAttribute('posttrigger', str(self.postTrigger))
xmlExpression = document.createElement('EXPRESSION')
text = document.createTextNode(str(self.expression))
xmlExpression.appendChild(text)
xmlStatement.appendChild(xmlExpression)
xmlActions = document.createElement('ACTIONS')
xmlStatement.appendChild(xmlActions)
for obj in self.actions:
xmlAction = obj.getXml(document)
xmlActions.appendChild(xmlAction)
return xmlStatement
[docs]class kvTrigAction(object):
[docs] class function:
START_LOG = 'ACTION_START_LOG'
STOP_LOG = 'ACTION_STOP_LOG'
STOP_LOG_COMPLETELY = 'ACTION_STOP_LOG_COMPLETELY'
ACTIVATE_AUTO_TRANSMIT_LIST = 'ACTION_ACTIVATE_AUTO_TRANSMIT_LIST'
DEACTIVATE_AUTO_TRANSMIT_LIST = 'ACTION_DEACTIVATE_AUTO_TRANSMIT_LIST'
EXTERNAL_PULSE = 'ACTION_EXTERNAL_PULSE'
def __init__(self, function=function.START_LOG, name=None, duration=0):
self.function = function
self.name = name
self.duration = duration
[docs] def getXml(self, document):
xmlAction = document.createElement(self.function)
if (self.function is kvTrigAction.function.ACTIVATE_AUTO_TRANSMIT_LIST or
self.function is kvTrigAction.function.DEACTIVATE_AUTO_TRANSMIT_LIST):
xmlAction.setAttribute('name', str(self.name))
if self.function is kvTrigAction.function.EXTERNAL_PULSE:
xmlAction.setAttribute('duration', str(self.duration))
return xmlAction
[docs]class kvMessage(object):
def __init__(self, name, msgid, data, dlc=None, can_ext=False,
can_fd=False, can_fd_brs=False, error_frame=False,
remote_frame=False):
self.name = name
self.msgid = msgid
self.data = data
self.dlc = dlc
self.can_ext = can_ext
self.can_fd = can_fd
self.can_fd_brs = can_fd_brs
self.error_frame = error_frame
self.remote_frame = remote_frame
[docs] def getXml(self, document):
xmlStatement = document.createElement('MESSAGE')
xmlStatement.setAttribute('name', self.name)
xmlStatement.setAttribute('msgid', str(self.msgid))
data_len = len(self.data)
if data_len > 8:
data_len = 8
if self.dlc is None:
self.dlc = data_len
xmlStatement.setAttribute('dlc', str(self.dlc))
for d in range(0, data_len):
xmlStatement.setAttribute('b' + str(d), str(self.data[d]))
xmlStatement.setAttribute('can_ext', "YES" if self.can_ext else "NO")
xmlStatement.setAttribute('can_fd', "YES" if self.can_fd else "NO")
xmlStatement.setAttribute('can_fd_brs',
"YES" if self.can_fd_brs else "NO")
xmlStatement.setAttribute('error_frame',
"YES" if self.error_frame else "NO")
xmlStatement.setAttribute('remote_frame',
"YES" if self.remote_frame else "NO")
return xmlStatement
[docs]class kvTransmitList(object):
def __init__(self, name, msg_delay=0, cycle_delay=0, cyclic=False,
autostart=False):
self.name = name
self.msg_delay = msg_delay
self.cycle_delay = cycle_delay
self.cyclic = cyclic
self.messages = []
[docs] def add(self, obj):
if isinstance(obj, kvTransmitMessage):
self.messages.append(obj)
else:
raise Exception(
"ERROR: (kvTransmitList) Can not add object of type {0}!".format(type(obj).__name__))
[docs] def getXml(self, document):
xmlTransmitList = document.createElement('TRANSMIT_LIST')
xmlTransmitList.setAttribute('name', self.name)
xmlTransmitList.setAttribute('msg_delay', str(self.msg_delay))
xmlTransmitList.setAttribute('cycle_delay', str(self.cycle_delay))
xmlTransmitList.setAttribute('cyclic', "YES" if self.cyclic else "NO")
if self.messages is None:
return xmlTransmitList
for message in self.messages:
xmlTransmitMessage = message.getXml(document)
xmlTransmitList.appendChild(xmlTransmitMessage)
return xmlTransmitList
[docs]class kvTransmitMessage(object):
def __init__(self, name, channel=0):
self.name = name
self.channel = channel
[docs] def getXml(self, document):
xmlTransmitList = document.createElement('TRANSMIT_MESSAGE')
xmlTransmitList.setAttribute('name', self.name)
xmlTransmitList.setAttribute('channel', str(self.channel))
return xmlTransmitList
[docs]class ValidationResult(object):
def __init__(self, severity, status, text):
self.severity = severity
self.status = status
self.text = text
def __str__(self):
return "[%s %d]: %s" % (self.severity, self.status, self.text)
[docs]class kvMemoConfig(object):
def __init__(self, version="2.0", binary_version="6.0", afterburner=0,
log_all=False, fifo_mode="NO", param_lif=None,
param_xml=None):
if param_lif is not None:
self.parseLif(param_lif)
elif param_xml is not None:
self.parseXml(param_xml)
else:
imp = minidom.DOMImplementation()
doctype = imp.createDocumentType(qualifiedName="KVASER",
publicId="", systemId="")
self.document = imp.createDocument(None, 'KVASER', doctype)
root = self.document.documentElement
self.document.appendChild(root)
comment = self.document.createComment("Created with memoConfig.py")
root.appendChild(comment)
child = self.document.createElement('VERSION')
text = self.document.createTextNode(version)
child.appendChild(text)
root.appendChild(child)
child = self.document.createElement('BINARY_VERSION')
text = self.document.createTextNode(binary_version)
child.appendChild(text)
root.appendChild(child)
xmlSettings = self.document.createElement('SETTINGS')
root.appendChild(xmlSettings)
xmlMode = self.document.createElement('MODE')
xmlMode.setAttribute('log_all', "YES" if log_all else "NO")
xmlMode.setAttribute('fifo_mode', fifo_mode)
xmlSettings.appendChild(xmlMode)
xmlCanpower = self.document.createElement('CANPOWER')
xmlCanpower.setAttribute('timeout', str(afterburner))
xmlSettings.appendChild(xmlCanpower)
[docs] def addBusparams(self, rateParam, channel=0, silent=False,
rateParamFd=None, iso=None):
child = self.document.getElementsByTagName('CAN_BUS')
if not child:
child = self.document.createElement('CAN_BUS')
self.document.documentElement.appendChild(child)
else:
child = child[0]
newchild = self.document.createElement('PARAMETERS')
newchild.setAttribute('channel', str(channel))
newchild.setAttribute('bitrate', str(rateParam.freq))
newchild.setAttribute('tseg1', str(rateParam.tseg1))
newchild.setAttribute('tseg2', str(rateParam.tseg2))
newchild.setAttribute('sjw', str(rateParam.sjw))
newchild.setAttribute('silent', "YES" if silent else "NO")
if rateParamFd is not None:
newchild.setAttribute('bitrate_brs', str(rateParamFd.freq))
newchild.setAttribute('tseg1_brs', str(rateParamFd.tseg1))
newchild.setAttribute('tseg2_brs', str(rateParamFd.tseg2))
newchild.setAttribute('sjw_brs', str(rateParamFd.sjw))
if iso is None:
iso = True
if iso is not None:
newchild.setAttribute('iso', "YES" if iso else "NO")
child.appendChild(newchild)
[docs] def add(self, obj):
if isinstance(obj, kvTrigger):
self.addTrigger(obj)
elif isinstance(obj, kvFilter):
self.addFilter(obj)
elif isinstance(obj, kvMessage):
self.addMessage(obj)
elif isinstance(obj, kvTransmitList):
self.addTransmitList(obj)
else:
raise Exception(
"ERROR: (kvMemoConfig) Can not add object of type {0}!".format(
type(obj).__name__))
def _addFilterAttribute(self, xmlFilterMsg, filter):
xmlFilterMsg.setAttribute('msgid', str(filter.msgid))
xmlFilterMsg.setAttribute('msgid_min', str(filter.msgid_min))
xmlFilterMsg.setAttribute('protocol', str(filter.protocol))
xmlFilterMsg.setAttribute('can_ext', str(filter.can_ext))
xmlChannel = self.document.createElement('CHANNEL')
text = self.document.createTextNode(str(filter.channel))
xmlChannel.appendChild(text)
xmlFilterMsg.appendChild(xmlChannel)
[docs] def addFilter(self, filter):
xmlFilterBlock = self.document.createElement('FILTERS')
self.document.documentElement.appendChild(xmlFilterBlock)
if filter is None:
return
for obj in filter.msgStop:
xmlFilterMsg = self.document.createElement('MESSAGE_STOP')
xmlFilterBlock.appendChild(xmlFilterMsg)
self._addFilterAttribute(xmlFilterMsg, obj)
for obj in filter.msgPass:
xmlFilterMsg = self.document.createElement('MESSAGE_PASS')
xmlFilterBlock.appendChild(xmlFilterMsg)
self._addFilterAttribute(xmlFilterMsg, obj)
[docs] def addTrigger(self, trigger):
xmlTriggerBlock = self.document.createElement('TRIGGERBLOCK')
self.document.documentElement.appendChild(xmlTriggerBlock)
if trigger is None:
return
xmlTriggers = trigger.getXmlTriggers(self.document)
xmlTriggerBlock.appendChild(xmlTriggers)
xmlStatements = trigger.getXmlStatements(self.document)
xmlTriggerBlock.appendChild(xmlStatements)
[docs] def addMessage(self, message):
xmlBlocks = self.document.getElementsByTagName('MESSAGES')
if len(xmlBlocks):
xmlMessageBlock = xmlBlocks[0]
else:
xmlMessageBlock = self.document.createElement('MESSAGES')
self.document.documentElement.appendChild(xmlMessageBlock)
if message is None:
return
xmlMessage = message.getXml(self.document)
xmlMessageBlock.appendChild(xmlMessage)
[docs] def addTransmitList(self, transmitList):
xmlBlocks = self.document.getElementsByTagName('TRANSMIT_LISTS')
if len(xmlBlocks):
xmlTranmitListBlock = xmlBlocks[0]
else:
xmlTranmitListBlock = self.document.createElement('TRANSMIT_LISTS')
self.document.documentElement.appendChild(xmlTranmitListBlock)
if transmitList is None:
return
xmlTranmitList = transmitList.getXml(self.document)
xmlTranmitListBlock.appendChild(xmlTranmitList)
[docs] def addScript(self, script, channel=0):
xmlScripts = self.document.createElement('SCRIPTS')
self.document.documentElement.appendChild(xmlScripts)
xmlScript = self.document.createElement('SCRIPT')
xmlScript.setAttribute('primary', "YES")
xmlScript.setAttribute('default_channel', str(channel))
xmlScripts.appendChild(xmlScript)
elementScript = self.document.createElement('FILENAME')
text = self.document.createTextNode(script.filename)
elementScript.appendChild(text)
xmlScript.appendChild(elementScript)
elementPath = self.document.createElement('PATH')
text = self.document.createTextNode(script.path)
elementPath.appendChild(text)
xmlScript.appendChild(elementPath)
[docs] def parseLif(self, conf_lif):
conf_xml = kvamemolibxml.kvaBufferToXml(conf_lif)
self.document = minidom.parseString(conf_xml)
[docs] def parseXml(self, conf_xml):
self.document = minidom.parseString(conf_xml)
[docs] def validate(self):
result = []
(countErr, countWarn) = kvamemolibxml.kvaXmlValidate(self.document.toxml())
while countErr > 0:
(status, text) = kvamemolibxml.xmlGetValidationError()
if status == 0:
break
result.append(ValidationResult(severity='error', status=status,
text=text))
while countWarn > 0:
(status, text) = kvamemolibxml.xmlGetValidationWarning()
if status == 0:
break
result.append(ValidationResult(severity='warning', status=status,
text=text))
return result
[docs] def toXml(self):
return self.document.toxml()
[docs] def toLif(self):
conf_lif = kvamemolibxml.kvaXmlToBuffer(self.document.toxml())
return conf_lif
if __name__ == '__main__':
import canlib
cl = canlib.canlib()
rate = cl.translateBaud(freq=canlib.canBITRATE_1M)
print(rate)
print("- Manually creating configuration -----------------")
memoConfig = kvMemoConfig(afterburner=10000)
memoConfig.addBusparams(channel=0, rateParam=rate)
memoConfig.addBusparams(channel=1, rateParam=rate)
trigger = kvTrigger(logmode=TRIG_ON_EVENT, fifomode=False)
trigVarTimer = kvTrigVarTimer(name="trigger_timer_0", offset=10)
trigger.add(trigVarTimer)
trigVarTimer = kvTrigVarTimer(name="trigger_timer_1", offset=20)
trigger.add(trigVarTimer)
trigStatement = kvTrigStatement(expression="trigger_timer_0")
trigger.add(trigStatement)
trigAction = kvTrigAction(function=kvTrigAction.function.START_LOG)
trigStatement.add(trigAction)
trigStatement = kvTrigStatement(expression="trigger_timer_1")
trigger.add(trigStatement)
trigAction = kvTrigAction(function=kvTrigAction.function.START_LOG)
trigStatement.add(trigAction)
memoConfig.addTrigger(trigger)
script = kvScript("test_script.txe")
memoConfig.addScript(script, channel=0)
outfile = open("firstTry.xml", 'w')
outfile.write(memoConfig.toXml())
outfile.close()
print("- Converting conf.lif to xml -----------------")
infile = open("conf.lif", 'rb')
conf_lif = infile.read()
infile.close()
memoConfig2 = kvMemoConfig(param_lif=conf_lif)
outfile = open("conf.xml", 'w')
outfile.write(memoConfig2.toXml())
outfile.close()
print("- Writing manual configuration to firstTry.lif -----------------")
outfile = open("firstTry.lif", 'wb')
outfile.write(memoConfig.toLif())
outfile.close()
print("- Converting firstTry.lif to xml -----------------")
infile = open("firstTry.lif", 'rb')
conf_lif = infile.read()
infile.close()
memoConfig2 = kvMemoConfig(param_lif=conf_lif)
outfile = open("firstTry2.xml", 'w')
outfile.write(memoConfig2.toXml())
outfile.close()