diff --git a/raspberry/requirements.py b/raspberry/requirements.py index 894461e..4dc99d3 100755 --- a/raspberry/requirements.py +++ b/raspberry/requirements.py @@ -2,7 +2,7 @@ python3-flask python3-flask-sqlalchemy python3-flask-login python3-flask-socketio -python3-socketio-client +python3-socketio python3-serial python3-opencv python3-picamera diff --git a/raspberry/roberto/camera/camera_gstreamer_webrtc.py b/raspberry/roberto/camera/camera_gstreamer_webrtc.py index c24b8e7..4e9ce0c 100644 --- a/raspberry/roberto/camera/camera_gstreamer_webrtc.py +++ b/raspberry/roberto/camera/camera_gstreamer_webrtc.py @@ -47,16 +47,16 @@ webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.googl queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv. ''' +# rpicamsrc name=src preview=0 fullscreen=0 ! h264parse ! omxh264dec ! glimagesink sync=0 + class WebRTCCamera(Thread): def __init__(self): - self.pipe = None - self.webrtc = None self.sid = None - self.current_client_sid = None - self.num_clients = 0 + self._num_clients = 0 self.connected = False self.server = 'ws://localhost:5000' self._queue = Queue(maxsize=10) + self._peers = {} Thread.__init__(self) @@ -71,13 +71,11 @@ class WebRTCCamera(Thread): while True: item = self._queue.get() if item['job'] == "connect_client": - self.current_client_sid = item['sid'] self.start_pipeline(item['sid']) - self.num_clients += 1 + self._num_clients += 1 elif item['job'] == "disconnect_client": - self.current_client_sid = item['sid'] self.close_pipeline(item['sid']) - self.num_clients -= 1 + self._num_clients -= 1 else: print("unknown job: %s" % item['job']) self._queue.task_done() @@ -105,63 +103,70 @@ class WebRTCCamera(Thread): def disconnect_client(self, sid, room): self._queue.put({'job':'disconnect_client', 'sid':sid}) - def start_pipeline(self, client_sid): # ok + def start_pipeline(self, client_sid): print("STARTING PIPELINE") - self.pipe = Gst.parse_launch(PIPELINE_DESC) - self.webrtc = self.pipe.get_by_name('sendrecv') - self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) - self.webrtc.connect('on-ice-candidate', self.on_ice_candidate) - self.pipe.set_state(Gst.State.PLAYING) + self._peers[client_sid] = {} + pipe = Gst.parse_launch(PIPELINE_DESC) + self._peers[client_sid]['pipe'] = pipe + webrtc = pipe.get_by_name('sendrecv') + self._peers[client_sid]['webrtc'] = webrtc + webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, client_sid) + webrtc.connect('on-ice-candidate', self.on_ice_candidate, client_sid) + pipe.set_state(Gst.State.PLAYING) def close_pipeline(self, client_sid): - self.pipe.set_state(Gst.State.NULL) - self.pipe = None - self.webrtc = None + self._peers[client_sid]['pipe'].set_state(Gst.State.NULL) + self._peers[client_sid]['pipe'] = None + self._peers[client_sid]['webrtc'] = None - def handle_sdp_answer(self, sdp): # ok + def handle_sdp_answer(self, sdp, client_sid): print("handle_sdp_answer") #print ('Received answer:\n%s' % sdp) res, sdpmsg = GstSdp.SDPMessage.new() GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) promise = Gst.Promise.new() - self.webrtc.emit('set-remote-description', answer, promise) + self._peers[client_sid]['webrtc'].emit('set-remote-description', answer, promise) promise.interrupt() - def handle_ice(self, ice): # ok + def handle_sdp_offer(self, sdp, client_sid): + pass + + def handle_ice(self, ice, client_sid): print("handle_ice") candidate = ice['candidate'] sdpmlineindex = ice['sdpMLineIndex'] - self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) + self._peers[client_sid]['webrtc'].emit('add-ice-candidate', sdpmlineindex, candidate) - def on_negotiation_needed(self, receiver_entry): # ok - print("on_negotiation_needed") - promise = Gst.Promise.new_with_change_func(self.on_offer_created, receiver_entry, None) + def on_negotiation_needed(self, receiver_entry, client_sid): + print("on_negotiation_needed [%s]" % client_sid) + promise = Gst.Promise.new_with_change_func(self.on_offer_created, receiver_entry, client_sid, None) receiver_entry.emit('create-offer', None, promise) - def on_ice_candidate(self, _, mlineindex, candidate): # ok - print("on_ice_candidate") - icemsg = json.dumps({'type': 'ice', 'data': {'candidate': candidate, 'sdpMLineIndex': mlineindex}}) + def on_ice_candidate(self, _, mlineindex, candidate, client_sid): + print("on_ice_candidate [%s]" % client_sid) + icemsg = json.dumps({'type': 'ice', 'data': {'candidate': candidate, 'sdpMLineIndex': mlineindex}, 'sid': client_sid}) sio.emit('message', data=icemsg, namespace='/webrtc') - def on_offer_created(self, promise, _, __): # ok - print("on_offer_created") + # Offer created by our pipeline, to be sent to the peer + def on_offer_created(self, promise, _, client_sid, __): + print("on_offer_created [%s]" % client_sid) promise.wait() reply = promise.get_reply() offer = reply['offer'] promise = Gst.Promise.new() - self.webrtc.emit('set-local-description', offer, promise) + self._peers[client_sid]['webrtc'].emit('set-local-description', offer, promise) promise.interrupt() text = offer.sdp.as_text() print ('Sending offer:\n%s' % text) - msg = json.dumps({'type': 'sdp', 'data': {'type': 'offer', 'sdp': text}}) + msg = json.dumps({'type': 'sdp', 'data': {'type': 'offer', 'sdp': text}, 'sid': client_sid}) sio.emit('message', data=msg, namespace='/webrtc') print("test") - @sio.on('message', namespace='/webrtc') # ok + @sio.on('message', namespace='/webrtc') def webrtc_message(data): from roberto import webrtccamera -# print('Message from {}: {}'.format(sid, data)) +# print('WebRTCCamera got Message: {}'.format(data)) if not data: print("data malformed") return @@ -171,18 +176,29 @@ class WebRTCCamera(Thread): if not 'data' in data: print("Received message without data field") return + if not 'from_sid' in data: + print("Received message without from_sid field") + return + if data['from_sid'] == webrtccamera.sid: + print("Received message from self") + return + client_sid = data['from_sid'] if data['type'] == 'sdp': if not 'type' in data['data']: print("Received SDP message without type field") return + if not 'sdp' in data['data']: + print("Received SDP message without SDP string") + return if data['data']['type'] == 'answer': - if 'sdp' in data['data']: - print("Received SDP:") - print(data['data']['sdp']) - webrtccamera.handle_sdp_answer(data['data']['sdp']) - else: - print("Received SDP message without SDP string") + print("Received SDP answer:") + print(data['data']['sdp']) + webrtccamera.handle_sdp_answer(data['data']['sdp'], client_sid) + elif data['data']['type'] == 'offer': + print("Received SDP offer:") + print(data['data']['sdp']) + webrtccamera.handle_sdp_offer(data['data']['sdp'], client_sid) else: print("Expected SDP message type \"answer\", got \"%s\"" % data['data']['type']) @@ -193,7 +209,7 @@ class WebRTCCamera(Thread): if 'candidate' in data['data']: print("Received ICE candidate with mline index %u" % data['data']['sdpMLineIndex']) print(data['data']['candidate']) - webrtccamera.handle_ice(data['data']) + webrtccamera.handle_ice(data['data'], client_sid) else: print("Received ICE message without ICE candidate string") return diff --git a/raspberry/roberto/views/websocket/routes.py b/raspberry/roberto/views/websocket/routes.py index 796faf7..d2e0bce 100644 --- a/raspberry/roberto/views/websocket/routes.py +++ b/raspberry/roberto/views/websocket/routes.py @@ -2,6 +2,7 @@ from . import websocket_blueprint from flask import current_app, render_template, request from flask_socketio import SocketIO, emit, join_room, leave_room +import json from roberto import socketio @@ -67,6 +68,17 @@ def camera(): def webrtc_message(data): sid = request.sid print('Message from {}: {}'.format(sid, data)) + if isinstance(data, str): + try: + jsonData = json.loads(data) + jsonData['from_sid'] = sid + data = json.dumps(jsonData) + except json.JSONDecodeError: + print("could not decode json") + elif isinstance(data, dict): + data['from_sid'] = sid + else: + print("unknown message type") socketio.emit('message', data=data, room=ROOM, namespace='/webrtc', skip_sid=sid)