mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-28 22:41:02 +00:00
5010ee872d
The GstWebRTC API web interface defaults to receiving an SDP offer and generating an answer, but this can be overridden by entering "offer options" before clicking to open the remote stream. The Python webrtcsink-custom-signaller.py example failed in this mode as it was coded to only generate an offer and receive an answer. Fix this by implementing support for receiving an offer and sending an answer. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1883>
213 lines
8 KiB
Python
213 lines
8 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
|
|
# 2022 Nirbheek Chauhan <nirbheek@centricular.com>
|
|
# 2024 Sebastian Dröge <sebastian@centricular.com>
|
|
#
|
|
# Demo application that shows how to implement a custom signaller around
|
|
# webrtcsink from Python
|
|
|
|
from websockets.version import version as wsv
|
|
import random
|
|
import ssl
|
|
import websockets
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
|
|
import gi
|
|
gi.require_version('Gst', '1.0')
|
|
from gi.repository import Gst # NOQA
|
|
gi.require_version('GstSdp', '1.0')
|
|
from gi.repository import GstSdp # NOQA
|
|
gi.require_version('GstWebRTC', '1.0')
|
|
from gi.repository import GstWebRTC # NOQA
|
|
|
|
# Ensure that gst-python is installed
|
|
try:
|
|
from gi.overrides import Gst as _
|
|
except ImportError:
|
|
print('gstreamer-python binding overrides aren\'t available, please install them')
|
|
raise
|
|
|
|
class WebRTCClient:
|
|
def __init__(self, loop, server):
|
|
self.conn = None
|
|
self.pipe = None
|
|
self.webrtc = None
|
|
self.event_loop = loop
|
|
self.server = server
|
|
|
|
self.pipe = Gst.parse_launch('webrtcsink name=sink do-fec=false videotestsrc is-live=true ! video/x-raw,width=800,height=600 ! sink. audiotestsrc is-live=true ! sink.')
|
|
bus = self.pipe.get_bus()
|
|
self.event_loop.add_reader(bus.get_pollfd().fd, self.on_bus_poll_cb, bus)
|
|
self.webrtcsink = self.pipe.get_by_name('sink')
|
|
self.signaller = self.webrtcsink.get_property('signaller')
|
|
self.signaller.connect('start', self.signaller_on_start)
|
|
self.signaller.connect('stop', self.signaller_on_stop)
|
|
self.signaller.connect('send-session-description', self.signaller_on_send_session_description)
|
|
self.signaller.connect('send-ice', self.signaller_on_send_ice)
|
|
self.signaller.connect('end-session', self.signaller_on_end_session)
|
|
self.signaller.connect('consumer-added', self.signaller_on_consumer_added)
|
|
self.signaller.connect('consumer-removed', self.signaller_on_consumer_removed)
|
|
self.signaller.connect('webrtcbin-ready', self.signaller_on_webrtcbin_ready)
|
|
self.pipe.set_state(Gst.State.PLAYING)
|
|
|
|
async def send(self, msg):
|
|
assert self.conn
|
|
print(f'>>> {msg}')
|
|
await self.conn.send(json.dumps(msg))
|
|
|
|
async def connect(self):
|
|
print(f'connecting to {self.server}')
|
|
self.conn = await websockets.connect(self.server)
|
|
assert self.conn
|
|
async for message in self.conn:
|
|
await self.handle_json(message)
|
|
self.close_pipeline()
|
|
|
|
def send_soon(self, msg):
|
|
asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
|
|
|
|
def on_bus_poll_cb(self, bus):
|
|
def remove_bus_poll():
|
|
self.event_loop.remove_reader(bus.get_pollfd().fd)
|
|
self.event_loop.stop()
|
|
while bus.peek():
|
|
msg = bus.pop()
|
|
if msg.type == Gst.MessageType.ERROR:
|
|
err = msg.parse_error()
|
|
print("ERROR:", err.gerror, err.debug)
|
|
remove_bus_poll()
|
|
break
|
|
elif msg.type == Gst.MessageType.EOS:
|
|
remove_bus_poll()
|
|
break
|
|
elif msg.type == Gst.MessageType.LATENCY:
|
|
self.pipe.recalculate_latency()
|
|
|
|
async def handle_json(self, message):
|
|
try:
|
|
msg = json.loads(message)
|
|
except json.decoder.JSONDecoderError:
|
|
print('Failed to parse JSON message, this might be a bug')
|
|
raise
|
|
print(f'<<< {msg}')
|
|
|
|
if msg['type'] == 'welcome':
|
|
self.peer_id = msg['peerId']
|
|
print(f'Got peer ID {self.peer_id} assigned')
|
|
meta = self.signaller_emit_request_meta()
|
|
await self.send({'type': 'setPeerStatus', 'roles': ['producer'], 'meta': meta})
|
|
elif msg['type'] == 'sessionStarted':
|
|
pass
|
|
elif msg['type'] == 'startSession':
|
|
offer = None
|
|
if msg['offer']:
|
|
_, sdpmsg = GstSdp.SDPMessage.new()
|
|
GstSdp.sdp_message_parse_buffer(bytes(msg['offer'].encode()), sdpmsg)
|
|
offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg)
|
|
self.signaller_emit_session_requested(msg['sessionId'], msg['peerId'], offer)
|
|
elif msg['type'] == 'endSession':
|
|
self.signaller_emit_session_ended(msg['sessionId'])
|
|
elif msg['type'] == 'peer':
|
|
if 'sdp' in msg:
|
|
sdp = msg['sdp']
|
|
assert sdp['type'] == 'answer'
|
|
self.signaller_emit_session_description(msg['sessionId'], sdp['sdp'])
|
|
elif 'ice' in msg:
|
|
ice = msg['ice']
|
|
self.signaller_emit_handle_ice(msg['sessionId'], ice['sdpMLineIndex'], None, ice['candidate'])
|
|
else:
|
|
print('unknown peer message')
|
|
pass
|
|
elif msg['type'] == 'error':
|
|
self.signaller_emit_error(f'Error message from server {msg['details']}')
|
|
else:
|
|
print('unknown message type')
|
|
|
|
def close_pipeline(self):
|
|
if self.pipe:
|
|
self.pipe.set_state(Gst.State.NULL)
|
|
self.pipe = None
|
|
self.webrtcsink = None
|
|
self.signaller = None
|
|
|
|
async def stop(self):
|
|
if self.conn:
|
|
await self.conn.close()
|
|
self.conn = None
|
|
|
|
# Signal handlers that are called from webrtcsink
|
|
def signaller_on_start(self, _):
|
|
print('starting')
|
|
asyncio.run_coroutine_threadsafe(self.connect(), self.event_loop)
|
|
return True
|
|
|
|
def signaller_on_stop(self, _):
|
|
print('stopping')
|
|
self.event_loop.stop()
|
|
return True
|
|
|
|
def signaller_on_send_session_description(self, _, session_id, offer):
|
|
typ = 'offer'
|
|
if offer.type == GstWebRTC.WebRTCSDPType.ANSWER:
|
|
typ = 'answer'
|
|
sdp = offer.sdp.as_text()
|
|
self.send_soon({'type': 'peer', 'sessionId': session_id, 'sdp': { 'type': typ, 'sdp': sdp }})
|
|
return True
|
|
|
|
def signaller_on_send_ice(self, _, session_id, candidate, sdp_m_line_index, sdp_mid):
|
|
self.send_soon({'type': 'peer', 'sessionId': session_id, 'ice': {'candidate': candidate, 'sdpMLineIndex': sdp_m_line_index}})
|
|
return True
|
|
|
|
def signaller_on_end_session(self, _, session_id):
|
|
self.send_soon({'type': 'endSession', 'sessionId': session_id})
|
|
return True
|
|
|
|
def signaller_on_consumer_added(self, _, peer_id, webrtcbin):
|
|
pass
|
|
|
|
def signaller_on_consumer_removed(self, _, peer_id, webrtcbin):
|
|
pass
|
|
|
|
def signaller_on_webrtcbin_ready(self, _, peer_id, webrtcbin):
|
|
pass
|
|
|
|
# Signals we have to emit to notify webrtcsink
|
|
def signaller_emit_error(self, error):
|
|
self.signaller.emit('error', error)
|
|
|
|
def signaller_emit_request_meta(self):
|
|
meta = self.signaller.emit('request-meta')
|
|
return meta
|
|
|
|
def signaller_emit_session_requested(self, session_id, peer_id, offer):
|
|
self.signaller.emit('session-requested', session_id, peer_id, offer)
|
|
|
|
def signaller_emit_session_description(self, session_id, sdp):
|
|
res, sdp = GstSdp.SDPMessage.new_from_text(sdp)
|
|
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdp)
|
|
self.signaller.emit('session-description', session_id, answer)
|
|
|
|
def signaller_emit_handle_ice(self, session_id, sdp_m_line_index, sdp_mid, candidate):
|
|
self.signaller.emit('handle-ice', session_id, sdp_m_line_index, sdp_mid, candidate)
|
|
|
|
def signaller_emit_session_ended(self, session_id):
|
|
return self.signaller.emit('session-ended', session_id)
|
|
|
|
def signaller_emit_shutdown(self):
|
|
self.signaller.emit('shutdown')
|
|
|
|
if __name__ == '__main__':
|
|
Gst.init(None)
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--server', default='ws://127.0.0.1:8443',
|
|
help='Signalling server to connect to, eg "ws://127.0.0.1:8443"')
|
|
args = parser.parse_args()
|
|
loop = asyncio.new_event_loop()
|
|
c = WebRTCClient(loop, args.server)
|
|
res = loop.run_forever()
|
|
sys.exit(res)
|