gstreamer/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py

208 lines
7.2 KiB
Python
Raw Normal View History

2018-06-11 16:49:53 +00:00
import random
import ssl
import websockets
import asyncio
import os
import sys
import json
import argparse
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp
# Ensure that gst-python is installed
try:
from gi.overrides import Gst as _
except ImportError:
print('gstreamer-python binding overrides aren\'t available, please install them')
raise
# These properties all mirror the ones in webrtc-sendrecv.c, see there for explanations
2018-06-11 16:49:53 +00:00
PIPELINE_DESC = '''
webrtcbin name=sendrecv bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! \
vp8enc deadline=1 keyframe-max-dist=2000 ! rtpvp8pay !
queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
2018-06-11 16:49:53 +00:00
'''
from websockets.version import version as wsv
2018-06-11 16:49:53 +00:00
class WebRTCClient:
def __init__(self, loop, id_, peer_id, server):
self.event_loop = loop
2018-06-11 16:49:53 +00:00
self.id_ = id_
self.conn = None
self.pipe = None
self.webrtc = None
self.peer_id = peer_id
self.server = server
2018-06-11 16:49:53 +00:00
async def send(self, msg):
assert self.conn
print(f'>>> Sending {msg}')
await self.conn.send(msg)
2018-06-11 16:49:53 +00:00
async def connect(self):
webrtc_sendrecv.py: Fix SSLError when connecting to websocket server ``` File "/../gstreamer/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py", line 189, in <module> loop.run_until_complete(c.connect()) File "/usr/lib64/python3.10/asyncio/base_events.py", line 641, in run_until_complete return future.result() File "/../gstreamer/subprojects/gst-examples/webrtc/sendrecv/gst/webrtc_sendrecv.py", line 40, in connect self.conn = await websockets.connect(self.server, ssl=sslctx) File "/home/nirbheek/.local/lib/python3.10/site-packages/websockets/legacy/client.py", line 650, in __await_impl_timeout__ return await asyncio.wait_for(self.__await_impl__(), self.open_timeout) File "/usr/lib64/python3.10/asyncio/tasks.py", line 445, in wait_for return fut.result() File "/home/nirbheek/.local/lib/python3.10/site-packages/websockets/legacy/client.py", line 654, in __await_impl__ transport, protocol = await self._create_connection() File "/usr/lib64/python3.10/asyncio/base_events.py", line 1080, in create_connection transport, protocol = await self._create_connection_transport( File "/usr/lib64/python3.10/asyncio/base_events.py", line 1110, in _create_connection_transport await waiter File "/usr/lib64/python3.10/asyncio/sslproto.py", line 631, in _on_handshake_complete raise handshake_exc File "/usr/lib64/python3.10/asyncio/sslproto.py", line 676, in _process_write_backlog ssldata = self._sslpipe.do_handshake( File "/usr/lib64/python3.10/asyncio/sslproto.py", line 116, in do_handshake self._sslobj = self._context.wrap_bio( File "/usr/lib64/python3.10/ssl.py", line 526, in wrap_bio return self.sslobject_class._create( File "/usr/lib64/python3.10/ssl.py", line 865, in _create sslobj = context._wrap_bio( ssl.SSLError: Cannot create a client socket with a PROTOCOL_TLS_SERVER context (_ssl.c:801) ``` Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/1821>
2022-03-01 14:25:11 +00:00
self.conn = await websockets.connect(self.server)
await self.send('HELLO %d' % self.id_)
2018-06-11 16:49:53 +00:00
async def setup_call(self):
await self.send('SESSION {}'.format(self.peer_id))
def send_soon(self, msg):
asyncio.run_coroutine_threadsafe(self.send(msg), self.event_loop)
2018-06-11 16:49:53 +00:00
def send_sdp_offer(self, offer):
text = offer.sdp.as_text()
print('Sending offer:\n%s' % text)
2018-06-11 16:49:53 +00:00
msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
self.send_soon(msg)
2018-06-11 16:49:53 +00:00
def on_offer_created(self, promise, _, __):
promise.wait()
reply = promise.get_reply()
offer = reply['offer']
promise = Gst.Promise.new()
print('Offer created, setting local description')
2018-06-11 16:49:53 +00:00
self.webrtc.emit('set-local-description', offer, promise)
promise.interrupt()
self.send_sdp_offer(offer)
def on_negotiation_needed(self, element):
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
element.emit('create-offer', None, promise)
def send_ice_candidate_message(self, _, mlineindex, candidate):
icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
self.send_soon(icemsg)
2018-06-11 16:49:53 +00:00
def on_incoming_decodebin_stream(self, _, pad):
if not pad.has_current_caps():
print(pad, 'has no caps, ignoring')
2018-06-11 16:49:53 +00:00
return
caps = pad.get_current_caps()
assert (len(caps))
s = caps[0]
name = s.get_name()
if name.startswith('video'):
q = Gst.ElementFactory.make('queue')
conv = Gst.ElementFactory.make('videoconvert')
sink = Gst.ElementFactory.make('autovideosink')
self.pipe.add(q, conv, sink)
self.pipe.sync_children_states()
pad.link(q.get_static_pad('sink'))
q.link(conv)
conv.link(sink)
elif name.startswith('audio'):
q = Gst.ElementFactory.make('queue')
conv = Gst.ElementFactory.make('audioconvert')
resample = Gst.ElementFactory.make('audioresample')
sink = Gst.ElementFactory.make('autoaudiosink')
self.pipe.add(q, conv, resample, sink)
self.pipe.sync_children_states()
pad.link(q.get_static_pad('sink'))
q.link(conv)
conv.link(resample)
resample.link(sink)
def on_incoming_stream(self, _, pad):
if pad.direction != Gst.PadDirection.SRC:
return
decodebin = Gst.ElementFactory.make('decodebin')
decodebin.connect('pad-added', self.on_incoming_decodebin_stream)
self.pipe.add(decodebin)
decodebin.sync_state_with_parent()
self.webrtc.link(decodebin)
def start_pipeline(self):
self.pipe = Gst.parse_launch(PIPELINE_DESC)
self.webrtc = self.pipe.get_by_name('sendrecv')
self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
self.webrtc.connect('pad-added', self.on_incoming_stream)
self.pipe.set_state(Gst.State.PLAYING)
def handle_sdp(self, message):
2018-06-11 16:49:53 +00:00
assert (self.webrtc)
msg = json.loads(message)
if 'sdp' in msg:
sdp = msg['sdp']
assert(sdp['type'] == 'answer')
sdp = sdp['sdp']
print('Received answer:\n%s' % sdp)
res, sdpmsg = GstSdp.SDPMessage.new()
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
2018-06-11 16:49:53 +00:00
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
promise = Gst.Promise.new()
self.webrtc.emit('set-remote-description', answer, promise)
promise.interrupt()
elif 'ice' in msg:
ice = msg['ice']
candidate = ice['candidate']
sdpmlineindex = ice['sdpMLineIndex']
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
def close_pipeline(self):
if self.pipe:
self.pipe.set_state(Gst.State.NULL)
self.pipe = None
self.webrtc = None
2018-06-11 16:49:53 +00:00
async def loop(self):
assert self.conn
async for message in self.conn:
if message == 'HELLO':
await self.setup_call()
elif message == 'SESSION_OK':
self.start_pipeline()
elif message.startswith('ERROR'):
print(message)
self.close_pipeline()
2018-06-11 16:49:53 +00:00
return 1
else:
self.handle_sdp(message)
self.close_pipeline()
2018-06-11 16:49:53 +00:00
return 0
async def stop(self):
if self.conn:
await self.conn.close()
self.conn = None
2018-06-11 16:49:53 +00:00
def check_plugins():
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
"rtpmanager", "videotestsrc", "audiotestsrc"]
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
if len(missing):
print('Missing gstreamer plugins:', missing)
return False
return True
if __name__ == '__main__':
2018-06-11 16:49:53 +00:00
Gst.init(None)
if not check_plugins():
sys.exit(1)
2018-06-11 16:49:53 +00:00
parser = argparse.ArgumentParser()
parser.add_argument('peerid', help='String ID of the peer to connect to')
parser.add_argument('--server', default='wss://webrtc.nirbheek.in:8443',
help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
2018-06-11 16:49:53 +00:00
args = parser.parse_args()
our_id = random.randrange(10, 10000)
loop = asyncio.new_event_loop()
c = WebRTCClient(loop, our_id, args.peerid, args.server)
loop.run_until_complete(c.connect())
res = loop.run_until_complete(c.loop())
2018-06-11 16:49:53 +00:00
sys.exit(res)