Robot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

434 lines
17 KiB

import os
import sys
import time
import traceback
from threading import Thread
from queue import Queue
import socketio
sio = socketio.Client()
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp
import json
platform = None
try:
import RPi.GPIO as gpio
platform = 'raspberry'
except (ImportError, RuntimeError):
platform = 'generic'
#PIPELINE_DESC = '''
#webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
# videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
# queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
# audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc !rtpopuspay !
# queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
#'''
#PIPELINE_DESC = '''
#webrtcbin name=webrtcbin stun-server=stun://stun://stun.l.google.com:19302
#rpicamsrc bitrate=600000 annotation-mode=12 preview=false ! video/x-h264,profile=constrained-baseline,width=640,height=360,level=3.0 ! queue max-size-time=100000000 ! h264parse !
#rtph264pay config-interval=-1 name=payloader !
#application/x-rtp,media=video,encoding-name=H264,payload=96 ! sendrecv.
#'''
#PIPELINE_DESC = '''
#v4l2src ! queue ! vp8enc ! rtpvp8pay !
# application/x-rtp,media=video,encoding-name=VP8,payload=96 !
# webrtcbin name=sendrecv
#'''
#PIPELINE_DESC = '''
#webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302 latency=40
# v4l2src device=/dev/video0 ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
# queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
# audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc !rtpopuspay !
# queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
#'''
# rpicamsrc name=src preview=0 fullscreen=0 ! h264parse ! omxh264dec ! glimagesink sync=0
#if platform == 'raspberry':
# PIPELINE_DESC = '''
# webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302 latency=40
# rpicamsrc bitrate=1000000 keyframe-interval=25 rotation=180 exposure-mode=sports annotation-mode=12 annotation-text-size=10 preview=false name=rpicamsrc ! video/x-h264,profile=constrained-baseline,width=1280,height=720,framerate=49/1,level=3.0 ! queue max-size-time=100000000 ! h264parse !
# rtph264pay config-interval=-1 name=payloader !
# application/x-rtp,media=video,encoding-name=H264,payload=96 ! sendrecv.
# audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
# queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
# '''
#PIPELINE_DESC = '''
# videotestsrc is-live=true ! vp8enc deadline=1 ! rtpvp8pay pt=96 ! tee name=video-tee !
# queue ! fakesink sync=true
# audiotestsrc wave=ticks is-live=true ! opusenc ! rtpopuspay pt=97 ! tee name=audio-tee !
# queue ! fakesink sync=true
# '''
#PEER_BIN_DESC = '''
# queue name=video-queue ! webrtcbin.
# queue name=audio-queue ! webrtcbin.
# webrtcbin name=webrtcbin
# '''
#############
PIPELINE_DESC = '''
v4l2src device=/dev/video0 ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay ! queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! tee name=video-tee !
queue ! fakesink sync=true
audiotestsrc wave=red-noise is-live=true ! opusenc ! rtpopuspay ! queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! tee name=audio-tee !
queue ! fakesink sync=true
'''
PIPELINE_DESC_RASPBERRY = '''
rpicamsrc bitrate=2000000 keyframe-interval=20 rotation=180 exposure-mode=sports annotation-mode=12 annotation-text-size=10 preview=false name=rpicamsrc ! video/x-h264,profile=constrained-baseline,width=1280,height=720,framerate=49/1,level=3.0 ! queue max-size-time=100000000 ! h264parse !
rtph264pay config-interval=-1 name=payloader !
application/x-rtp,media=video,encoding-name=H264,payload=96 ! tee name=video-tee !
queue ! fakesink sync=true
audiotestsrc wave=red-noise is-live=true ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=97 ! tee name=audio-tee !
queue ! fakesink sync=true
'''
PEER_BIN_DESC = '''
queue name=video-queue ! webrtcbin.
queue name=audio-queue ! webrtcbin.
webrtcbin name=webrtcbin bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302 latency=40
'''
class WebRTCCamera(Thread):
def __init__(self):
self.pipe = None
self.sid = None
self._num_clients = 0
self.connected = False
self.server = 'ws://127.0.0.1:5000'
self._queue = Queue(maxsize=10)
self._peers = {}
self.video_tee = None
self.audio_tee = None
Thread.__init__(self)
Gst.init(None)
if not check_plugins():
sys.exit(1)
def run(self):
print("Starting WebRTCCamera background thread")
self.connect()
while True:
try:
item = self._queue.get()
if item['sid'] == self.sid:
continue
if item['job'] == "connect_client":
if not self.pipe:
self.start_global_pipeline()
self.start_client_pipeline(item['sid'])
self._num_clients += 1
elif item['job'] == "disconnect_client":
self.close_client_pipeline(item['sid'])
self._num_clients -= 1
if self._num_clients == 0:
print("last client left. stopping global pipeline")
self.close_global_pipeline()
else:
print("%d clients left" % self._num_clients)
else:
print("unknown job: %s" % item['job'])
self._queue.task_done()
except:
e = sys.exc_info()[0]
print("Exception in camera task: %s" % e )
#print(sys.exc_info()[2])
print(traceback.format_exc())
self.disconnect()
def connect(self):
while not sio.sid:
print("connecting camera websocket..")
sio.connect(self.server, namespaces=['/webrtc'], transports=['websocket'])
time.sleep(1)
print('camera sid is', sio.sid)
self.sid = sio.sid
self.connected = True
def disconnect(self):
self.connected = False
sio.disconnect()
def connect_client(self, sid, room):
if not self.is_alive():
self.start()
self._queue.put({'job':'connect_client', 'sid':sid})
def disconnect_client(self, sid, room):
if sid == self.sid:
print("CAMERA DISCONNECTED")
return
self._queue.put({'job':'disconnect_client', 'sid':sid})
def start_global_pipeline(self):
print("STARTING PIPELINE")
desc = PIPELINE_DESC
if platform == 'raspberry':
desc = PIPELINE_DESC_RASPBERRY
self.pipe = Gst.parse_launch(desc)
if platform == 'raspberry':
rpicamsrc = self.pipe.get_by_name ("rpicamsrc")
#rpicamsrc.set_property("annotation-mode", 1)
#rpicamsrc.set_property("annotation-text", "Saturation %d" % (saturation))
self.video_tee = self.pipe.get_by_name('video-tee')
self.audio_tee = self.pipe.get_by_name('audio-tee')
#video_mixer = self.pipe.get_by_name('video-mixer')
#audio_mixer = self.pipe.get_by_name('audio-mixer')
#bus = self.pipe.get_bus()
#bus.add_signal_watch()
#bus.connect('message', self.bus_message)
##pollfd = self.bus.get_pollfd()
##asyncio.get_event_loop().add_reader(pollfd.fd, self.poll_cb)
self.pipe.set_state(Gst.State.PLAYING)
print("GLOBAL PIPELINE RUNNING")
def close_global_pipeline(self):
print("CLOSING PIPELINE")
#bus = self.pipe.get_bus()
##bus.disconnect('message')
#bus.remove_signal_watch()
self.pipe.set_state(Gst.State.NULL)
self.pipe = None
def start_client_pipeline(self, client_sid):
print("starting client pipeline for client [%s]" % client_sid)
self._peers[client_sid] = {}
peer_bin = Gst.parse_bin_from_description(PEER_BIN_DESC, False)
webrtcbin = peer_bin.get_by_name("webrtcbin")
# webrtcbin.set_property("stun-server", "stun://stun.l.google.com:19302")
# webrtcbin.set_property("bundle-policy", "max-bundle")
##webrtcbin.set_property("latency", 40)
#rtpbin = webrtcbin.get_by_name('rtpbin')
#rtpbin.set_property("latency", 40)
audio_queue = peer_bin.get_by_name("audio-queue")
audio_sink_pad = Gst.GhostPad.new("audio_sink", audio_queue.get_static_pad("sink"))
peer_bin.add_pad(audio_sink_pad)
video_queue = peer_bin.get_by_name("video-queue")
video_sink_pad = Gst.GhostPad.new("video_sink", video_queue.get_static_pad("sink"))
peer_bin.add_pad(video_sink_pad)
self._peers[client_sid]['bin'] = peer_bin
self._peers[client_sid]['webrtcbin'] = webrtcbin
self.pipe.add(peer_bin)
webrtcbin.connect('on-negotiation-needed', self.on_negotiation_needed, client_sid)
webrtcbin.connect('on-ice-candidate', self.on_ice_candidate, client_sid)
#webrtcbin.connect_pad_added "on_incoming_stream"
# connect_pad_added for audio_src and video_src (app.audio_mixer.get_request_pad('sink_%u')
# connect unlinked
audio_src_pad = self.audio_tee.get_request_pad('src_%u')
audio_block = audio_src_pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, self.probe_block)
audio_src_pad.link(audio_sink_pad)
video_src_pad = self.video_tee.get_request_pad('src_%u')
video_block = video_src_pad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, self.probe_block)
video_src_pad.link(video_sink_pad)
peer_bin.sync_state_with_parent()
#test
#self.pipe.sync_children_states()
#webrtc.sync_children_states()
#debug
#video_caps = video_src_pad.get_current_caps()
#if not video_caps:
# video_caps = video_src_pad.query_caps()
#print(video_caps)
audio_src_pad.remove_probe(audio_block)
video_src_pad.remove_probe(video_block)
def probe_block(self, pad, info):
print("blocked")
return Gst.PadProbeReturn.OK
# def bus_message(self, bus, msg):
# t = msg.type
# print("BUS MESSGE")
# if t == gst.MessageType.ERROR:
# busError, detail = msg.parse_error()
# print("gstreamer bus message error: %s" % busError)
def close_client_pipeline(self, client_sid):
print("closing client pipeline")
webrtcbin = self._peers[client_sid]['webrtcbin']
peer_bin = self._peers[client_sid]['bin']
audio_tee_sinkpad = self.audio_tee.get_static_pad("sink")
audio_block = audio_tee_sinkpad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, self.probe_block)
video_tee_sinkpad = self.video_tee.get_static_pad("sink")
video_block = video_tee_sinkpad.add_probe(Gst.PadProbeType.BLOCK_DOWNSTREAM, self.probe_block)
audio_sinkpad = peer_bin.get_static_pad("audio_sink")
audio_tee_srcpad = audio_sinkpad.get_peer()
audio_tee_srcpad.unlink(audio_sinkpad)
self.audio_tee.release_request_pad(audio_tee_srcpad)
audio_tee_sinkpad.remove_probe(audio_block)
video_sinkpad = peer_bin.get_static_pad("video_sink")
video_tee_srcpad = video_sinkpad.get_peer()
video_tee_srcpad.unlink(video_sinkpad)
self.video_tee.release_request_pad(video_tee_srcpad)
video_tee_sinkpad.remove_probe(video_block)
self.pipe.remove(peer_bin)
peer_bin.set_state(Gst.State.NULL)
self._peers[client_sid]['bin'] = None
self._peers[client_sid]['webrtcbin'] = None
del self._peers[client_sid]
def handle_sdp_answer(self, sdp, client_sid):
print("handle_sdp_answer")
#print ('Received answer:\n%s' % sdp)
res, sdpmsg = GstSdp.SDPMessage.new()
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
promise = Gst.Promise.new()
self._peers[client_sid]['webrtcbin'].emit('set-remote-description', answer, promise)
promise.interrupt()
def handle_sdp_offer(self, sdp, client_sid):
pass
def handle_ice(self, ice, client_sid):
print("handle_ice")
candidate = ice['candidate']
if not candidate or candidate == '':
print("candidate string is empty")
return
sdpmlineindex = ice['sdpMLineIndex']
self._peers[client_sid]['webrtcbin'].emit('add-ice-candidate', sdpmlineindex, candidate)
def on_negotiation_needed(self, receiver_entry, client_sid):
print("on_negotiation_needed [%s]" % client_sid)
promise = Gst.Promise.new_with_change_func(self.on_offer_created, receiver_entry, client_sid, None)
receiver_entry.emit('create-offer', None, promise)
def on_ice_candidate(self, _, mlineindex, candidate, client_sid):
print("on_ice_candidate [%s]" % client_sid)
icemsg = json.dumps({'type': 'ice', 'data': {'candidate': candidate, 'sdpMLineIndex': mlineindex}, 'sid': client_sid})
sio.emit('message', data=icemsg, namespace='/webrtc')
# Offer created by our pipeline, to be sent to the peer
def on_offer_created(self, promise, _, client_sid, __):
print("on_offer_created [%s]" % client_sid)
promise.wait()
reply = promise.get_reply()
offer = reply['offer']
promise = Gst.Promise.new()
self._peers[client_sid]['webrtcbin'].emit('set-local-description', offer, promise)
promise.interrupt()
text = offer.sdp.as_text()
print ('Sending offer:\n%s' % text)
msg = json.dumps({'type': 'sdp', 'data': {'type': 'offer', 'sdp': text}, 'sid': client_sid})
sio.emit('message', data=msg, namespace='/webrtc')
print("test")
@sio.on('message', namespace='/webrtc')
def webrtc_message(data):
from roberto import webrtccamera
# print('WebRTCCamera got Message: {}'.format(data))
if not data:
print("data malformed")
return
if not 'type' in data:
print("Received message without type field")
return
if not 'data' in data:
print("Received message without data field")
return
if not 'from_sid' in data:
print("Received message without from_sid field")
return
if data['from_sid'] == webrtccamera.sid:
print("Received message from self")
return
client_sid = data['from_sid']
if data['type'] == 'sdp':
if not 'type' in data['data']:
print("Received SDP message without type field")
return
if not 'sdp' in data['data']:
print("Received SDP message without SDP string")
return
if data['data']['type'] == 'answer':
print("Received SDP answer:")
print(data['data']['sdp'])
webrtccamera.handle_sdp_answer(data['data']['sdp'], client_sid)
elif data['data']['type'] == 'offer':
print("Received SDP offer:")
print(data['data']['sdp'])
webrtccamera.handle_sdp_offer(data['data']['sdp'], client_sid)
else:
print("Expected SDP message type \"answer\", got \"%s\"" % data['data']['type'])
elif data['type'] == 'ice':
if not 'sdpMLineIndex' in data['data']:
print("Received ICE message without mline index")
return
if 'candidate' in data['data']:
print("Received ICE candidate with mline index %u" % data['data']['sdpMLineIndex'])
print(data['data']['candidate'])
webrtccamera.handle_ice(data['data'], client_sid)
else:
print("Received ICE message without ICE candidate string")
return
else:
print("Unknown message \"%s\", ignoring" % data['data'])
@sio.event
def connected():
print("I'm connected!")
@sio.event
def connect_error():
print("The connection failed!")
@sio.event
def disconnected():
print("I'm disconnected!")
def check_plugins():
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
"rtpmanager", "videotestsrc", "audiotestsrc"]
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
if len(missing):
print('Missing gstreamer plugins:', missing)
return False
return True