So, I found some time to get my Python code. The code looks not really nice and still needs improvement.
#!/usr/bin/env python
import sys
import Tkinter
import tkMessageBox
import threading
import Queue
import serial
from collections import deque
def main(argv=None):
if argv is None:
argv = sys.argv
commandList = []
inputCmds = deque()
outputCmds = deque()
inputQ = Queue.Queue()
outputQ = Queue.Queue()
manager = CommunicationManager(inputQ,outputQ)
gui = GuiClass(inputQ,outputQ,manager,commandList,inputCmds, outputCmds)
gui.go()
return 0
class GuiClass(object):
def __init__(self,inputQ,outputQ,commManager,commandList,inputCmds,outputCmds):
self.inputQ = inputQ
self.outputQ = outputQ
self.manager = commManager
self.commandList = commandList
self.inputCmds = inputCmds
self.outputCmds = outputCmds
self.currentCmd = ""
self.root = Tkinter.Tk()
text_frame = Tkinter.Frame(self.root)
entry_frame = Tkinter.Frame(self.root)
scrollbar = Tkinter.Scrollbar(text_frame)
self.text = Tkinter.Text(text_frame,height=5,width=80,wrap=Tkinter.WORD,yscrollcommand=scrollbar.set)
scrollbar.pack(side=Tkinter.RIGHT,fill=Tkinter.Y)
self.text.pack(fill=Tkinter.BOTH,expand=True)
scrollbar.config(command=self.text.yview)
portLabel = Tkinter.Label(entry_frame, text="Arduino Port: ")
self.arduinoFile = Tkinter.StringVar()
portEntry = Tkinter.Entry(entry_frame, textvariable=self.arduinoFile)
portButton = Tkinter.Button(entry_frame, text="Connect", command=self.connect)
portLabel.pack(side=Tkinter.LEFT)
portEntry.pack(side=Tkinter.LEFT,fill=Tkinter.X,expand=True)
portButton.pack(side=Tkinter.LEFT)
newTestButton = Tkinter.Button(self.root,text="New Command",command=self.newTest)
nextTest = Tkinter.Button(self.root, text="Next commands", command=self.nextcommands)
quitButton = Tkinter.Button(self.root,text="Quit",command=self.root.quit)
entry_frame.pack(fill=Tkinter.X)
text_frame.pack(fill=Tkinter.BOTH,expand=True)
newTestButton.pack(fill=Tkinter.X)
nextTest.pack(fill=Tkinter.X)
quitButton.pack(fill=Tkinter.X)
self.text.configure(state=Tkinter.DISABLED)
self.firstline=True
commands1 = ["DND", "DED"]
commands2 = ["DND", "DFI100", "DFI2000", "DFI", "DED"]
commands3 = ["DND", "DFI5000", "DED"]
commandList.append(commands1)
commandList.append(commands2)
commandList.append(commands3)
# Menubar
menubar = Tkinter.Menu(self.root)
filemenu = Tkinter.Menu(menubar,tearoff=False)
#filemenu.add_command(label="Open...",command=self.openport)
filemenu.add_separator()
filemenu.add_command(label="Quit",command=self.root.quit)
helpmenu = Tkinter.Menu(menubar,tearoff=False)
helpmenu.add_command(label="About",command=self.about)
menubar.add_cascade(label="File",menu=filemenu)
menubar.add_cascade(label="Help",menu=helpmenu)
self.root.config(menu=menubar)
self.root.title("Python Test")
def go(self):
self.root.after(100, self.periodic_check)
self.root.mainloop()
def connect(self):
filename = self.arduinoFile.get()
self.manager.connect(filename)
self.root.after(2000,self.check_connection)
def check_connection(self):
self.outputQ.put("CHK")
def periodic_check(self):
try:
inp = self.inputQ.get_nowait()
tokens = inp.split()
if len(tokens)>=1:
if tokens[0]=="ACK1":
self.writeline("Arduino Connected")
elif tokens[0]=="DND1":
self.inputCmds.append("DND")
elif tokens[0]=="DFI1":
self.inputCmds.append("DFI")
elif tokens[0]=="DED1":
self.inputCmds.append("DED")
elif tokens[0]=="AGP1":
self.writeline("GET POSITION")
elif tokens[0]=="ASP1":
self.writeline("SET POSITION")
else:
self.writeline("Unrecognized response:")
self.writeline(inp)
except Queue.Empty:
pass
# first time
if (len(self.outputCmds) > 0 and self.currentCmd == ""):
self.currentCmd = self.outputCmds[0]
self.outputQ.put(self.outputCmds.popleft())
self.writeline("send: "+self.currentCmd)
if (len(self.inputCmds) > 0):
if (self.inputCmds[0] == self.currentCmd):
self.writeline("acknowledgement: "+ self.currentCmd)
try:
self.inputCmds.popleft()
except:
pass
if (len(self.outputCmds) > 0):
self.currentCmd = self.outputCmds[0]
self.outputQ.put(self.outputCmds.popleft())
self.writeline("send: "+self.currentCmd)
elif (len(self.outputCmds) == 0 and len(self.inputCmds) == 0):
self.currentCmd = ""
self.writeline("Queue empty.")
self.root.after(100, self.periodic_check)
def writeline(self,text):
self.text.configure(state=Tkinter.NORMAL)
if self.firstline:
self.firstline=False
else:
self.text.insert(Tkinter.END,"\n")
self.text.insert(Tkinter.END,text)
self.text.yview(Tkinter.END)
self.text.configure(state=Tkinter.DISABLED)
def newTest(self):
self.outputCmds.append("DND1002001203")
def nextTest(self):
self.writeline("Starting sending new commands")
try:
mycommands = self.commandList.pop(0)
for myCommand in mycommands:
self.outputCmds.append(myCommand)
except:
self.writeline("No commands available!")
def about(self):
tkMessageBox.showinfo("About",__doc__,parent=self.root)
return
class CommunicationManager(object):
def __init__(self,inputQ,outputQ):
self.inputQ=inputQ
self.outputQ=outputQ
self.serialPort=None
self.inputThread = None
self.outputThread = None
self.keepRunning = True
self.activeConnection = False
def runInput(self):
while self.keepRunning:
try:
inputline = self.serialPort.readline()
self.inputQ.put(inputline.rstrip())
except:
# This area is reached on connection closing
pass
return
def runOutput(self):
while self.keepRunning:
try:
outputline = self.outputQ.get()
self.serialPort.write("%s\n"%outputline)
except:
# This area is reached on connection closing
pass
return
def connect(self,filename):
if self.activeConnection:
self.close()
self.activeConnection = False
try:
self.serialPort = serial.Serial(filename,9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_TWO,
bytesize=serial.EIGHTBITS)
except serial.SerialException:
self.inputQ.put("ERROR Unable to Connect to Serial Port")
return
self.keepRunning=True
self.inputThread = threading.Thread(target=self.runInput)
self.inputThread.daemon=True
self.inputThread.start()
self.outputThread = threading.Thread(target=self.runOutput)
self.outputThread.daemon=True
self.outputThread.start()
self.activeConnection = True
def close(self):
self.keepRunning=False;
self.serialPort.close()
self.outputQ.put("TERMINATE") # This does not get sent, but stops the outputQ from blocking.
self.inputThread.join()
self.outputThread.join()
self.inputQ.put("IOHALT")
if __name__ == "__main__":
sys.exit(main())
I suppose that the problem is linked to the self.serialPort.write("%s\n"%outputline) part where I send out the commands to the Arduino.
In the serial monitor everything works fine, but sending out commands from Python only works when they have only 3 characters and no value passed.
Do you have any idea? Thanks in advance,
Manuel