#!/usr/bin/env python3 # # Copyright (C) 2018 Matthew Waters # 2022 Nirbheek Chauhan # # Demo gstreamer app for negotiating and streaming a sendrecv webrtc stream # with a browser JS app, implemented in Python. import random import ssl import websockets import asyncio import os import sys import json import argparse import gi gi.require_version('Gst', '1.0') from gi.repository import Gst gi.require_version('GstWebRTC', '1.0') from gi.repository import GstWebRTC gi.require_version('GstSdp', '1.0') from gi.repository import GstSdp # Ensure that gst-python is installed try: from gi.overrides import Gst as _ except ImportError: print('gstreamer-python binding overrides aren\'t available, please install them') raise # These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations PIPELINE_DESC = ''' webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302 videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \ vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay ! queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv. audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay ! queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv. ''' from websockets.version import version as wsv def print_status(msg): print(f'--- {msg}') def print_error(msg): print(f'!!! {msg}', file=sys.stderr) class WebRTCClient: def __init__(self, loop, our_id, peer_id, server, remote_is_offerer): self.conn = None self.pipe = None self.webrtc = None self.event_loop = loop self.server = server # An optional user-specified ID we can use to register self.our_id = our_id # The actual ID we used to register self.id_ = None # An optional peer ID we should connect to self.peer_id = peer_id # Whether we will send the offer or the remote peer will self.remote_is_offerer = remote_is_offerer async def send(self, msg): assert self.conn print(f'>>> {msg}') await self.conn.send(msg) async def connect(self): self.conn = await websockets.connect(self.server) if self.our_id is None: self.id_ = str(random.randrange(10, 10000)) else: self.id_ = self.our_id await self.send(f'HELLO {self.id_}') async def setup_call(self): assert self.peer_id await self.send(f'SESSION {self.peer_id}') def send_soon(self, msg): asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop) def send_sdp(self, offer): text = offer.sdp.as_text() if offer.type == GstWebRTC.WebRTCSDPType.OFFER: print_status('Sending offer:\n%s' % text) msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}}) elif offer.type == GstWebRTC.WebRTCSDPType.ANSWER: print_status('Sending answer:\n%s' % text) msg = json.dumps({'sdp': {'type': 'answer', 'sdp': text}}) else: raise AssertionError(offer.type) self.send_soon(msg) def on_offer_created(self, promise, _, __): assert(promise.wait() == Gst.PromiseResult.REPLIED) reply = promise.get_reply() offer = reply['offer'] promise = Gst.Promise.new() print_status('Offer created, setting local description') self.webrtc.emit('set-local-description', offer, promise) promise.interrupt() # we don't care about the result, discard it self.send_sdp(offer) def on_negotiation_needed(self, _, create_offer): if create_offer: print_status('Call was connected: creating offer') promise = Gst.Promise.new_with_change_func(self.on_offer_created, None, None) self.webrtc.emit('create-offer', None, promise) elif self.remote_is_offerer: # We are initiating the call, but we want the remote peer to create the offer print_status('Call was connected: requesting remote peer for offer') self.send_soon('OFFER_REQUEST') def send_ice_candidate_message(self, _, mlineindex, candidate): icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}}) self.send_soon(icemsg) def on_incoming_decodebin_stream(self, _, pad): if not pad.has_current_caps(): print_error(pad, 'has no caps, ignoring') return caps = pad.get_current_caps() assert (len(caps)) s = caps[0] name = s.get_name() if name.startswith('video'): q = Gst.ElementFactory.make('queue') conv = Gst.ElementFactory.make('videoconvert') sink = Gst.ElementFactory.make('autovideosink') self.pipe.add(q, conv, sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(sink) elif name.startswith('audio'): q = Gst.ElementFactory.make('queue') conv = Gst.ElementFactory.make('audioconvert') resample = Gst.ElementFactory.make('audioresample') sink = Gst.ElementFactory.make('autoaudiosink') self.pipe.add(q, conv, resample, sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(resample) resample.link(sink) def on_ice_gathering_state_notify(self, pspec, _): state = self.webrtc.get_property('ice-gathering-state') print_status(f'ICE gathering state changed to {state}') def on_incoming_stream(self, _, pad): if pad.direction != Gst.PadDirection.SRC: return decodebin = Gst.ElementFactory.make('decodebin') decodebin.connect('pad-added', self.on_incoming_decodebin_stream) self.pipe.add(decodebin) decodebin.sync_state_with_parent() self.webrtc.link(decodebin) def start_pipeline(self, create_offer=True): print_status(f'Creating pipeline, create_offer: {create_offer}') self.pipe = Gst.parse_launch(PIPELINE_DESC) self.webrtc = self.pipe.get_by_name('sendrecv') self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed, create_offer) self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) self.webrtc.connect('notify::ice-gathering-state', self.on_ice_gathering_state_notify) self.webrtc.connect('pad-added', self.on_incoming_stream) self.pipe.set_state(Gst.State.PLAYING) def on_answer_created(self, promise, _, __): assert(promise.wait() == Gst.PromiseResult.REPLIED) reply = promise.get_reply() answer = reply['answer'] promise = Gst.Promise.new() self.webrtc.emit('set-local-description', answer, promise) promise.interrupt() # we don't care about the result, discard it self.send_sdp(answer) def on_offer_set(self, promise, _, __): assert(promise.wait() == Gst.PromiseResult.REPLIED) promise = Gst.Promise.new_with_change_func(self.on_answer_created, None, None) self.webrtc.emit('create-answer', None, promise) def handle_json(self, message): assert (self.webrtc) try: msg = json.loads(message) except json.decoder.JSONDecoderError: print_error('Failed to parse JSON message, this might be a bug') raise if 'sdp' in msg: sdp = msg['sdp']['sdp'] if msg['sdp']['type'] == 'answer': print_status('Received answer:\n%s' % sdp) res, sdpmsg = GstSdp.SDPMessage.new() GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) promise = Gst.Promise.new() self.webrtc.emit('set-remote-description', answer, promise) promise.interrupt() # we don't care about the result, discard it else: print_status('Received offer:\n%s' % sdp) res, sdpmsg = GstSdp.SDPMessage.new() GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) offer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.OFFER, sdpmsg) promise = Gst.Promise.new_with_change_func(self.on_offer_set, None, None) self.webrtc.emit('set-remote-description', offer, promise) elif 'ice' in msg: ice = msg['ice'] candidate = ice['candidate'] sdpmlineindex = ice['sdpMLineIndex'] self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) def close_pipeline(self): if self.pipe: self.pipe.set_state(Gst.State.NULL) self.pipe = None self.webrtc = None def is_incoming_offer(self, msg): if self.webrtc: return False if self.remote_is_offerer: return True return True async def loop(self): assert self.conn async for message in self.conn: print(f'<<< {message}') if message == 'HELLO': assert self.id_ # If a peer ID is specified, we want to connect to it. If not, # we wait for an incoming call. if not self.peer_id: print_status(f'Waiting for incoming call: ID is {self.id_}') else: if self.remote_is_offerer: print_status('Have peer ID: initiating call (will request remote peer to create offer)') else: print_status('Have peer ID: initiating call (will create offer)') await self.setup_call() elif message == 'SESSION_OK': if self.remote_is_offerer: self.start_pipeline(create_offer=False) else: self.start_pipeline() elif message == 'OFFER_REQUEST': print_status('Incoming call: we have been asked to create the offer') self.start_pipeline() elif message.startswith('ERROR'): print_error(message) self.close_pipeline() return 1 else: if self.is_incoming_offer(message): print_status('Incoming call: received an offer, creating pipeline') self.start_pipeline(create_offer=False) self.handle_json(message) self.close_pipeline() return 0 async def stop(self): if self.conn: await self.conn.close() self.conn = None def check_plugins(): needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp", "rtpmanager", "videotestsrc", "audiotestsrc"] missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed)) if len(missing): print_error('Missing gstreamer plugins:', missing) return False return True if __name__ == '__main__': Gst.init(None) if not check_plugins(): sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--peer-id', help='String ID of the peer to connect to') parser.add_argument('--our-id', help='String ID that the peer can use to connect to us') parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"') parser.add_argument('--remote-offerer', default=False, action='store_true', dest='remote_is_offerer', help='Request that the peer generate the offer and we\'ll answer') args = parser.parse_args() if not args.peer_id and not args.our_id: print('You must pass either --peer-id or --our-id') sys.exit(1) loop = asyncio.new_event_loop() c = WebRTCClient(loop, args.our_id, args.peer_id, args.server, args.remote_is_offerer) loop.run_until_complete(c.connect()) res = loop.run_until_complete(c.loop()) sys.exit(res)