Robot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

112 lines
3.1 KiB

from . import websocket_blueprint
from flask import current_app, render_template, request
from flask_socketio import SocketIO, emit, join_room, leave_room
import json
from roberto import socketio
################
#### routes ####
################
@socketio.on('axes')
def gamepad_axes(axes_data):
print('GAMEPAD axes')
print(axes_data)
from roberto import serial
x = applyDeadZone( axes_data['0'], 0.15 )
y = applyDeadZone( axes_data['1'], 0.15 )
r = applyDeadZone( axes_data['3'], 0.15 )
m0 = +y - x - r
m1 = +y + x - r
m2 = -y + x - r
m3 = -y - x - r
m0 *= 4000
m1 *= 4000
m2 *= 4000
m3 *= 4000
command = 'S '+str(int(m0))+','+str(int(m1))+','+str(int(m2))+','+str(int(m3))+'\n'
serial.write(bytes(command, "utf8"))
def applyDeadZone(value, threshold):
new_value = ( abs(value) - threshold ) / ( 1 - threshold )
if new_value < 0:
return 0
if value < 0:
new_value = new_value * -1
return new_value
@socketio.on('text')
def display_text(text):
print("display_text()")
print(text['data'])
if isinstance(text['data'], str):
if len(text['data']) < 250:
from roberto import serial
command = 'T '+str(text['data'])
serial.write(bytes(command, "utf8"))
# https://pfertyk.me/2020/03/webrtc-a-working-example/
from roberto import webrtccamera
ROOM = 'default'
@websocket_blueprint.route('/camera')
def camera():
return render_template('camera.html', room=ROOM)
@socketio.on('message') # ok
def webrtc_message(data):
sid = request.sid
print('Message from {}: {}'.format(sid, data))
if isinstance(data, str):
try:
jsonData = json.loads(data)
jsonData['from_sid'] = sid
data = json.dumps(jsonData)
except json.JSONDecodeError:
print("could not decode json")
elif isinstance(data, dict):
data['from_sid'] = sid
else:
print("unknown message type")
socketio.emit('message', data=data, room=ROOM, skip_sid=sid)
@socketio.on('get_stats')
def get_stats(data):
sid = request.sid
print("client %s requesting stats" % sid)
from roberto import serial
if sid != webrtccamera.sid:
serial.add_callback('B', display_battery, sid)
@socketio.on('disconnect')
def disconnect():
sid = request.sid
print("Received Disconnect message from %s" % sid)
leave_room(ROOM)
webrtccamera.disconnect_client(sid, ROOM)
from roberto import serial
serial.del_callback('B', display_battery, sid)
@socketio.on('connect')
def connect():
sid = request.sid
print("Received Connect message from %s" % sid)
join_room(ROOM)
webrtccamera.connect_client(sid, ROOM)
@socketio.on_error_default
def default_error_handler(e):
print(request.event["message"]) # "my error event"
print(request.event["args"]) # (data,)
keys = request.keys()
keys.sort()
for key in request:
print('%s: %s' % (key, repr(request[key])))
def display_battery(val, sid=ROOM):
socketio.emit('battery', data=val, room=sid)
#socketio.emit('battery', data=val)