webrtcsink: Add a custom signaller example in Python

This re-implements the default webrtcsink/src signalling protocol in
Python for demonstration purposes.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1569>
This commit is contained in:
Sebastian Dröge 2024-05-10 13:58:50 +03:00 committed by GStreamer Marge Bot
parent a719cbfcc6
commit 613ed56675

View file

@ -0,0 +1,206 @@
#!/usr/bin/env python3
#
# Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
# 2022 Nirbheek Chauhan <nirbheek@centricular.com>
# 2024 Sebastian Dröge <sebastian@centricular.com>
#
# Demo application that shows how to implement a custom signaller around
# webrtcsink from Python
from websockets.version import version as wsv
import random
import ssl
import websockets
import asyncio
import os
import sys
import json
import argparse
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst # NOQA
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp # NOQA
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC # NOQA
# Ensure that gst-python is installed
try:
from gi.overrides import Gst as _
except ImportError:
print('gstreamer-python binding overrides aren\'t available, please install them')
raise
class WebRTCClient:
def __init__(self, loop, server):
self.conn = None
self.pipe = None
self.webrtc = None
self.event_loop = loop
self.server = server
self.pipe = Gst.parse_launch('webrtcsink name=sink do-fec=false videotestsrc is-live=true ! video/x-raw,width=800,height=600 ! sink. audiotestsrc is-live=true ! sink.')
bus = self.pipe.get_bus()
self.event_loop.add_reader(bus.get_pollfd().fd, self.on_bus_poll_cb, bus)
self.webrtcsink = self.pipe.get_by_name('sink')
self.signaller = self.webrtcsink.get_property('signaller')
self.signaller.connect('start', self.signaller_on_start)
self.signaller.connect('stop', self.signaller_on_stop)
self.signaller.connect('send-session-description', self.signaller_on_send_session_description)
self.signaller.connect('send-ice', self.signaller_on_send_ice)
self.signaller.connect('end-session', self.signaller_on_end_session)
self.signaller.connect('consumer-added', self.signaller_on_consumer_added)
self.signaller.connect('consumer-removed', self.signaller_on_consumer_removed)
self.signaller.connect('webrtcbin-ready', self.signaller_on_webrtcbin_ready)
self.pipe.set_state(Gst.State.PLAYING)
async def send(self, msg):
assert self.conn
print(f'>>> {msg}')
await self.conn.send(json.dumps(msg))
async def connect(self):
print(f'connecting to {self.server}')
self.conn = await websockets.connect(self.server)
assert self.conn
async for message in self.conn:
await self.handle_json(message)
self.close_pipeline()
def send_soon(self, msg):
asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
def on_bus_poll_cb(self, bus):
def remove_bus_poll():
self.event_loop.remove_reader(bus.get_pollfd().fd)
self.event_loop.stop()
while bus.peek():
msg = bus.pop()
if msg.type == Gst.MessageType.ERROR:
err = msg.parse_error()
print("ERROR:", err.gerror, err.debug)
remove_bus_poll()
break
elif msg.type == Gst.MessageType.EOS:
remove_bus_poll()
break
elif msg.type == Gst.MessageType.LATENCY:
self.pipe.recalculate_latency()
async def handle_json(self, message):
try:
msg = json.loads(message)
except json.decoder.JSONDecoderError:
print('Failed to parse JSON message, this might be a bug')
raise
print(f'<<< {msg}')
if msg['type'] == 'welcome':
self.peer_id = msg['peerId']
print(f'Got peer ID {self.peer_id} assigned')
meta = self.signaller_emit_request_meta()
await self.send({'type': 'setPeerStatus', 'roles': ['producer'], 'meta': meta})
elif msg['type'] == 'sessionStarted':
pass
elif msg['type'] == 'startSession':
self.signaller_emit_session_requested(msg['sessionId'], msg['peerId'], None)
elif msg['type'] == 'endSession':
self.signaller_emit_session_ended(msg['sessionId'])
elif msg['type'] == 'peer':
if 'sdp' in msg:
sdp = msg['sdp']
assert sdp['type'] == 'answer'
self.signaller_emit_session_description(msg['sessionId'], sdp['sdp'])
elif 'ice' in msg:
ice = msg['ice']
self.signaller_emit_handle_ice(msg['sessionId'], ice['sdpMLineIndex'], None, ice['candidate'])
else:
print('unknown peer message')
pass
elif msg['type'] == 'error':
self.signaller_emit_error(f'Error message from server {msg['details']}')
else:
print('unknown message type')
def close_pipeline(self):
if self.pipe:
self.pipe.set_state(Gst.State.NULL)
self.pipe = None
self.webrtcsink = None
self.signaller = None
async def stop(self):
if self.conn:
await self.conn.close()
self.conn = None
# Signal handlers that are called from webrtcsink
def signaller_on_start(self, _):
print('starting')
asyncio.run_coroutine_threadsafe(self.connect(), self.event_loop)
return True
def signaller_on_stop(self, _):
print('stopping')
self.event_loop.stop()
return True
def signaller_on_send_session_description(self, _, session_id, offer):
sdp = offer.sdp.as_text()
assert offer.type == GstWebRTC.WebRTCSDPType.OFFER
self.send_soon({'type': 'peer', 'sessionId': session_id, 'sdp': { 'type': 'offer', 'sdp': sdp }})
return True
def signaller_on_send_ice(self, _, session_id, candidate, sdp_m_line_index, sdp_mid):
self.send_soon({'type': 'peer', 'sessionId': session_id, 'ice': {'candidate': candidate, 'sdpMLineIndex': sdp_m_line_index}})
return True
def signaller_on_end_session(self, _, session_id):
self.send_soon({'type': 'endSession', 'sessionId': session_id})
return True
def signaller_on_consumer_added(self, _, peer_id, webrtcbin):
pass
def signaller_on_consumer_removed(self, _, peer_id, webrtcbin):
pass
def signaller_on_webrtcbin_ready(self, _, peer_id, webrtcbin):
pass
# Signals we have to emit to notify webrtcsink
def signaller_emit_error(self, error):
self.signaller.emit('error', error)
def signaller_emit_request_meta(self):
meta = self.signaller.emit('request-meta')
return meta
def signaller_emit_session_requested(self, session_id, peer_id, offer):
self.signaller.emit('session-requested', session_id, peer_id, offer)
def signaller_emit_session_description(self, session_id, sdp):
res, sdp = GstSdp.SDPMessage.new_from_text(sdp)
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdp)
self.signaller.emit('session-description', session_id, answer)
def signaller_emit_handle_ice(self, session_id, sdp_m_line_index, sdp_mid, candidate):
self.signaller.emit('handle-ice', session_id, sdp_m_line_index, sdp_mid, candidate)
def signaller_emit_session_ended(self, session_id):
return self.signaller.emit('session-ended', session_id)
def signaller_emit_shutdown(self):
self.signaller.emit('shutdown')
if __name__ == '__main__':
Gst.init(None)
parser = argparse.ArgumentParser()
parser.add_argument('--server', default='ws://127.0.0.1:8443',
help='Signalling server to connect to, eg "ws://127.0.0.1:8443"')
args = parser.parse_args()
loop = asyncio.new_event_loop()
c = WebRTCClient(loop, args.server)
res = loop.run_forever()
sys.exit(res)