Hendrik Langer 4 years ago
parent
commit
996d6546c7
  1. 119
      raspberry/roberto/camera/camera_gstreamer_webrtc.py

119
raspberry/roberto/camera/camera_gstreamer_webrtc.py

@ -66,14 +66,30 @@ if platform == 'raspberry':
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv. queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
''' '''
PIPELINE_DESC = '''
videotestsrc is-live=true ! vp8enc deadline=1 ! rtpvp8pay pt=96 ! tee name=video-tee !
queue ! fakesink sync=true
audiotestsrc wave=ticks is-live=true ! opusenc ! rtpopuspay pt=97 ! tee name=audio-tee !
queue ! fakesink sync=true
'''
PEER_BIN_DESC = '''
queue name=video-queue ! webrtcbin.
queue name=audio-queue ! webrtcbin.
webrtcbin name=webrtcbin
'''
class WebRTCCamera(Thread): class WebRTCCamera(Thread):
def __init__(self): def __init__(self):
self.pipe = None
self.sid = None self.sid = None
self._num_clients = 0 self._num_clients = 0
self.connected = False self.connected = False
self.server = 'ws://localhost:5000' self.server = 'ws://localhost:5000'
self._queue = Queue(maxsize=10) self._queue = Queue(maxsize=10)
self._peers = {} self._peers = {}
self.video_tee = None
self.audio_tee = None
Thread.__init__(self) Thread.__init__(self)
@ -88,11 +104,16 @@ class WebRTCCamera(Thread):
while True: while True:
item = self._queue.get() item = self._queue.get()
if item['job'] == "connect_client": if item['job'] == "connect_client":
self.start_pipeline(item['sid']) if not self.pipe:
self.start_global_pipeline()
self.start_client_pipeline(item['sid'])
self._num_clients += 1 self._num_clients += 1
elif item['job'] == "disconnect_client": elif item['job'] == "disconnect_client":
self.close_pipeline(item['sid']) self.close_client_pipeline(item['sid'])
self._num_clients -= 1 self._num_clients -= 1
if self._num_clients == 0:
print("last client left. stopping global pipeline")
self.close_global_pipeline()
else: else:
print("unknown job: %s" % item['job']) print("unknown job: %s" % item['job'])
self._queue.task_done() self._queue.task_done()
@ -120,30 +141,84 @@ class WebRTCCamera(Thread):
def disconnect_client(self, sid, room): def disconnect_client(self, sid, room):
self._queue.put({'job':'disconnect_client', 'sid':sid}) self._queue.put({'job':'disconnect_client', 'sid':sid})
def start_pipeline(self, client_sid): def start_global_pipeline(self):
print("STARTING PIPELINE") print("STARTING PIPELINE")
self._peers[client_sid] = {} self.pipe = Gst.parse_launch(PIPELINE_DESC)
pipe = Gst.parse_launch(PIPELINE_DESC)
self._peers[client_sid]['pipe'] = pipe
webrtc = pipe.get_by_name('sendrecv')
self._peers[client_sid]['webrtc'] = webrtc
webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, client_sid)
webrtc.connect('on-ice-candidate', self.on_ice_candidate, client_sid)
rtpbin = webrtc.get_by_name('rtpbin')
rtpbin.set_property("latency", 40)
if platform == 'raspberry': if platform == 'raspberry':
rpicamsrc = pipe.get_by_name ("rpicamsrc") rpicamsrc = self.pipe.get_by_name ("rpicamsrc")
#rpicamsrc.set_property("annotation-mode", 1) #rpicamsrc.set_property("annotation-mode", 1)
#rpicamsrc.set_property("annotation-text", "Saturation %d" % (saturation)) #rpicamsrc.set_property("annotation-text", "Saturation %d" % (saturation))
pipe.set_state(Gst.State.PLAYING) self.video_tee = self.pipe.get_by_name('video-tee')
self.audio_tee = self.pipe.get_by_name('audio-tee')
#video_mixer = self.pipe.get_by_name('video-mixer')
#audio_mixer = self.pipe.get_by_name('audio-mixer')
bus = self.pipe.get_bus()
self.pipe.set_state(Gst.State.PLAYING)
def close_global_pipeline(self):
self.pipe.set_state(Gst.State.NULL)
self.pipe = None
def start_client_pipeline(self, client_sid):
print("starting client pipeline")
self._peers[client_sid] = {}
peer_bin = Gst.parse_bin_from_description(PEER_BIN_DESC, False)
webrtcbin = peer_bin.get_by_name("webrtcbin")
webrtcbin.set_property("stun-server", "stun://stun.l.google.com:19302")
webrtcbin.set_property("bundle-policy", "max-bundle")
#webrtcbin.set_property("latency", 40)
rtpbin = webrtcbin.get_by_name('rtpbin')
rtpbin.set_property("latency", 40)
audio_queue = peer_bin.get_by_name("audio-queue")
audio_sink_pad = Gst.GhostPad.new("audio_sink", audio_queue.get_static_pad("sink"))
peer_bin.add_pad(audio_sink_pad)
video_queue = peer_bin.get_by_name("video-queue")
video_sink_pad = Gst.GhostPad.new("video_sink", video_queue.get_static_pad("sink"))
peer_bin.add_pad(video_sink_pad)
self._peers[client_sid]['bin'] = peer_bin
self._peers[client_sid]['webrtcbin'] = webrtcbin
self.pipe.add(peer_bin)
webrtcbin.connect('on-negotiation-needed', self.on_negotiation_needed, client_sid)
webrtcbin.connect('on-ice-candidate', self.on_ice_candidate, client_sid)
#webrtcbin.connect_pad_added "on_incoming_stream"
# connect_pad_added for audio_src and video_src (app.audio_mixer.get_request_pad('sink_%u')
# connect unlinked
audio_src_pad = self.audio_tee.get_request_pad('src_%u')
audio_src_pad.link(audio_sink_pad)
video_src_pad = self.video_tee.get_request_pad('src_%u')
video_src_pad.link(video_sink_pad)
peer_bin.sync_state_with_parent()
#self.pipe.sync_children_states()
#webrtc.sync_children_states()
def close_client_pipeline(self, client_sid):
webrtcbin = self._peers[client_sid]['webrtcbin']
peer_bin = self._peers[client_sid]['bin']
audio_sinkpad = peer_bin.get_static_pad("audio_sink")
video_sinkpad = peer_bin.get_static_pad("video_sink")
audio_sinkpad.get_peer().unlink(audio_sinkpad)
video_sinkpad.get_peer().unlink(video_sinkpad)
self.pipe.remove(peer_bin)
peer_bin.set_state(Gst.State.NULL)
def close_pipeline(self, client_sid): self._peers[client_sid]['bin'] = None
self._peers[client_sid]['pipe'].set_state(Gst.State.NULL) self._peers[client_sid]['webrtcbin'] = None
self._peers[client_sid]['pipe'] = None
self._peers[client_sid]['webrtc'] = None
def handle_sdp_answer(self, sdp, client_sid): def handle_sdp_answer(self, sdp, client_sid):
print("handle_sdp_answer") print("handle_sdp_answer")
@ -152,7 +227,7 @@ class WebRTCCamera(Thread):
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
promise = Gst.Promise.new() promise = Gst.Promise.new()
self._peers[client_sid]['webrtc'].emit('set-remote-description', answer, promise) self._peers[client_sid]['webrtcbin'].emit('set-remote-description', answer, promise)
promise.interrupt() promise.interrupt()
def handle_sdp_offer(self, sdp, client_sid): def handle_sdp_offer(self, sdp, client_sid):
@ -165,7 +240,7 @@ class WebRTCCamera(Thread):
print("candidate string is empty") print("candidate string is empty")
return return
sdpmlineindex = ice['sdpMLineIndex'] sdpmlineindex = ice['sdpMLineIndex']
self._peers[client_sid]['webrtc'].emit('add-ice-candidate', sdpmlineindex, candidate) self._peers[client_sid]['webrtcbin'].emit('add-ice-candidate', sdpmlineindex, candidate)
def on_negotiation_needed(self, receiver_entry, client_sid): def on_negotiation_needed(self, receiver_entry, client_sid):
print("on_negotiation_needed [%s]" % client_sid) print("on_negotiation_needed [%s]" % client_sid)
@ -184,7 +259,7 @@ class WebRTCCamera(Thread):
reply = promise.get_reply() reply = promise.get_reply()
offer = reply['offer'] offer = reply['offer']
promise = Gst.Promise.new() promise = Gst.Promise.new()
self._peers[client_sid]['webrtc'].emit('set-local-description', offer, promise) self._peers[client_sid]['webrtcbin'].emit('set-local-description', offer, promise)
promise.interrupt() promise.interrupt()
text = offer.sdp.as_text() text = offer.sdp.as_text()
print ('Sending offer:\n%s' % text) print ('Sending offer:\n%s' % text)

Loading…
Cancel
Save