Browse Source

split pipe

split-pipe
Hendrik Langer 4 years ago
parent
commit
96d81d8c43
  1. 68
      raspberry/roberto/camera/camera_gstreamer_webrtc.py

68
raspberry/roberto/camera/camera_gstreamer_webrtc.py

@ -39,18 +39,31 @@ import json
# webrtcbin name=sendrecv # webrtcbin name=sendrecv
#''' #'''
#PIPELINE_DESC = '''
#webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
# v4l2src device=/dev/video0 ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
# queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
# audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc !rtpopuspay !
# queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
#'''
PIPELINE_DESC = ''' PIPELINE_DESC = '''
webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302 tee name=audiotee ! queue ! fakesink
v4l2src device=/dev/video0 ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay ! audiotestsrc is-live=true wave=red-noise ! queue ! opusenc ! rtpopuspay !
queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv. queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! audiotee.
audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc !rtpopuspay !
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
''' '''
#PIPELINE_DESC = '''
#tee name=audiotee ! queue ! fakesink
# v4l2src device=/dev/video0 ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
# queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! audiotee.
#'''
# rpicamsrc name=src preview=0 fullscreen=0 ! h264parse ! omxh264dec ! glimagesink sync=0 # rpicamsrc name=src preview=0 fullscreen=0 ! h264parse ! omxh264dec ! glimagesink sync=0
class WebRTCCamera(Thread): class WebRTCCamera(Thread):
def __init__(self): def __init__(self):
self.pipe = None
self.sid = None self.sid = None
self._num_clients = 0 self._num_clients = 0
self.connected = False self.connected = False
@ -71,11 +84,16 @@ class WebRTCCamera(Thread):
while True: while True:
item = self._queue.get() item = self._queue.get()
if item['job'] == "connect_client": if item['job'] == "connect_client":
self.start_pipeline(item['sid']) if not self.pipe:
self.start_global_pipeline()
self.start_client_pipeline(item['sid'])
self._num_clients += 1 self._num_clients += 1
elif item['job'] == "disconnect_client": elif item['job'] == "disconnect_client":
self.close_pipeline(item['sid']) self.close_client_pipeline(item['sid'])
self._num_clients -= 1 self._num_clients -= 1
if self._num_clients == 0:
print("last client left. stopping global pipeline")
self.stop_global_pipeline()
else: else:
print("unknown job: %s" % item['job']) print("unknown job: %s" % item['job'])
self._queue.task_done() self._queue.task_done()
@ -103,18 +121,42 @@ class WebRTCCamera(Thread):
def disconnect_client(self, sid, room): def disconnect_client(self, sid, room):
self._queue.put({'job':'disconnect_client', 'sid':sid}) self._queue.put({'job':'disconnect_client', 'sid':sid})
def start_pipeline(self, client_sid): def start_global_pipeline(self):
print("STARTING PIPELINE") print("STARTING PIPELINE")
self.pipe = Gst.parse_launch(PIPELINE_DESC)
self.pipe.set_state(Gst.State.PLAYING)
def stop_global_pipeline(self):
self.pipe.set_state(Gst.State.NULL)
self.pipe = None
def start_client_pipeline(self, client_sid):
print("starting client pipeline")
self._peers[client_sid] = {} self._peers[client_sid] = {}
pipe = Gst.parse_launch(PIPELINE_DESC) q = Gst.ElementFactory.make('queue')
self._peers[client_sid]['pipe'] = pipe webrtc = Gst.ElementFactory.make('webrtcbin', client_sid)
webrtc = pipe.get_by_name('sendrecv')
self._peers[client_sid]['webrtc'] = webrtc self._peers[client_sid]['webrtc'] = webrtc
self.pipe.add(q, webrtc)
# q.link(webrtc.get_request_pad('sink_%u'))
q.link(webrtc)
tee = self.pipe.get_by_name('audiotee')
# tee.link(q.get_static_pad('sink'))
tee.link(q)
self._peers[client_sid]['pipe'] = q
webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, client_sid) webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, client_sid)
webrtc.connect('on-ice-candidate', self.on_ice_candidate, client_sid) webrtc.connect('on-ice-candidate', self.on_ice_candidate, client_sid)
pipe.set_state(Gst.State.PLAYING) self.pipe.sync_children_states()
webrtc.sync_children_states()
def close_client_pipeline(self, client_sid):
webrtc = self._peers[client_sid]['webrtc']
self.pipe.remove(webrtc)
self.pipe.remove(self._peers[client_sid]['pipe'])
tee = self.pipe.get_by_name('audiotee')
#tee.release_request_pad(webrtc.get_request_pad('sink_%u'))
def close_pipeline(self, client_sid):
self._peers[client_sid]['pipe'].set_state(Gst.State.NULL) self._peers[client_sid]['pipe'].set_state(Gst.State.NULL)
self._peers[client_sid]['pipe'] = None self._peers[client_sid]['pipe'] = None
self._peers[client_sid]['webrtc'] = None self._peers[client_sid]['webrtc'] = None

Loading…
Cancel
Save