diff --git a/webrtc/janus/janusvideoroom.py b/webrtc/janus/janusvideoroom.py new file mode 100644 index 0000000000..80ee47fef5 --- /dev/null +++ b/webrtc/janus/janusvideoroom.py @@ -0,0 +1,443 @@ +# Janus Videoroom example +# Copyright @tobiasfriden and @saket424 on github +# See https://github.com/centricular/gstwebrtc-demos/issues/66 +# Copyright Jan Schmidt 2020 +import random +import ssl +import websockets +import asyncio +import os +import sys +import json +import argparse +import string +from websockets.exceptions import ConnectionClosed + +import attr + +@attr.s +class JanusEvent: + sender = attr.ib(validator=attr.validators.instance_of(int)) + +@attr.s +class PluginData(JanusEvent): + plugin = attr.ib(validator=attr.validators.instance_of(str)) + data = attr.ib() + jsep = attr.ib() + +@attr.s +class WebrtcUp(JanusEvent): + pass + +@attr.s +class Media(JanusEvent): + receiving = attr.ib(validator=attr.validators.instance_of(bool)) + kind = attr.ib(validator=attr.validators.in_(["audio", "video"])) + + @kind.validator + def validate_kind(self, attribute, kind): + if kind not in ["video", "audio"]: + raise ValueError("kind must equal video or audio") + +@attr.s +class SlowLink(JanusEvent): + uplink = attr.ib(validator=attr.validators.instance_of(bool)) + lost = attr.ib(validator=attr.validators.instance_of(int)) + +@attr.s +class HangUp(JanusEvent): + reason = attr.ib(validator=attr.validators.instance_of(str)) + +@attr.s(cmp=False) +class Ack: + transaction = attr.ib(validator=attr.validators.instance_of(str)) + +@attr.s +class Jsep: + sdp = attr.ib() + type = attr.ib(validator=attr.validators.in_(["offer", "pranswer", "answer", "rollback"])) + + +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst +gi.require_version('GstWebRTC', '1.0') +from gi.repository import GstWebRTC +gi.require_version('GstSdp', '1.0') +from gi.repository import GstSdp + +DO_VP8 = True + +if DO_VP8: + ( encoder, payloader, rtp_encoding) = ( "vp8enc target-bitrate=500000", "rtpvp8pay", "VP8" ) +else: + ( encoder, payloader, rtp_encoding) = ( "x264enc", "rtph264pay", "H264" ) + +PIPELINE_DESC = ''' + webrtcbin name=sendrecv stun-server=stun://stun.l.google.com:19302 + videotestsrc pattern=ball ! video/x-raw,width=320,height=240 ! videoconvert ! queue ! + {} ! {} ! queue ! application/x-rtp,media=video,encoding-name={},payload=96 ! sendrecv. +'''.format(encoder, payloader, rtp_encoding) + +def transaction_id(): + return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(8)) + +@attr.s +class JanusGateway: + server = attr.ib(validator=attr.validators.instance_of(str)) + #secure = attr.ib(default=True) + _messages = attr.ib(factory=set) + conn = None + + async def connect(self): + sslCon=None + if self.server.startswith("wss"): + sslCon=ssl.SSLContext() + self.conn = await websockets.connect(self.server, subprotocols=['janus-protocol'], ssl=sslCon) + transaction = transaction_id() + await self.conn.send(json.dumps({ + "janus": "create", + "transaction": transaction + })) + resp = await self.conn.recv() + print (resp) + parsed = json.loads(resp) + assert parsed["janus"] == "success", "Failed creating session" + assert parsed["transaction"] == transaction, "Incorrect transaction" + self.session = parsed["data"]["id"] + + async def close(self): + if self.conn: + await self.conn.close() + + async def attach(self, plugin): + assert hasattr(self, "session"), "Must connect before attaching to plugin" + transaction = transaction_id() + await self.conn.send(json.dumps({ + "janus": "attach", + "session_id": self.session, + "plugin": plugin, + "transaction": transaction + })) + resp = await self.conn.recv() + parsed = json.loads(resp) + assert parsed["janus"] == "success", "Failed attaching to {}".format(plugin) + assert parsed["transaction"] == transaction, "Incorrect transaction" + self.handle = parsed["data"]["id"] + + async def sendtrickle(self, candidate): + assert hasattr(self, "session"), "Must connect before sending messages" + assert hasattr(self, "handle"), "Must attach before sending messages" + + transaction = transaction_id() + janus_message = { + "janus": "trickle", + "session_id": self.session, + "handle_id": self.handle, + "transaction": transaction, + "candidate": candidate + } + + await self.conn.send(json.dumps(janus_message)) + + #while True: + # resp = await self._recv_and_parse() + # if isinstance(resp, PluginData): + # return resp + # else: + # self._messages.add(resp) +# + async def sendmessage(self, body, jsep=None): + assert hasattr(self, "session"), "Must connect before sending messages" + assert hasattr(self, "handle"), "Must attach before sending messages" + + transaction = transaction_id() + janus_message = { + "janus": "message", + "session_id": self.session, + "handle_id": self.handle, + "transaction": transaction, + "body": body + } + if jsep is not None: + janus_message["jsep"] = jsep + + await self.conn.send(json.dumps(janus_message)) + + #while True: + # resp = await self._recv_and_parse() + # if isinstance(resp, PluginData): + # if jsep is not None: + # await client.handle_sdp(resp.jsep) + # return resp + # else: + # self._messages.add(resp) + + async def keepalive(self): + assert hasattr(self, "session"), "Must connect before sending messages" + assert hasattr(self, "handle"), "Must attach before sending messages" + + while True: + try: + await asyncio.sleep(10) + transaction = transaction_id() + await self.conn.send(json.dumps({ + "janus": "keepalive", + "session_id": self.session, + "handle_id": self.handle, + "transaction": transaction + })) + except KeyboardInterrupt: + return + + async def recv(self): + if len(self._messages) > 0: + return self._messages.pop() + else: + return await self._recv_and_parse() + + async def _recv_and_parse(self): + raw = json.loads(await self.conn.recv()) + janus = raw["janus"] + + if janus == "event": + return PluginData( + sender=raw["sender"], + plugin=raw["plugindata"]["plugin"], + data=raw["plugindata"]["data"], + jsep=raw["jsep"] if "jsep" in raw else None + ) + elif janus == "webrtcup": + return WebrtcUp( + sender=raw["sender"] + ) + elif janus == "media": + return Media( + sender=raw["sender"], + receiving=raw["receiving"], + kind=raw["type"] + ) + elif janus == "slowlink": + return SlowLink( + sender=raw["sender"], + uplink=raw["uplink"], + lost=raw["lost"] + ) + elif janus == "hangup": + return HangUp( + sender=raw["sender"], + reason=raw["reason"] + ) + elif janus == "ack": + return Ack( + transaction=raw["transaction"] + ) + else: + return raw + +class WebRTCClient: + def __init__(self, id_, peer_id, server, signaling): + self.id_ = id_ + self.conn = None + self.pipe = None + self.webrtc = None + self.peer_id = peer_id + self.server = server or 'wss://127.0.0.1:8989' + self.signaling = signaling + self.request = None + self.offermsg = None + + def send_sdp_offer(self, offer): + text = offer.sdp.as_text() + print ('Sending offer:\n%s' % text) + # configure media + media = {'audio': True, 'video': True} + request = {'request': 'publish'} + request.update(media) + self.request = request + self.offermsg = { 'sdp': text, 'trickle': True, 'type': 'offer' } + print (self.offermsg) + loop = asyncio.new_event_loop() + loop.run_until_complete(self.signaling.sendmessage(self.request, self.offermsg)) + + def on_offer_created(self, promise, _, __): + promise.wait() + reply = promise.get_reply() + offer = reply.get_value('offer') + promise = Gst.Promise.new() + self.webrtc.emit('set-local-description', offer, promise) + promise.interrupt() + self.send_sdp_offer(offer) + + def on_negotiation_needed(self, element): + promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None) + element.emit('create-offer', None, promise) + + def send_ice_candidate_message(self, _, mlineindex, candidate): + icemsg = {'candidate': candidate, 'sdpMLineIndex': mlineindex} + print ("Sending ICE", icemsg) + loop = asyncio.new_event_loop() + loop.run_until_complete(self.signaling.sendtrickle(icemsg)) + + def on_incoming_decodebin_stream(self, _, pad): + if not pad.has_current_caps(): + print (pad, 'has no caps, ignoring') + return + + caps = pad.get_current_caps() + name = caps.to_string() + if name.startswith('video'): + q = Gst.ElementFactory.make('queue') + conv = Gst.ElementFactory.make('videoconvert') + sink = Gst.ElementFactory.make('autovideosink') + self.pipe.add(q) + self.pipe.add(conv) + self.pipe.add(sink) + self.pipe.sync_children_states() + pad.link(q.get_static_pad('sink')) + q.link(conv) + conv.link(sink) + elif name.startswith('audio'): + q = Gst.ElementFactory.make('queue') + conv = Gst.ElementFactory.make('audioconvert') + resample = Gst.ElementFactory.make('audioresample') + sink = Gst.ElementFactory.make('autoaudiosink') + self.pipe.add(q) + self.pipe.add(conv) + self.pipe.add(resample) + self.pipe.add(sink) + self.pipe.sync_children_states() + pad.link(q.get_static_pad('sink')) + q.link(conv) + conv.link(resample) + resample.link(sink) + + def on_incoming_stream(self, _, pad): + if pad.direction != Gst.PadDirection.SRC: + return + + decodebin = Gst.ElementFactory.make('decodebin') + decodebin.connect('pad-added', self.on_incoming_decodebin_stream) + self.pipe.add(decodebin) + decodebin.sync_state_with_parent() + self.webrtc.link(decodebin) + + def start_pipeline(self): + self.pipe = Gst.parse_launch(PIPELINE_DESC) + self.webrtc = self.pipe.get_by_name('sendrecv') + self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed) + self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message) + self.webrtc.connect('pad-added', self.on_incoming_stream) + self.pipe.set_state(Gst.State.PLAYING) + + def extract_ice_from_sdp(self, sdp): + mlineindex = -1 + for line in sdp.splitlines(): + if line.startswith("a=candidate"): + candidate = line[2:] + if mlineindex < 0: + print("Received ice candidate in SDP before any m= line") + continue + print ('Received remote ice-candidate mlineindex {}: {}'.format(mlineindex, candidate)) + self.webrtc.emit('add-ice-candidate', mlineindex, candidate) + elif line.startswith("m="): + mlineindex += 1 + + async def handle_sdp(self, msg): + print (msg) + if 'sdp' in msg: + sdp = msg['sdp'] + assert(msg['type'] == 'answer') + print ('Received answer:\n%s' % sdp) + res, sdpmsg = GstSdp.SDPMessage.new() + GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg) + + answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg) + promise = Gst.Promise.new() + self.webrtc.emit('set-remote-description', answer, promise) + promise.interrupt() + + # Extract ICE candidates from the SDP to work around a GStreamer + # limitation in (at least) 1.16.2 and below + self.extract_ice_from_sdp (sdp) + + elif 'ice' in msg: + ice = msg['ice'] + candidate = ice['candidate'] + sdpmlineindex = ice['sdpMLineIndex'] + self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) + + async def loop(self, signaling): + await signaling.connect() + await signaling.attach("janus.plugin.videoroom") + + loop = asyncio.get_event_loop() + loop.create_task(signaling.keepalive()) + #asyncio.create_task(self.keepalive()) + + joinmessage = { "request": "join", "ptype": "publisher", "room": 1234, "display": self.peer_id } + await signaling.sendmessage(joinmessage) + + assert signaling.conn + self.start_pipeline() + + while True: + try: + msg = await signaling.recv() + if isinstance(msg, PluginData): + if msg.jsep is not None: + await self.handle_sdp(msg.jsep) + elif isinstance(msg, Media): + print (msg) + elif isinstance(msg, WebrtcUp): + print (msg) + elif isinstance(msg, SlowLink): + print (msg) + elif isinstance(msg, HangUp): + print (msg) + elif not isinstance(msg, Ack): + if 'candidate' in msg: + ice = msg['candidate'] + print (ice) + if 'candidate' in ice: + candidate = ice['candidate'] + sdpmlineindex = ice['sdpMLineIndex'] + self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate) + print(msg) + except (KeyboardInterrupt, ConnectionClosed): + return + + return 0 + + +def check_plugins(): + needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp", + "rtpmanager", "videotestsrc", "audiotestsrc"] + missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed)) + if len(missing): + print('Missing gstreamer plugins:', missing) + return False + return True + + +if __name__=='__main__': + Gst.init(None) + if not check_plugins(): + sys.exit(1) + parser = argparse.ArgumentParser() + parser.add_argument('label', help='videoroom label') + parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8989"') + args = parser.parse_args() + our_id = random.randrange(10, 10000) + signaling = JanusGateway(args.server) + c = WebRTCClient(our_id, args.label, args.server, signaling) + loop = asyncio.get_event_loop() + try: + loop.run_until_complete( + c.loop(signaling) + ) + except KeyboardInterrupt: + pass + finally: + print("Interrupted, cleaning up") + loop.run_until_complete(signaling.close())