Implement the demo in C# with GStreamerSharp

Based on https://github.com/ttustonic/GStreamerSharpSamples from
Tomislav Tustonić <ttustonic@outlook.com>
This commit is contained in:
Thibault Saunier 2018-07-03 09:49:46 -04:00 committed by Matthew Waters
parent c5e5a7cfd3
commit 122c4106a4
6 changed files with 569 additions and 0 deletions

View file

@ -0,0 +1,309 @@
using System;
using static System.Diagnostics.Debug;
using Gst;
using WebSocketSharp;
using Gst.WebRTC;
using Newtonsoft.Json;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
using Gst.Sdp;
using System.Text;
using GLib;
namespace GstWebRTCDemo
{
class WebRtcClient : IDisposable
{
const string SERVER = "wss://127.0.0.1:8443";
const string PIPELINE_DESC = @"webrtcbin name=sendrecv
videotestsrc pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
audiotestsrc wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.";
readonly int _id;
readonly int _peerId;
readonly string _server;
readonly WebSocket _conn;
Pipeline pipe;
Element webrtc;
bool terminate;
public WebRtcClient(int id, int peerId, string server = SERVER)
{
_id = id;
_peerId = peerId;
_server = server;
_conn = new WebSocket(_server);
_conn.SslConfiguration.ServerCertificateValidationCallback = validatCert;
_conn.OnOpen += OnOpen;
_conn.OnError += OnError;
_conn.OnMessage += OnMessage;
_conn.OnClose += OnClose;
pipe = (Pipeline)Parse.Launch(PIPELINE_DESC);
}
bool validatCert(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
return true;
}
public void Connect()
{
_conn.ConnectAsync();
}
void SetupCall()
{
_conn.Send($"SESSION {_peerId}");
}
void OnClose(object sender, CloseEventArgs e)
{
Console.WriteLine("Closed: " + e.Reason);
terminate = true;
}
void OnError(object sender, ErrorEventArgs e)
{
Console.WriteLine("Error " + e.Message);
terminate = true;
}
void OnOpen(object sender, System.EventArgs e)
{
var ws = sender as WebSocket;
ws.SendAsync($"HELLO {_id}", (b) => Console.WriteLine($"Opened {b}"));
}
void OnMessage(object sender, MessageEventArgs args)
{
var msg = args.Data;
switch (msg)
{
case "HELLO":
SetupCall();
break;
case "SESSION_OK":
StartPipeline();
break;
default:
if (msg.StartsWith("ERROR")) {
Console.WriteLine(msg);
terminate = true;
} else {
HandleSdp(msg);
}
break;
}
}
void StartPipeline()
{
webrtc = pipe.GetByName("sendrecv");
Assert(webrtc != null);
webrtc.Connect("on-negotiation-needed", OnNegotiationNeeded);
webrtc.Connect("on-ice-candidate", OnIceCandidate);
webrtc.Connect("pad-added", OnIncomingStream);
pipe.SetState(State.Playing);
Console.WriteLine("Playing");
}
#region Webrtc signal handlers
#region Incoming stream
void OnIncomingStream(object o, GLib.SignalArgs args)
{
var pad = args.Args[0] as Pad;
if (pad.Direction != PadDirection.Src)
return;
var decodebin = ElementFactory.Make("decodebin");
decodebin.Connect("pad-added", OnIncomingDecodebinStream);
pipe.Add(decodebin);
decodebin.SyncStateWithParent();
webrtc.Link(decodebin);
}
void OnIncomingDecodebinStream(object o, SignalArgs args)
{
var pad = (Pad)args.Args[0];
if (!pad.HasCurrentCaps)
{
Console.WriteLine($"{pad.Name} has no caps, ignoring");
return;
}
var caps = pad.CurrentCaps;
Assert(!caps.IsEmpty);
Structure s = caps[0];
var name = s.Name;
if (name.StartsWith("video"))
{
var q = ElementFactory.Make("queue");
var conv = ElementFactory.Make("videoconvert");
var sink = ElementFactory.Make("autovideosink");
pipe.Add(q, conv, sink);
pipe.SyncChildrenStates();
pad.Link(q.GetStaticPad("sink"));
Element.Link(q, conv, sink);
}
else if (name.StartsWith("audio"))
{
var q = ElementFactory.Make("queue");
var conv = ElementFactory.Make("audioconvert");
var resample = ElementFactory.Make("audioresample");
var sink = ElementFactory.Make("autoaudiosink");
pipe.Add(q, conv, resample, sink);
pipe.SyncChildrenStates();
pad.Link(q.GetStaticPad("sink"));
Element.Link(q, conv, resample, sink);
}
}
#endregion
void OnIceCandidate(object o, GLib.SignalArgs args)
{
var index = (uint)args.Args[0];
var cand = (string)args.Args[1];
var obj = new { ice = new { sdpMLineIndex = index, candidate = cand } };
var iceMsg = JsonConvert.SerializeObject(obj);
_conn.SendAsync(iceMsg, (b) => { } );
}
void OnNegotiationNeeded(object o, GLib.SignalArgs args)
{
var webRtc = o as Element;
Assert(webRtc != null, "not a webrtc object");
Promise promise = new Promise(OnOfferCreated, webrtc.Handle, null); // webRtc.Handle, null);
Structure structure = new Structure("struct");
webrtc.Emit("create-offer", structure, promise);
}
void OnOfferCreated(Promise promise)
{
promise.Wait();
var reply = promise.RetrieveReply();
var gval = reply.GetValue("offer");
WebRTCSessionDescription offer = (WebRTCSessionDescription)gval.Val;
promise = new Promise();
webrtc.Emit("set-local-description", offer, promise);
promise.Interrupt();
SendSdpOffer(offer) ;
}
#endregion
void SendSdpOffer(WebRTCSessionDescription offer)
{
var text = offer.Sdp.AsText();
var obj = new { sdp = new { type = "offer", sdp = text } };
var json = JsonConvert.SerializeObject(obj);
Console.Write(json);
_conn.SendAsync(json, (b) => Console.WriteLine($"Send offer completed {b}"));
}
void HandleSdp(string message)
{
var msg = JsonConvert.DeserializeObject<dynamic>(message);
if (msg.sdp != null)
{
var sdp = msg.sdp;
if (sdp.type != null && sdp.type != "answer")
{
throw new Exception("Not an answer");
}
string sdpAns = sdp.sdp;
Console.WriteLine($"received answer:\n{sdpAns}");
SDPMessage.New(out SDPMessage sdpMsg);
SDPMessage.ParseBuffer(ASCIIEncoding.Default.GetBytes(sdpAns), (uint)sdpAns.Length, sdpMsg);
var answer = WebRTCSessionDescription.New(WebRTCSDPType.Answer, sdpMsg);
var promise = new Promise();
webrtc.Emit("set-remote-description", answer, promise);
}
else if (msg.ice != null)
{
var ice = msg.ice;
string candidate = ice.candidate;
uint sdpMLineIndex = ice.sdpMLineIndex;
webrtc.Emit("add-ice-candidate", sdpMLineIndex, candidate);
}
}
public void Run()
{
// Wait until error, EOS or State Change
var bus = pipe.Bus;
do {
var msg = bus.TimedPopFiltered (Gst.Constants.SECOND, MessageType.Error | MessageType.Eos | MessageType.StateChanged);
// Parse message
if (msg != null) {
switch (msg.Type) {
case MessageType.Error:
string debug;
GLib.GException exc;
msg.ParseError (out exc, out debug);
Console.WriteLine ("Error received from element {0}: {1}", msg.Src.Name, exc.Message);
Console.WriteLine ("Debugging information: {0}", debug != null ? debug : "none");
terminate = true;
break;
case MessageType.Eos:
Console.WriteLine ("End-Of-Stream reached.\n");
terminate = true;
break;
case MessageType.StateChanged:
// We are only interested in state-changed messages from the pipeline
if (msg.Src == pipe) {
State oldState, newState, pendingState;
msg.ParseStateChanged (out oldState, out newState, out pendingState);
Console.WriteLine ("Pipeline state changed from {0} to {1}:",
Element.StateGetName (oldState), Element.StateGetName (newState));
}
break;
default:
// We should not reach here because we only asked for ERRORs, EOS and STATE_CHANGED
Console.WriteLine ("Unexpected message received.");
break;
}
}
} while (!terminate);
}
public void Dispose()
{
((IDisposable)_conn).Dispose();
pipe.SetState(State.Null);
pipe.Dispose();
}
}
static class WebRtcSendRcv
{
const string SERVER = "wss://webrtc.nirbheek.in:8443";
static Random random = new Random();
public static void Main(string[] args)
{
// Initialize GStreamer
Gst.Application.Init (ref args);
if (args.Length == 0)
throw new Exception("need peerId");
int peerId = Int32.Parse(args[0]);
var server = (args.Length > 1) ? args[1] : SERVER;
var ourId = random.Next(100, 10000);
Console.WriteLine($"PeerId:{peerId} OurId:{ourId} ");
var c = new WebRtcClient(ourId, peerId, server);
c.Connect();
c.Run();
c.Dispose();
}
}
}

View file

@ -0,0 +1,36 @@
project('gstreamer-sharp', ['cs'], meson_version: '>=0.47.0', license: 'LGPL')
gstreamer_version = '1.14.0'
mono_path = ''
nuget = find_program('nuget.py')
dependencies = []
foreach dependency, version: { 'Newtonsoft.Json': '11.0.2', 'WebSocketSharp': '1.0.3-rc11'}
message('Getting @0@:@1@'.format(dependency, version))
get_dep= run_command(nuget, 'get',
'--builddir', dependency,
'--nuget-name', dependency,
'--nuget-version', version,
'--csharp-version=net45',
'--current-builddir', meson.current_build_dir(),
'--builddir', meson.build_root(),
)
if get_dep.returncode() != 0
error('Failed to get @0@-@1@: @2@'.format(dependency, version, get_dep.stderr()))
endif
link_args = get_dep.stdout().split()
dependencies += [declare_dependency(link_args: link_args, version: version)]
foreach path: get_dep.stdout().split()
mono_path += ':@0@'.format(join_paths(meson.build_root(), path.strip('-r:'), '..'))
endforeach
endforeach
# Use nugget once 1.16 is released.
dependencies += [dependency('gstreamer-sharp-1.0', fallback: ['gstreamer-sharp', 'gst_sharp_dep'])]
message('Execute with MONO_PATH=@0@:$MONO_PATH @1@/WebRTCSendRecv.exe'.format(mono_path, meson.current_build_dir()))
executable('WebRTCSendRecv', 'WebRTCSendRecv.cs',
cs_args: ['-unsafe'], dependencies: dependencies)

View file

@ -0,0 +1,211 @@
#!/usr/bin/python3
import argparse
import getpass
import os
import sys
import shutil
import subprocess
from datetime import datetime
from urllib.request import urlretrieve
from zipfile import ZipFile
NUSPEC_TEMPLATE = """<?xml version="1.0" encoding="utf-8"?>
<package xmlns="http://schemas.microsoft.com/packaging/2011/08/nuspec.xsd">
<metadata>
<id>{package_name}</id>
<authors>{author}</authors>
<owners>{owner}</owners>
<licenseUrl>{license_url}</licenseUrl>
<projectUrl>{project_url}</projectUrl>
<iconUrl>{icon_url}</iconUrl>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>{description}.</description>
<copyright>{copyright}</copyright>
<tags>{tags}</tags>
<version>{version}</version>
<dependencies>
{dependencies} </dependencies>
</metadata>
<files>
{files} </files>
</package>
"""
TARGETS_TEMPLATE = r"""<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Target Name="{package_name}CopyMapConfigs" AfterTargets="AfterBuild">
<CreateItem Include="$(MSBuildThisFileDirectory)\{frameworkdir}\*.config">
<Output TaskParameter="Include" ItemName="MapConfigs" />
</CreateItem>
<Copy SourceFiles="@(MapConfigs)" DestinationFiles="@(MapConfigs->'$(OutDir)\%(RecursiveDir)%(Filename)%(Extension)')" />
</Target>
</Project>"""
class Nugetifier:
def cleanup_args(self):
self.nugetdir = os.path.join(self.builddir,
self.package_name + 'nupkg')
self.frameworkdir = 'net45'
self.nuget_build_dir = os.path.join(
self.nugetdir, 'build', self.frameworkdir)
self.nuget_lib_dir = os.path.join(
self.nugetdir, 'lib', self.frameworkdir)
self.nuspecfile = os.path.join(
self.nugetdir, '%s.nuspec' % self.package_name)
self.nugettargets = os.path.join(
self.nuget_build_dir, "%s.targets" % self.package_name)
self.nuget = shutil.which('nuget')
if not self.nuget:
print("Could not find the `nuget` tool, install it and retry!")
return -1
for d in [self.nugetdir, self.nuget_lib_dir, self.nuget_build_dir]:
os.makedirs(d, exist_ok=True)
if not self.description:
self.description = "%s c# bindings" % self.package_name
if not self.copyright:
self.copyright = "Copyright %s" % datetime.now().year
if not self.tags:
self.tags = self.package_name
return 0
def run(self):
res = self.cleanup_args()
if res:
return res
self.files = ''
def add_file(path, target="lib"):
f = ' <file src="%s" target="%s"/>\n' % (
path, os.path.join(target, os.path.basename(path)))
self.files += f
self.dependencies = ''
for dependency in self.dependency:
_id, version = dependency.split(":")
self.dependencies += ' <dependency id="%s" version="%s" />\n' % (
_id, version)
for assembly in self.assembly:
add_file(assembly, os.path.join('lib', self.frameworkdir))
for f in [assembly + '.config', assembly[:-3] + 'pdb']:
if os.path.exists(f):
add_file(f, os.path.join('build', self.frameworkdir))
with open(self.nugettargets, 'w') as _:
print(TARGETS_TEMPLATE.format(**self.__dict__), file=_)
add_file(self.nugettargets, 'build')
with open(self.nuspecfile, 'w') as _:
print(NUSPEC_TEMPLATE.format(**self.__dict__), file=_)
subprocess.check_call([self.nuget, 'pack', self.nuspecfile],
cwd=self.builddir)
class NugetDownloader:
def reporthook(self, blocknum, blocksize, totalsize):
readsofar = blocknum * blocksize
if totalsize > 0:
percent = readsofar * 1e2 / totalsize
s = "\r%5.1f%% %*d / %d" % (
percent, len(str(totalsize)), readsofar, totalsize)
sys.stderr.write(s)
if readsofar >= totalsize: # near the end
sys.stderr.write("\n")
else: # total size is unknown
sys.stderr.write("read %d\n" % (readsofar,))
def run(self):
url = "https://www.nuget.org/api/v2/package/{nuget_name}/{nuget_version}".format(
**self.__dict__)
workdir = os.path.join(self.current_builddir,
self.nuget_name, self.nuget_version)
os.makedirs(workdir, exist_ok=True)
try:
with open(os.path.join(workdir, 'linkline'), 'r') as f:
print(f.read())
return
except FileNotFoundError:
pass
nugetpath = os.path.join(workdir, self.nuget_name) + '.zip'
print("Downloading %s into %s" % (url, nugetpath), file=sys.stderr)
urlretrieve(url, nugetpath, self.reporthook)
lib_paths = [os.path.join('lib', self.csharp_version), 'lib']
build_path = os.path.join('build', self.csharp_version)
dll_path = os.path.join(self.nuget_name, self.nuget_version)
extract_dir = os.path.join(self.current_builddir, dll_path)
os.makedirs(extract_dir, exist_ok=True)
linkline = ''
print("%s - %s" % (self.builddir, extract_dir), file=sys.stderr)
configs = []
dlldir = None
with ZipFile(nugetpath) as zip:
for lib_path in lib_paths:
for f in zip.infolist():
if f.filename.startswith(lib_path) or f.filename.startswith(build_path):
zip.extract(f, path=extract_dir)
if f.filename.endswith('.dll'):
fpath = os.path.relpath(os.path.join(extract_dir, f.filename), self.builddir)
linkline += ' -r:' + fpath
dlldir = os.path.dirname(os.path.join(extract_dir, f.filename))
elif f.filename.endswith('.dll.config'):
configs.append(os.path.join(extract_dir, f.filename))
if dlldir:
break
print(dlldir, file=sys.stderr)
for config in configs:
print(config, file=sys.stderr)
print(os.path.join(dlldir, os.path.basename(config)), file=sys.stderr)
os.rename(config, os.path.join(dlldir, os.path.basename(config)))
with open(os.path.join(workdir, 'linkline'), 'w') as f:
print(linkline.strip(), file=f)
print(linkline.strip())
if __name__ == "__main__":
if "get" not in sys.argv:
parser = argparse.ArgumentParser()
parser.add_argument('--builddir')
parser.add_argument('--package-name')
parser.add_argument('--author', default=getpass.getuser())
parser.add_argument('--owner', default=getpass.getuser())
parser.add_argument('--native', action='append', default=[])
parser.add_argument('--assembly', action='append', default=[])
parser.add_argument('--out')
parser.add_argument('--description')
parser.add_argument('--copyright')
parser.add_argument('--version')
parser.add_argument('--icon-url', default='')
parser.add_argument('--project-url', default='')
parser.add_argument('--license-url', default='')
parser.add_argument('--tags', default='')
parser.add_argument('--dependency', default=[], action='append')
runner = Nugetifier()
else:
sys.argv.remove('get')
parser = argparse.ArgumentParser()
parser.add_argument('--builddir')
parser.add_argument('--current-builddir')
parser.add_argument('--nuget-name')
parser.add_argument('--nuget-version')
parser.add_argument('--csharp-version')
runner = NugetDownloader()
options = parser.parse_args(namespace=runner)
exit(runner.run())

View file

@ -0,0 +1,4 @@
[wrap-git]
directory=bindinator
url=https://github.com/GLibSharp/bindinator.git
revision=master

View file

@ -0,0 +1,5 @@
[wrap-git]
directory=gstreamer-sharp
url=https://anongit.freedesktop.org/git/gstreamer/gstreamer-sharp.git
push-url=ssh://git.freedesktop.org/git/gstreamer/gstreamer-sharp
revision=master

View file

@ -0,0 +1,4 @@
[wrap-git]
directory=gtk-sharp
url=https://github.com/gtk-sharp/gtk-sharp.git
revision=master