import random import ssl import websockets import asyncio import os import sys import json import argparse import gi gi.require_version('Gst', '1.0') from gi.repository import Gst gi.require_version('GstWebRTC', '1.0') from gi.repository import GstWebRTC gi.require_version('GstSdp', '1.0') from gi.repository import GstSdp PIPELINE_DESC = ''' webrtcbin name=sendrecv bundle-policy=max-bundle videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay ! queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv. audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay ! queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv. ''' class WebRTCClient: def __init__(self, id_, peer_id, server): self.id_ = id_ self.conn = None self.pipe = None self.webrtc = None self.peer_id = peer_id self.server = server or 'wss://webrtc.nirbheek.in:8443' async def connect(self): sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) self.conn = await websockets.connect(self.server, ssl=sslctx) await self.conn.send('HELLO %d' % our_id) async def setup_call(self): await self.conn.send('SESSION {}'.format(self.peer_id)) def send_sdp_offer(self, offer): text = offer.sdp.as_text() print ('Sending offer:\n%s' % text) msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}}) loop = asyncio.new_event_loop() loop.run_until_complete(self.conn.send(msg)) def on_offer_created(self, promise, _, __): promise.wait() reply = promise.get_reply() offer = reply['offer'] promise = Gst.Promise.new() self.webrtc.emit('set-local-description', offer, promise) promise.interrupt() self.send_sdp_offer(offer) def on_negotiation_needed(self, element): promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None) element.emit('create-offer', None, promise) def send_ice_candidate_message(self, _, mlineindex, candidate): icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}}) loop = asyncio.new_event_loop() loop.run_until_complete(self.conn.send(icemsg)) def on_incoming_decodebin_stream(self, _, pad): if not pad.has_current_caps(): print (pad, 'has no caps, ignoring') return caps = pad.get_current_caps() assert (len(caps)) s = caps[0] name = s.get_name() if name.startswith('video'): q = Gst.ElementFactory.make('queue') conv = Gst.ElementFactory.make('videoconvert') sink = Gst.ElementFactory.make('autovideosink') self.pipe.add(q, conv, sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(sink) elif name.startswith('audio'): q = Gst.ElementFactory.make('queue') conv = Gst.ElementFactory.make('audioconvert') resample = Gst.ElementFactory.make('audioresample') sink = Gst.ElementFactory.make('autoaudiosink') self.pipe.add(q, conv, resample, sink) self.pipe.sync_children_states() pad.link(q.get_static_pad('sink')) q.link(conv) conv.link(resample) resample.link(sink) def on_incoming_stream(self, _, pad): if pad.direction != Gst.PadDirection.SRC: return decodebin = Gst.ElementFactory.make('decodebin') decodebin.connect('pad-added', self.on_incoming_decodebin_stream) self.pipe.add(decodebin) decodebin.sync_state_with_parent() self.webrtc.link(decodebin) def start_pipeline(self): self.pipe = Gst.parse_launch(PIPELINE_DESC) self.webrtc = self.pipe.get_by_name('sendrecv') self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) self.webrtc.connect('pad-added', self.on_incoming_stream) self.pipe.set_state(Gst.State.PLAYING) async def handle_sdp(self, message): assert (self.webrtc) msg = json.loads(message) if 'sdp' in msg: sdp = msg['sdp'] assert(sdp['type'] == 'answer') sdp = sdp['sdp'] print ('Received answer:\n%s' % sdp) res, sdpmsg = GstSdp.SDPMessage.new() GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) promise = Gst.Promise.new() self.webrtc.emit('set-remote-description', answer, promise) promise.interrupt() elif 'ice' in msg: ice = msg['ice'] candidate = ice['candidate'] sdpmlineindex = ice['sdpMLineIndex'] self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) async def loop(self): assert self.conn async for message in self.conn: if message == 'HELLO': await self.setup_call() elif message == 'SESSION_OK': self.start_pipeline() elif message.startswith('ERROR'): print (message) return 1 else: await self.handle_sdp(message) return 0 def check_plugins(): needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp", "rtpmanager", "videotestsrc", "audiotestsrc"] missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed)) if len(missing): print('Missing gstreamer plugins:', missing) return False return True if __name__=='__main__': Gst.init(None) if not check_plugins(): sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('peerid', help='String ID of the peer to connect to') parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"') args = parser.parse_args() our_id = random.randrange(10, 10000) c = WebRTCClient(our_id, args.peerid, args.server) asyncio.get_event_loop().run_until_complete(c.connect()) res = asyncio.get_event_loop().run_until_complete(c.loop()) sys.exit(res)