From b8be306c312339badfcc4548f1add77f29471f1b Mon Sep 17 00:00:00 2001 From: Hendrik Langer Date: Sat, 1 Aug 2020 03:39:20 +0200 Subject: [PATCH] rework everything so the camera is a websocket client (working) --- raspberry/requirements.py | 1 + raspberry/roberto/__init__.py | 2 +- .../roberto/camera/camera_gstreamer_webrtc.py | 163 ++++++++------ raspberry/roberto/views/websocket/routes.py | 30 +-- .../views/websocket/templates/camera.html | 211 ++++++++---------- 5 files changed, 188 insertions(+), 219 deletions(-) diff --git a/raspberry/requirements.py b/raspberry/requirements.py index 2c0cae7..894461e 100755 --- a/raspberry/requirements.py +++ b/raspberry/requirements.py @@ -2,6 +2,7 @@ python3-flask python3-flask-sqlalchemy python3-flask-login python3-flask-socketio +python3-socketio-client python3-serial python3-opencv python3-picamera diff --git a/raspberry/roberto/__init__.py b/raspberry/roberto/__init__.py index 73bcd6d..e9a4ca2 100644 --- a/raspberry/roberto/__init__.py +++ b/raspberry/roberto/__init__.py @@ -19,7 +19,7 @@ socketio = SocketIO() from roberto.camera.camera_opencv import Camera camera = Camera() from roberto.camera.camera_gstreamer_webrtc import WebRTCCamera -webrtccamera = WebRTCCamera(1000, 1001) +webrtccamera = WebRTCCamera() from roberto.Serial import Serial serial = Serial() diff --git a/raspberry/roberto/camera/camera_gstreamer_webrtc.py b/raspberry/roberto/camera/camera_gstreamer_webrtc.py index decc972..7776548 100644 --- a/raspberry/roberto/camera/camera_gstreamer_webrtc.py +++ b/raspberry/roberto/camera/camera_gstreamer_webrtc.py @@ -1,6 +1,12 @@ import os import sys +from threading import Thread + +import socketio + +sio = socketio.Client() + import gi gi.require_version('Gst', '1.0') from gi.repository import Gst @@ -19,41 +25,42 @@ webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.googl queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv. ''' -class WebRTCCamera: - def __init__(self, id_, peer_id, server=None): - self.id_ = id_ - self.conn = None +class WebRTCCamera(Thread): + def __init__(self): self.pipe = None self.webrtc = None - self.socketio = None - self.peer_id = peer_id - self.room = 'default' - self.sid = 'gstwebrtc1000' - self.server = server or 'wss://localhost:5000/webrtc' + self.server = 'ws://localhost:5000' self.connected = False + self.exitFlag = False + + Thread.__init__(self) Gst.init(None) if not check_plugins(): sys.exit(1) - def connect(self, socketio, room, sid): - self.socketio = socketio - self.room = room - self.sid = sid + def run(self): + self.connect() + while not self.exitFlag: + sio.sleep(1) + + def connect(self): + sio.connect(self.server, namespaces=['/webrtc'], transports=['websocket']) + print('my sid is', sio.sid) + sio.emit('message', 'starting camera', namespace='/webrtc') self.start_pipeline() self.connected = True def disconnect(self): self.connected = False self.close_pipeline() - self.socketio = None + sio.disconnect() - def start_pipeline(self): + def start_pipeline(self): # ok self.pipe = Gst.parse_launch(PIPELINE_DESC) self.webrtc = self.pipe.get_by_name('sendrecv') self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) - self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) - self.webrtc.connect('pad-added', self.on_incoming_stream) + self.webrtc.connect('on-ice-candidate', self.on_ice_candidate) self.pipe.set_state(Gst.State.PLAYING) def close_pipeline(self): @@ -61,9 +68,9 @@ class WebRTCCamera: self.pipe = None self.webrtc = None - def handle_sdp_answer(self, sdp): + def handle_sdp_answer(self, sdp): # ok print("handle_sdp_answer") - print ('Received answer:\n%s' % sdp) + #print ('Received answer:\n%s' % sdp) res, sdpmsg = GstSdp.SDPMessage.new() GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) @@ -71,65 +78,23 @@ class WebRTCCamera: self.webrtc.emit('set-remote-description', answer, promise) promise.interrupt() - def handle_ice(self, ice): + def handle_ice(self, ice): # ok print("handle_ice") candidate = ice['candidate'] sdpmlineindex = ice['sdpMLineIndex'] self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) - def on_negotiation_needed(self, element): + def on_negotiation_needed(self, receiver_entry): # ok print("on_negotiation_needed") - promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None) - element.emit('create-offer', None, promise) - - def send_ice_candidate_message(self, _, mlineindex, candidate): - print("send_ice_candidate_message") - icemsg = json.dumps({'type': 'candidate', 'candidate': {'candidate': candidate, 'sdpMLineIndex': mlineindex}}) - self.socketio.emit('data', type='candidate', data=icemsg, room=self.room, namespace='/webrtc', skip_sid=self.sid) + promise = Gst.Promise.new_with_change_func(self.on_offer_created, receiver_entry, None) + receiver_entry.emit('create-offer', None, promise) - def on_incoming_stream(self, _, pad): - print("on_incoming_stream") - if pad.direction != Gst.PadDirection.SRC: - return + def on_ice_candidate(self, _, mlineindex, candidate): # ok + print("on_ice_candidate") + icemsg = json.dumps({'type': 'ice', 'data': {'candidate': candidate, 'sdpMLineIndex': mlineindex}}) + sio.emit('message', data=icemsg, namespace='/webrtc') - decodebin = Gst.ElementFactory.make('decodebin') - decodebin.connect('pad-added', self.on_incoming_decodebin_stream) - self.pipe.add(decodebin) - decodebin.sync_state_with_parent() - self.webrtc.link(decodebin) - - def on_incoming_decodebin_stream(self, _, pad): - print("on_incoming_decodebin_stream") - if not pad.has_current_caps(): - print (pad, 'has no caps, ignoring') - return - - caps = pad.get_current_caps() - assert (len(caps)) - s = caps[0] - name = s.get_name() - if name.startswith('video'): - q = Gst.ElementFactory.make('queue') - conv = Gst.ElementFactory.make('videoconvert') - sink = Gst.ElementFactory.make('autovideosink') - self.pipe.add(q, conv, sink) - self.pipe.sync_children_states() - pad.link(q.get_static_pad('sink')) - q.link(conv) - conv.link(sink) - elif name.startswith('audio'): - q = Gst.ElementFactory.make('queue') - conv = Gst.ElementFactory.make('audioconvert') - resample = Gst.ElementFactory.make('audioresample') - sink = Gst.ElementFactory.make('autoaudiosink') - self.pipe.add(q, conv, resample, sink) - self.pipe.sync_children_states() - pad.link(q.get_static_pad('sink')) - q.link(conv) - conv.link(resample) - resample.link(sink) - - def on_offer_created(self, promise, _, __): + def on_offer_created(self, promise, _, __): # ok print("on_offer_created") promise.wait() reply = promise.get_reply() @@ -139,9 +104,63 @@ class WebRTCCamera: promise.interrupt() text = offer.sdp.as_text() print ('Sending offer:\n%s' % text) - msg = json.dumps({'type': 'offer', 'sdp': text}) - self.socketio.emit('data', type='offer', data=msg, room=self.room, namespace='/webrtc', skip_sid=self.sid) + msg = json.dumps({'type': 'sdp', 'data': {'type': 'offer', 'sdp': text}}) + sio.emit('message', data=msg, namespace='/webrtc') + print("test") + + @sio.on('message', namespace='/webrtc') # ok + def webrtc_message(data): + from roberto import webrtccamera +# print('Message from {}: {}'.format(sid, data)) + if not data: + print("data malformed") + return + if not 'type' in data: + print("Received message without type field") + return + if not 'data' in data: + print("Received message without data field") + return + if data['type'] == 'sdp': + if not 'type' in data['data']: + print("Received SDP message without type field") + return + if data['data']['type'] == 'answer': + if 'sdp' in data['data']: + print("Received SDP:") + print(data['data']['sdp']) + webrtccamera.handle_sdp_answer(data['data']['sdp']) + else: + print("Received SDP message without SDP string") + else: + print("Expected SDP message type \"answer\", got \"%s\"" % data['data']['type']) + + elif data['type'] == 'ice': + if not 'sdpMLineIndex' in data['data']: + print("Received ICE message without mline index") + return + if 'candidate' in data['data']: + print("Received ICE candidate with mline index %u" % data['data']['sdpMLineIndex']) + print(data['data']['candidate']) + webrtccamera.handle_ice(data['data']) + else: + print("Received ICE message without ICE candidate string") + return + else: + print("Unknown message \"%s\", ignoring" % data['data']) + + @sio.event + def connected(): + print("I'm connected!") + + @sio.event + def connect_error(): + print("The connection failed!") + + @sio.event + def disconnected(): + print("I'm disconnected!") def check_plugins(): needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp", diff --git a/raspberry/roberto/views/websocket/routes.py b/raspberry/roberto/views/websocket/routes.py index 6c91e8d..13016f6 100644 --- a/raspberry/roberto/views/websocket/routes.py +++ b/raspberry/roberto/views/websocket/routes.py @@ -63,43 +63,27 @@ ROOM = 'default' def camera(): return render_template('camera.html', room='default') -@socketio.on('data', namespace='/webrtc') +@socketio.on('message', namespace='/webrtc') # ok def webrtc_message(data): sid = request.sid print('Message from {}: {}'.format(sid, data)) - socketio.emit('data', data=data, room=ROOM, namespace='/webrtc', skip_sid=sid) - if 'type' in data: - if data['type'] == 'offer': - print("OFFER") - - elif data['type'] == 'answer': - print("ANSWER") - if 'sdp' in data: - webrtccamera.handle_sdp_answer(data['sdp']) - elif data['type'] == 'candidate': - print("CANDIDATE") - if 'candidiate' in data: - webrtccamera.handle_ice(data) - else: - print("got unknown data message!") + socketio.emit('message', data=data, namespace='/webrtc', skip_sid=sid) + @socketio.on('disconnect', namespace='/webrtc') def disconnect(): sid = request.sid print("Received Disconnect message from %s" % sid) - leave_room(ROOM) -# if webrtccamera.connected: -# webrtccamera.disconnect() + if webrtccamera.connected: + webrtccamera.disconnect() @socketio.on('connect', namespace='/webrtc') def connect(): sid = request.sid print("Received Connect message from %s" % sid) - join_room(ROOM) if not webrtccamera.connected: - webrtccamera.connect(socketio, ROOM, 'gstwebrtc1000') - #socketio.emit('ready', room=ROOM, namespace='/webrtc', skip_sid=sid) - socketio.emit('ready', room=ROOM, namespace='/webrtc') + webrtccamera.start() + socketio.emit('message', 'test2', namespace='/webrtc') @socketio.on_error_default def default_error_handler(e): diff --git a/raspberry/roberto/views/websocket/templates/camera.html b/raspberry/roberto/views/websocket/templates/camera.html index d01d0a9..6e0a9e3 100644 --- a/raspberry/roberto/views/websocket/templates/camera.html +++ b/raspberry/roberto/views/websocket/templates/camera.html @@ -14,144 +14,109 @@ -
- +
+
-
- -