import time import threading import serial class Serial(object): _callbacks = dict() def __init__(self, devicename='/dev/ttyUSB0', baudrate=115200): self.devicename = devicename self.alive = None try: self.serialport = serial.Serial(devicename, baudrate, timeout=3.0) self.serialport.flushInput() self.serialport.write(b'\n') self.alive = True print("Arduino connection established..") except serial.SerialException as e: print('could not open port {!r}: {}\n'.format(devicename, e)) # except: # print("Failed to open serial device: " + self.serialport.name) self.receiver_thread = threading.Thread(target=self.reader, name='rx') self.receiver_thread.daemon = True self.receiver_thread.start() def __del__(self): self.alive = False self.serialport.close() def write(self, buf): if not self.alive: print("Serial port not connected") return try: if isinstance(buf, str): buf = bytes(buf, "utf8") self.serialport.flushInput() self.serialport.write(buf) #self.serialport.write(b'\n') # or "\r\n" ? print("sent " + str(buf)) except: print("Error writing to: " + self.serialport.name + " Data: " + str(buf)) def reader(self): try: while self.alive and self.serialport: data = self.serialport.read(self.serialport.in_waiting or 1) if data: print(data) command = data[0] argument = data[2:].strip() if command.isalnum() and command in self._callbacks: cb = self._callbacks[command] if cb: cb(argument) except serial.SerialException: pass #self.alive = False #raise def add_callback(cmd, callback): self._callbacks[cmd] = callback