diff --git a/raspberry/roberto/camera/camera_gstreamer_webrtc.py b/raspberry/roberto/camera/camera_gstreamer_webrtc.py index ca61ca4..4a7b631 100644 --- a/raspberry/roberto/camera/camera_gstreamer_webrtc.py +++ b/raspberry/roberto/camera/camera_gstreamer_webrtc.py @@ -66,14 +66,30 @@ if platform == 'raspberry': queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv. ''' +PIPELINE_DESC = ''' + videotestsrc is-live=true ! vp8enc deadline=1 ! rtpvp8pay pt=96 ! tee name=video-tee ! + queue ! fakesink sync=true + audiotestsrc wave=ticks is-live=true ! opusenc ! rtpopuspay pt=97 ! tee name=audio-tee ! + queue ! fakesink sync=true + ''' + +PEER_BIN_DESC = ''' + queue name=video-queue ! webrtcbin. + queue name=audio-queue ! webrtcbin. + webrtcbin name=webrtcbin + ''' + class WebRTCCamera(Thread): def __init__(self): + self.pipe = None self.sid = None self._num_clients = 0 self.connected = False self.server = 'ws://localhost:5000' self._queue = Queue(maxsize=10) self._peers = {} + self.video_tee = None + self.audio_tee = None Thread.__init__(self) @@ -88,11 +104,16 @@ class WebRTCCamera(Thread): while True: item = self._queue.get() if item['job'] == "connect_client": - self.start_pipeline(item['sid']) + if not self.pipe: + self.start_global_pipeline() + self.start_client_pipeline(item['sid']) self._num_clients += 1 elif item['job'] == "disconnect_client": - self.close_pipeline(item['sid']) + self.close_client_pipeline(item['sid']) self._num_clients -= 1 + if self._num_clients == 0: + print("last client left. stopping global pipeline") + self.close_global_pipeline() else: print("unknown job: %s" % item['job']) self._queue.task_done() @@ -120,30 +141,84 @@ class WebRTCCamera(Thread): def disconnect_client(self, sid, room): self._queue.put({'job':'disconnect_client', 'sid':sid}) - def start_pipeline(self, client_sid): + def start_global_pipeline(self): print("STARTING PIPELINE") - self._peers[client_sid] = {} - pipe = Gst.parse_launch(PIPELINE_DESC) - self._peers[client_sid]['pipe'] = pipe - webrtc = pipe.get_by_name('sendrecv') - self._peers[client_sid]['webrtc'] = webrtc - webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, client_sid) - webrtc.connect('on-ice-candidate', self.on_ice_candidate, client_sid) - - rtpbin = webrtc.get_by_name('rtpbin') - rtpbin.set_property("latency", 40) + self.pipe = Gst.parse_launch(PIPELINE_DESC) if platform == 'raspberry': - rpicamsrc = pipe.get_by_name ("rpicamsrc") + rpicamsrc = self.pipe.get_by_name ("rpicamsrc") #rpicamsrc.set_property("annotation-mode", 1) #rpicamsrc.set_property("annotation-text", "Saturation %d" % (saturation)) - pipe.set_state(Gst.State.PLAYING) + self.video_tee = self.pipe.get_by_name('video-tee') + self.audio_tee = self.pipe.get_by_name('audio-tee') + #video_mixer = self.pipe.get_by_name('video-mixer') + #audio_mixer = self.pipe.get_by_name('audio-mixer') + + bus = self.pipe.get_bus() + + self.pipe.set_state(Gst.State.PLAYING) + + def close_global_pipeline(self): + self.pipe.set_state(Gst.State.NULL) + self.pipe = None + + def start_client_pipeline(self, client_sid): + print("starting client pipeline") + self._peers[client_sid] = {} + peer_bin = Gst.parse_bin_from_description(PEER_BIN_DESC, False) + webrtcbin = peer_bin.get_by_name("webrtcbin") + webrtcbin.set_property("stun-server", "stun://stun.l.google.com:19302") + webrtcbin.set_property("bundle-policy", "max-bundle") + + #webrtcbin.set_property("latency", 40) + rtpbin = webrtcbin.get_by_name('rtpbin') + rtpbin.set_property("latency", 40) + + audio_queue = peer_bin.get_by_name("audio-queue") + audio_sink_pad = Gst.GhostPad.new("audio_sink", audio_queue.get_static_pad("sink")) + peer_bin.add_pad(audio_sink_pad) + + video_queue = peer_bin.get_by_name("video-queue") + video_sink_pad = Gst.GhostPad.new("video_sink", video_queue.get_static_pad("sink")) + peer_bin.add_pad(video_sink_pad) + + self._peers[client_sid]['bin'] = peer_bin + self._peers[client_sid]['webrtcbin'] = webrtcbin + + self.pipe.add(peer_bin) + + webrtcbin.connect('on-negotiation-needed', self.on_negotiation_needed, client_sid) + webrtcbin.connect('on-ice-candidate', self.on_ice_candidate, client_sid) + + #webrtcbin.connect_pad_added "on_incoming_stream" + # connect_pad_added for audio_src and video_src (app.audio_mixer.get_request_pad('sink_%u') + # connect unlinked + + audio_src_pad = self.audio_tee.get_request_pad('src_%u') + audio_src_pad.link(audio_sink_pad) + + video_src_pad = self.video_tee.get_request_pad('src_%u') + video_src_pad.link(video_sink_pad) + + peer_bin.sync_state_with_parent() + #self.pipe.sync_children_states() + #webrtc.sync_children_states() + + def close_client_pipeline(self, client_sid): + webrtcbin = self._peers[client_sid]['webrtcbin'] + peer_bin = self._peers[client_sid]['bin'] + + audio_sinkpad = peer_bin.get_static_pad("audio_sink") + video_sinkpad = peer_bin.get_static_pad("video_sink") + audio_sinkpad.get_peer().unlink(audio_sinkpad) + video_sinkpad.get_peer().unlink(video_sinkpad) + + self.pipe.remove(peer_bin) + peer_bin.set_state(Gst.State.NULL) - def close_pipeline(self, client_sid): - self._peers[client_sid]['pipe'].set_state(Gst.State.NULL) - self._peers[client_sid]['pipe'] = None - self._peers[client_sid]['webrtc'] = None + self._peers[client_sid]['bin'] = None + self._peers[client_sid]['webrtcbin'] = None def handle_sdp_answer(self, sdp, client_sid): print("handle_sdp_answer") @@ -152,7 +227,7 @@ class WebRTCCamera(Thread): GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) promise = Gst.Promise.new() - self._peers[client_sid]['webrtc'].emit('set-remote-description', answer, promise) + self._peers[client_sid]['webrtcbin'].emit('set-remote-description', answer, promise) promise.interrupt() def handle_sdp_offer(self, sdp, client_sid): @@ -165,7 +240,7 @@ class WebRTCCamera(Thread): print("candidate string is empty") return sdpmlineindex = ice['sdpMLineIndex'] - self._peers[client_sid]['webrtc'].emit('add-ice-candidate', sdpmlineindex, candidate) + self._peers[client_sid]['webrtcbin'].emit('add-ice-candidate', sdpmlineindex, candidate) def on_negotiation_needed(self, receiver_entry, client_sid): print("on_negotiation_needed [%s]" % client_sid) @@ -184,7 +259,7 @@ class WebRTCCamera(Thread): reply = promise.get_reply() offer = reply['offer'] promise = Gst.Promise.new() - self._peers[client_sid]['webrtc'].emit('set-local-description', offer, promise) + self._peers[client_sid]['webrtcbin'].emit('set-local-description', offer, promise) promise.interrupt() text = offer.sdp.as_text() print ('Sending offer:\n%s' % text)