# -*- coding: utf-8 -*-
import J2534
import re
from J2534.Define import *
from J2534.Error import ERR_BUFFER_EMPTY
from pynput.keyboard import Key, Listener
import threading as th
action_prot=ProtocolID.ISO9141
pattern0="X:"
pattern1="Y:"
def test_cmd(RxCmd):
fo = open(r"D:\obdii_project\KW82_simulate\me7.req")
reply = 0
line = fo.readline()
while line:
m = re.match(pattern0, line)
if(m):
src = line[2:].replace(" ", "").upper().strip()
#print(RxCmd, src)
if(RxCmd == src):
reply = 1
line = fo.readline()
if(1 == reply):
break;
fo.close()
if(1 == reply):
return bytearray.fromhex(line[2:])
else:
return None
def on_release(key):
if key == Key.esc:
# Stop listener
return False
keep_going = True
def key_capture_thread():
global keep_going
input()
keep_going = False
# print list off tools installed on your computer...
print(J2534.toolLIST())
# use pretty list above to choose the index of the tool you will be using...
J2534.setDevice(6)
# open connection to tool and grab divice id...
# ret will return 0 which means no errors detected...
ret, deviceID = J2534.ptOpen()
# connect channel with correct comm protocol and baudrate...
# also returns channel id which gets passed to ecm filter...
ret, channelID = J2534.ptConnect(deviceID, action_prot, 0x200, 9600)
# start iso1941 pass block filter...
J2534.ptStartEcmFilter(channelID, action_prot, [0, 0, 0], [0, 0, 0], 0, 0, 0)
J2534.SetConfig(channelID,
[[Parameter.P4_MIN, 10],
[Parameter.PARITY, 2], # 0(NO_PARITY)/1(ODD_PARITY)/2(EVEN_PARITY)
[Parameter.LOOPBACK, 1],
[Parameter.TIDLE, 0]])
# set transmit data structure
Tx = J2534.ptTxMsg(action_prot, 0, 0)
# set recv data structure...
Rx = J2534.ptRxMsg(action_prot, 0, 0)
# transmit data string, 28, 00, 62 is how chrysler pings ecu
# which is also returns first digit of vin number...
Tx.SetDataString([0x6c, 0x10, 0xf0, 0x01, 0x00])
# transmit message that was set with Tx.SetDataString([0x28, 0x00, 0x62]) will return 0 if successful...
J2534.ptWtiteMsgs(channelID, Tx, 1, 500)
th.Thread(target=key_capture_thread, args=(), name='key_capture_thread', daemon=True).start()
# read message will return 0 if no errors detected
while keep_going:
ret = J2534.ptReadMsgs(channelID, Rx, 1, 500)
if(ERR_BUFFER_EMPTY != ret):
# print data that was recv...
RxHexData = Rx.DumpData()
TxHexData = test_cmd(RxHexData)
if(None != TxHexData):
print(TxHexData)
Tx.SetDataString(TxHexData)
J2534.ptWtiteMsgs(channelID, Tx, 1, 500)
# with Listener(on_release=on_release) as listener:
# listener.join()
# break
# close channel...
J2534.ptDisconnect(channelID)
# close tool connect...
J2534.ptClose(deviceID)