diff --git a/net/webrtc/examples/webrtcsink-custom-signaller.py b/net/webrtc/examples/webrtcsink-custom-signaller.py new file mode 100644 index 00000000..0dc58f76 --- /dev/null +++ b/net/webrtc/examples/webrtcsink-custom-signaller.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2018 Matthew Waters +# 2022 Nirbheek Chauhan +# 2024 Sebastian Dröge +# +# Demo application that shows how to implement a custom signaller around +# webrtcsink from Python + +from websockets.version import version as wsv +import random +import ssl +import websockets +import asyncio +import os +import sys +import json +import argparse + +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst # NOQA +gi.require_version('GstSdp', '1.0') +from gi.repository import GstSdp # NOQA +gi.require_version('GstWebRTC', '1.0') +from gi.repository import GstWebRTC # NOQA + +# Ensure that gst-python is installed +try: + from gi.overrides import Gst as _ +except ImportError: + print('gstreamer-python binding overrides aren\'t available, please install them') + raise + +class WebRTCClient: + def __init__(self, loop, server): + self.conn = None + self.pipe = None + self.webrtc = None + self.event_loop = loop + self.server = server + + self.pipe = Gst.parse_launch('webrtcsink name=sink do-fec=false videotestsrc is-live=true ! video/x-raw,width=800,height=600 ! sink. audiotestsrc is-live=true ! sink.') + bus = self.pipe.get_bus() + self.event_loop.add_reader(bus.get_pollfd().fd, self.on_bus_poll_cb, bus) + self.webrtcsink = self.pipe.get_by_name('sink') + self.signaller = self.webrtcsink.get_property('signaller') + self.signaller.connect('start', self.signaller_on_start) + self.signaller.connect('stop', self.signaller_on_stop) + self.signaller.connect('send-session-description', self.signaller_on_send_session_description) + self.signaller.connect('send-ice', self.signaller_on_send_ice) + self.signaller.connect('end-session', self.signaller_on_end_session) + self.signaller.connect('consumer-added', self.signaller_on_consumer_added) + self.signaller.connect('consumer-removed', self.signaller_on_consumer_removed) + self.signaller.connect('webrtcbin-ready', self.signaller_on_webrtcbin_ready) + self.pipe.set_state(Gst.State.PLAYING) + + async def send(self, msg): + assert self.conn + print(f'>>> {msg}') + await self.conn.send(json.dumps(msg)) + + async def connect(self): + print(f'connecting to {self.server}') + self.conn = await websockets.connect(self.server) + assert self.conn + async for message in self.conn: + await self.handle_json(message) + self.close_pipeline() + + def send_soon(self, msg): + asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop) + + def on_bus_poll_cb(self, bus): + def remove_bus_poll(): + self.event_loop.remove_reader(bus.get_pollfd().fd) + self.event_loop.stop() + while bus.peek(): + msg = bus.pop() + if msg.type == Gst.MessageType.ERROR: + err = msg.parse_error() + print("ERROR:", err.gerror, err.debug) + remove_bus_poll() + break + elif msg.type == Gst.MessageType.EOS: + remove_bus_poll() + break + elif msg.type == Gst.MessageType.LATENCY: + self.pipe.recalculate_latency() + + async def handle_json(self, message): + try: + msg = json.loads(message) + except json.decoder.JSONDecoderError: + print('Failed to parse JSON message, this might be a bug') + raise + print(f'<<< {msg}') + + if msg['type'] == 'welcome': + self.peer_id = msg['peerId'] + print(f'Got peer ID {self.peer_id} assigned') + meta = self.signaller_emit_request_meta() + await self.send({'type': 'setPeerStatus', 'roles': ['producer'], 'meta': meta}) + elif msg['type'] == 'sessionStarted': + pass + elif msg['type'] == 'startSession': + self.signaller_emit_session_requested(msg['sessionId'], msg['peerId'], None) + elif msg['type'] == 'endSession': + self.signaller_emit_session_ended(msg['sessionId']) + elif msg['type'] == 'peer': + if 'sdp' in msg: + sdp = msg['sdp'] + assert sdp['type'] == 'answer' + self.signaller_emit_session_description(msg['sessionId'], sdp['sdp']) + elif 'ice' in msg: + ice = msg['ice'] + self.signaller_emit_handle_ice(msg['sessionId'], ice['sdpMLineIndex'], None, ice['candidate']) + else: + print('unknown peer message') + pass + elif msg['type'] == 'error': + self.signaller_emit_error(f'Error message from server {msg['details']}') + else: + print('unknown message type') + + def close_pipeline(self): + if self.pipe: + self.pipe.set_state(Gst.State.NULL) + self.pipe = None + self.webrtcsink = None + self.signaller = None + + async def stop(self): + if self.conn: + await self.conn.close() + self.conn = None + + # Signal handlers that are called from webrtcsink + def signaller_on_start(self, _): + print('starting') + asyncio.run_coroutine_threadsafe(self.connect(), self.event_loop) + return True + + def signaller_on_stop(self, _): + print('stopping') + self.event_loop.stop() + return True + + def signaller_on_send_session_description(self, _, session_id, offer): + sdp = offer.sdp.as_text() + assert offer.type == GstWebRTC.WebRTCSDPType.OFFER + self.send_soon({'type': 'peer', 'sessionId': session_id, 'sdp': { 'type': 'offer', 'sdp': sdp }}) + return True + + def signaller_on_send_ice(self, _, session_id, candidate, sdp_m_line_index, sdp_mid): + self.send_soon({'type': 'peer', 'sessionId': session_id, 'ice': {'candidate': candidate, 'sdpMLineIndex': sdp_m_line_index}}) + return True + + def signaller_on_end_session(self, _, session_id): + self.send_soon({'type': 'endSession', 'sessionId': session_id}) + return True + + def signaller_on_consumer_added(self, _, peer_id, webrtcbin): + pass + + def signaller_on_consumer_removed(self, _, peer_id, webrtcbin): + pass + + def signaller_on_webrtcbin_ready(self, _, peer_id, webrtcbin): + pass + + # Signals we have to emit to notify webrtcsink + def signaller_emit_error(self, error): + self.signaller.emit('error', error) + + def signaller_emit_request_meta(self): + meta = self.signaller.emit('request-meta') + return meta + + def signaller_emit_session_requested(self, session_id, peer_id, offer): + self.signaller.emit('session-requested', session_id, peer_id, offer) + + def signaller_emit_session_description(self, session_id, sdp): + res, sdp = GstSdp.SDPMessage.new_from_text(sdp) + answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdp) + self.signaller.emit('session-description', session_id, answer) + + def signaller_emit_handle_ice(self, session_id, sdp_m_line_index, sdp_mid, candidate): + self.signaller.emit('handle-ice', session_id, sdp_m_line_index, sdp_mid, candidate) + + def signaller_emit_session_ended(self, session_id): + return self.signaller.emit('session-ended', session_id) + + def signaller_emit_shutdown(self): + self.signaller.emit('shutdown') + +if __name__ == '__main__': + Gst.init(None) + parser = argparse.ArgumentParser() + parser.add_argument('--server', default='ws://127.0.0.1:8443', + help='Signalling server to connect to, eg "ws://127.0.0.1:8443"') + args = parser.parse_args() + loop = asyncio.new_event_loop() + c = WebRTCClient(loop, args.server) + res = loop.run_forever() + sys.exit(res)