From b935b9ee131a41730e066ddb74dcbff7caeb0094 Mon Sep 17 00:00:00 2001 From: Hendrik Langer Date: Wed, 5 Aug 2020 13:28:12 +0200 Subject: [PATCH] fix lots of errors --- raspberry/requirements.py | 1 + .../roberto/camera/camera_gstreamer_webrtc.py | 98 +++++++++++++------ 2 files changed, 68 insertions(+), 31 deletions(-) diff --git a/raspberry/requirements.py b/raspberry/requirements.py index 4f316b0..d6beb15 100755 --- a/raspberry/requirements.py +++ b/raspberry/requirements.py @@ -3,6 +3,7 @@ python3-flask-sqlalchemy python3-flask-login python3-flask-socketio python3-socketio +python3-websocket # python3-socketio-client python3-serial python3-opencv python3-picamera diff --git a/raspberry/roberto/camera/camera_gstreamer_webrtc.py b/raspberry/roberto/camera/camera_gstreamer_webrtc.py index 634175d..11608cb 100644 --- a/raspberry/roberto/camera/camera_gstreamer_webrtc.py +++ b/raspberry/roberto/camera/camera_gstreamer_webrtc.py @@ -1,5 +1,7 @@ import os import sys +import time +import traceback from threading import Thread from queue import Queue @@ -82,9 +84,9 @@ except (ImportError, RuntimeError): ############# PIPELINE_DESC = ''' - v4l2src device=/dev/video0 ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay pt=96 ! tee name=video-tee ! + v4l2src device=/dev/video0 ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay ! queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! tee name=video-tee ! queue ! fakesink sync=true - audiotestsrc wave=red-noise is-live=true ! opusenc ! rtpopuspay pt=97 ! tee name=audio-tee ! + audiotestsrc wave=red-noise is-live=true ! opusenc ! rtpopuspay ! queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! tee name=audio-tee ! queue ! fakesink sync=true ''' @@ -101,7 +103,7 @@ PIPELINE_DESC_RASPBERRY = ''' PEER_BIN_DESC = ''' queue name=video-queue ! webrtcbin. queue name=audio-queue ! webrtcbin. - webrtcbin name=webrtcbin + webrtcbin name=webrtcbin bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302 latency=40 ''' @@ -111,7 +113,7 @@ class WebRTCCamera(Thread): self.sid = None self._num_clients = 0 self.connected = False - self.server = 'ws://localhost:5000' + self.server = 'ws://127.0.0.1:5000' self._queue = Queue(maxsize=10) self._peers = {} self.video_tee = None @@ -128,28 +130,41 @@ class WebRTCCamera(Thread): self.connect() while True: - item = self._queue.get() - if item['job'] == "connect_client": - if not self.pipe: - self.start_global_pipeline() - self.start_client_pipeline(item['sid']) - self._num_clients += 1 - elif item['job'] == "disconnect_client": - self.close_client_pipeline(item['sid']) - self._num_clients -= 1 - if self._num_clients == 0: - print("last client left. stopping global pipeline") - self.close_global_pipeline() - else: - print("unknown job: %s" % item['job']) - self._queue.task_done() + try: + item = self._queue.get() + if item['sid'] == self.sid: + continue + if item['job'] == "connect_client": + if not self.pipe: + self.start_global_pipeline() + self.start_client_pipeline(item['sid']) + self._num_clients += 1 + elif item['job'] == "disconnect_client": + self.close_client_pipeline(item['sid']) + self._num_clients -= 1 + if self._num_clients == 0: + print("last client left. stopping global pipeline") + self.close_global_pipeline() + else: + print("%d clients left" % self._num_clients) + else: + print("unknown job: %s" % item['job']) + self._queue.task_done() + except: + e = sys.exc_info()[0] + print("Exception in camera task: %s" % e ) + #print(sys.exc_info()[2]) + print(traceback.format_exc()) self.disconnect() def connect(self): - sio.connect(self.server, namespaces=['/webrtc'], transports=['websocket']) - print('my sid is', sio.sid) + while not sio.sid: + print("connecting camera websocket..") + sio.connect(self.server, namespaces=['/webrtc'], transports=['websocket']) + time.sleep(1) + print('camera sid is', sio.sid) self.sid = sio.sid self.connected = True @@ -160,8 +175,6 @@ class WebRTCCamera(Thread): def connect_client(self, sid, room): if not self.is_alive(): self.start() - if sid == self.sid: - return self._queue.put({'job':'connect_client', 'sid':sid}) def disconnect_client(self, sid, room): @@ -187,26 +200,35 @@ class WebRTCCamera(Thread): #video_mixer = self.pipe.get_by_name('video-mixer') #audio_mixer = self.pipe.get_by_name('audio-mixer') - bus = self.pipe.get_bus() + #bus = self.pipe.get_bus() + #bus.add_signal_watch() + #bus.connect('message', self.bus_message) + ##pollfd = self.bus.get_pollfd() + ##asyncio.get_event_loop().add_reader(pollfd.fd, self.poll_cb) self.pipe.set_state(Gst.State.PLAYING) print("GLOBAL PIPELINE RUNNING") def close_global_pipeline(self): + print("CLOSING PIPELINE") + #bus = self.pipe.get_bus() + ##bus.disconnect('message') + #bus.remove_signal_watch() + self.pipe.set_state(Gst.State.NULL) self.pipe = None def start_client_pipeline(self, client_sid): - print("starting client pipeline") + print("starting client pipeline for client [%s]" % client_sid) self._peers[client_sid] = {} peer_bin = Gst.parse_bin_from_description(PEER_BIN_DESC, False) webrtcbin = peer_bin.get_by_name("webrtcbin") - webrtcbin.set_property("stun-server", "stun://stun.l.google.com:19302") - webrtcbin.set_property("bundle-policy", "max-bundle") +# webrtcbin.set_property("stun-server", "stun://stun.l.google.com:19302") +# webrtcbin.set_property("bundle-policy", "max-bundle") - #webrtcbin.set_property("latency", 40) - rtpbin = webrtcbin.get_by_name('rtpbin') - rtpbin.set_property("latency", 40) + ##webrtcbin.set_property("latency", 40) + #rtpbin = webrtcbin.get_by_name('rtpbin') + #rtpbin.set_property("latency", 40) audio_queue = peer_bin.get_by_name("audio-queue") audio_sink_pad = Gst.GhostPad.new("audio_sink", audio_queue.get_static_pad("sink")) @@ -215,7 +237,7 @@ class WebRTCCamera(Thread): video_queue = peer_bin.get_by_name("video-queue") video_sink_pad = Gst.GhostPad.new("video_sink", video_queue.get_static_pad("sink")) peer_bin.add_pad(video_sink_pad) - + self._peers[client_sid]['bin'] = peer_bin self._peers[client_sid]['webrtcbin'] = webrtcbin @@ -237,9 +259,16 @@ class WebRTCCamera(Thread): video_src_pad.link(video_sink_pad) peer_bin.sync_state_with_parent() + #test #self.pipe.sync_children_states() #webrtc.sync_children_states() + #debug + #video_caps = video_src_pad.get_current_caps() + #if not video_caps: + # video_caps = video_src_pad.query_caps() + #print(video_caps) + audio_src_pad.remove_probe(audio_block) video_src_pad.remove_probe(video_block) @@ -247,6 +276,13 @@ class WebRTCCamera(Thread): print("blocked") return Gst.PadProbeReturn.OK +# def bus_message(self, bus, msg): +# t = msg.type +# print("BUS MESSGE") +# if t == gst.MessageType.ERROR: +# busError, detail = msg.parse_error() +# print("gstreamer bus message error: %s" % busError) + def close_client_pipeline(self, client_sid): print("closing client pipeline") webrtcbin = self._peers[client_sid]['webrtcbin']