Robot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

261 lines
9.9 KiB

import os
import sys
from threading import Thread
from queue import Queue
import socketio
sio = socketio.Client()
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp
import json
platform = None
try:
import RPi.GPIO as gpio
platform = 'raspberry'
except (ImportError, RuntimeError):
platform = 'generic'
#PIPELINE_DESC = '''
#webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
# videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
# queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
# audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc !rtpopuspay !
# queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
#'''
#PIPELINE_DESC = '''
#webrtcbin name=webrtcbin stun-server=stun://stun://stun.l.google.com:19302
#rpicamsrc bitrate=600000 annotation-mode=12 preview=false ! video/x-h264,profile=constrained-baseline,width=640,height=360,level=3.0 ! queue max-size-time=100000000 ! h264parse !
#rtph264pay config-interval=-1 name=payloader !
#application/x-rtp,media=video,encoding-name=H264,payload=96 ! sendrecv.
#'''
#PIPELINE_DESC = '''
#v4l2src ! queue ! vp8enc ! rtpvp8pay !
# application/x-rtp,media=video,encoding-name=VP8,payload=96 !
# webrtcbin name=sendrecv
#'''
PIPELINE_DESC = '''
webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
v4l2src device=/dev/video0 ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc !rtpopuspay !
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
'''
# rpicamsrc name=src preview=0 fullscreen=0 ! h264parse ! omxh264dec ! glimagesink sync=0
if platform == 'raspberry':
PIPELINE_DESC = '''
webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
rpicamsrc bitrate=2000000 keyframe-interval=25 exposure-mode=sports annotation-mode=12 preview=false name=rpicamsrc ! video/x-h264,profile=baseline,width=1280,height=720,framerate=49/1,level=3.0 ! queue max-size-time=100000000 ! h264parse !
rtph264pay config-interval=-1 name=payloader !
application/x-rtp,media=video,encoding-name=H264,payload=96 ! sendrecv.
audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
'''
class WebRTCCamera(Thread):
def __init__(self):
self.sid = None
self._num_clients = 0
self.connected = False
self.server = 'ws://localhost:5000'
self._queue = Queue(maxsize=10)
self._peers = {}
Thread.__init__(self)
Gst.init(None)
if not check_plugins():
sys.exit(1)
def run(self):
print("Starting WebRTCCamera background thread")
self.connect()
while True:
item = self._queue.get()
if item['job'] == "connect_client":
self.start_pipeline(item['sid'])
self._num_clients += 1
elif item['job'] == "disconnect_client":
self.close_pipeline(item['sid'])
self._num_clients -= 1
else:
print("unknown job: %s" % item['job'])
self._queue.task_done()
self.disconnect()
def connect(self):
sio.connect(self.server, namespaces=['/webrtc'], transports=['websocket'])
print('my sid is', sio.sid)
self.sid = sio.sid
self.connected = True
def disconnect(self):
self.connected = False
sio.disconnect()
def connect_client(self, sid, room):
if not self.is_alive():
self.start()
if sid == self.sid:
return
self._queue.put({'job':'connect_client', 'sid':sid})
def disconnect_client(self, sid, room):
self._queue.put({'job':'disconnect_client', 'sid':sid})
def start_pipeline(self, client_sid):
print("STARTING PIPELINE")
self._peers[client_sid] = {}
pipe = Gst.parse_launch(PIPELINE_DESC)
self._peers[client_sid]['pipe'] = pipe
webrtc = pipe.get_by_name('sendrecv')
self._peers[client_sid]['webrtc'] = webrtc
webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, client_sid)
webrtc.connect('on-ice-candidate', self.on_ice_candidate, client_sid)
if platform == 'raspberry':
rpicamsrc = pipe.get_by_name ("rpicamsrc")
#rpicamsrc.set_property("annotation-mode", 1)
#rpicamsrc.set_property("annotation-text", "Saturation %d" % (saturation))
pipe.set_state(Gst.State.PLAYING)
def close_pipeline(self, client_sid):
self._peers[client_sid]['pipe'].set_state(Gst.State.NULL)
self._peers[client_sid]['pipe'] = None
self._peers[client_sid]['webrtc'] = None
def handle_sdp_answer(self, sdp, client_sid):
print("handle_sdp_answer")
#print ('Received answer:\n%s' % sdp)
res, sdpmsg = GstSdp.SDPMessage.new()
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
promise = Gst.Promise.new()
self._peers[client_sid]['webrtc'].emit('set-remote-description', answer, promise)
promise.interrupt()
def handle_sdp_offer(self, sdp, client_sid):
pass
def handle_ice(self, ice, client_sid):
print("handle_ice")
candidate = ice['candidate']
sdpmlineindex = ice['sdpMLineIndex']
self._peers[client_sid]['webrtc'].emit('add-ice-candidate', sdpmlineindex, candidate)
def on_negotiation_needed(self, receiver_entry, client_sid):
print("on_negotiation_needed [%s]" % client_sid)
promise = Gst.Promise.new_with_change_func(self.on_offer_created, receiver_entry, client_sid, None)
receiver_entry.emit('create-offer', None, promise)
def on_ice_candidate(self, _, mlineindex, candidate, client_sid):
print("on_ice_candidate [%s]" % client_sid)
icemsg = json.dumps({'type': 'ice', 'data': {'candidate': candidate, 'sdpMLineIndex': mlineindex}, 'sid': client_sid})
sio.emit('message', data=icemsg, namespace='/webrtc')
# Offer created by our pipeline, to be sent to the peer
def on_offer_created(self, promise, _, client_sid, __):
print("on_offer_created [%s]" % client_sid)
promise.wait()
reply = promise.get_reply()
offer = reply['offer']
promise = Gst.Promise.new()
self._peers[client_sid]['webrtc'].emit('set-local-description', offer, promise)
promise.interrupt()
text = offer.sdp.as_text()
print ('Sending offer:\n%s' % text)
msg = json.dumps({'type': 'sdp', 'data': {'type': 'offer', 'sdp': text}, 'sid': client_sid})
sio.emit('message', data=msg, namespace='/webrtc')
print("test")
@sio.on('message', namespace='/webrtc')
def webrtc_message(data):
from roberto import webrtccamera
# print('WebRTCCamera got Message: {}'.format(data))
if not data:
print("data malformed")
return
if not 'type' in data:
print("Received message without type field")
return
if not 'data' in data:
print("Received message without data field")
return
if not 'from_sid' in data:
print("Received message without from_sid field")
return
if data['from_sid'] == webrtccamera.sid:
print("Received message from self")
return
client_sid = data['from_sid']
if data['type'] == 'sdp':
if not 'type' in data['data']:
print("Received SDP message without type field")
return
if not 'sdp' in data['data']:
print("Received SDP message without SDP string")
return
if data['data']['type'] == 'answer':
print("Received SDP answer:")
print(data['data']['sdp'])
webrtccamera.handle_sdp_answer(data['data']['sdp'], client_sid)
elif data['data']['type'] == 'offer':
print("Received SDP offer:")
print(data['data']['sdp'])
webrtccamera.handle_sdp_offer(data['data']['sdp'], client_sid)
else:
print("Expected SDP message type \"answer\", got \"%s\"" % data['data']['type'])
elif data['type'] == 'ice':
if not 'sdpMLineIndex' in data['data']:
print("Received ICE message without mline index")
return
if 'candidate' in data['data']:
print("Received ICE candidate with mline index %u" % data['data']['sdpMLineIndex'])
print(data['data']['candidate'])
webrtccamera.handle_ice(data['data'], client_sid)
else:
print("Received ICE message without ICE candidate string")
return
else:
print("Unknown message \"%s\", ignoring" % data['data'])
@sio.event
def connected():
print("I'm connected!")
@sio.event
def connect_error():
print("The connection failed!")
@sio.event
def disconnected():
print("I'm disconnected!")
def check_plugins():
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
"rtpmanager", "videotestsrc", "audiotestsrc"]
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
if len(missing):
print('Missing gstreamer plugins:', missing)
return False
return True