Robot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

76 lines
2.6 KiB

import time
import threading
import serial
class Serial(object):
4 years ago
_callbacks = dict()
4 years ago
def __init__(self, devicename='/dev/ttyUSB0', baudrate=115200):
self.devicename = devicename
self.alive = None
try:
self.serialport = serial.Serial(devicename, baudrate, timeout=3.0)
self.serialport.flushInput()
4 years ago
self.serialport.write(b'\n')
self.alive = True
print("Arduino connection established..")
except serial.SerialException as e:
print('could not open port {!r}: {}\n'.format(devicename, e))
4 years ago
# except:
# print("Failed to open serial device: " + self.serialport.name)
self.receiver_thread = threading.Thread(target=self.reader, name='rx')
self.receiver_thread.daemon = True
self.receiver_thread.start()
def __del__(self):
self.alive = False
self.serialport.close()
def write(self, buf):
if not self.alive:
print("Serial port not connected")
return
try:
if isinstance(buf, str):
buf = bytes(buf, "utf8")
self.serialport.flushInput()
4 years ago
self.serialport.write(buf)
#self.serialport.write(b'\n') # or "\r\n" ?
print("sent " + str(buf))
except:
print("Error writing to: " + self.serialport.name + " Data: " + str(buf))
def reader(self):
try:
while self.alive and self.serialport:
4 years ago
#data = self.serialport.read(self.serialport.in_waiting or 1)
data = self.serialport.readline()
if data:
print(data)
4 years ago
if isinstance(data, bytes):
data = data.decode(encoding='utf8')
4 years ago
command = data[0]
4 years ago
argument = data[2:].strip()
4 years ago
if command and command in self._callbacks:
callbacks = self._callbacks[command]
for cb, cb_arg in callbacks:
cb(argument, cb_arg)
4 years ago
else:
print("no callback found for command '%s'" % command)
4 years ago
except serial.SerialException:
4 years ago
pass
#self.alive = False
#raise
4 years ago
def add_callback(self, cmd, callback, arg):
if not cmd in self._callbacks:
self._callbacks[cmd] = []
self._callbacks[cmd].append( (callback, arg) )
def del_callback(self, cmd, callback, arg):
if cmd in self._callbacks:
self._callbacks[cmd].remove( (callback, arg) )