mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2025-01-04 14:38:48 +00:00
453 lines
15 KiB
Python
453 lines
15 KiB
Python
# Janus Videoroom example
|
|
# Copyright @tobiasfriden and @saket424 on github
|
|
# See https://github.com/centricular/gstwebrtc-demos/issues/66
|
|
# Copyright Jan Schmidt <jan@centricular.com> 2020
|
|
import random
|
|
import ssl
|
|
import websockets
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import string
|
|
from websockets.exceptions import ConnectionClosed
|
|
|
|
import attr
|
|
|
|
# Set to False to send H.264
|
|
DO_VP8 = True
|
|
# Set to False to disable RTX (lost packet retransmission)
|
|
DO_RTX = True
|
|
# Choose the video source:
|
|
VIDEO_SRC="videotestsrc pattern=ball"
|
|
# VIDEO_SRC="v4l2src"
|
|
|
|
|
|
@attr.s
|
|
class JanusEvent:
|
|
sender = attr.ib(validator=attr.validators.instance_of(int))
|
|
|
|
@attr.s
|
|
class PluginData(JanusEvent):
|
|
plugin = attr.ib(validator=attr.validators.instance_of(str))
|
|
data = attr.ib()
|
|
jsep = attr.ib()
|
|
|
|
@attr.s
|
|
class WebrtcUp(JanusEvent):
|
|
pass
|
|
|
|
@attr.s
|
|
class Media(JanusEvent):
|
|
receiving = attr.ib(validator=attr.validators.instance_of(bool))
|
|
kind = attr.ib(validator=attr.validators.in_(["audio", "video"]))
|
|
|
|
@kind.validator
|
|
def validate_kind(self, attribute, kind):
|
|
if kind not in ["video", "audio"]:
|
|
raise ValueError("kind must equal video or audio")
|
|
|
|
@attr.s
|
|
class SlowLink(JanusEvent):
|
|
uplink = attr.ib(validator=attr.validators.instance_of(bool))
|
|
lost = attr.ib(validator=attr.validators.instance_of(int))
|
|
|
|
@attr.s
|
|
class HangUp(JanusEvent):
|
|
reason = attr.ib(validator=attr.validators.instance_of(str))
|
|
|
|
@attr.s(cmp=False)
|
|
class Ack:
|
|
transaction = attr.ib(validator=attr.validators.instance_of(str))
|
|
|
|
@attr.s
|
|
class Jsep:
|
|
sdp = attr.ib()
|
|
type = attr.ib(validator=attr.validators.in_(["offer", "pranswer", "answer", "rollback"]))
|
|
|
|
|
|
import gi
|
|
gi.require_version('Gst', '1.0')
|
|
from gi.repository import Gst
|
|
gi.require_version('GstWebRTC', '1.0')
|
|
from gi.repository import GstWebRTC
|
|
gi.require_version('GstSdp', '1.0')
|
|
from gi.repository import GstSdp
|
|
|
|
if DO_VP8:
|
|
( encoder, payloader, rtp_encoding) = ( "vp8enc target-bitrate=100000 overshoot=25 undershoot=100 deadline=33000 keyframe-max-dist=1", "rtpvp8pay picture-id-mode=2", "VP8" )
|
|
else:
|
|
( encoder, payloader, rtp_encoding) = ( "x264enc", "rtph264pay aggregate-mode=zero-latency", "H264" )
|
|
|
|
PIPELINE_DESC = '''
|
|
webrtcbin name=sendrecv stun-server=stun://stun.l.google.com:19302
|
|
{} ! video/x-raw,width=640,height=480 ! videoconvert ! queue !
|
|
{} ! {} ! queue ! application/x-rtp,media=video,encoding-name={},payload=96 ! sendrecv.
|
|
'''.format(VIDEO_SRC, encoder, payloader, rtp_encoding)
|
|
|
|
def transaction_id():
|
|
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(8))
|
|
|
|
@attr.s
|
|
class JanusGateway:
|
|
server = attr.ib(validator=attr.validators.instance_of(str))
|
|
#secure = attr.ib(default=True)
|
|
_messages = attr.ib(factory=set)
|
|
conn = None
|
|
|
|
async def connect(self):
|
|
sslCon=None
|
|
if self.server.startswith("wss"):
|
|
sslCon=ssl.SSLContext()
|
|
self.conn = await websockets.connect(self.server, subprotocols=['janus-protocol'], ssl=sslCon)
|
|
transaction = transaction_id()
|
|
await self.conn.send(json.dumps({
|
|
"janus": "create",
|
|
"transaction": transaction
|
|
}))
|
|
resp = await self.conn.recv()
|
|
print (resp)
|
|
parsed = json.loads(resp)
|
|
assert parsed["janus"] == "success", "Failed creating session"
|
|
assert parsed["transaction"] == transaction, "Incorrect transaction"
|
|
self.session = parsed["data"]["id"]
|
|
|
|
async def close(self):
|
|
if self.conn:
|
|
await self.conn.close()
|
|
|
|
async def attach(self, plugin):
|
|
assert hasattr(self, "session"), "Must connect before attaching to plugin"
|
|
transaction = transaction_id()
|
|
await self.conn.send(json.dumps({
|
|
"janus": "attach",
|
|
"session_id": self.session,
|
|
"plugin": plugin,
|
|
"transaction": transaction
|
|
}))
|
|
resp = await self.conn.recv()
|
|
parsed = json.loads(resp)
|
|
assert parsed["janus"] == "success", "Failed attaching to {}".format(plugin)
|
|
assert parsed["transaction"] == transaction, "Incorrect transaction"
|
|
self.handle = parsed["data"]["id"]
|
|
|
|
async def sendtrickle(self, candidate):
|
|
assert hasattr(self, "session"), "Must connect before sending messages"
|
|
assert hasattr(self, "handle"), "Must attach before sending messages"
|
|
|
|
transaction = transaction_id()
|
|
janus_message = {
|
|
"janus": "trickle",
|
|
"session_id": self.session,
|
|
"handle_id": self.handle,
|
|
"transaction": transaction,
|
|
"candidate": candidate
|
|
}
|
|
|
|
await self.conn.send(json.dumps(janus_message))
|
|
|
|
#while True:
|
|
# resp = await self._recv_and_parse()
|
|
# if isinstance(resp, PluginData):
|
|
# return resp
|
|
# else:
|
|
# self._messages.add(resp)
|
|
#
|
|
async def sendmessage(self, body, jsep=None):
|
|
assert hasattr(self, "session"), "Must connect before sending messages"
|
|
assert hasattr(self, "handle"), "Must attach before sending messages"
|
|
|
|
transaction = transaction_id()
|
|
janus_message = {
|
|
"janus": "message",
|
|
"session_id": self.session,
|
|
"handle_id": self.handle,
|
|
"transaction": transaction,
|
|
"body": body
|
|
}
|
|
if jsep is not None:
|
|
janus_message["jsep"] = jsep
|
|
|
|
await self.conn.send(json.dumps(janus_message))
|
|
|
|
#while True:
|
|
# resp = await self._recv_and_parse()
|
|
# if isinstance(resp, PluginData):
|
|
# if jsep is not None:
|
|
# await client.handle_sdp(resp.jsep)
|
|
# return resp
|
|
# else:
|
|
# self._messages.add(resp)
|
|
|
|
async def keepalive(self):
|
|
assert hasattr(self, "session"), "Must connect before sending messages"
|
|
assert hasattr(self, "handle"), "Must attach before sending messages"
|
|
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(10)
|
|
transaction = transaction_id()
|
|
await self.conn.send(json.dumps({
|
|
"janus": "keepalive",
|
|
"session_id": self.session,
|
|
"handle_id": self.handle,
|
|
"transaction": transaction
|
|
}))
|
|
except KeyboardInterrupt:
|
|
return
|
|
|
|
async def recv(self):
|
|
if len(self._messages) > 0:
|
|
return self._messages.pop()
|
|
else:
|
|
return await self._recv_and_parse()
|
|
|
|
async def _recv_and_parse(self):
|
|
raw = json.loads(await self.conn.recv())
|
|
janus = raw["janus"]
|
|
|
|
if janus == "event":
|
|
return PluginData(
|
|
sender=raw["sender"],
|
|
plugin=raw["plugindata"]["plugin"],
|
|
data=raw["plugindata"]["data"],
|
|
jsep=raw["jsep"] if "jsep" in raw else None
|
|
)
|
|
elif janus == "webrtcup":
|
|
return WebrtcUp(
|
|
sender=raw["sender"]
|
|
)
|
|
elif janus == "media":
|
|
return Media(
|
|
sender=raw["sender"],
|
|
receiving=raw["receiving"],
|
|
kind=raw["type"]
|
|
)
|
|
elif janus == "slowlink":
|
|
return SlowLink(
|
|
sender=raw["sender"],
|
|
uplink=raw["uplink"],
|
|
lost=raw["lost"]
|
|
)
|
|
elif janus == "hangup":
|
|
return HangUp(
|
|
sender=raw["sender"],
|
|
reason=raw["reason"]
|
|
)
|
|
elif janus == "ack":
|
|
return Ack(
|
|
transaction=raw["transaction"]
|
|
)
|
|
else:
|
|
return raw
|
|
|
|
class WebRTCClient:
|
|
def __init__(self, peer_id, server):
|
|
self.conn = None
|
|
self.pipe = None
|
|
self.webrtc = None
|
|
self.peer_id = peer_id
|
|
self.signaling = JanusGateway(server)
|
|
self.request = None
|
|
self.offermsg = None
|
|
|
|
def send_sdp_offer(self, offer):
|
|
text = offer.sdp.as_text()
|
|
print ('Sending offer:\n%s' % text)
|
|
# configure media
|
|
media = {'audio': True, 'video': True}
|
|
request = {'request': 'publish'}
|
|
request.update(media)
|
|
self.request = request
|
|
self.offermsg = { 'sdp': text, 'trickle': True, 'type': 'offer' }
|
|
print (self.offermsg)
|
|
loop = asyncio.new_event_loop()
|
|
loop.run_until_complete(self.signaling.sendmessage(self.request, self.offermsg))
|
|
|
|
def on_offer_created(self, promise, _, __):
|
|
promise.wait()
|
|
reply = promise.get_reply()
|
|
offer = reply.get_value('offer')
|
|
promise = Gst.Promise.new()
|
|
self.webrtc.emit('set-local-description', offer, promise)
|
|
promise.interrupt()
|
|
self.send_sdp_offer(offer)
|
|
|
|
def on_negotiation_needed(self, element):
|
|
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
|
|
element.emit('create-offer', None, promise)
|
|
|
|
def send_ice_candidate_message(self, _, mlineindex, candidate):
|
|
icemsg = {'candidate': candidate, 'sdpMLineIndex': mlineindex}
|
|
print ("Sending ICE", icemsg)
|
|
loop = asyncio.new_event_loop()
|
|
loop.run_until_complete(self.signaling.sendtrickle(icemsg))
|
|
|
|
def on_incoming_decodebin_stream(self, _, pad):
|
|
if not pad.has_current_caps():
|
|
print (pad, 'has no caps, ignoring')
|
|
return
|
|
|
|
caps = pad.get_current_caps()
|
|
name = caps.to_string()
|
|
if name.startswith('video'):
|
|
q = Gst.ElementFactory.make('queue')
|
|
conv = Gst.ElementFactory.make('videoconvert')
|
|
sink = Gst.ElementFactory.make('autovideosink')
|
|
self.pipe.add(q)
|
|
self.pipe.add(conv)
|
|
self.pipe.add(sink)
|
|
self.pipe.sync_children_states()
|
|
pad.link(q.get_static_pad('sink'))
|
|
q.link(conv)
|
|
conv.link(sink)
|
|
elif name.startswith('audio'):
|
|
q = Gst.ElementFactory.make('queue')
|
|
conv = Gst.ElementFactory.make('audioconvert')
|
|
resample = Gst.ElementFactory.make('audioresample')
|
|
sink = Gst.ElementFactory.make('autoaudiosink')
|
|
self.pipe.add(q)
|
|
self.pipe.add(conv)
|
|
self.pipe.add(resample)
|
|
self.pipe.add(sink)
|
|
self.pipe.sync_children_states()
|
|
pad.link(q.get_static_pad('sink'))
|
|
q.link(conv)
|
|
conv.link(resample)
|
|
resample.link(sink)
|
|
|
|
def on_incoming_stream(self, _, pad):
|
|
if pad.direction != Gst.PadDirection.SRC:
|
|
return
|
|
|
|
decodebin = Gst.ElementFactory.make('decodebin')
|
|
decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
|
|
self.pipe.add(decodebin)
|
|
decodebin.sync_state_with_parent()
|
|
self.webrtc.link(decodebin)
|
|
|
|
def start_pipeline(self):
|
|
self.pipe = Gst.parse_launch(PIPELINE_DESC)
|
|
self.webrtc = self.pipe.get_by_name('sendrecv')
|
|
self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
|
|
self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
|
|
self.webrtc.connect('pad-added', self.on_incoming_stream)
|
|
|
|
trans = self.webrtc.emit('get-transceiver', 0)
|
|
if DO_RTX:
|
|
trans.set_property ('do-nack', True)
|
|
self.pipe.set_state(Gst.State.PLAYING)
|
|
|
|
def extract_ice_from_sdp(self, sdp):
|
|
mlineindex = -1
|
|
for line in sdp.splitlines():
|
|
if line.startswith("a=candidate"):
|
|
candidate = line[2:]
|
|
if mlineindex < 0:
|
|
print("Received ice candidate in SDP before any m= line")
|
|
continue
|
|
print ('Received remote ice-candidate mlineindex {}: {}'.format(mlineindex, candidate))
|
|
self.webrtc.emit('add-ice-candidate', mlineindex, candidate)
|
|
elif line.startswith("m="):
|
|
mlineindex += 1
|
|
|
|
async def handle_sdp(self, msg):
|
|
print (msg)
|
|
if 'sdp' in msg:
|
|
sdp = msg['sdp']
|
|
assert(msg['type'] == 'answer')
|
|
print ('Received answer:\n%s' % sdp)
|
|
res, sdpmsg = GstSdp.SDPMessage.new()
|
|
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
|
|
|
|
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
|
|
promise = Gst.Promise.new()
|
|
self.webrtc.emit('set-remote-description', answer, promise)
|
|
promise.interrupt()
|
|
|
|
# Extract ICE candidates from the SDP to work around a GStreamer
|
|
# limitation in (at least) 1.16.2 and below
|
|
self.extract_ice_from_sdp (sdp)
|
|
|
|
elif 'ice' in msg:
|
|
ice = msg['ice']
|
|
candidate = ice['candidate']
|
|
sdpmlineindex = ice['sdpMLineIndex']
|
|
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
|
|
|
|
async def loop(self):
|
|
signaling = self.signaling
|
|
await signaling.connect()
|
|
await signaling.attach("janus.plugin.videoroom")
|
|
|
|
loop = asyncio.get_event_loop()
|
|
loop.create_task(signaling.keepalive())
|
|
#asyncio.create_task(self.keepalive())
|
|
|
|
joinmessage = { "request": "join", "ptype": "publisher", "room": 1234, "display": self.peer_id }
|
|
await signaling.sendmessage(joinmessage)
|
|
|
|
assert signaling.conn
|
|
self.start_pipeline()
|
|
|
|
while True:
|
|
try:
|
|
msg = await signaling.recv()
|
|
if isinstance(msg, PluginData):
|
|
if msg.jsep is not None:
|
|
await self.handle_sdp(msg.jsep)
|
|
elif isinstance(msg, Media):
|
|
print (msg)
|
|
elif isinstance(msg, WebrtcUp):
|
|
print (msg)
|
|
elif isinstance(msg, SlowLink):
|
|
print (msg)
|
|
elif isinstance(msg, HangUp):
|
|
print (msg)
|
|
elif not isinstance(msg, Ack):
|
|
if 'candidate' in msg:
|
|
ice = msg['candidate']
|
|
print (ice)
|
|
if 'candidate' in ice:
|
|
candidate = ice['candidate']
|
|
sdpmlineindex = ice['sdpMLineIndex']
|
|
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
|
|
print(msg)
|
|
except (KeyboardInterrupt, ConnectionClosed):
|
|
return
|
|
|
|
return 0
|
|
|
|
async def close(self):
|
|
return await self.signaling.close()
|
|
|
|
def check_plugins():
|
|
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
|
|
"rtpmanager", "videotestsrc", "audiotestsrc"]
|
|
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
|
|
if len(missing):
|
|
print('Missing gstreamer plugins:', missing)
|
|
return False
|
|
return True
|
|
|
|
|
|
if __name__=='__main__':
|
|
Gst.init(None)
|
|
if not check_plugins():
|
|
sys.exit(1)
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('label', help='videoroom label')
|
|
parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8989"')
|
|
args = parser.parse_args()
|
|
c = WebRTCClient(args.label, args.server)
|
|
loop = asyncio.get_event_loop()
|
|
try:
|
|
loop.run_until_complete(
|
|
c.loop()
|
|
)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
print("Interrupted, cleaning up")
|
|
loop.run_until_complete(c.close())
|