Browse Source

rework everything so the camera is a websocket client (working)

split-pipe
Hendrik Langer 5 years ago
parent
commit
b8be306c31
  1. 1
      raspberry/requirements.py
  2. 2
      raspberry/roberto/__init__.py
  3. 163
      raspberry/roberto/camera/camera_gstreamer_webrtc.py
  4. 30
      raspberry/roberto/views/websocket/routes.py
  5. 211
      raspberry/roberto/views/websocket/templates/camera.html

1
raspberry/requirements.py

@ -2,6 +2,7 @@ python3-flask
python3-flask-sqlalchemy
python3-flask-login
python3-flask-socketio
python3-socketio-client
python3-serial
python3-opencv
python3-picamera

2
raspberry/roberto/__init__.py

@ -19,7 +19,7 @@ socketio = SocketIO()
from roberto.camera.camera_opencv import Camera
camera = Camera()
from roberto.camera.camera_gstreamer_webrtc import WebRTCCamera
webrtccamera = WebRTCCamera(1000, 1001)
webrtccamera = WebRTCCamera()
from roberto.Serial import Serial
serial = Serial()

163
raspberry/roberto/camera/camera_gstreamer_webrtc.py

@ -1,6 +1,12 @@
import os
import sys
from threading import Thread
import socketio
sio = socketio.Client()
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
@ -19,41 +25,42 @@ webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.googl
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
'''
class WebRTCCamera:
def __init__(self, id_, peer_id, server=None):
self.id_ = id_
self.conn = None
class WebRTCCamera(Thread):
def __init__(self):
self.pipe = None
self.webrtc = None
self.socketio = None
self.peer_id = peer_id
self.room = 'default'
self.sid = 'gstwebrtc1000'
self.server = server or 'wss://localhost:5000/webrtc'
self.server = 'ws://localhost:5000'
self.connected = False
self.exitFlag = False
Thread.__init__(self)
Gst.init(None)
if not check_plugins():
sys.exit(1)
def connect(self, socketio, room, sid):
self.socketio = socketio
self.room = room
self.sid = sid
def run(self):
self.connect()
while not self.exitFlag:
sio.sleep(1)
def connect(self):
sio.connect(self.server, namespaces=['/webrtc'], transports=['websocket'])
print('my sid is', sio.sid)
sio.emit('message', 'starting camera', namespace='/webrtc')
self.start_pipeline()
self.connected = True
def disconnect(self):
self.connected = False
self.close_pipeline()
self.socketio = None
sio.disconnect()
def start_pipeline(self):
def start_pipeline(self): # ok
self.pipe = Gst.parse_launch(PIPELINE_DESC)
self.webrtc = self.pipe.get_by_name('sendrecv')
self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
self.webrtc.connect('pad-added', self.on_incoming_stream)
self.webrtc.connect('on-ice-candidate', self.on_ice_candidate)
self.pipe.set_state(Gst.State.PLAYING)
def close_pipeline(self):
@ -61,9 +68,9 @@ class WebRTCCamera:
self.pipe = None
self.webrtc = None
def handle_sdp_answer(self, sdp):
def handle_sdp_answer(self, sdp): # ok
print("handle_sdp_answer")
print ('Received answer:\n%s' % sdp)
#print ('Received answer:\n%s' % sdp)
res, sdpmsg = GstSdp.SDPMessage.new()
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
@ -71,65 +78,23 @@ class WebRTCCamera:
self.webrtc.emit('set-remote-description', answer, promise)
promise.interrupt()
def handle_ice(self, ice):
def handle_ice(self, ice): # ok
print("handle_ice")
candidate = ice['candidate']
sdpmlineindex = ice['sdpMLineIndex']
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
def on_negotiation_needed(self, element):
def on_negotiation_needed(self, receiver_entry): # ok
print("on_negotiation_needed")
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
element.emit('create-offer', None, promise)
def send_ice_candidate_message(self, _, mlineindex, candidate):
print("send_ice_candidate_message")
icemsg = json.dumps({'type': 'candidate', 'candidate': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
self.socketio.emit('data', type='candidate', data=icemsg, room=self.room, namespace='/webrtc', skip_sid=self.sid)
promise = Gst.Promise.new_with_change_func(self.on_offer_created, receiver_entry, None)
receiver_entry.emit('create-offer', None, promise)
def on_incoming_stream(self, _, pad):
print("on_incoming_stream")
if pad.direction != Gst.PadDirection.SRC:
return
def on_ice_candidate(self, _, mlineindex, candidate): # ok
print("on_ice_candidate")
icemsg = json.dumps({'type': 'ice', 'data': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
sio.emit('message', data=icemsg, namespace='/webrtc')
decodebin = Gst.ElementFactory.make('decodebin')
decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
self.pipe.add(decodebin)
decodebin.sync_state_with_parent()
self.webrtc.link(decodebin)
def on_incoming_decodebin_stream(self, _, pad):
print("on_incoming_decodebin_stream")
if not pad.has_current_caps():
print (pad, 'has no caps, ignoring')
return
caps = pad.get_current_caps()
assert (len(caps))
s = caps[0]
name = s.get_name()
if name.startswith('video'):
q = Gst.ElementFactory.make('queue')
conv = Gst.ElementFactory.make('videoconvert')
sink = Gst.ElementFactory.make('autovideosink')
self.pipe.add(q, conv, sink)
self.pipe.sync_children_states()
pad.link(q.get_static_pad('sink'))
q.link(conv)
conv.link(sink)
elif name.startswith('audio'):
q = Gst.ElementFactory.make('queue')
conv = Gst.ElementFactory.make('audioconvert')
resample = Gst.ElementFactory.make('audioresample')
sink = Gst.ElementFactory.make('autoaudiosink')
self.pipe.add(q, conv, resample, sink)
self.pipe.sync_children_states()
pad.link(q.get_static_pad('sink'))
q.link(conv)
conv.link(resample)
resample.link(sink)
def on_offer_created(self, promise, _, __):
def on_offer_created(self, promise, _, __): # ok
print("on_offer_created")
promise.wait()
reply = promise.get_reply()
@ -139,9 +104,63 @@ class WebRTCCamera:
promise.interrupt()
text = offer.sdp.as_text()
print ('Sending offer:\n%s' % text)
msg = json.dumps({'type': 'offer', 'sdp': text})
self.socketio.emit('data', type='offer', data=msg, room=self.room, namespace='/webrtc', skip_sid=self.sid)
msg = json.dumps({'type': 'sdp', 'data': {'type': 'offer', 'sdp': text}})
sio.emit('message', data=msg, namespace='/webrtc')
print("test")
@sio.on('message', namespace='/webrtc') # ok
def webrtc_message(data):
from roberto import webrtccamera
# print('Message from {}: {}'.format(sid, data))
if not data:
print("data malformed")
return
if not 'type' in data:
print("Received message without type field")
return
if not 'data' in data:
print("Received message without data field")
return
if data['type'] == 'sdp':
if not 'type' in data['data']:
print("Received SDP message without type field")
return
if data['data']['type'] == 'answer':
if 'sdp' in data['data']:
print("Received SDP:")
print(data['data']['sdp'])
webrtccamera.handle_sdp_answer(data['data']['sdp'])
else:
print("Received SDP message without SDP string")
else:
print("Expected SDP message type \"answer\", got \"%s\"" % data['data']['type'])
elif data['type'] == 'ice':
if not 'sdpMLineIndex' in data['data']:
print("Received ICE message without mline index")
return
if 'candidate' in data['data']:
print("Received ICE candidate with mline index %u" % data['data']['sdpMLineIndex'])
print(data['data']['candidate'])
webrtccamera.handle_ice(data['data'])
else:
print("Received ICE message without ICE candidate string")
return
else:
print("Unknown message \"%s\", ignoring" % data['data'])
@sio.event
def connected():
print("I'm connected!")
@sio.event
def connect_error():
print("The connection failed!")
@sio.event
def disconnected():
print("I'm disconnected!")
def check_plugins():
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",

30
raspberry/roberto/views/websocket/routes.py

@ -63,43 +63,27 @@ ROOM = 'default'
def camera():
return render_template('camera.html', room='default')
@socketio.on('data', namespace='/webrtc')
@socketio.on('message', namespace='/webrtc') # ok
def webrtc_message(data):
sid = request.sid
print('Message from {}: {}'.format(sid, data))
socketio.emit('data', data=data, room=ROOM, namespace='/webrtc', skip_sid=sid)
if 'type' in data:
if data['type'] == 'offer':
print("OFFER")
elif data['type'] == 'answer':
print("ANSWER")
if 'sdp' in data:
webrtccamera.handle_sdp_answer(data['sdp'])
elif data['type'] == 'candidate':
print("CANDIDATE")
if 'candidiate' in data:
webrtccamera.handle_ice(data)
else:
print("got unknown data message!")
socketio.emit('message', data=data, namespace='/webrtc', skip_sid=sid)
@socketio.on('disconnect', namespace='/webrtc')
def disconnect():
sid = request.sid
print("Received Disconnect message from %s" % sid)
leave_room(ROOM)
# if webrtccamera.connected:
# webrtccamera.disconnect()
if webrtccamera.connected:
webrtccamera.disconnect()
@socketio.on('connect', namespace='/webrtc')
def connect():
sid = request.sid
print("Received Connect message from %s" % sid)
join_room(ROOM)
if not webrtccamera.connected:
webrtccamera.connect(socketio, ROOM, 'gstwebrtc1000')
#socketio.emit('ready', room=ROOM, namespace='/webrtc', skip_sid=sid)
socketio.emit('ready', room=ROOM, namespace='/webrtc')
webrtccamera.start()
socketio.emit('message', 'test2', namespace='/webrtc')
@socketio.on_error_default
def default_error_handler(e):

211
raspberry/roberto/views/websocket/templates/camera.html

@ -14,144 +14,109 @@
</head>
<body>
<div id="videos">
<video id="stream" autoplay playsinline>Your browser does not support video</video>
<div id="video">
<video id="stream" autoplay playsinline muted>Your browser does not support video</video>
</div>
<div id="chatlog"></div>
<textarea id="text" name="text"></textarea>
<button id="sendText" onclick="sendData()">Send</button>
<script type="text/javascript">
'use strict';
const SIGNALING_SERVER_URL = 'wss://' + document.domain + ':' + location.port + '/webrtc';
const PC_CONFIG = {};
var room = '{{room}}';
var html5VideoElement;
var wsUrl = "wss://" + window.location.hostname + ":" + window.location.port + "/webrtc";
var socket = io.connect(wsUrl, { autoConnect: false, transports: [ 'websocket' ] });
var webrtcPeerConnection;
var webrtcConfiguration;
var reportError;
var socket = io.connect(SIGNALING_SERVER_URL, { autoConnect: false, transports: [ 'websocket' ] });
function onLocalDescription(desc) {
console.log("Local description: " + JSON.stringify(desc));
webrtcPeerConnection.setLocalDescription(desc).then(function() {
socket.emit('message', { type: "sdp", "data": webrtcPeerConnection.localDescription });
}).catch(reportError);
}
function onIncomingSDP(sdp) {
console.log("Incoming SDP: " + JSON.stringify(sdp));
webrtcPeerConnection.setRemoteDescription(sdp).catch(reportError);
webrtcPeerConnection.createAnswer().then(onLocalDescription).catch(reportError);
}
function onIncomingICE(ice) {
var candidate = new RTCIceCandidate(ice);
console.log("Incoming ICE: " + JSON.stringify(ice));
webrtcPeerConnection.addIceCandidate(candidate).catch(reportError);
}
function onAddRemoteStream(event) {
html5VideoElement.srcObject = event.streams[0];
}
function onIceCandidate(event) {
if (event.candidate == null)
return;
console.log("Sending ICE candidate out: " + JSON.stringify(event.candidate));
socket.emit('message', { "type": "ice", "data": event.candidate });
}
socket.on('connect', function(){
console.log("Connected...!", socket.connected)
});
const video = document.querySelector("#stream");
socket.on('message', (data) => {
console.log("got message: ",data);
var msg;
try {
msg = JSON.parse(data);
} catch (e) {
console.log("ERROR parsing message");
return;
}
if (!webrtcPeerConnection) {
webrtcPeerConnection = new RTCPeerConnection(webrtcConfiguration);
webrtcPeerConnection.ontrack = onAddRemoteStream;
webrtcPeerConnection.onicecandidate = onIceCandidate;
}
switch (msg.type) {
case "sdp": onIncomingSDP(msg.data); break;
case "ice": onIncomingICE(msg.data); break;
default: break;
}
});
socket.on('data', (data) => {
console.log('Data received: ',data);
var json_data;
if (typeof data === 'string') {
json_data = JSON.parse(data);
} else {
json_data = data;
}
handleSignalingData(json_data);
});
socket.on('ready', () => {
console.log('Ready');
createPeerConnection();
sendOffer();
});
function playStream(videoElement, hostname, port, path, configuration, reportErrorCB) {
var l = window.location;
var wsHost = (hostname != undefined) ? hostname : l.hostname;
var wsPort = (port != undefined) ? port : l.port;
var wsPath = (path != undefined) ? path : "webrtc";
if (wsPort)
wsPort = ":" + wsPort;
var wsUrl = "wss://" + wsHost + wsPort + "/" + wsPath;
html5VideoElement = videoElement;
webrtcConfiguration = configuration;
reportError = (reportErrorCB != undefined) ? reportErrorCB : function(text) {};
socket.connect();
}
window.onload = function() {
var vidstream = document.getElementById("stream");
var config = { 'iceServers': [{ 'urls': 'stun:stun.l.google.com:19302' }] };
playStream(vidstream, null, null, null, config, function (errmsg) { console.error(errmsg); });
};
let sendData = (data) => {
socket.emit('data', data);
};
// WebRTC methods
let pc;
let localStream;
let remoteStreamElement = document.querySelector('#stream');
let getLocalStream = () => {
navigator.mediaDevices.getUserMedia({ audio: true, video: true })
.then((stream) => {
console.log('Stream found');
localStream = stream;
// Connect after making sure that local stream is availble
socket.connect();
})
.catch(error => {
console.error('Stream not found: ', error);
});
}
let createPeerConnection = () => {
try {
pc = new RTCPeerConnection(PC_CONFIG);
pc.onicecandidate = onIceCandidate;
pc.onaddstream = onAddStream;
pc.addStream(localStream);
console.log('PeerConnection created');
} catch (error) {
console.error('PeerConnection failed: ', error);
}
};
let sendOffer = () => {
console.log('Send offer');
pc.createOffer().then(
setAndSendLocalDescription,
(error) => { console.error('Send offer failed: ', error); }
);
};
let sendAnswer = () => {
console.log('Send answer');
pc.createAnswer().then(
setAndSendLocalDescription,
(error) => { console.error('Send answer failed: ', error); }
);
};
let setAndSendLocalDescription = (sessionDescription) => {
pc.setLocalDescription(sessionDescription);
console.log('Local description set');
sendData(sessionDescription);
};
let onIceCandidate = (event) => {
if (event.candidate) {
console.log('ICE candidate');
sendData({
type: 'candidate',
candidate: event.candidate
});
}
};
let onAddStream = (event) => {
console.log('Add stream');
remoteStreamElement.srcObject = event.stream;
};
let handleSignalingData = (data) => {
switch (data.type) {
case 'offer':
createPeerConnection();
pc.setRemoteDescription(new RTCSessionDescription(data));
sendAnswer();
break;
case 'answer':
pc.setRemoteDescription(new RTCSessionDescription(data));
break;
case 'candidate':
pc.addIceCandidate(new RTCIceCandidate(data.candidate));
break;
default:
console.log("got unknown message of type: ", data.type);
break;
}
};
function foo() {
if (pc) {
var sig_state = pc.signalingState;
console.log("signaling state: ", sig_state);
}
}
// Start connection
getLocalStream();
setInterval(foo, 5000);
</script>
</body>

Loading…
Cancel
Save