Browse Source

bring client_sid into methods

split-pipe
Hendrik Langer 4 years ago
parent
commit
304347a43b
  1. 2
      raspberry/requirements.py
  2. 94
      raspberry/roberto/camera/camera_gstreamer_webrtc.py
  3. 12
      raspberry/roberto/views/websocket/routes.py

2
raspberry/requirements.py

@ -2,7 +2,7 @@ python3-flask
python3-flask-sqlalchemy python3-flask-sqlalchemy
python3-flask-login python3-flask-login
python3-flask-socketio python3-flask-socketio
python3-socketio-client python3-socketio
python3-serial python3-serial
python3-opencv python3-opencv
python3-picamera python3-picamera

94
raspberry/roberto/camera/camera_gstreamer_webrtc.py

@ -47,16 +47,16 @@ webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.googl
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv. queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
''' '''
# rpicamsrc name=src preview=0 fullscreen=0 ! h264parse ! omxh264dec ! glimagesink sync=0
class WebRTCCamera(Thread): class WebRTCCamera(Thread):
def __init__(self): def __init__(self):
self.pipe = None
self.webrtc = None
self.sid = None self.sid = None
self.current_client_sid = None self._num_clients = 0
self.num_clients = 0
self.connected = False self.connected = False
self.server = 'ws://localhost:5000' self.server = 'ws://localhost:5000'
self._queue = Queue(maxsize=10) self._queue = Queue(maxsize=10)
self._peers = {}
Thread.__init__(self) Thread.__init__(self)
@ -71,13 +71,11 @@ class WebRTCCamera(Thread):
while True: while True:
item = self._queue.get() item = self._queue.get()
if item['job'] == "connect_client": if item['job'] == "connect_client":
self.current_client_sid = item['sid']
self.start_pipeline(item['sid']) self.start_pipeline(item['sid'])
self.num_clients += 1 self._num_clients += 1
elif item['job'] == "disconnect_client": elif item['job'] == "disconnect_client":
self.current_client_sid = item['sid']
self.close_pipeline(item['sid']) self.close_pipeline(item['sid'])
self.num_clients -= 1 self._num_clients -= 1
else: else:
print("unknown job: %s" % item['job']) print("unknown job: %s" % item['job'])
self._queue.task_done() self._queue.task_done()
@ -105,63 +103,70 @@ class WebRTCCamera(Thread):
def disconnect_client(self, sid, room): def disconnect_client(self, sid, room):
self._queue.put({'job':'disconnect_client', 'sid':sid}) self._queue.put({'job':'disconnect_client', 'sid':sid})
def start_pipeline(self, client_sid): # ok def start_pipeline(self, client_sid):
print("STARTING PIPELINE") print("STARTING PIPELINE")
self.pipe = Gst.parse_launch(PIPELINE_DESC) self._peers[client_sid] = {}
self.webrtc = self.pipe.get_by_name('sendrecv') pipe = Gst.parse_launch(PIPELINE_DESC)
self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) self._peers[client_sid]['pipe'] = pipe
self.webrtc.connect('on-ice-candidate', self.on_ice_candidate) webrtc = pipe.get_by_name('sendrecv')
self.pipe.set_state(Gst.State.PLAYING) self._peers[client_sid]['webrtc'] = webrtc
webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, client_sid)
webrtc.connect('on-ice-candidate', self.on_ice_candidate, client_sid)
pipe.set_state(Gst.State.PLAYING)
def close_pipeline(self, client_sid): def close_pipeline(self, client_sid):
self.pipe.set_state(Gst.State.NULL) self._peers[client_sid]['pipe'].set_state(Gst.State.NULL)
self.pipe = None self._peers[client_sid]['pipe'] = None
self.webrtc = None self._peers[client_sid]['webrtc'] = None
def handle_sdp_answer(self, sdp): # ok def handle_sdp_answer(self, sdp, client_sid):
print("handle_sdp_answer") print("handle_sdp_answer")
#print ('Received answer:\n%s' % sdp) #print ('Received answer:\n%s' % sdp)
res, sdpmsg = GstSdp.SDPMessage.new() res, sdpmsg = GstSdp.SDPMessage.new()
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
promise = Gst.Promise.new() promise = Gst.Promise.new()
self.webrtc.emit('set-remote-description', answer, promise) self._peers[client_sid]['webrtc'].emit('set-remote-description', answer, promise)
promise.interrupt() promise.interrupt()
def handle_ice(self, ice): # ok def handle_sdp_offer(self, sdp, client_sid):
pass
def handle_ice(self, ice, client_sid):
print("handle_ice") print("handle_ice")
candidate = ice['candidate'] candidate = ice['candidate']
sdpmlineindex = ice['sdpMLineIndex'] sdpmlineindex = ice['sdpMLineIndex']
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) self._peers[client_sid]['webrtc'].emit('add-ice-candidate', sdpmlineindex, candidate)
def on_negotiation_needed(self, receiver_entry): # ok def on_negotiation_needed(self, receiver_entry, client_sid):
print("on_negotiation_needed") print("on_negotiation_needed [%s]" % client_sid)
promise = Gst.Promise.new_with_change_func(self.on_offer_created, receiver_entry, None) promise = Gst.Promise.new_with_change_func(self.on_offer_created, receiver_entry, client_sid, None)
receiver_entry.emit('create-offer', None, promise) receiver_entry.emit('create-offer', None, promise)
def on_ice_candidate(self, _, mlineindex, candidate): # ok def on_ice_candidate(self, _, mlineindex, candidate, client_sid):
print("on_ice_candidate") print("on_ice_candidate [%s]" % client_sid)
icemsg = json.dumps({'type': 'ice', 'data': {'candidate': candidate, 'sdpMLineIndex': mlineindex}}) icemsg = json.dumps({'type': 'ice', 'data': {'candidate': candidate, 'sdpMLineIndex': mlineindex}, 'sid': client_sid})
sio.emit('message', data=icemsg, namespace='/webrtc') sio.emit('message', data=icemsg, namespace='/webrtc')
def on_offer_created(self, promise, _, __): # ok # Offer created by our pipeline, to be sent to the peer
print("on_offer_created") def on_offer_created(self, promise, _, client_sid, __):
print("on_offer_created [%s]" % client_sid)
promise.wait() promise.wait()
reply = promise.get_reply() reply = promise.get_reply()
offer = reply['offer'] offer = reply['offer']
promise = Gst.Promise.new() promise = Gst.Promise.new()
self.webrtc.emit('set-local-description', offer, promise) self._peers[client_sid]['webrtc'].emit('set-local-description', offer, promise)
promise.interrupt() promise.interrupt()
text = offer.sdp.as_text() text = offer.sdp.as_text()
print ('Sending offer:\n%s' % text) print ('Sending offer:\n%s' % text)
msg = json.dumps({'type': 'sdp', 'data': {'type': 'offer', 'sdp': text}}) msg = json.dumps({'type': 'sdp', 'data': {'type': 'offer', 'sdp': text}, 'sid': client_sid})
sio.emit('message', data=msg, namespace='/webrtc') sio.emit('message', data=msg, namespace='/webrtc')
print("test") print("test")
@sio.on('message', namespace='/webrtc') # ok @sio.on('message', namespace='/webrtc')
def webrtc_message(data): def webrtc_message(data):
from roberto import webrtccamera from roberto import webrtccamera
# print('Message from {}: {}'.format(sid, data)) # print('WebRTCCamera got Message: {}'.format(data))
if not data: if not data:
print("data malformed") print("data malformed")
return return
@ -171,18 +176,29 @@ class WebRTCCamera(Thread):
if not 'data' in data: if not 'data' in data:
print("Received message without data field") print("Received message without data field")
return return
if not 'from_sid' in data:
print("Received message without from_sid field")
return
if data['from_sid'] == webrtccamera.sid:
print("Received message from self")
return
client_sid = data['from_sid']
if data['type'] == 'sdp': if data['type'] == 'sdp':
if not 'type' in data['data']: if not 'type' in data['data']:
print("Received SDP message without type field") print("Received SDP message without type field")
return return
if not 'sdp' in data['data']:
print("Received SDP message without SDP string")
return
if data['data']['type'] == 'answer': if data['data']['type'] == 'answer':
if 'sdp' in data['data']: print("Received SDP answer:")
print("Received SDP:")
print(data['data']['sdp']) print(data['data']['sdp'])
webrtccamera.handle_sdp_answer(data['data']['sdp']) webrtccamera.handle_sdp_answer(data['data']['sdp'], client_sid)
else: elif data['data']['type'] == 'offer':
print("Received SDP message without SDP string") print("Received SDP offer:")
print(data['data']['sdp'])
webrtccamera.handle_sdp_offer(data['data']['sdp'], client_sid)
else: else:
print("Expected SDP message type \"answer\", got \"%s\"" % data['data']['type']) print("Expected SDP message type \"answer\", got \"%s\"" % data['data']['type'])
@ -193,7 +209,7 @@ class WebRTCCamera(Thread):
if 'candidate' in data['data']: if 'candidate' in data['data']:
print("Received ICE candidate with mline index %u" % data['data']['sdpMLineIndex']) print("Received ICE candidate with mline index %u" % data['data']['sdpMLineIndex'])
print(data['data']['candidate']) print(data['data']['candidate'])
webrtccamera.handle_ice(data['data']) webrtccamera.handle_ice(data['data'], client_sid)
else: else:
print("Received ICE message without ICE candidate string") print("Received ICE message without ICE candidate string")
return return

12
raspberry/roberto/views/websocket/routes.py

@ -2,6 +2,7 @@ from . import websocket_blueprint
from flask import current_app, render_template, request from flask import current_app, render_template, request
from flask_socketio import SocketIO, emit, join_room, leave_room from flask_socketio import SocketIO, emit, join_room, leave_room
import json
from roberto import socketio from roberto import socketio
@ -67,6 +68,17 @@ def camera():
def webrtc_message(data): def webrtc_message(data):
sid = request.sid sid = request.sid
print('Message from {}: {}'.format(sid, data)) print('Message from {}: {}'.format(sid, data))
if isinstance(data, str):
try:
jsonData = json.loads(data)
jsonData['from_sid'] = sid
data = json.dumps(jsonData)
except json.JSONDecodeError:
print("could not decode json")
elif isinstance(data, dict):
data['from_sid'] = sid
else:
print("unknown message type")
socketio.emit('message', data=data, room=ROOM, namespace='/webrtc', skip_sid=sid) socketio.emit('message', data=data, room=ROOM, namespace='/webrtc', skip_sid=sid)

Loading…
Cancel
Save