mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-11 01:45:33 +00:00
Add python Janus videoroom streaming example.
Added with permission and copyright @tobiasfriden and @saket424 on github. See https://github.com/centricular/gstwebrtc-demos/issues/66
This commit is contained in:
parent
666f982882
commit
3cabee61c7
1 changed files with 443 additions and 0 deletions
443
webrtc/janus/janusvideoroom.py
Normal file
443
webrtc/janus/janusvideoroom.py
Normal file
|
@ -0,0 +1,443 @@
|
|||
# Janus Videoroom example
|
||||
# Copyright @tobiasfriden and @saket424 on github
|
||||
# See https://github.com/centricular/gstwebrtc-demos/issues/66
|
||||
# Copyright Jan Schmidt <jan@centricular.com> 2020
|
||||
import random
|
||||
import ssl
|
||||
import websockets
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import argparse
|
||||
import string
|
||||
from websockets.exceptions import ConnectionClosed
|
||||
|
||||
import attr
|
||||
|
||||
@attr.s
|
||||
class JanusEvent:
|
||||
sender = attr.ib(validator=attr.validators.instance_of(int))
|
||||
|
||||
@attr.s
|
||||
class PluginData(JanusEvent):
|
||||
plugin = attr.ib(validator=attr.validators.instance_of(str))
|
||||
data = attr.ib()
|
||||
jsep = attr.ib()
|
||||
|
||||
@attr.s
|
||||
class WebrtcUp(JanusEvent):
|
||||
pass
|
||||
|
||||
@attr.s
|
||||
class Media(JanusEvent):
|
||||
receiving = attr.ib(validator=attr.validators.instance_of(bool))
|
||||
kind = attr.ib(validator=attr.validators.in_(["audio", "video"]))
|
||||
|
||||
@kind.validator
|
||||
def validate_kind(self, attribute, kind):
|
||||
if kind not in ["video", "audio"]:
|
||||
raise ValueError("kind must equal video or audio")
|
||||
|
||||
@attr.s
|
||||
class SlowLink(JanusEvent):
|
||||
uplink = attr.ib(validator=attr.validators.instance_of(bool))
|
||||
lost = attr.ib(validator=attr.validators.instance_of(int))
|
||||
|
||||
@attr.s
|
||||
class HangUp(JanusEvent):
|
||||
reason = attr.ib(validator=attr.validators.instance_of(str))
|
||||
|
||||
@attr.s(cmp=False)
|
||||
class Ack:
|
||||
transaction = attr.ib(validator=attr.validators.instance_of(str))
|
||||
|
||||
@attr.s
|
||||
class Jsep:
|
||||
sdp = attr.ib()
|
||||
type = attr.ib(validator=attr.validators.in_(["offer", "pranswer", "answer", "rollback"]))
|
||||
|
||||
|
||||
import gi
|
||||
gi.require_version('Gst', '1.0')
|
||||
from gi.repository import Gst
|
||||
gi.require_version('GstWebRTC', '1.0')
|
||||
from gi.repository import GstWebRTC
|
||||
gi.require_version('GstSdp', '1.0')
|
||||
from gi.repository import GstSdp
|
||||
|
||||
DO_VP8 = True
|
||||
|
||||
if DO_VP8:
|
||||
( encoder, payloader, rtp_encoding) = ( "vp8enc target-bitrate=500000", "rtpvp8pay", "VP8" )
|
||||
else:
|
||||
( encoder, payloader, rtp_encoding) = ( "x264enc", "rtph264pay", "H264" )
|
||||
|
||||
PIPELINE_DESC = '''
|
||||
webrtcbin name=sendrecv stun-server=stun://stun.l.google.com:19302
|
||||
videotestsrc pattern=ball ! video/x-raw,width=320,height=240 ! videoconvert ! queue !
|
||||
{} ! {} ! queue ! application/x-rtp,media=video,encoding-name={},payload=96 ! sendrecv.
|
||||
'''.format(encoder, payloader, rtp_encoding)
|
||||
|
||||
def transaction_id():
|
||||
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(8))
|
||||
|
||||
@attr.s
|
||||
class JanusGateway:
|
||||
server = attr.ib(validator=attr.validators.instance_of(str))
|
||||
#secure = attr.ib(default=True)
|
||||
_messages = attr.ib(factory=set)
|
||||
conn = None
|
||||
|
||||
async def connect(self):
|
||||
sslCon=None
|
||||
if self.server.startswith("wss"):
|
||||
sslCon=ssl.SSLContext()
|
||||
self.conn = await websockets.connect(self.server, subprotocols=['janus-protocol'], ssl=sslCon)
|
||||
transaction = transaction_id()
|
||||
await self.conn.send(json.dumps({
|
||||
"janus": "create",
|
||||
"transaction": transaction
|
||||
}))
|
||||
resp = await self.conn.recv()
|
||||
print (resp)
|
||||
parsed = json.loads(resp)
|
||||
assert parsed["janus"] == "success", "Failed creating session"
|
||||
assert parsed["transaction"] == transaction, "Incorrect transaction"
|
||||
self.session = parsed["data"]["id"]
|
||||
|
||||
async def close(self):
|
||||
if self.conn:
|
||||
await self.conn.close()
|
||||
|
||||
async def attach(self, plugin):
|
||||
assert hasattr(self, "session"), "Must connect before attaching to plugin"
|
||||
transaction = transaction_id()
|
||||
await self.conn.send(json.dumps({
|
||||
"janus": "attach",
|
||||
"session_id": self.session,
|
||||
"plugin": plugin,
|
||||
"transaction": transaction
|
||||
}))
|
||||
resp = await self.conn.recv()
|
||||
parsed = json.loads(resp)
|
||||
assert parsed["janus"] == "success", "Failed attaching to {}".format(plugin)
|
||||
assert parsed["transaction"] == transaction, "Incorrect transaction"
|
||||
self.handle = parsed["data"]["id"]
|
||||
|
||||
async def sendtrickle(self, candidate):
|
||||
assert hasattr(self, "session"), "Must connect before sending messages"
|
||||
assert hasattr(self, "handle"), "Must attach before sending messages"
|
||||
|
||||
transaction = transaction_id()
|
||||
janus_message = {
|
||||
"janus": "trickle",
|
||||
"session_id": self.session,
|
||||
"handle_id": self.handle,
|
||||
"transaction": transaction,
|
||||
"candidate": candidate
|
||||
}
|
||||
|
||||
await self.conn.send(json.dumps(janus_message))
|
||||
|
||||
#while True:
|
||||
# resp = await self._recv_and_parse()
|
||||
# if isinstance(resp, PluginData):
|
||||
# return resp
|
||||
# else:
|
||||
# self._messages.add(resp)
|
||||
#
|
||||
async def sendmessage(self, body, jsep=None):
|
||||
assert hasattr(self, "session"), "Must connect before sending messages"
|
||||
assert hasattr(self, "handle"), "Must attach before sending messages"
|
||||
|
||||
transaction = transaction_id()
|
||||
janus_message = {
|
||||
"janus": "message",
|
||||
"session_id": self.session,
|
||||
"handle_id": self.handle,
|
||||
"transaction": transaction,
|
||||
"body": body
|
||||
}
|
||||
if jsep is not None:
|
||||
janus_message["jsep"] = jsep
|
||||
|
||||
await self.conn.send(json.dumps(janus_message))
|
||||
|
||||
#while True:
|
||||
# resp = await self._recv_and_parse()
|
||||
# if isinstance(resp, PluginData):
|
||||
# if jsep is not None:
|
||||
# await client.handle_sdp(resp.jsep)
|
||||
# return resp
|
||||
# else:
|
||||
# self._messages.add(resp)
|
||||
|
||||
async def keepalive(self):
|
||||
assert hasattr(self, "session"), "Must connect before sending messages"
|
||||
assert hasattr(self, "handle"), "Must attach before sending messages"
|
||||
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(10)
|
||||
transaction = transaction_id()
|
||||
await self.conn.send(json.dumps({
|
||||
"janus": "keepalive",
|
||||
"session_id": self.session,
|
||||
"handle_id": self.handle,
|
||||
"transaction": transaction
|
||||
}))
|
||||
except KeyboardInterrupt:
|
||||
return
|
||||
|
||||
async def recv(self):
|
||||
if len(self._messages) > 0:
|
||||
return self._messages.pop()
|
||||
else:
|
||||
return await self._recv_and_parse()
|
||||
|
||||
async def _recv_and_parse(self):
|
||||
raw = json.loads(await self.conn.recv())
|
||||
janus = raw["janus"]
|
||||
|
||||
if janus == "event":
|
||||
return PluginData(
|
||||
sender=raw["sender"],
|
||||
plugin=raw["plugindata"]["plugin"],
|
||||
data=raw["plugindata"]["data"],
|
||||
jsep=raw["jsep"] if "jsep" in raw else None
|
||||
)
|
||||
elif janus == "webrtcup":
|
||||
return WebrtcUp(
|
||||
sender=raw["sender"]
|
||||
)
|
||||
elif janus == "media":
|
||||
return Media(
|
||||
sender=raw["sender"],
|
||||
receiving=raw["receiving"],
|
||||
kind=raw["type"]
|
||||
)
|
||||
elif janus == "slowlink":
|
||||
return SlowLink(
|
||||
sender=raw["sender"],
|
||||
uplink=raw["uplink"],
|
||||
lost=raw["lost"]
|
||||
)
|
||||
elif janus == "hangup":
|
||||
return HangUp(
|
||||
sender=raw["sender"],
|
||||
reason=raw["reason"]
|
||||
)
|
||||
elif janus == "ack":
|
||||
return Ack(
|
||||
transaction=raw["transaction"]
|
||||
)
|
||||
else:
|
||||
return raw
|
||||
|
||||
class WebRTCClient:
|
||||
def __init__(self, id_, peer_id, server, signaling):
|
||||
self.id_ = id_
|
||||
self.conn = None
|
||||
self.pipe = None
|
||||
self.webrtc = None
|
||||
self.peer_id = peer_id
|
||||
self.server = server or 'wss://127.0.0.1:8989'
|
||||
self.signaling = signaling
|
||||
self.request = None
|
||||
self.offermsg = None
|
||||
|
||||
def send_sdp_offer(self, offer):
|
||||
text = offer.sdp.as_text()
|
||||
print ('Sending offer:\n%s' % text)
|
||||
# configure media
|
||||
media = {'audio': True, 'video': True}
|
||||
request = {'request': 'publish'}
|
||||
request.update(media)
|
||||
self.request = request
|
||||
self.offermsg = { 'sdp': text, 'trickle': True, 'type': 'offer' }
|
||||
print (self.offermsg)
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.run_until_complete(self.signaling.sendmessage(self.request, self.offermsg))
|
||||
|
||||
def on_offer_created(self, promise, _, __):
|
||||
promise.wait()
|
||||
reply = promise.get_reply()
|
||||
offer = reply.get_value('offer')
|
||||
promise = Gst.Promise.new()
|
||||
self.webrtc.emit('set-local-description', offer, promise)
|
||||
promise.interrupt()
|
||||
self.send_sdp_offer(offer)
|
||||
|
||||
def on_negotiation_needed(self, element):
|
||||
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
|
||||
element.emit('create-offer', None, promise)
|
||||
|
||||
def send_ice_candidate_message(self, _, mlineindex, candidate):
|
||||
icemsg = {'candidate': candidate, 'sdpMLineIndex': mlineindex}
|
||||
print ("Sending ICE", icemsg)
|
||||
loop = asyncio.new_event_loop()
|
||||
loop.run_until_complete(self.signaling.sendtrickle(icemsg))
|
||||
|
||||
def on_incoming_decodebin_stream(self, _, pad):
|
||||
if not pad.has_current_caps():
|
||||
print (pad, 'has no caps, ignoring')
|
||||
return
|
||||
|
||||
caps = pad.get_current_caps()
|
||||
name = caps.to_string()
|
||||
if name.startswith('video'):
|
||||
q = Gst.ElementFactory.make('queue')
|
||||
conv = Gst.ElementFactory.make('videoconvert')
|
||||
sink = Gst.ElementFactory.make('autovideosink')
|
||||
self.pipe.add(q)
|
||||
self.pipe.add(conv)
|
||||
self.pipe.add(sink)
|
||||
self.pipe.sync_children_states()
|
||||
pad.link(q.get_static_pad('sink'))
|
||||
q.link(conv)
|
||||
conv.link(sink)
|
||||
elif name.startswith('audio'):
|
||||
q = Gst.ElementFactory.make('queue')
|
||||
conv = Gst.ElementFactory.make('audioconvert')
|
||||
resample = Gst.ElementFactory.make('audioresample')
|
||||
sink = Gst.ElementFactory.make('autoaudiosink')
|
||||
self.pipe.add(q)
|
||||
self.pipe.add(conv)
|
||||
self.pipe.add(resample)
|
||||
self.pipe.add(sink)
|
||||
self.pipe.sync_children_states()
|
||||
pad.link(q.get_static_pad('sink'))
|
||||
q.link(conv)
|
||||
conv.link(resample)
|
||||
resample.link(sink)
|
||||
|
||||
def on_incoming_stream(self, _, pad):
|
||||
if pad.direction != Gst.PadDirection.SRC:
|
||||
return
|
||||
|
||||
decodebin = Gst.ElementFactory.make('decodebin')
|
||||
decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
|
||||
self.pipe.add(decodebin)
|
||||
decodebin.sync_state_with_parent()
|
||||
self.webrtc.link(decodebin)
|
||||
|
||||
def start_pipeline(self):
|
||||
self.pipe = Gst.parse_launch(PIPELINE_DESC)
|
||||
self.webrtc = self.pipe.get_by_name('sendrecv')
|
||||
self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
|
||||
self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
|
||||
self.webrtc.connect('pad-added', self.on_incoming_stream)
|
||||
self.pipe.set_state(Gst.State.PLAYING)
|
||||
|
||||
def extract_ice_from_sdp(self, sdp):
|
||||
mlineindex = -1
|
||||
for line in sdp.splitlines():
|
||||
if line.startswith("a=candidate"):
|
||||
candidate = line[2:]
|
||||
if mlineindex < 0:
|
||||
print("Received ice candidate in SDP before any m= line")
|
||||
continue
|
||||
print ('Received remote ice-candidate mlineindex {}: {}'.format(mlineindex, candidate))
|
||||
self.webrtc.emit('add-ice-candidate', mlineindex, candidate)
|
||||
elif line.startswith("m="):
|
||||
mlineindex += 1
|
||||
|
||||
async def handle_sdp(self, msg):
|
||||
print (msg)
|
||||
if 'sdp' in msg:
|
||||
sdp = msg['sdp']
|
||||
assert(msg['type'] == 'answer')
|
||||
print ('Received answer:\n%s' % sdp)
|
||||
res, sdpmsg = GstSdp.SDPMessage.new()
|
||||
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
|
||||
|
||||
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
|
||||
promise = Gst.Promise.new()
|
||||
self.webrtc.emit('set-remote-description', answer, promise)
|
||||
promise.interrupt()
|
||||
|
||||
# Extract ICE candidates from the SDP to work around a GStreamer
|
||||
# limitation in (at least) 1.16.2 and below
|
||||
self.extract_ice_from_sdp (sdp)
|
||||
|
||||
elif 'ice' in msg:
|
||||
ice = msg['ice']
|
||||
candidate = ice['candidate']
|
||||
sdpmlineindex = ice['sdpMLineIndex']
|
||||
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
|
||||
|
||||
async def loop(self, signaling):
|
||||
await signaling.connect()
|
||||
await signaling.attach("janus.plugin.videoroom")
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.create_task(signaling.keepalive())
|
||||
#asyncio.create_task(self.keepalive())
|
||||
|
||||
joinmessage = { "request": "join", "ptype": "publisher", "room": 1234, "display": self.peer_id }
|
||||
await signaling.sendmessage(joinmessage)
|
||||
|
||||
assert signaling.conn
|
||||
self.start_pipeline()
|
||||
|
||||
while True:
|
||||
try:
|
||||
msg = await signaling.recv()
|
||||
if isinstance(msg, PluginData):
|
||||
if msg.jsep is not None:
|
||||
await self.handle_sdp(msg.jsep)
|
||||
elif isinstance(msg, Media):
|
||||
print (msg)
|
||||
elif isinstance(msg, WebrtcUp):
|
||||
print (msg)
|
||||
elif isinstance(msg, SlowLink):
|
||||
print (msg)
|
||||
elif isinstance(msg, HangUp):
|
||||
print (msg)
|
||||
elif not isinstance(msg, Ack):
|
||||
if 'candidate' in msg:
|
||||
ice = msg['candidate']
|
||||
print (ice)
|
||||
if 'candidate' in ice:
|
||||
candidate = ice['candidate']
|
||||
sdpmlineindex = ice['sdpMLineIndex']
|
||||
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
|
||||
print(msg)
|
||||
except (KeyboardInterrupt, ConnectionClosed):
|
||||
return
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def check_plugins():
|
||||
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
|
||||
"rtpmanager", "videotestsrc", "audiotestsrc"]
|
||||
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
|
||||
if len(missing):
|
||||
print('Missing gstreamer plugins:', missing)
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
if __name__=='__main__':
|
||||
Gst.init(None)
|
||||
if not check_plugins():
|
||||
sys.exit(1)
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('label', help='videoroom label')
|
||||
parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8989"')
|
||||
args = parser.parse_args()
|
||||
our_id = random.randrange(10, 10000)
|
||||
signaling = JanusGateway(args.server)
|
||||
c = WebRTCClient(our_id, args.label, args.server, signaling)
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
loop.run_until_complete(
|
||||
c.loop(signaling)
|
||||
)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
print("Interrupted, cleaning up")
|
||||
loop.run_until_complete(signaling.close())
|
Loading…
Reference in a new issue