Compare commits

...

65 commits

Author SHA1 Message Date
Sebastian Dröge 5f6ed181c2 Update version to 0.12.3 2024-03-21 18:53:12 +02:00
Sebastian Dröge 10d7324d90 Update CHANGELOG.md for 0.12.3 2024-03-21 18:47:31 +02:00
Sebastian Dröge 03166bcc35 Update Cargo.lock
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-21 13:30:20 +02:00
François Laignel fd154d272c webrtcsrc: add do-retransmission property
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-21 13:30:20 +02:00
Sebastian Dröge 77e91e96ff livesync: Ignore another racy test
Same problem as https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/328

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-21 13:30:20 +02:00
François Laignel ede354d7a5 webrtcsink: prevent video-info error log for audio streams
The following error is logged when `webrtcsink` is feeded with an audio stream:

> ERROR video-info video-info.c:540:gst_video_info_from_caps:
>       wrong name 'audio/x-raw', expected video/ or image/

This commit bypasses `VideoInfo::from_caps` for audio streams.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-21 13:30:20 +02:00
François Laignel 158fe80779 rtp: gccbwe: don't break downstream assumptions pushing buffer lists
Some elements in the RTP stack assume all buffers in a `gst::BufferList`
correspond to the same timestamp. See in [`rtpsession`] for instance.
This also had the effect that `rtpsession` did not create correct RTCP as it
only saw some of the SSRCs in the stream.

`rtpgccbwe` formed a packet group by gathering buffers in a `gst::BufferList`,
regardless of whether they corresponded to the same timestamp, which broke
synchronization under certain circonstances.

This commit makes `rtpgccbwe` push the buffers as they were received: one by one.

[`rtpsession`]: bc858976db/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpsession.c (L2462)

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-21 13:30:20 +02:00
Guillaume Desmottes a502dba6d5 webrtc: janus: handle 'hangup' messages from Janus
Fix error about this message not being handled:

{
   "janus": "hangup",
   "session_id": 4758817463851315,
   "sender": 4126342934227009,
   "reason": "Close PC"
}

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-21 13:30:20 +02:00
Guillaume Desmottes be055f6dfa webrtc: janus: handle 'destroyed' messages from Janus
Fix this error when the room is destroyed:

ERROR   webrtc-janusvr-signaller imp.rs:413:gstrswebrtc::janusvr_signaller:👿:Signaller::handle_msg:<GstJanusVRWebRTCSignallerU64@0x55b166a3fe40> Unknown message from server: {
   "janus": "event",
   "session_id": 6667171862739941,
   "sender": 1964690595468240,
   "plugindata": {
      "plugin": "janus.plugin.videoroom",
      "data": {
         "videoroom": "destroyed",
         "room": 8320333573294267
      }
   }
}

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-21 13:30:20 +02:00
Sebastian Dröge c982db73a7 rtp: Switch from chrono to time
Which allows to simplify quite a bit of code and avoids us having to
handle some API deprecations.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-21 13:30:20 +02:00
Sebastian Dröge 4def418b45 fmp4mux: Move away from deprecated chrono function
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Sebastian Dröge dab1ff7dd9 version-helper: Use non-deprecated type alias from toml_edit
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Sebastian Dröge c0970c6cf4 deny: Add override for heck 0.4
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Guillaume Desmottes 21f59c65da webrtc: allow resolution and framerate input changes
Some changes do not require a WebRTC renegotiation so we can allow
those.

Fix #515

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Mathieu Duponchelle 2374147b0b gstregex: add support for switches exposed by RegexBuilder
The builder allows for instance for switching off case-sensitiveness for
the entire pattern, instead of having to do so inline with `(?i)`.

All the options exposed by the builder at
<https://docs.rs/regex/latest/regex/struct.RegexBuilder.html> can now be
passed as fields of invidual commands, snake-cased.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Guillaume Desmottes 3f0c6e654b gtk4: scale texture position
Fix regression in 0.12 introduced by 3423d05f77

Code from Ivan Molodetskikh suggested on Matrix.

Fix #519

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Nirbheek Chauhan ce2aa0aecf meson: Disable docs completely when the option is disabled
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Guillaume Desmottes 6b7e41e551 threadshare: disable racy tests
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/250

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Guillaume Desmottes 40402f8e58 livesync: disable racy tests
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/328
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/357

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Guillaume Desmottes 14ed7b0db1 togglerecord: disable racy test_two_stream_close_open_nonlivein_liveout test
See https://gitlab.freedesktop.org/gdesmott/gst-plugins-rs/-/jobs/56183085

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Guillaume Desmottes 2d350252c0 spotify: document how to use with non Facebook accounts
See discussion on #203.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Seungha Yang 2cb76f2ff2 sccparse: Ignore invalid timecode during seek as well
sccparse holds last timecode in order to ignore invalid timecode
and fallback to the previous timecode. That should happen
when sccparse is handling seek event too. Otherwise single invalid
timecode before the target seek position will cause flow error.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Jordan Yelloz 7a90e96332 livekit_signaller: Added missing getter for excluded-producer-peer-ids
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Jordan Yelloz 283d1568b4 webrtcsrc: Removed incorrect URIHandler from LiveKit source
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Sebastian Dröge 12533a4c0d Remove empty line from the CHANGELOG.md that confuses the GitLab renderer
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Jordan Yelloz 51287705ce livekit_signaller: Improved shutdown behavior
Without sending a Leave request to the server before disconnecting, the
disconnected client will appear present and stuck in the room for a little
while until the server removes it due to inactivity.

After this change, the disconnecting client will immediately leave the room.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Jordan Yelloz e9edee131b webrtcsrc: Removed flag setup from WhipServerSrc
It's already done in the base class

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Jordan Yelloz feb01510f9 webrtcsrc: Updated readme for LiveKit source
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Jordan Yelloz 32e13f0a10 webrtcsrc: Added LiveKit source element
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Jordan Yelloz c8dcd50846 webrtcsink: Updated livekitwebrtcsink for new signaller constructor
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Jordan Yelloz 59ee2721bf livekit_signaller: Added dual-role support
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:24:55 +01:00
Guillaume Desmottes 133b527391 webrtc: janus: rename RoomId to JanusId
Those weird ids are used in multiple places, not only for the room id,
so best to have a more generic name.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:54:45 +02:00
Guillaume Desmottes 7f460c2db8 webrtc: janus: room id not optional in 'joined' message
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:54:17 +02:00
Guillaume Desmottes 8361471fcc webrtc: janus: remove 'audio' and 'video' from publish messages
Those are deprecated and no longer used.

See https://janus.conf.meetecho.com/docs/videoroom and
https://github.com/meetecho/janus-gateway/blob/master/src/plugins/janus_videoroom.c#L9894

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:54:02 +02:00
Guillaume Desmottes b9ea05a14a webrtc: janus: numerical room ids are u64
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:53:51 +02:00
Jordan Yelloz 048d51d9d9 webrtcsrc: Made producer-peer-id optional
It may be necessary for some signalling clients but the source element
doesn't need to depend on it.

Also, the value will fall back to the pad's MSID for the first argument
to the request-encoded-filter gobject signal when it isn't available
from the signalling client.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1508>
2024-03-20 16:53:22 +02:00
Sebastian Dröge 55b74c9a9a Update Cargo.lock 2024-02-26 14:56:22 +02:00
Sebastian Dröge c6841e1e74 Update versions to 0.12.2 2024-02-26 14:55:51 +02:00
Sebastian Dröge 3a50489dac Update CHANGELOG.md for 0.12.2 2024-02-26 14:55:29 +02:00
Sebastian Dröge 078c76c260 deny: Add zerocopy 0.6 duplicate override for librespot
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:26:36 +02:00
Sebastian Dröge 3ac010bfb6 Update Cargo.lock
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:25:33 +02:00
Xavier Claessens 7edf94f98b janusvr: Add string-ids property
It forces usage of strings even if it can be parsed into an integer.
This allows joining room `"133"` in a server configured with string
room ids.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:24:29 +02:00
Xavier Claessens ea59544c71 janusvr: Room IDs can be strings
Sponsored-by: Netflix Inc.
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:24:25 +02:00
Sebastian Dröge 0b96457395 fmp4mux: Update to dash-mpd 0.15
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:24:17 +02:00
Xavier Claessens 5888f5aa5f meson: Fix error when default_library=both
Skip duplicated plugin_name when we have both the static and shared
plugin in the plugins list.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:24:08 +02:00
Maksym Khomenko 4e86b0f3c8 webrtcsink: extensions: separate API and signal checks
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:24:02 +02:00
Maksym Khomenko 98411e97f1 webrtcsink: apply rustfmt
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:23:56 +02:00
Mathieu Duponchelle 8211c253a8 textwrap: don't split on all whitespaces ..
but only on ASCII whitespaces, as we want to honor non-breaking
whitespaces (\u{a0})

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:23:50 +02:00
Xavier Claessens 3a0f30be96 janusvr: Add secret-key property
Every API calls have an optional "apisecret" argument.

Sponsored-by: Netflix Inc.
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:23:45 +02:00
Sebastian Dröge de7e4806e5 deny: Add winnow 0.5 override
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:23:38 +02:00
Sebastian Dröge 27d806ae85 Remove Cargo.lock from .gitignore
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:21:00 +02:00
Sebastian Dröge 3679db5740 rtpgccbwe: Don't reset PTS/DTS to None
The element is usually placed before `rtpsession`, and `rtpsession`
needs the PTS/DTS for correctly determining the running time. The
running time is then used to produce correct RTCP SR, and to potentially
update an NTP-64 RTP header extension if existing on the packets.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/496

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1476>
2024-02-26 14:20:53 +02:00
Sebastian Dröge 2f2aac55a3 Update version to 0.12.1 2024-02-13 13:02:27 +02:00
Sebastian Dröge 31dfcd0a78 Update CHANGELOG.md for 0.12.1 2024-02-13 13:01:46 +02:00
Sebastian Dröge b3e233f0c5 Update Cargo.lock
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1460>
2024-02-13 12:37:23 +02:00
Sebastian Dröge 58a065caf3 textwrap: Remove unnecessary to_string() in debug output of a string
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1460>
2024-02-13 12:35:40 +02:00
Jordan Yelloz 606352d7cf webrtcsink: Added sinkpad with "msid" property
This forwards to the webrtcbin sinkpad's msid when specified.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1460>
2024-02-12 18:11:42 +02:00
Sebastian Dröge aa2d056ea1 Update to async-tungstenite 0.25
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1460>
2024-02-12 18:11:31 +02:00
Sebastian Dröge 3f9d5cf2f0 gtk4: Create a window if running from gst-launch-1.0 or GST_GTK4_WINDOW=1 is set
Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/482

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1460>
2024-02-12 18:11:25 +02:00
Sebastian Dröge 149eff08b7 utils: Update for renamed clippy lint in 1.76
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1460>
2024-02-12 18:11:19 +02:00
Sebastian Dröge c4e3fff2a2 Update Cargo.lock
Downgrade clap_derive to 4.4.7 to not require Rust 1.74 or newer.
2024-02-08 20:52:14 +02:00
Sebastian Dröge 16e001e3f2 Update dependency versions for gtk-rs-core / gtk4-rs / gstreamer-rs and local crates 2024-02-08 19:40:08 +02:00
Sebastian Dröge af694e8bc1 ci: Use 0.22 branch of gstreamer-rs images templates 2024-02-08 19:35:05 +02:00
Sebastian Dröge 66f2969eb9 Update Cargo.lock 2024-02-08 19:33:32 +02:00
Sebastian Dröge 50efdf6a64 Update version to 0.12.0 2024-02-08 19:33:09 +02:00
44 changed files with 1978 additions and 1018 deletions

1
.gitignore vendored
View file

@ -1,4 +1,3 @@
Cargo.lock
target
*~
*.bk

View file

@ -6,7 +6,7 @@ include:
file: '/templates/debian.yml'
- project: 'gstreamer/gstreamer-rs'
ref: main
ref: '0.22'
file: '/ci/images_template.yml'
- project: 'gstreamer/gstreamer'

View file

@ -5,6 +5,47 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html),
specifically the [variant used by Rust](http://doc.crates.io/manifest.html#the-version-field).
## [0.12.3] - 2024-03-21
### Fixed
- gtk4paintablesink: Fix scaling of texture position.
- janusvrwebrtcsink: Handle 64 bit numerical room ids.
- janusvrwebrtcsink: Don't include deprecated audio/video fields in publish
messages.
- janusvrwebrtcsink: Handle various other messages to avoid printing errors.
- livekitwebrtc: Fix shutdown behaviour.
- rtpgccbwe: Don't forward buffer lists with buffers from different SSRCs to
avoid breaking assumptions in rtpsession.
- sccparse: Ignore invalid timecodes during seeking.
- webrtcsink: Don't try parsing audio caps as video caps.
### Changed
- webrtc: Allow resolution and framerate changes.
- webrtcsrc: Make produce-peer-id optional.
### Added
- livekitwebrtcsrc: Add new LiveKit source element.
- regex: Add support for configuring regex behaviour.
- spotifyaudiosrc: Document how to use with non-Facebook accounts.
- webrtcsrc: Add `do-retransmission` property.
## [0.12.2] - 2024-02-26
### Fixed
- rtpgccbwe: Don't reset PTS/DTS to `None` as otherwise `rtpsession` won't be
able to generate valid RTCP.
- webrtcsink: Fix usage with 1.22.
### Added
- janusvrwebrtcsink: Add `secret-key` property.
- janusvrwebrtcsink: Allow for string room ids and add `string-ids` property.
- textwrap: Don't split on all whitespaces, especially not on non-breaking
whitespace.
## [0.12.1] - 2024-02-13
### Added
- gtk4: Create a window for testing purposes when running in `gst-launch-1.0`
or if `GST_GTK4_WINDOW=1` is set.
- webrtcsink: Add `msid` property.
## [0.12.0] - 2024-02-08
### Changed
- ndi: `ndisrc` passes received data downstream without an additional copy, if
@ -36,7 +77,6 @@ specifically the [variant used by Rust](http://doc.crates.io/manifest.html#the-v
- New `janusvrwebrtcsink` element for the Janus VideoRoom API.
- New `rtspsrc2` element.
- New `whipserversrc` element.
- gtk4: New `background-color` property for setting the color of the
background of the frame and the borders, if any.
- gtk4: New `scale-filter` property for defining how to scale the frames.
@ -344,7 +384,10 @@ specifically the [variant used by Rust](http://doc.crates.io/manifest.html#the-v
- webrtcsink: Make the `turn-server` property a `turn-servers` list
- webrtcsink: Move from async-std to tokio
[Unreleased]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/compare/0.12.0...HEAD
[Unreleased]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/compare/0.12.3...HEAD
[0.12.3]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/compare/0.12.2...0.12.3
[0.12.2]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/compare/0.12.1...0.12.2
[0.12.1]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/compare/0.12.0...0.12.1
[0.12.0]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/compare/0.11.3...0.12.0
[0.11.3]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/compare/0.11.2...0.11.3
[0.11.2]: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/compare/0.11.1...0.11.2

1100
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -113,36 +113,36 @@ panic = 'unwind'
opt-level = 1
[workspace.package]
version = "0.12.0-alpha.1"
version = "0.12.3"
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
edition = "2021"
rust-version = "1.70"
[workspace.dependencies]
once_cell = "1"
glib = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "master" }
gio = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "master" }
cairo-rs = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "master", features=["use_glib"] }
pango = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "master" }
pangocairo = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "master" }
gtk = { package = "gtk4", git = "https://github.com/gtk-rs/gtk4-rs", branch = "master"}
gdk-wayland = { package = "gdk4-wayland", git = "https://github.com/gtk-rs/gtk4-rs", branch = "master"}
gdk-x11 = { package = "gdk4-x11", git = "https://github.com/gtk-rs/gtk4-rs", branch = "master"}
gdk-win32 = { package = "gdk4-win32", git = "https://github.com/gtk-rs/gtk4-rs", branch = "master"}
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-gl = { package = "gstreamer-gl", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-gl-egl = { package = "gstreamer-gl-egl", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-gl-wayland = { package = "gstreamer-gl-wayland", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-gl-x11 = { package = "gstreamer-gl-x11", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-plugin-version-helper = { path="./version-helper" }
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-sdp = { package = "gstreamer-sdp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-utils = { package = "gstreamer-utils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
gst-webrtc = { package = "gstreamer-webrtc", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "main" }
glib = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "0.19", version = "0.19" }
gio = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "0.19", version = "0.19" }
cairo-rs = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "0.19", version = "0.19", features=["use_glib"] }
pango = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "0.19", version = "0.19" }
pangocairo = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "0.19", version = "0.19" }
gtk = { package = "gtk4", git = "https://github.com/gtk-rs/gtk4-rs", branch = "0.8", version = "0.8"}
gdk-wayland = { package = "gdk4-wayland", git = "https://github.com/gtk-rs/gtk4-rs", branch = "0.8", version = "0.8"}
gdk-x11 = { package = "gdk4-x11", git = "https://github.com/gtk-rs/gtk4-rs", branch = "0.8", version = "0.8"}
gdk-win32 = { package = "gdk4-win32", git = "https://github.com/gtk-rs/gtk4-rs", branch = "0.8", version = "0.8"}
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-gl = { package = "gstreamer-gl", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-gl-egl = { package = "gstreamer-gl-egl", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-gl-wayland = { package = "gstreamer-gl-wayland", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-gl-x11 = { package = "gstreamer-gl-x11", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-plugin-version-helper = { path="./version-helper", version = "0.8" }
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-sdp = { package = "gstreamer-sdp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-utils = { package = "gstreamer-utils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-video = { package = "gstreamer-video", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }
gst-webrtc = { package = "gstreamer-webrtc", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", branch = "0.22", version = "0.22" }

View file

@ -8,10 +8,11 @@ to respect their legal/licensing restrictions.
## Spotify Credentials
This plugin requires a [Spotify Premium](https://www.spotify.com/premium/) account configured
with a [device password](https://www.spotify.com/us/account/set-device-password/).
This plugin requires a [Spotify Premium](https://www.spotify.com/premium/) account.
If your account is linked with Facebook, you'll need to setup
a [device username and password](https://www.spotify.com/us/account/set-device-password/).
You can then set the device username and password using the `username` and `password` properties.
Those username and password are then set using the `username` and `password` properties.
You may also want to cache credentials and downloaded files, see the `cache-` properties on the element.

View file

@ -30,13 +30,13 @@ impl Settings {
pub fn properties() -> Vec<glib::ParamSpec> {
vec![glib::ParamSpecString::builder("username")
.nick("Username")
.blurb("Spotify device username from https://www.spotify.com/us/account/set-device-password/")
.blurb("Spotify username, Facebook accounts need a device username from https://www.spotify.com/us/account/set-device-password/")
.default_value(Some(""))
.mutable_ready()
.build(),
glib::ParamSpecString::builder("password")
.nick("Password")
.blurb("Spotify device password from https://www.spotify.com/us/account/set-device-password/")
.blurb("Spotify password, Facebook accounts need a device password from https://www.spotify.com/us/account/set-device-password/")
.default_value(Some(""))
.mutable_ready()
.build(),

View file

@ -70,6 +70,9 @@ version = "0.9"
[[bans.skip]]
name = "hmac"
version = "0.11"
[[bans.skip]]
name = "zerocopy"
version = "0.6"
# field-offset and nix depend on an older memoffset
# https://github.com/Diggsey/rust-field-offset/pull/23
@ -184,6 +187,14 @@ version = "0.2"
[[bans.skip]]
name = "toml_edit"
version = "0.21"
[[bans.skip]]
name = "winnow"
version = "0.5"
# Various crates depend on an older version of heck
[[bans.skip]]
name = "heck"
version = "0.4"
[sources]
unknown-registry = "deny"

View file

@ -1,5 +1,9 @@
build_hotdoc = false
if get_option('doc').disabled()
subdir_done()
endif
if meson.is_cross_build()
if get_option('doc').enabled()
error('Documentation enabled but building the doc while cross building is not supported yet.')

View file

@ -6704,12 +6704,14 @@
"audio_%%u": {
"caps": "audio/x-raw:\naudio/x-opus:\n",
"direction": "sink",
"presence": "request"
"presence": "request",
"type": "GstWebRTCSinkPad"
},
"video_%%u": {
"caps": "video/x-raw:\n\nvideo/x-raw(memory:CUDAMemory):\n\nvideo/x-raw(memory:GLMemory):\n\nvideo/x-raw(memory:NVMM):\n\nvideo/x-raw(memory:D3D11Memory):\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n",
"direction": "sink",
"presence": "request"
"presence": "request",
"type": "GstWebRTCSinkPad"
}
},
"rank": "none"
@ -6735,12 +6737,14 @@
"audio_%%u": {
"caps": "audio/x-raw:\naudio/x-opus:\n",
"direction": "sink",
"presence": "request"
"presence": "request",
"type": "GstWebRTCSinkPad"
},
"video_%%u": {
"caps": "video/x-raw:\n\nvideo/x-raw(memory:CUDAMemory):\n\nvideo/x-raw(memory:GLMemory):\n\nvideo/x-raw(memory:NVMM):\n\nvideo/x-raw(memory:D3D11Memory):\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n",
"direction": "sink",
"presence": "request"
"presence": "request",
"type": "GstWebRTCSinkPad"
}
},
"rank": "none"
@ -6766,12 +6770,46 @@
"audio_%%u": {
"caps": "audio/x-raw:\naudio/x-opus:\n",
"direction": "sink",
"presence": "request"
"presence": "request",
"type": "GstWebRTCSinkPad"
},
"video_%%u": {
"caps": "video/x-raw:\n\nvideo/x-raw(memory:CUDAMemory):\n\nvideo/x-raw(memory:GLMemory):\n\nvideo/x-raw(memory:NVMM):\n\nvideo/x-raw(memory:D3D11Memory):\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n",
"direction": "sink",
"presence": "request"
"presence": "request",
"type": "GstWebRTCSinkPad"
}
},
"rank": "none"
},
"livekitwebrtcsrc": {
"author": "Jordan Yelloz <jordan.yelloz@collabora.com>",
"description": "WebRTC source with LiveKit signaller",
"hierarchy": [
"GstLiveKitWebRTCSrc",
"GstBaseWebRTCSrc",
"GstBin",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy"
],
"klass": "Source/Network/WebRTC",
"pad-templates": {
"audio_%%u": {
"caps": "audio/x-raw(ANY):\napplication/x-rtp:\naudio/x-opus:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
},
"video_%%u": {
"caps": "video/x-raw(ANY):\napplication/x-rtp:\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n",
"direction": "src",
"presence": "sometimes",
"type": "GstWebRTCSrcPad"
}
},
"rank": "none"
@ -6797,12 +6835,14 @@
"audio_%%u": {
"caps": "audio/x-raw:\naudio/x-opus:\n",
"direction": "sink",
"presence": "request"
"presence": "request",
"type": "GstWebRTCSinkPad"
},
"video_%%u": {
"caps": "video/x-raw:\n\nvideo/x-raw(memory:CUDAMemory):\n\nvideo/x-raw(memory:GLMemory):\n\nvideo/x-raw(memory:NVMM):\n\nvideo/x-raw(memory:D3D11Memory):\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n",
"direction": "sink",
"presence": "request"
"presence": "request",
"type": "GstWebRTCSinkPad"
}
},
"rank": "none"
@ -6862,12 +6902,14 @@
"audio_%%u": {
"caps": "audio/x-raw:\naudio/x-opus:\n",
"direction": "sink",
"presence": "request"
"presence": "request",
"type": "GstWebRTCSinkPad"
},
"video_%%u": {
"caps": "video/x-raw:\n\nvideo/x-raw(memory:CUDAMemory):\n\nvideo/x-raw(memory:GLMemory):\n\nvideo/x-raw(memory:NVMM):\n\nvideo/x-raw(memory:D3D11Memory):\nvideo/x-vp8:\nvideo/x-h264:\nvideo/x-vp9:\nvideo/x-h265:\n",
"direction": "sink",
"presence": "request"
"presence": "request",
"type": "GstWebRTCSinkPad"
}
},
"rank": "none"
@ -7237,6 +7279,18 @@
"type": "GstValueArray",
"writable": true
},
"do-retransmission": {
"blurb": "Send retransmission events upstream when a packet is late",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "true",
"mutable": "ready",
"readable": true,
"type": "gboolean",
"writable": true
},
"enable-data-channel-navigation": {
"blurb": "Enable navigation events through a dedicated WebRTCDataChannel",
"conditionally-available": false,
@ -7590,6 +7644,32 @@
}
]
},
"GstWebRTCSinkPad": {
"hierarchy": [
"GstWebRTCSinkPad",
"GstGhostPad",
"GstProxyPad",
"GstPad",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"kind": "object",
"properties": {
"msid": {
"blurb": "Remote MediaStream ID in use for this pad",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "NULL",
"mutable": "ready",
"readable": true,
"type": "gchararray",
"writable": true
}
}
},
"GstWebRTCSrcPad": {
"hierarchy": [
"GstWebRTCSrcPad",
@ -7813,7 +7893,7 @@
"writable": true
},
"password": {
"blurb": "Spotify device password from https://www.spotify.com/us/account/set-device-password/",
"blurb": "Spotify password, Facebook accounts need a device password from https://www.spotify.com/us/account/set-device-password/",
"conditionally-available": false,
"construct": false,
"construct-only": false,
@ -7837,7 +7917,7 @@
"writable": true
},
"username": {
"blurb": "Spotify device username from https://www.spotify.com/us/account/set-device-password/",
"blurb": "Spotify username, Facebook accounts need a device username from https://www.spotify.com/us/account/set-device-password/",
"conditionally-available": false,
"construct": false,
"construct-only": false,

View file

@ -609,6 +609,8 @@ fn premature_shutdown() {
}
#[test]
// FIXME: racy: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/250
#[ignore]
fn socket_play_null_play() {
use gio::{
prelude::SocketExt, InetAddress, InetSocketAddress, SocketFamily, SocketProtocol,

View file

@ -76,6 +76,8 @@ fn test_client_management() {
}
#[test]
// FIXME: racy: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/250
#[ignore]
fn test_chain() {
init();

View file

@ -1,7 +1,7 @@
project('gst-plugins-rs',
'rust',
'c',
version: '0.12.0-alpha.1',
version: '0.12.2',
meson_version : '>= 1.1')
# dependencies.py needs a toml parsing module
@ -489,6 +489,16 @@ foreach plugin : plugins
plugin_name = plugin_name.substring(3)
endif
plugin_display_name = plugin_name
if plugin_name.startswith('gst')
plugin_display_name = plugin_name.substring(3)
endif
if plugin_display_name in plugin_names
# When default_library=both plugins are duplicated.
continue
endif
plugin_names += plugin_display_name
option_name = plugin_name.substring(3)
if option_name.startswith('rs')
option_name = option_name.substring(2)
@ -533,13 +543,7 @@ foreach plugin : plugins
warning('Static plugin @0@ is known to fail. It will not be included in libgstreamer-full.'.format(plugin_name))
else
gst_plugins += dep
pc_files += [plugin_name + '.pc']
if plugin_name.startswith('gst')
plugin_names += [plugin_name.substring(3)]
else
plugin_names += [plugin_name]
endif
endif
endforeach

View file

@ -26,8 +26,8 @@ path = "src/lib.rs"
gst-app = { workspace = true, features = ["v1_18"] }
gst-check = { workspace = true, features = ["v1_18"] }
m3u8-rs = "5.0"
chrono = "0.4"
dash-mpd = { version = "0.14", default-features = false }
chrono = "0.4.35"
dash-mpd = { version = "0.15", default-features = false }
quick-xml = { version = "0.31", features = ["serialize"] }
serde = "1"

View file

@ -182,8 +182,8 @@ fn main() -> Result<(), Error> {
let mut write_segment =
|start: gst::ClockTime, duration: gst::ClockTime, repeat: usize| {
let mut s = dash_mpd::S {
t: Some(start.mseconds() as i64),
d: duration.mseconds() as i64,
t: Some(start.mseconds()),
d: duration.mseconds(),
..Default::default()
};
if repeat > 0 {

View file

@ -153,7 +153,7 @@ fn trim_segments(state: &mut StreamState) {
// safe side
removal_time: segment
.date_time
.checked_add_signed(Duration::seconds(20))
.checked_add_signed(Duration::try_seconds(20).unwrap())
.unwrap(),
path: segment.path.clone(),
});

View file

@ -12,8 +12,9 @@ rust-version.workspace = true
bitstream-io = "2.0"
gst = { workspace = true, features = ["v1_20"] }
gst-rtp = { workspace = true, features = ["v1_20"]}
chrono = { version = "0.4", default-features = false }
time = { version = "0.3", default-features = false, features = ["std"] }
once_cell.workspace = true
smallvec = { version = "1.11", features = ["union", "write", "const_generics", "const_new"] }
[dev-dependencies]
gst-check = { workspace = true, features = ["v1_20"] }

View file

@ -17,19 +17,20 @@
* responsible for determining what bitrate to give to each encode)
*
*/
use chrono::Duration;
use gst::{glib, prelude::*, subclass::prelude::*};
use once_cell::sync::Lazy;
use smallvec::SmallVec;
use std::{
collections::{BTreeMap, VecDeque},
fmt,
fmt::Debug,
mem,
sync::Mutex,
time,
};
use time::Duration;
type Bitrate = u32;
type BufferList = SmallVec<[gst::Buffer; 10]>;
const DEFAULT_MIN_BITRATE: Bitrate = 1000;
const DEFAULT_ESTIMATED_BITRATE: Bitrate = 2_048_000;
@ -44,7 +45,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
// Table1. Time limit in milliseconds between packet bursts which identifies a group
static BURST_TIME: Lazy<Duration> = Lazy::new(|| Duration::milliseconds(5));
const BURST_TIME: Duration = Duration::milliseconds(5);
// Table1. Coefficient used for the measured noise variance
// [0.1,0.001]
@ -55,13 +56,13 @@ const ONE_MINUS_CHI: f64 = 1. - CHI;
const Q: f64 = 0.001;
// Table1. Initial value for the adaptive threshold
static INITIAL_DEL_VAR_TH: Lazy<Duration> = Lazy::new(|| Duration::microseconds(12500));
const INITIAL_DEL_VAR_TH: Duration = Duration::microseconds(12500);
// Table1. Initial value of the system error covariance
const INITIAL_ERROR_COVARIANCE: f64 = 0.1;
// Table1. Time required to trigger an overuse signal
static OVERUSE_TIME_TH: Lazy<Duration> = Lazy::new(|| Duration::milliseconds(10));
const OVERUSE_TIME_TH: Duration = Duration::milliseconds(10);
// from 5.5 "beta is typically chosen to be in the interval [0.8, 0.95], 0.85 is the RECOMMENDED value."
const BETA: f64 = 0.85;
@ -76,7 +77,7 @@ const MOVING_AVERAGE_SMOOTHING_FACTOR: f64 = 0.5;
// `N(i)` is the number of packets received the past T seconds and `L(j)` is
// the payload size of packet j. A window between 0.5 and 1 second is
// RECOMMENDED.
static PACKETS_RECEIVED_WINDOW: Lazy<Duration> = Lazy::new(|| Duration::milliseconds(1000)); // ms
const PACKETS_RECEIVED_WINDOW: Duration = Duration::milliseconds(1000); // ms
// from "5.4 Over-use detector" ->
// Moreover, del_var_th(i) SHOULD NOT be updated if this condition
@ -85,32 +86,32 @@ static PACKETS_RECEIVED_WINDOW: Lazy<Duration> = Lazy::new(|| Duration::millisec
// ```
// |m(i)| - del_var_th(i) > 15
// ```
static MAX_M_MINUS_DEL_VAR_TH: Lazy<Duration> = Lazy::new(|| Duration::milliseconds(15));
const MAX_M_MINUS_DEL_VAR_TH: Duration = Duration::milliseconds(15);
// from 5.4 "It is also RECOMMENDED to clamp del_var_th(i) to the range [6, 600]"
static MIN_THRESHOLD: Lazy<Duration> = Lazy::new(|| Duration::milliseconds(6));
static MAX_THRESHOLD: Lazy<Duration> = Lazy::new(|| Duration::milliseconds(600));
const MIN_THRESHOLD: Duration = Duration::milliseconds(6);
const MAX_THRESHOLD: Duration = Duration::milliseconds(600);
// From 5.5 ""Close" is defined as three standard deviations around this average"
const STANDARD_DEVIATION_CLOSE_NUM: f64 = 3.;
// Minimal duration between 2 updates on the lost based rate controller
static LOSS_UPDATE_INTERVAL: Lazy<time::Duration> = Lazy::new(|| time::Duration::from_millis(200));
static LOSS_DECREASE_THRESHOLD: f64 = 0.1;
static LOSS_INCREASE_THRESHOLD: f64 = 0.02;
static LOSS_INCREASE_FACTOR: f64 = 1.05;
const LOSS_UPDATE_INTERVAL: Duration = Duration::milliseconds(200);
const LOSS_DECREASE_THRESHOLD: f64 = 0.1;
const LOSS_INCREASE_THRESHOLD: f64 = 0.02;
const LOSS_INCREASE_FACTOR: f64 = 1.05;
// Minimal duration between 2 updates on the lost based rate controller
static DELAY_UPDATE_INTERVAL: Lazy<time::Duration> = Lazy::new(|| time::Duration::from_millis(100));
const DELAY_UPDATE_INTERVAL: Duration = Duration::milliseconds(100);
static ROUND_TRIP_TIME_WINDOW_SIZE: usize = 100;
const ROUND_TRIP_TIME_WINDOW_SIZE: usize = 100;
fn ts2dur(t: gst::ClockTime) -> Duration {
const fn ts2dur(t: gst::ClockTime) -> Duration {
Duration::nanoseconds(t.nseconds() as i64)
}
fn dur2ts(t: Duration) -> gst::ClockTime {
gst::ClockTime::from_nseconds(t.num_nanoseconds().unwrap() as u64)
const fn dur2ts(t: Duration) -> gst::ClockTime {
gst::ClockTime::from_nseconds(t.whole_nanoseconds() as u64)
}
#[derive(Debug)]
@ -118,7 +119,9 @@ enum BandwidthEstimationOp {
/// Don't update target bitrate
Hold,
/// Decrease target bitrate
#[allow(unused)]
Decrease(String /* reason */),
#[allow(unused)]
Increase(String /* reason */),
}
@ -160,7 +163,7 @@ impl Packet {
let seqnum = structure.get::<u32>("seqnum").unwrap() as u64;
if lost {
return Some(Packet {
arrival: Duration::zero(),
arrival: Duration::ZERO,
departure: ts2dur(departure),
size: structure.get::<u32>("size").unwrap() as usize,
seqnum,
@ -189,18 +192,12 @@ impl Default for PacketGroup {
fn default() -> Self {
Self {
packets: Default::default(),
departure: Duration::zero(),
departure: Duration::ZERO,
arrival: None,
}
}
}
fn pdur(d: &Duration) -> String {
let stdd = time::Duration::from_nanos(d.num_nanoseconds().unwrap().unsigned_abs());
format!("{}{stdd:?}", if d.lt(&Duration::zero()) { "-" } else { "" })
}
impl PacketGroup {
fn add(&mut self, packet: Packet) {
if self.departure.is_zero() {
@ -310,10 +307,10 @@ impl Debug for Detector {
"Network Usage: {:?}. Effective bitrate: {}ps - Measure: {} Estimate: {} threshold {} - overuse_estimate {}",
self.usage,
human_kbits(self.effective_bitrate()),
pdur(&self.measure),
pdur(&self.estimate),
pdur(&self.threshold),
pdur(&self.last_overuse_estimate),
self.measure,
self.estimate,
self.threshold,
self.last_overuse_estimate,
)
}
}
@ -323,7 +320,7 @@ impl Detector {
Detector {
group: Default::default(),
prev_group: Default::default(),
measure: Duration::zero(),
measure: Duration::ZERO,
/* Smallish value to hold PACKETS_RECEIVED_WINDOW packets */
last_received_packets: BTreeMap::new(),
@ -334,16 +331,16 @@ impl Detector {
gain: 0.,
measurement_uncertainty: 0.,
estimate_error: INITIAL_ERROR_COVARIANCE,
estimate: Duration::zero(),
estimate: Duration::ZERO,
threshold: *INITIAL_DEL_VAR_TH,
threshold: INITIAL_DEL_VAR_TH,
last_threshold_update: None,
num_deltas: 0,
last_use_detector_update: time::Instant::now(),
increasing_counter: 0,
last_overuse_estimate: Duration::zero(),
increasing_duration: Duration::zero(),
last_overuse_estimate: Duration::ZERO,
increasing_duration: Duration::ZERO,
rtts: Default::default(),
clock: gst::SystemClock::obtain(),
@ -371,7 +368,7 @@ impl Detector {
.unwrap()
.arrival;
while last_arrival - self.oldest_packet_in_window_ts() > *PACKETS_RECEIVED_WINDOW {
while last_arrival - self.oldest_packet_in_window_ts() > PACKETS_RECEIVED_WINDOW {
let oldest_seqnum = *self.last_received_packets.iter().next().unwrap().0;
self.last_received_packets.remove(&oldest_seqnum);
}
@ -398,9 +395,8 @@ impl Detector {
.sum::<f64>()
* 8.;
(bits
/ (duration.num_nanoseconds().unwrap() as f64
/ gst::ClockTime::SECOND.nseconds() as f64)) as Bitrate
(bits / (duration.whole_nanoseconds() as f64 / gst::ClockTime::SECOND.nseconds() as f64))
as Bitrate
}
fn oldest_packet_in_window_ts(&self) -> Duration {
@ -425,7 +421,7 @@ impl Detector {
(self
.rtts
.iter()
.map(|d| d.num_nanoseconds().unwrap() as f64)
.map(|d| d.whole_nanoseconds() as f64)
.sum::<f64>()
/ self.rtts.len() as f64) as i64,
)
@ -481,7 +477,7 @@ impl Detector {
}
if pkt.departure >= self.group.departure {
if self.group.inter_departure_time_pkt(pkt) < *BURST_TIME {
if self.group.inter_departure_time_pkt(pkt) < BURST_TIME {
self.group.add(*pkt);
continue;
}
@ -491,8 +487,8 @@ impl Detector {
// A Packet which has an inter-arrival time less than burst_time and
// an inter-group delay variation d(i) less than 0 is considered
// being part of the current group of packets.
if self.group.inter_arrival_time_pkt(pkt) < *BURST_TIME
&& self.group.inter_delay_variation_pkt(pkt) < Duration::zero()
if self.group.inter_arrival_time_pkt(pkt) < BURST_TIME
&& self.group.inter_delay_variation_pkt(pkt) < Duration::ZERO
{
self.group.add(*pkt);
continue;
@ -502,7 +498,7 @@ impl Detector {
gst::trace!(
CAT,
"Packet group done: {:?}",
gst::ClockTime::from_nseconds(group.departure.num_nanoseconds().unwrap() as u64)
gst::ClockTime::from_nseconds(group.departure.whole_nanoseconds() as u64)
);
if let Some(prev_group) = mem::replace(&mut self.prev_group, Some(group.clone())) {
// 5.3 Arrival-time filter
@ -514,7 +510,7 @@ impl Detector {
gst::debug!(
CAT,
"Ignoring packet departed at {:?} as we got feedback too late",
gst::ClockTime::from_nseconds(pkt.departure.num_nanoseconds().unwrap() as u64)
gst::ClockTime::from_nseconds(pkt.departure.whole_nanoseconds() as u64)
);
}
}
@ -527,10 +523,7 @@ impl Detector {
if let Some(ref last_update) = self.last_loss_update {
self.loss_average = loss_fraction
+ (-Duration::from_std(now - *last_update)
.unwrap()
.num_milliseconds() as f64)
.exp()
+ (-(now - *last_update).whole_milliseconds() as f64).exp()
* (self.loss_average - loss_fraction);
}
@ -541,7 +534,7 @@ impl Detector {
self.measure = group.inter_delay_variation(prev_group);
let z = self.measure - self.estimate;
let zms = z.num_microseconds().unwrap() as f64 / 1000.0;
let zms = z.whole_microseconds() as f64 / 1000.0;
// This doesn't exactly follows the spec as we should compute and
// use f_max here, no implementation we have found actually uses it.
@ -575,11 +568,11 @@ impl Detector {
}
let t = Duration::nanoseconds(
self.estimate.num_nanoseconds().unwrap() * i64::min(self.num_deltas, MAX_DELTAS),
self.estimate.whole_nanoseconds() as i64 * i64::min(self.num_deltas, MAX_DELTAS),
);
let usage = if t > self.threshold {
NetworkUsage::Over
} else if t.num_nanoseconds().unwrap() < -self.threshold.num_nanoseconds().unwrap() {
} else if t.whole_nanoseconds() < -self.threshold.whole_nanoseconds() {
NetworkUsage::Under
} else {
NetworkUsage::Normal
@ -593,15 +586,15 @@ impl Detector {
fn update_threshold(&mut self, estimate: &Duration) {
const K_U: f64 = 0.01; // Table1. Coefficient for the adaptive threshold
const K_D: f64 = 0.00018; // Table1. Coefficient for the adaptive threshold
const MAX_TIME_DELTA: time::Duration = time::Duration::from_millis(100);
const MAX_TIME_DELTA: Duration = Duration::milliseconds(100);
let now = time::Instant::now();
if self.last_threshold_update.is_none() {
self.last_threshold_update = Some(now);
}
let abs_estimate = Duration::nanoseconds(estimate.num_nanoseconds().unwrap().abs());
if abs_estimate > self.threshold + *MAX_M_MINUS_DEL_VAR_TH {
let abs_estimate = estimate.abs();
if abs_estimate > self.threshold + MAX_M_MINUS_DEL_VAR_TH {
self.last_threshold_update = Some(now);
return;
}
@ -611,14 +604,12 @@ impl Detector {
} else {
K_U
};
let time_delta =
Duration::from_std((now - self.last_threshold_update.unwrap()).min(MAX_TIME_DELTA))
.unwrap();
let time_delta = (now - self.last_threshold_update.unwrap()).min(MAX_TIME_DELTA);
let d = abs_estimate - self.threshold;
let add = k * d.num_milliseconds() as f64 * time_delta.num_milliseconds() as f64;
let add = k * d.whole_milliseconds() as f64 * time_delta.whole_milliseconds() as f64;
self.threshold += Duration::nanoseconds((add * 100. * 1_000.) as i64);
self.threshold = self.threshold.clamp(*MIN_THRESHOLD, *MAX_THRESHOLD);
self.threshold = self.threshold.clamp(MIN_THRESHOLD, MAX_THRESHOLD);
self.last_threshold_update = Some(now);
}
@ -626,22 +617,22 @@ impl Detector {
let (th_usage, estimate) = self.compare_threshold();
let now = time::Instant::now();
let delta = Duration::from_std(now - self.last_use_detector_update).unwrap();
let delta = now - self.last_use_detector_update;
self.last_use_detector_update = now;
gst::log!(
CAT,
"{:?} - self.estimate {} - estimate: {} - th: {}",
th_usage,
pdur(&self.estimate),
pdur(&estimate),
pdur(&self.threshold)
self.estimate,
estimate,
self.threshold
);
match th_usage {
NetworkUsage::Over => {
self.increasing_duration += delta;
self.increasing_counter += 1;
if self.increasing_duration > *OVERUSE_TIME_TH
if self.increasing_duration > OVERUSE_TIME_TH
&& self.increasing_counter > 1
&& estimate > self.last_overuse_estimate
{
@ -649,7 +640,7 @@ impl Detector {
}
}
NetworkUsage::Under | NetworkUsage::Normal => {
self.increasing_duration = Duration::zero();
self.increasing_duration = Duration::ZERO;
self.increasing_counter = 0;
self.usage = th_usage;
@ -759,10 +750,10 @@ impl Default for State {
impl State {
// 4. sending engine implementing a "leaky bucket"
fn create_buffer_list(&mut self, bwe: &super::BandwidthEstimator) -> gst::BufferList {
fn create_buffer_list(&mut self, bwe: &super::BandwidthEstimator) -> BufferList {
let now = time::Instant::now();
let elapsed = Duration::from_std(now - self.last_push).unwrap();
let mut budget = (elapsed.num_nanoseconds().unwrap())
let elapsed = now - self.last_push;
let mut budget = (elapsed.whole_nanoseconds() as i64)
.mul_div_round(
self.estimated_bitrate as i64,
gst::ClockTime::SECOND.nseconds() as i64,
@ -773,8 +764,8 @@ impl State {
let mut remaining = self.buffers.iter().map(|b| b.size() as f64).sum::<f64>() * 8.;
let total_size = remaining;
let mut list = gst::BufferList::new();
let mutlist = list.get_mut().unwrap();
let mut list_size = 0;
let mut list = BufferList::new();
// Leak the bucket so it can hold at most 30ms of data
let maximum_remaining_bits = 30. * self.estimated_bitrate as f64 / 1000.;
@ -784,7 +775,8 @@ impl State {
let n_bits = buf.size() * 8;
leaked = budget <= 0 && remaining > maximum_remaining_bits;
mutlist.add(buf);
list_size += buf.size();
list.push(buf);
budget -= n_bits as i64;
remaining -= n_bits as f64;
}
@ -793,11 +785,11 @@ impl State {
CAT,
obj: bwe,
"{} bitrate: {}ps budget: {}/{} sending: {} Remaining: {}/{}",
pdur(&elapsed),
elapsed,
human_kbits(self.estimated_bitrate),
human_kbits(budget as f64),
human_kbits(total_budget as f64),
human_kbits(list.calculate_size() as f64 * 8.),
human_kbits(list_size as f64 * 8.),
human_kbits(remaining),
human_kbits(total_size)
);
@ -815,11 +807,11 @@ impl State {
let time_since_last_update_ms = match self.last_increase_on_delay {
None => 0.,
Some(prev) => {
if now - prev < *DELAY_UPDATE_INTERVAL {
if now - prev < DELAY_UPDATE_INTERVAL {
return None;
}
(now - prev).as_millis() as f64
(now - prev).whole_milliseconds() as f64
}
};
@ -839,7 +831,7 @@ impl State {
let packets_per_frame = f64::ceil(bits_per_frame / (1200. * 8.));
let avg_packet_size_bits = bits_per_frame / packets_per_frame;
let rtt_ms = self.detector.rtt().num_milliseconds() as f64;
let rtt_ms = self.detector.rtt().whole_milliseconds() as f64;
let response_time_ms = 100. + rtt_ms;
let alpha = 0.5 * f64::min(time_since_last_update_ms / response_time_ms, 1.0);
let threshold_on_effective_bitrate = 1.5 * effective_bitrate as f64;
@ -959,7 +951,7 @@ impl State {
let now = time::Instant::now();
if loss_ratio > LOSS_DECREASE_THRESHOLD
&& (now - self.last_decrease_on_loss) > *LOSS_UPDATE_INTERVAL
&& (now - self.last_decrease_on_loss) > LOSS_UPDATE_INTERVAL
{
let factor = 1. - (0.5 * loss_ratio);
@ -973,7 +965,7 @@ impl State {
ControllerType::Loss,
)
} else if loss_ratio < LOSS_INCREASE_THRESHOLD
&& (now - self.last_increase_on_loss) > *LOSS_UPDATE_INTERVAL
&& (now - self.last_increase_on_loss) > LOSS_UPDATE_INTERVAL
{
self.last_control_op = BandwidthEstimationOp::Increase("Low loss".into());
self.last_increase_on_loss = now;
@ -1000,7 +992,7 @@ impl State {
},
NetworkUsage::Over => {
let now = time::Instant::now();
if now - self.last_decrease_on_delay > *DELAY_UPDATE_INTERVAL {
if now - self.last_decrease_on_delay > DELAY_UPDATE_INTERVAL {
let effective_bitrate = self.detector.effective_bitrate();
let target =
(self.estimated_bitrate as f64 * 0.95).min(BETA * effective_bitrate as f64);
@ -1037,8 +1029,14 @@ pub struct BandwidthEstimator {
}
impl BandwidthEstimator {
fn push_list(&self, list: gst::BufferList) -> Result<gst::FlowSuccess, gst::FlowError> {
let res = self.srcpad.push_list(list);
fn push_list(&self, list: BufferList) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut res = Ok(gst::FlowSuccess::Ok);
for buf in list {
res = self.srcpad.push(buf);
if res.is_err() {
break;
}
}
self.state.lock().unwrap().flow_return = res;
@ -1051,7 +1049,7 @@ impl BandwidthEstimator {
let clock = gst::SystemClock::obtain();
bwe.imp().state.lock().unwrap().clock_entry =
Some(clock.new_single_shot_id(clock.time().unwrap() + dur2ts(*BURST_TIME)));
Some(clock.new_single_shot_id(clock.time().unwrap() + dur2ts(BURST_TIME)));
self.srcpad.start_task(move || {
let pause = || {
@ -1088,10 +1086,7 @@ impl BandwidthEstimator {
let list = {
let mut state = lock_state();
clock
.single_shot_id_reinit(
&clock_entry,
clock.time().unwrap() + dur2ts(*BURST_TIME),
)
.single_shot_id_reinit(&clock_entry, clock.time().unwrap() + dur2ts(BURST_TIME))
.unwrap();
state.clock_entry = Some(clock_entry);
state.create_buffer_list(&bwe)
@ -1146,15 +1141,12 @@ impl ObjectSubclass for BandwidthEstimator {
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_from_template(&templ)
.chain_function(|_pad, parent, mut buffer| {
.chain_function(|_pad, parent, buffer| {
BandwidthEstimator::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| {
let mut state = this.state.lock().unwrap();
let mutbuf = buffer.make_mut();
mutbuf.set_pts(None);
mutbuf.set_dts(None);
state.buffers.push_front(buffer);
state.flow_return

View file

@ -25,11 +25,11 @@ futures = "0.3"
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread", "time"] }
tokio-native-tls = "0.3.0"
tokio-stream = "0.1.11"
async-tungstenite = { version = "0.24", features = ["tokio-runtime", "tokio-native-tls"] }
async-tungstenite = { version = "0.25", features = ["tokio-runtime", "tokio-native-tls"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
fastrand = "2.0"
gst_plugin_webrtc_protocol = { path="protocol", package = "gst-plugin-webrtc-signalling-protocol" }
gst_plugin_webrtc_protocol = { path="protocol", package = "gst-plugin-webrtc-signalling-protocol", version = "0.12" }
human_bytes = "0.4"
url = "2"

View file

@ -350,6 +350,104 @@ gst-launch-1.0 -e uridecodebin uri=file:///home/meh/path/to/video/file ! \
You should see a second video displayed in the videoroomtest web page.
## Streaming from LiveKit using the livekitwebrtcsrc element
First, publish a stream to the room using the following command:
```shell
gst-launch-1.0 livekitwebrtcsink name=sink \
signaller::ws-url=ws://127.0.0.1:7880 \
signaller::api-key=devkey \
signaller::secret-key=secret \
signaller::room-name=testroom \
signaller::identity=gst-producer \
signaller::participant-name=gst-producer \
video-caps='video/x-h264' \
videotestsrc is-live=1 \
! video/x-raw,width=640,height=360,framerate=15/1 \
! timeoverlay ! videoconvert ! queue ! sink.
```
Then play back the published stream:
```shell
gst-launch-1.0 livekitwebrtcsrc \
name=src \
signaller::ws-url=ws://127.0.0.1:7880 \
signaller::api-key=devkey \
signaller::secret-key=secret \
signaller::room-name=testroom \
signaller::identity=gst-consumer \
signaller::participant-name=gst-consumer \
signaller::producer-peer-id=gst-producer \
video-codecs='<H264>' \
src. ! queue ! videoconvert ! autovideosink
```
### Auto-subscribe with livekitwebrtcsrc element
With the LiveKit source element, you can also subscribe to all the peers in
your room, simply by not specifying any value for
`signaller::producer-peer-id`. Unwanted peers can also be ignored by supplying
an array of peer IDs to `signaller::excluded-producer-peer-ids`. Importantly,
it is also necessary to add sinks for all the streams in the room that the
source element has subscribed to.
First, publish a few streams using different connections:
```shell
gst-launch-1.0 \
livekitwebrtcsink name=sinka \
signaller::ws-url=ws://127.0.0.1:7880 \
signaller::api-key=devkey \
signaller::secret-key=secret \
signaller::room-name=testroom \
signaller::identity=gst-producer-a \
signaller::participant-name=gst-producer-a \
video-caps='video/x-vp8' \
livekitwebrtcsink name=sinkb \
signaller::ws-url=ws://127.0.0.1:7880 \
signaller::api-key=devkey \
signaller::secret-key=secret \
signaller::room-name=testroom \
signaller::identity=gst-producer-b \
signaller::participant-name=gst-producer-b \
video-caps='video/x-vp8' \
livekitwebrtcsink name=sinkc \
signaller::ws-url=ws://127.0.0.1:7880 \
signaller::api-key=devkey \
signaller::secret-key=secret \
signaller::room-name=testroom \
signaller::identity=gst-producer-c \
signaller::participant-name=gst-producer-c \
video-caps='video/x-vp8' \
videotestsrc is-live=1 \
! video/x-raw,width=640,height=360,framerate=15/1 \
! timeoverlay ! videoconvert ! queue ! sinka. \
videotestsrc pattern=ball is-live=1 \
! video/x-raw,width=320,height=180,framerate=15/1 \
! timeoverlay ! videoconvert ! queue ! sinkb.
videotestsrc is-live=1 \
! video/x-raw,width=320,height=180,framerate=15/1 \
! timeoverlay ! videoconvert ! queue ! sinkc.
```
Then watch only streams A and B by excluding peer C:
```shell
gst-launch-1.0 livekitwebrtcsrc \
name=src \
signaller::ws-url=ws://127.0.0.1:7880 \
signaller::api-key=devkey \
signaller::secret-key=secret \
signaller::room-name=testroom \
signaller::identity=gst-consumer \
signaller::participant-name=gst-consumer \
signaller::excluded-producer-peer-ids='<gst-producer-c>' \
src. ! queue ! videoconvert ! autovideosink
src. ! queue ! videoconvert ! autovideosink
```
[LiveKit]: https://livekit.io/
[janus]: https://github.com/meetecho/janus-gateway
[simple whip server]: https://github.com/meetecho/simple-whip-server/

View file

@ -12,7 +12,7 @@ rust-version.workspace = true
anyhow = "1"
tokio = { version = "1", features = ["fs", "io-util", "macros", "rt-multi-thread", "time"] }
tokio-native-tls = "0.3.0"
async-tungstenite = { version = "0.24", features = ["tokio-runtime", "tokio-native-tls"] }
async-tungstenite = { version = "0.25", features = ["tokio-runtime", "tokio-native-tls"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
clap = { version = "4", features = ["derive"] }
@ -24,7 +24,7 @@ uuid = { version = "1", features = ["v4"] }
thiserror = "1"
test-log = { version = "0.2", features = ["trace"], default-features = false }
pin-project-lite = "0.2"
gst_plugin_webrtc_protocol = { path="../protocol", package = "gst-plugin-webrtc-signalling-protocol" }
gst_plugin_webrtc_protocol = { path="../protocol", package = "gst-plugin-webrtc-signalling-protocol", version = "0.12" }
[[bin]]
name = "gst-webrtc-signalling-server"

View file

@ -42,17 +42,37 @@ fn feed_id() -> u32 {
thread_rng().gen()
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
#[serde(untagged)]
/// Ids are either u64 (default) or string in Janus, depending of the
/// `string_ids` configuration in the videoroom plugin config file.
enum JanusId {
Str(String),
Num(u64),
}
impl std::fmt::Display for JanusId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
JanusId::Str(s) => write!(f, "{s}"),
JanusId::Num(n) => write!(f, "{n}"),
}
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct KeepAliveMsg {
janus: String,
transaction: String,
session_id: u64,
apisecret: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct CreateSessionMsg {
janus: String,
transaction: String,
apisecret: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
@ -61,14 +81,15 @@ struct AttachPluginMsg {
transaction: String,
plugin: String,
session_id: u64,
apisecret: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct RoomRequestBody {
request: String,
ptype: String,
room: u64,
id: u32,
room: JanusId,
id: JanusId,
#[serde(skip_serializing_if = "Option::is_none")]
display: Option<String>,
}
@ -79,14 +100,13 @@ struct RoomRequestMsg {
transaction: String,
session_id: u64,
handle_id: u64,
apisecret: Option<String>,
body: RoomRequestBody,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
struct PublishBody {
request: String,
audio: bool,
video: bool,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
@ -102,6 +122,7 @@ struct PublishMsg {
transaction: String,
session_id: u64,
handle_id: u64,
apisecret: Option<String>,
body: PublishBody,
jsep: Jsep,
}
@ -119,6 +140,7 @@ struct TrickleMsg {
transaction: String,
session_id: u64,
handle_id: u64,
apisecret: Option<String>,
candidate: Candidate,
}
@ -139,25 +161,39 @@ struct InnerError {
reason: String,
}
#[derive(Serialize, Deserialize, Debug)]
struct InnerHangup {
session_id: JanusId,
sender: JanusId,
reason: String,
}
#[derive(Serialize, Deserialize, Debug)]
struct RoomJoined {
room: Option<u64>,
room: JanusId,
}
#[derive(Serialize, Deserialize, Debug)]
struct RoomEvent {
room: Option<u64>,
room: Option<JanusId>,
error_code: Option<i32>,
error: Option<String>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "videoroom")]
struct RoomDestroyed {
room: JanusId,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "videoroom", rename_all = "kebab-case")]
enum VideoRoomData {
#[serde(rename = "joined")]
Joined(RoomJoined),
#[serde(rename = "event")]
Event(RoomEvent),
Destroyed(RoomDestroyed),
}
#[derive(Serialize, Deserialize, Debug)]
@ -189,20 +225,15 @@ struct EventMsg {
// IncomingMessage
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "janus")]
#[serde(tag = "janus", rename_all = "lowercase")]
enum JsonReply {
#[serde(rename = "ack")]
Ack,
#[serde(rename = "success")]
Success(SuccessMsg),
#[serde(rename = "event")]
Event(EventMsg),
#[serde(rename = "webrtcup")]
WebRTCUp,
#[serde(rename = "media")]
Media,
#[serde(rename = "error")]
Error(InnerError),
HangUp(InnerHangup),
}
#[derive(Default)]
@ -213,14 +244,18 @@ struct State {
session_id: Option<u64>,
handle_id: Option<u64>,
transaction_id: Option<String>,
room_id: Option<JanusId>,
feed_id: Option<JanusId>,
}
#[derive(Clone)]
struct Settings {
janus_endpoint: String,
room_id: Option<String>,
feed_id: u32,
feed_id: String,
display_name: Option<String>,
secret_key: Option<String>,
string_ids: bool,
}
impl Default for Settings {
@ -228,8 +263,10 @@ impl Default for Settings {
Self {
janus_endpoint: "ws://127.0.0.1:8188".to_string(),
room_id: None,
feed_id: feed_id(),
feed_id: feed_id().to_string(),
display_name: None,
secret_key: None,
string_ids: false,
}
}
}
@ -240,8 +277,10 @@ pub struct Signaller {
state: Mutex<State>,
#[property(name="janus-endpoint", get, set, type = String, member = janus_endpoint, blurb = "The Janus server endpoint to POST SDP offer to")]
#[property(name="room-id", get, set, type = String, member = room_id, blurb = "The Janus Room ID that will be joined to")]
#[property(name="feed-id", get, set, type = u32, member = feed_id, blurb = "The Janus Feed ID to identify where the track is coming from")]
#[property(name="feed-id", get, set, type = String, member = feed_id, blurb = "The Janus Feed ID to identify where the track is coming from")]
#[property(name="display-name", get, set, type = String, member = display_name, blurb = "The name of the publisher in the Janus Video Room")]
#[property(name="secret-key", get, set, type = String, member = secret_key, blurb = "The secret API key to communicate with Janus server")]
#[property(name="string-ids", get, set, type = bool, member = string_ids, blurb = "Force passing room-id and feed-id as string even if they can be parsed into an integer")]
settings: Mutex<Settings>,
}
@ -292,15 +331,20 @@ impl Signaller {
},
_ = tokio::time::sleep(Duration::from_secs(10)) => {
if let Some(ref this) = this {
let (transaction, session_id) = {
let (transaction, session_id, apisecret) = {
let state = this.state.lock().unwrap();
(state.transaction_id.clone().unwrap(),
state.session_id.unwrap())
let settings = this.settings.lock().unwrap();
(
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
settings.secret_key.clone(),
)
};
let msg = OutgoingMessage::KeepAlive(KeepAliveMsg {
janus: "keepalive".to_string(),
transaction,
session_id,
apisecret,
});
res = ws_sink
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
@ -401,10 +445,8 @@ impl Signaller {
if let Some(PluginData::VideoRoom { data: plugindata }) = event.plugindata {
match plugindata {
VideoRoomData::Joined(joined) => {
if let Some(room) = joined.room {
gst::trace!(CAT, imp: self, "Joined room {} successfully", room);
self.session_requested();
}
gst::trace!(CAT, imp: self, "Joined room {:?} successfully", joined.room);
self.session_requested();
}
VideoRoomData::Event(room_event) => {
if room_event.error_code.is_some() && room_event.error.is_some() {
@ -423,12 +465,21 @@ impl Signaller {
}
}
}
VideoRoomData::Destroyed(room_destroyed) => {
gst::trace!(CAT, imp: self, "Room {} has been destroyed", room_destroyed.room);
self.raise_error(format!(
"room {} has been destroyed",
room_destroyed.room
));
}
}
}
}
JsonReply::Error(error) => {
self.raise_error(format!("code: {}, reason: {}", error.code, error.reason))
}
JsonReply::HangUp(hangup) => self.raise_error(format!("hangup: {}", hangup.reason)),
// ignore for now
JsonReply::Ack | JsonReply::Media => {}
}
@ -466,9 +517,12 @@ impl Signaller {
fn create_session(&self) {
let transaction = transaction_id();
self.set_transaction_id(transaction.clone());
let settings = self.settings.lock().unwrap();
let apisecret = settings.secret_key.clone();
self.send(OutgoingMessage::CreateSession(CreateSessionMsg {
janus: "create".to_string(),
transaction,
apisecret,
}));
}
@ -481,12 +535,14 @@ impl Signaller {
}
fn attach_plugin(&self) {
let (transaction, session_id) = {
let (transaction, session_id, apisecret) = {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
(
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
settings.secret_key.clone(),
)
};
self.send(OutgoingMessage::AttachPlugin(AttachPluginMsg {
@ -494,12 +550,13 @@ impl Signaller {
transaction,
plugin: "janus.plugin.videoroom".to_string(),
session_id,
apisecret,
}));
}
fn join_room(&self) {
let (transaction, session_id, handle_id, room, feed_id, display) = {
let state = self.state.lock().unwrap();
let (transaction, session_id, handle_id, room, feed_id, display, apisecret) = {
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
if settings.room_id.is_none() {
@ -507,13 +564,38 @@ impl Signaller {
return;
}
/* room_id and feed_id can be either a string or integer depending
* on server configuration. The property is always a string, if we
* can parse it to integer then assume that's what the server expects,
* unless string-ids=true is set to force usage of strings.
* Save parsed value in state to not have to parse it again for future
* API calls.
*/
if settings.string_ids {
state.room_id = Some(JanusId::Str(settings.room_id.clone().unwrap()));
state.feed_id = Some(JanusId::Str(settings.feed_id.clone()));
} else {
let room_id_str = settings.room_id.as_ref().unwrap();
match room_id_str.parse() {
Ok(n) => {
state.room_id = Some(JanusId::Num(n));
state.feed_id = Some(JanusId::Num(settings.feed_id.parse().unwrap()));
}
Err(_) => {
state.room_id = Some(JanusId::Str(room_id_str.clone()));
state.feed_id = Some(JanusId::Str(settings.feed_id.clone()));
}
};
}
(
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
state.handle_id.unwrap(),
settings.room_id.as_ref().unwrap().parse().unwrap(),
settings.feed_id,
state.room_id.clone().unwrap(),
state.feed_id.clone().unwrap(),
settings.display_name.clone(),
settings.secret_key.clone(),
)
};
self.send(OutgoingMessage::RoomRequest(RoomRequestMsg {
@ -521,6 +603,7 @@ impl Signaller {
transaction,
session_id,
handle_id,
apisecret,
body: RoomRequestBody {
request: "join".to_string(),
ptype: "publisher".to_string(),
@ -532,7 +615,7 @@ impl Signaller {
}
fn leave_room(&self) {
let (transaction, session_id, handle_id, room, feed_id, display) = {
let (transaction, session_id, handle_id, room, feed_id, display, apisecret) = {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
@ -545,9 +628,10 @@ impl Signaller {
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
state.handle_id.unwrap(),
settings.room_id.as_ref().unwrap().parse().unwrap(),
settings.feed_id,
state.room_id.clone().unwrap(),
state.feed_id.clone().unwrap(),
settings.display_name.clone(),
settings.secret_key.clone(),
)
};
self.send_blocking(OutgoingMessage::RoomRequest(RoomRequestMsg {
@ -555,6 +639,7 @@ impl Signaller {
transaction,
session_id,
handle_id,
apisecret,
body: RoomRequestBody {
request: "leave".to_string(),
ptype: "publisher".to_string(),
@ -566,7 +651,7 @@ impl Signaller {
}
fn publish(&self, offer: &gst_webrtc::WebRTCSessionDescription) {
let (transaction, session_id, handle_id) = {
let (transaction, session_id, handle_id, apisecret) = {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
@ -579,6 +664,7 @@ impl Signaller {
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
state.handle_id.unwrap(),
settings.secret_key.clone(),
)
};
let sdp_data = offer.sdp().as_text().unwrap();
@ -587,10 +673,9 @@ impl Signaller {
transaction,
session_id,
handle_id,
apisecret,
body: PublishBody {
request: "publish".to_string(),
audio: true,
video: true,
},
jsep: Jsep {
sdp: sdp_data,
@ -601,7 +686,7 @@ impl Signaller {
}
fn trickle(&self, candidate: &str, sdp_m_line_index: u32) {
let (transaction, session_id, handle_id) = {
let (transaction, session_id, handle_id, apisecret) = {
let state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
@ -614,6 +699,7 @@ impl Signaller {
state.transaction_id.clone().unwrap(),
state.session_id.unwrap(),
state.handle_id.unwrap(),
settings.secret_key.clone(),
)
};
self.send(OutgoingMessage::Trickle(TrickleMsg {
@ -621,6 +707,7 @@ impl Signaller {
transaction,
session_id,
handle_id,
apisecret,
candidate: Candidate {
candidate: candidate.to_string(),
sdp_m_line_index,

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MPL-2.0
use crate::signaller::{Signallable, SignallableImpl};
use crate::signaller::{Signallable, SignallableImpl, WebRTCSignallerRole};
use crate::utils::{wait_async, WaitError};
use crate::RUNTIME;
@ -13,6 +13,7 @@ use gst::subclass::prelude::*;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
@ -40,6 +41,9 @@ struct Settings {
identity: Option<String>,
room_name: Option<String>,
auth_token: Option<String>,
role: WebRTCSignallerRole,
producer_peer_id: Option<String>,
excluded_produder_peer_ids: Vec<String>,
timeout: u32,
}
@ -53,6 +57,9 @@ impl Default for Settings {
identity: Some("gstreamer".to_string()),
room_name: None,
auth_token: None,
role: WebRTCSignallerRole::default(),
producer_peer_id: None,
excluded_produder_peer_ids: vec![],
timeout: DEFAULT_TRACK_PUBLISH_TIMEOUT,
}
}
@ -93,6 +100,99 @@ impl Signaller {
.emit_by_name::<()>("error", &[&format!("Error: {msg}")]);
}
fn role(&self) -> Option<WebRTCSignallerRole> {
self.settings.lock().map(|s| s.role).ok()
}
fn is_subscriber(&self) -> bool {
matches!(self.role(), Some(WebRTCSignallerRole::Consumer))
}
fn producer_peer_id(&self) -> Option<String> {
assert!(self.is_subscriber());
let settings = self.settings.lock().ok()?;
settings.producer_peer_id.clone()
}
fn auto_subscribe(&self) -> bool {
self.is_subscriber()
&& self.producer_peer_id().is_none()
&& self.excluded_producer_peer_ids_is_empty()
}
fn signal_target(&self) -> Option<proto::SignalTarget> {
match self.role()? {
WebRTCSignallerRole::Consumer => Some(proto::SignalTarget::Subscriber),
WebRTCSignallerRole::Producer => Some(proto::SignalTarget::Publisher),
_ => None,
}
}
fn excluded_producer_peer_ids_is_empty(&self) -> bool {
assert!(self.is_subscriber());
self.settings
.lock()
.unwrap()
.excluded_produder_peer_ids
.is_empty()
}
fn is_peer_excluded(&self, peer_id: &str) -> bool {
self.settings
.lock()
.unwrap()
.excluded_produder_peer_ids
.iter()
.any(|id| id == peer_id)
}
fn signal_client(&self) -> Option<Arc<signal_client::SignalClient>> {
let connection = self.connection.lock().unwrap();
Some(connection.as_ref()?.signal_client.clone())
}
fn require_signal_client(&self) -> Arc<signal_client::SignalClient> {
self.signal_client().unwrap()
}
async fn send_trickle_request(&self, candidate_init: &str) {
let Some(signal_client) = self.signal_client() else {
return;
};
let Some(target) = self.signal_target() else {
return;
};
signal_client
.send(proto::signal_request::Message::Trickle(
proto::TrickleRequest {
candidate_init: candidate_init.to_string(),
target: target as i32,
},
))
.await;
}
async fn send_delayed_ice_candidates(&self) {
let Some(mut early_candidates) = self
.connection
.lock()
.unwrap()
.as_mut()
.and_then(|c| c.early_candidates.take())
else {
return;
};
while let Some(candidate_str) = early_candidates.pop() {
gst::debug!(
CAT,
imp: self,
"Sending delayed ice candidate {candidate_str:?}"
);
self.send_trickle_request(&candidate_str).await;
}
}
async fn signal_task(&self, mut signal_events: signal_client::SignalEvents) {
loop {
match wait_async(&self.signal_task_canceller, signal_events.recv(), 0).await {
@ -136,10 +236,36 @@ impl Signaller {
self.obj()
.emit_by_name::<()>("session-description", &[&"unique", &answer]);
}
proto::signal_response::Message::Offer(offer) => {
if !self.is_subscriber() {
gst::warning!(CAT, imp: self, "Ignoring subscriber offer in non-subscriber mode: {:?}", offer);
return;
}
gst::debug!(CAT, imp: self, "Received subscriber offer: {:?}", offer);
let sdp = match gst_sdp::SDPMessage::parse_buffer(offer.sdp.as_bytes()) {
Ok(sdp) => sdp,
Err(_) => {
self.raise_error("Couldn't parse Offer SDP".to_string());
return;
}
};
let offer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
sdp,
);
self.obj()
.emit_by_name::<()>("session-description", &[&"unique", &offer]);
}
proto::signal_response::Message::Trickle(trickle) => {
gst::debug!(CAT, imp: self, "Received ice_candidate {:?}", trickle);
if trickle.target() == proto::SignalTarget::Publisher {
let Some(target) = self.signal_target() else {
return;
};
if target == trickle.target() {
if let Ok(json) =
serde_json::from_str::<IceCandidateJson>(&trickle.candidate_init)
{
@ -165,6 +291,17 @@ impl Signaller {
}
}
proto::signal_response::Message::Update(update) => {
if !self.is_subscriber() {
gst::trace!(CAT, imp: self, "Ignoring update in non-subscriber mode: {:?}", update);
return;
}
gst::debug!(CAT, imp: self, "Update: {:?}", update);
for participant in update.participants {
self.on_participant(&participant, true)
}
}
proto::signal_response::Message::Leave(leave) => {
gst::debug!(CAT, imp: self, "Leave: {:?}", leave);
}
@ -172,182 +309,36 @@ impl Signaller {
_ => {}
}
}
}
impl SignallableImpl for Signaller {
fn start(&self) {
gst::debug!(CAT, imp: self, "Connecting");
let wsurl = if let Some(wsurl) = &self.settings.lock().unwrap().wsurl {
wsurl.clone()
} else {
self.raise_error("WebSocket URL must be set".to_string());
return;
};
let auth_token = {
let settings = self.settings.lock().unwrap();
if let Some(auth_token) = &settings.auth_token {
auth_token.clone()
} else if let (
Some(api_key),
Some(secret_key),
Some(identity),
Some(participant_name),
Some(room_name),
) = (
&settings.api_key,
&settings.secret_key,
&settings.identity,
&settings.participant_name,
&settings.room_name,
) {
let grants = VideoGrants {
room_join: true,
can_subscribe: false,
room: room_name.clone(),
..Default::default()
};
let access_token = AccessToken::with_api_key(api_key, secret_key)
.with_name(participant_name)
.with_identity(identity)
.with_grants(grants);
match access_token.to_jwt() {
Ok(token) => token,
Err(err) => {
self.raise_error(format!(
"{:?}",
anyhow!("Could not create auth token {err}")
));
return;
}
}
} else {
self.raise_error("Either auth-token or (api-key and secret-key and identity and room-name) must be set".to_string());
return;
}
};
gst::debug!(CAT, imp: self, "We have an authentication token");
fn send_sdp_answer(&self, _session_id: &str, sessdesc: &gst_webrtc::WebRTCSessionDescription) {
let weak_imp = self.downgrade();
let sessdesc = sessdesc.clone();
RUNTIME.spawn(async move {
let imp = if let Some(imp) = weak_imp.upgrade() {
imp
} else {
return;
};
let options = signal_client::SignalOptions::default();
gst::debug!(CAT, imp: imp, "Connecting to {}", wsurl);
let res = signal_client::SignalClient::connect(&wsurl, &auth_token, options).await;
let (signal_client, join_response, signal_events) = match res {
Err(err) => {
imp.obj()
.emit_by_name::<()>("error", &[&format!("{:?}", anyhow!("Error: {err}"))]);
return;
}
Ok(ok) => ok,
};
let signal_client = Arc::new(signal_client);
gst::debug!(
CAT,
imp: imp,
"Connected with JoinResponse: {:?}",
join_response
);
let weak_imp = imp.downgrade();
let signal_task = RUNTIME.spawn(async move {
if let Some(imp) = weak_imp.upgrade() {
imp.signal_task(signal_events).await;
}
});
let weak_imp = imp.downgrade();
imp.obj().connect_closure(
"webrtcbin-ready",
false,
glib::closure!(|_signaler: &super::LiveKitSignaller,
_consumer_identifier: &str,
webrtcbin: &gst::Element| {
gst::info!(CAT, "Adding data channels");
let reliable_channel = webrtcbin.emit_by_name::<gst_webrtc::WebRTCDataChannel>(
"create-data-channel",
&[
&"_reliable",
&gst::Structure::builder("config")
.field("ordered", true)
.build(),
],
);
let lossy_channel = webrtcbin.emit_by_name::<gst_webrtc::WebRTCDataChannel>(
"create-data-channel",
&[
&"_lossy",
&gst::Structure::builder("config")
.field("ordered", true)
.field("max-retransmits", 0)
.build(),
],
);
if let Some(imp) = weak_imp.upgrade() {
let mut connection = imp.connection.lock().unwrap();
if let Some(connection) = connection.as_mut() {
connection.channels = Some(Channels {
reliable_channel,
lossy_channel,
});
}
}
}),
);
let connection = Connection {
signal_client,
signal_task,
pending_tracks: Default::default(),
early_candidates: Some(Vec::new()),
channels: None,
};
if let Ok(mut sc) = imp.connection.lock() {
*sc = Some(connection);
if let Some(imp) = weak_imp.upgrade() {
let sdp = sessdesc.sdp();
gst::debug!(CAT, imp: imp, "Sending SDP {:?} now", &sdp);
let signal_client = imp.require_signal_client();
signal_client
.send(proto::signal_request::Message::Answer(
proto::SessionDescription {
r#type: "answer".to_string(),
sdp: sdp.to_string(),
},
))
.await;
imp.send_delayed_ice_candidates().await;
}
imp.obj().emit_by_name::<()>(
"session-requested",
&[
&"unique",
&"unique",
&None::<gst_webrtc::WebRTCSessionDescription>,
],
);
});
}
fn send_sdp(&self, _session_id: &str, sessdesc: &gst_webrtc::WebRTCSessionDescription) {
gst::debug!(CAT, imp: self, "Created offer SDP {:#?}", sessdesc.sdp());
assert!(sessdesc.type_() == gst_webrtc::WebRTCSDPType::Offer);
fn send_sdp_offer(&self, _session_id: &str, sessdesc: &gst_webrtc::WebRTCSessionDescription) {
let weak_imp = self.downgrade();
let sessdesc = sessdesc.clone();
RUNTIME.spawn(async move {
if let Some(imp) = weak_imp.upgrade() {
let sdp = sessdesc.sdp();
let signal_client = imp
.connection
.lock()
.unwrap()
.as_ref()
.unwrap()
.signal_client
.clone();
let signal_client = imp.require_signal_client();
let timeout = imp.settings.lock().unwrap().timeout;
for media in sdp.medias() {
@ -457,35 +448,271 @@ impl SignallableImpl for Signaller {
.await;
if let Some(imp) = weak_imp.upgrade() {
let early_candidates =
if let Some(connection) = &mut *imp.connection.lock().unwrap() {
connection.early_candidates.take()
} else {
None
};
if let Some(mut early_candidates) = early_candidates {
while let Some(candidate_str) = early_candidates.pop() {
gst::debug!(
CAT,
imp: imp,
"Sending delayed ice candidate {candidate_str:?}"
);
signal_client
.send(proto::signal_request::Message::Trickle(
proto::TrickleRequest {
candidate_init: candidate_str,
target: proto::SignalTarget::Publisher as i32,
},
))
.await;
}
}
imp.send_delayed_ice_candidates().await;
}
}
});
}
fn on_participant(&self, participant: &proto::ParticipantInfo, new_connection: bool) {
gst::debug!(CAT, imp: self, "{:?}", participant);
if !participant.is_publisher {
return;
}
let peer_sid = &participant.sid;
let peer_identity = &participant.identity;
match self.producer_peer_id() {
Some(id) if id == *peer_sid => {
gst::debug!(CAT, imp: self, "matching peer sid {id:?}");
}
Some(id) if id == *peer_identity => {
gst::debug!(CAT, imp: self, "matching peer identity {id:?}");
}
None => {
if self.is_peer_excluded(peer_sid) || self.is_peer_excluded(peer_identity) {
gst::debug!(CAT, imp: self, "ignoring excluded peer {participant:?}");
return;
}
gst::debug!(CAT, imp: self, "catch-all mode, matching {participant:?}");
}
_ => return,
}
let meta = Some(&participant.metadata)
.filter(|meta| !meta.is_empty())
.and_then(|meta| gst::Structure::from_str(meta).ok());
match participant.state {
x if x == proto::participant_info::State::Active as i32 => {
let track_sids = participant
.tracks
.iter()
.filter(|t| !t.muted)
.map(|t| t.sid.clone())
.collect::<Vec<_>>();
let update = proto::UpdateSubscription {
track_sids: track_sids.clone(),
subscribe: true,
participant_tracks: vec![proto::ParticipantTracks {
participant_sid: participant.sid.clone(),
track_sids: track_sids.clone(),
}],
};
let update = proto::signal_request::Message::Subscription(update);
let weak_imp = self.downgrade();
let peer_sid = peer_sid.clone();
RUNTIME.spawn(async move {
let imp = match weak_imp.upgrade() {
Some(imp) => imp,
None => return,
};
let signal_client = imp.require_signal_client();
signal_client.send(update).await;
imp.obj()
.emit_by_name::<()>("producer-added", &[&peer_sid, &meta, &new_connection]);
});
}
_ => {
self.obj()
.emit_by_name::<()>("producer-removed", &[&peer_sid, &meta]);
}
}
}
async fn close_signal_client(signal_client: &signal_client::SignalClient) {
signal_client
.send(proto::signal_request::Message::Leave(proto::LeaveRequest {
can_reconnect: false,
reason: proto::DisconnectReason::ClientInitiated as i32,
..Default::default()
}))
.await;
signal_client.close().await;
}
}
impl SignallableImpl for Signaller {
fn start(&self) {
gst::debug!(CAT, imp: self, "Connecting");
let wsurl = if let Some(wsurl) = &self.settings.lock().unwrap().wsurl {
wsurl.clone()
} else {
self.raise_error("WebSocket URL must be set".to_string());
return;
};
let auth_token = {
let settings = self.settings.lock().unwrap();
let role = settings.role;
if let Some(auth_token) = &settings.auth_token {
auth_token.clone()
} else if let (
Some(api_key),
Some(secret_key),
Some(identity),
Some(participant_name),
Some(room_name),
) = (
&settings.api_key,
&settings.secret_key,
&settings.identity,
&settings.participant_name,
&settings.room_name,
) {
let grants = VideoGrants {
room_join: true,
can_subscribe: role == WebRTCSignallerRole::Consumer,
room: room_name.clone(),
..Default::default()
};
let access_token = AccessToken::with_api_key(api_key, secret_key)
.with_name(participant_name)
.with_identity(identity)
.with_grants(grants);
match access_token.to_jwt() {
Ok(token) => token,
Err(err) => {
self.raise_error(format!(
"{:?}",
anyhow!("Could not create auth token {err}")
));
return;
}
}
} else {
self.raise_error("Either auth-token or (api-key and secret-key and identity and room-name) must be set".to_string());
return;
}
};
gst::debug!(CAT, imp: self, "We have an authentication token");
let weak_imp = self.downgrade();
RUNTIME.spawn(async move {
let imp = if let Some(imp) = weak_imp.upgrade() {
imp
} else {
return;
};
let options = signal_client::SignalOptions {
auto_subscribe: imp.auto_subscribe(),
..Default::default()
};
gst::debug!(CAT, imp: imp, "Connecting to {}", wsurl);
let res = signal_client::SignalClient::connect(&wsurl, &auth_token, options).await;
let (signal_client, join_response, signal_events) = match res {
Err(err) => {
imp.obj()
.emit_by_name::<()>("error", &[&format!("{:?}", anyhow!("Error: {err}"))]);
return;
}
Ok(ok) => ok,
};
let signal_client = Arc::new(signal_client);
gst::debug!(
CAT,
imp: imp,
"Connected with JoinResponse: {:?}",
join_response
);
let weak_imp = imp.downgrade();
let signal_task = RUNTIME.spawn(async move {
if let Some(imp) = weak_imp.upgrade() {
imp.signal_task(signal_events).await;
}
});
if imp.is_subscriber() {
imp.obj()
.emit_by_name::<()>("session-started", &[&"unique", &"unique"]);
for participant in &join_response.other_participants {
imp.on_participant(participant, false)
}
}
let weak_imp = imp.downgrade();
imp.obj().connect_closure(
"webrtcbin-ready",
false,
glib::closure!(|_signaller: &super::LiveKitSignaller,
_consumer_identifier: &str,
webrtcbin: &gst::Element| {
gst::info!(CAT, "Adding data channels");
let reliable_channel = webrtcbin.emit_by_name::<gst_webrtc::WebRTCDataChannel>(
"create-data-channel",
&[
&"_reliable",
&gst::Structure::builder("config")
.field("ordered", true)
.build(),
],
);
let lossy_channel = webrtcbin.emit_by_name::<gst_webrtc::WebRTCDataChannel>(
"create-data-channel",
&[
&"_lossy",
&gst::Structure::builder("config")
.field("ordered", true)
.field("max-retransmits", 0)
.build(),
],
);
if let Some(imp) = weak_imp.upgrade() {
let mut connection = imp.connection.lock().unwrap();
if let Some(connection) = connection.as_mut() {
connection.channels = Some(Channels {
reliable_channel,
lossy_channel,
});
}
}
}),
);
let connection = Connection {
signal_client,
signal_task,
pending_tracks: Default::default(),
early_candidates: Some(Vec::new()),
channels: None,
};
if let Ok(mut sc) = imp.connection.lock() {
*sc = Some(connection);
}
imp.obj().emit_by_name::<()>(
"session-requested",
&[
&"unique",
&"unique",
&None::<gst_webrtc::WebRTCSessionDescription>,
],
);
});
}
fn send_sdp(&self, session_id: &str, sessdesc: &gst_webrtc::WebRTCSessionDescription) {
gst::debug!(CAT, imp: self, "Created SDP {:?}", sessdesc.sdp());
match sessdesc.type_() {
gst_webrtc::WebRTCSDPType::Offer => {
self.send_sdp_offer(session_id, sessdesc);
}
gst_webrtc::WebRTCSDPType::Answer => {
self.send_sdp_answer(session_id, sessdesc);
}
_ => {
gst::debug!(CAT, imp: self, "Ignoring SDP {:?}", sessdesc.sdp());
}
}
}
fn add_ice(
&self,
_session_id: &str,
@ -514,20 +741,7 @@ impl SignallableImpl for Signaller {
let imp = self.downgrade();
RUNTIME.spawn(async move {
if let Some(imp) = imp.upgrade() {
let signal_client = if let Some(connection) = &mut *imp.connection.lock().unwrap() {
connection.signal_client.clone()
} else {
return;
};
signal_client
.send(proto::signal_request::Message::Trickle(
proto::TrickleRequest {
candidate_init: candidate_str,
target: proto::SignalTarget::Publisher as i32,
},
))
.await;
imp.send_trickle_request(&candidate_str).await;
};
});
}
@ -542,7 +756,7 @@ impl SignallableImpl for Signaller {
if let Some(connection) = self.connection.lock().unwrap().take() {
block_on(connection.signal_task).unwrap();
block_on(connection.signal_client.close());
block_on(Self::close_signal_client(&connection.signal_client));
}
}
@ -615,6 +829,22 @@ impl ObjectImpl for Signaller {
.blurb("Lossy Data Channel object.")
.flags(glib::ParamFlags::READABLE)
.build(),
glib::ParamSpecEnum::builder_with_default("role", WebRTCSignallerRole::default())
.nick("Sigaller Role")
.blurb("Whether this signaller acts as either a Consumer or Producer. Listener is not currently supported.")
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecString::builder("producer-peer-id")
.nick("Producer Peer ID")
.blurb("When in Consumer Role, the signaller will subscribe to this peer's tracks.")
.flags(glib::ParamFlags::READWRITE)
.build(),
gst::ParamSpecArray::builder("excluded-producer-peer-ids")
.nick("Excluded Producer Peer IDs")
.blurb("When in Consumer Role, the signaller will not subscribe to these peers' tracks.")
.flags(glib::ParamFlags::READWRITE)
.element_spec(&glib::ParamSpecString::builder("producer-peer-id").build())
.build(),
]
});
@ -648,6 +878,18 @@ impl ObjectImpl for Signaller {
"timeout" => {
settings.timeout = value.get().unwrap();
}
"role" => settings.role = value.get().unwrap(),
"producer-peer-id" => settings.producer_peer_id = value.get().unwrap(),
"excluded-producer-peer-ids" => {
settings.excluded_produder_peer_ids = value
.get::<gst::ArrayRef>()
.expect("type checked upstream")
.as_slice()
.iter()
.filter_map(|id| id.get::<&str>().ok())
.map(|id| id.to_string())
.collect::<Vec<String>>()
}
_ => unimplemented!(),
}
}
@ -679,6 +921,11 @@ impl ObjectImpl for Signaller {
};
channel.to_value()
}
"role" => settings.role.to_value(),
"producer-peer-id" => settings.producer_peer_id.to_value(),
"excluded-producer-peer-ids" => {
gst::Array::new(&settings.excluded_produder_peer_ids).to_value()
}
_ => unimplemented!(),
}
}

View file

@ -1,6 +1,6 @@
// SPDX-License-Identifier: MPL-2.0
use crate::signaller::Signallable;
use crate::signaller::{Signallable, WebRTCSignallerRole};
use gst::glib;
mod imp;
@ -9,6 +9,20 @@ glib::wrapper! {
pub struct LiveKitSignaller(ObjectSubclass<imp::Signaller>) @implements Signallable;
}
impl LiveKitSignaller {
fn new(role: WebRTCSignallerRole) -> Self {
glib::Object::builder().property("role", role).build()
}
pub fn new_consumer() -> Self {
Self::new(WebRTCSignallerRole::Consumer)
}
pub fn new_producer() -> Self {
Self::new(WebRTCSignallerRole::Producer)
}
}
impl Default for LiveKitSignaller {
fn default() -> Self {
glib::Object::new()

View file

@ -20,7 +20,9 @@ use std::ops::Mul;
use std::sync::{mpsc, Arc, Condvar, Mutex};
use super::homegrown_cc::CongestionController;
use super::{WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode};
use super::{
WebRTCSinkCongestionControl, WebRTCSinkError, WebRTCSinkMitigationMode, WebRTCSinkPad,
};
use crate::aws_kvs_signaller::AwsKvsSignaller;
use crate::janusvr_signaller::JanusVRSignaller;
use crate::livekit_signaller::LiveKitSignaller;
@ -186,7 +188,7 @@ impl futures::stream::FusedStream for CustomBusStream {
/// Wrapper around our sink pads
#[derive(Debug, Clone)]
struct InputStream {
sink_pad: gst::GhostPad,
sink_pad: WebRTCSinkPad,
producer: Option<StreamProducer>,
/// The (fixed) caps coming in
in_caps: Option<gst::Caps>,
@ -233,6 +235,8 @@ pub struct VideoEncoder {
session_id: String,
mitigation_mode: WebRTCSinkMitigationMode,
pub transceiver: gst_webrtc::WebRTCRTPTransceiver,
/// name of the sink pad feeding this encoder
stream_name: String,
}
struct Session {
@ -554,8 +558,8 @@ fn make_converter_for_video_caps(caps: &gst::Caps, codec: &Codec) -> Result<gst:
// NVIDIA V4L2 encoders require NVMM memory as input and that requires using the
// corresponding converter
|| codec
.encoder_factory()
.map_or(false, |factory| factory.name().starts_with("nvv4l2"))
.encoder_factory()
.map_or(false, |factory| factory.name().starts_with("nvv4l2"))
{
let queue = make_element("queue", None)?;
let nvconvert = if let Ok(nvconvert) = make_element("nvvideoconvert", None) {
@ -918,6 +922,7 @@ impl VideoEncoder {
session_id: &str,
codec_name: &str,
transceiver: gst_webrtc::WebRTCRTPTransceiver,
stream_name: String,
) -> Option<Self> {
let halved_framerate = video_info.fps().mul(gst::Fraction::new(1, 2));
Some(Self {
@ -936,6 +941,7 @@ impl VideoEncoder {
session_id: session_id.to_string(),
mitigation_mode: WebRTCSinkMitigationMode::NONE,
transceiver,
stream_name,
})
}
@ -1320,6 +1326,7 @@ impl Session {
&self.id,
codec.caps.structure(0).unwrap().name(),
transceiver,
stream_name.clone(),
) {
match self.cc_info.heuristic {
WebRTCSinkCongestionControl::Disabled => {
@ -1434,6 +1441,10 @@ impl InputStream {
),
)
}
fn msid(&self) -> Option<String> {
self.sink_pad.property("msid")
}
}
impl NavigationEventHandler {
@ -1537,12 +1548,13 @@ impl BaseWebRTCSink {
match extension_configuration_type {
ExtensionConfigurationType::Auto => {
// GstRTPBasePayload::extensions property is only available since GStreamer 1.24
if !payloader.has_property("extensions", Some(gst::Array::static_type()))
&& self.has_connected_payloader_setup_slots()
{
gst::warning!(CAT, "'extensions' property is not available: TWCC extension ID will default to 1. \
if !payloader.has_property("extensions", Some(gst::Array::static_type())) {
if self.has_connected_payloader_setup_slots() {
gst::warning!(CAT, "'extensions' property is not available: TWCC extension ID will default to 1. \
Application code must ensure to pick non-conflicting IDs for any additionally configured extensions. \
Please consider updating GStreamer to 1.24.");
}
return Some(1);
}
@ -1738,6 +1750,11 @@ impl BaseWebRTCSink {
return;
};
if let Some(msid) = stream.msid() {
gst::trace!(CAT, obj: element, "forwarding msid={msid:?} to webrtcbin sinkpad");
pad.set_property("msid", &msid);
}
let transceiver = pad.property::<gst_webrtc::WebRTCRTPTransceiver>("transceiver");
transceiver.set_property(
@ -1867,7 +1884,7 @@ impl BaseWebRTCSink {
gst::StreamError::Failed,
["Signalling error: {}", error]
);
})
}),
),
request_meta: signaler.connect_closure(
@ -1877,7 +1894,7 @@ impl BaseWebRTCSink {
let meta = instance.imp().settings.lock().unwrap().meta.clone();
meta
})
}),
),
session_requested: signaler.connect_closure(
@ -1887,7 +1904,7 @@ impl BaseWebRTCSink {
if let Err(err) = instance.imp().start_session(session_id, peer_id, offer) {
gst::warning!(CAT, "{}", err);
}
})
}),
),
session_description: signaler.connect_closure(
@ -1908,9 +1925,9 @@ impl BaseWebRTCSink {
),
handle_ice: signaler.connect_closure(
"handle-ice",
false,
glib::closure!(@watch instance => move |
"handle-ice",
false,
glib::closure!(@watch instance => move |
_signaler: glib::Object,
session_id: &str,
sdp_m_line_index: u32,
@ -1920,7 +1937,7 @@ impl BaseWebRTCSink {
.imp()
.handle_ice(session_id, Some(sdp_m_line_index), None, candidate);
}),
),
),
session_ended: signaler.connect_closure(
"session-ended",
@ -1930,7 +1947,7 @@ impl BaseWebRTCSink {
gst::warning!(CAT, "{}", err);
}
false
})
}),
),
shutdown: signaler.connect_closure(
@ -1938,7 +1955,7 @@ impl BaseWebRTCSink {
false,
glib::closure!(@watch instance => move |_signaler: glib::Object|{
instance.imp().shutdown(instance);
})
}),
),
});
}
@ -2585,7 +2602,7 @@ impl BaseWebRTCSink {
if session.congestion_controller.is_some() {
let session_id_str = session_id.to_string();
rtpbin.connect_closure("on-new-ssrc", true,
glib::closure!(@weak-allow-none element,
glib::closure!(@weak-allow-none element,
=> move |rtpbin: gst::Object, session_id: u32, _src: u32| {
let rtp_session = rtpbin.emit_by_name::<gst::Element>("get-session", &[&session_id]);
@ -2606,8 +2623,8 @@ impl BaseWebRTCSink {
));
}
}
})
);
}),
);
}
let clock = element.clock();
@ -3447,6 +3464,37 @@ impl BaseWebRTCSink {
)
}
/// Check if the caps of a sink pad can be changed from `current` to `new` without requiring a WebRTC renegotiation
fn input_caps_change_allowed(&self, current: &gst::CapsRef, new: &gst::CapsRef) -> bool {
let Some(current) = current.structure(0) else {
return false;
};
let Some(new) = new.structure(0) else {
return false;
};
if current.name() != new.name() {
return false;
}
let mut current = current.to_owned();
let mut new = new.to_owned();
// Allow changes of fields which should not be part of the SDP, and so can be updated without requiring
// a renegotiation.
let caps_type = current.name();
if caps_type.starts_with("video/") {
const VIDEO_ALLOWED_CHANGES: &[&str] = &["width", "height", "framerate"];
current.remove_fields(VIDEO_ALLOWED_CHANGES.iter().copied());
new.remove_fields(VIDEO_ALLOWED_CHANGES.iter().copied());
} else if caps_type.starts_with("audio/") {
// TODO
}
current == new
}
fn sink_event(
&self,
pad: &gst::Pad,
@ -3457,10 +3505,7 @@ impl BaseWebRTCSink {
if let EventView::Caps(e) = event.view() {
if let Some(caps) = pad.current_caps() {
if caps.is_strictly_equal(e.caps()) {
// Nothing changed
return true;
} else {
if !self.input_caps_change_allowed(&caps, e.caps()) {
gst::error!(
CAT,
obj: pad,
@ -3470,30 +3515,44 @@ impl BaseWebRTCSink {
);
return false;
}
} else {
gst::info!(CAT, obj: pad, "Received caps event {:?}", e);
}
gst::info!(CAT, obj: pad, "Received caps event {:?}", e);
self.state
.lock()
.unwrap()
.streams
.iter_mut()
.for_each(|(_, stream)| {
if stream.sink_pad.upcast_ref::<gst::Pad>() == pad {
// We do not want VideoInfo to consider max-framerate
// when computing fps, so we strip it away here
let mut caps = e.caps().to_owned();
{
let mut_caps = caps.get_mut().unwrap();
if let Some(s) = mut_caps.structure_mut(0) {
if s.has_name("video/x-raw") {
s.remove_field("max-framerate");
}
}
let mut state = self.state.lock().unwrap();
state.streams.iter_mut().for_each(|(_, stream)| {
if stream.sink_pad.upcast_ref::<gst::Pad>() == pad {
// We do not want VideoInfo to consider max-framerate
// when computing fps, so we strip it away here
let mut caps = e.caps().to_owned();
{
let mut_caps = caps.get_mut().unwrap();
if let Some(s) = mut_caps.structure_mut(0) {
if s.has_name("video/x-raw") {
s.remove_field("max-framerate");
}
stream.in_caps = Some(caps.to_owned());
}
});
}
stream.in_caps = Some(caps.to_owned());
}
});
if e.caps().structure(0).unwrap().name().starts_with("video/") {
if let Ok(video_info) = gst_video::VideoInfo::from_caps(e.caps()) {
// update video encoder info used when downscaling/downsampling the input
let stream_name = pad.name().to_string();
state
.sessions
.values_mut()
.flat_map(|session| session.unwrap_mut().encoders.iter_mut())
.filter(|encoder| encoder.stream_name == stream_name)
.for_each(|encoder| {
encoder.halved_framerate =
video_info.fps().mul(gst::Fraction::new(1, 2));
encoder.video_info = video_info.clone();
});
}
}
}
@ -3718,7 +3777,7 @@ impl ObjectImpl for BaseWebRTCSink {
.mutable_ready()
.build(),
glib::ParamSpecObject::builder::<Signallable>("signaller")
.flags(glib::ParamFlags::READABLE | gst::PARAM_FLAG_MUTABLE_READY)
.flags(glib::ParamFlags::READABLE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb("The Signallable object to use to handle WebRTC Signalling")
.build(),
]
@ -4071,11 +4130,12 @@ impl ElementImpl for BaseWebRTCSink {
caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}
let video_pad_template = gst::PadTemplate::new(
let video_pad_template = gst::PadTemplate::with_gtype(
"video_%u",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&caps_builder.build(),
WebRTCSinkPad::static_type(),
)
.unwrap();
@ -4084,11 +4144,12 @@ impl ElementImpl for BaseWebRTCSink {
for codec in Codecs::audio_codecs() {
caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}
let audio_pad_template = gst::PadTemplate::new(
let audio_pad_template = gst::PadTemplate::with_gtype(
"audio_%u",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&caps_builder.build(),
WebRTCSinkPad::static_type(),
)
.unwrap();
@ -4127,13 +4188,13 @@ impl ElementImpl for BaseWebRTCSink {
(name, false)
};
let sink_pad = gst::GhostPad::builder_from_template(templ)
let sink_pad = gst::PadBuilder::<WebRTCSinkPad>::from_template(templ)
.name(name.as_str())
.chain_function(|pad, parent, buffer| {
BaseWebRTCSink::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.chain(pad, buffer),
|this| this.chain(pad.upcast_ref(), buffer),
)
})
.event_function(|pad, parent, event| {
@ -4250,7 +4311,7 @@ impl ChildProxyImpl for BaseWebRTCSink {
fn child_by_name(&self, name: &str) -> Option<glib::Object> {
match name {
"signaller" => Some(self.settings.lock().unwrap().signaller.clone().upcast()),
_ => None,
_ => self.obj().static_pad(name).map(|pad| pad.upcast()),
}
}
}
@ -4393,7 +4454,7 @@ impl ObjectImpl for LiveKitWebRTCSink {
let element = self.obj();
let ws = element.upcast_ref::<super::BaseWebRTCSink>().imp();
let _ = ws.set_signaller(LiveKitSignaller::default().upcast());
let _ = ws.set_signaller(LiveKitSignaller::new_producer().upcast());
}
}

View file

@ -39,11 +39,16 @@ use gst::subclass::prelude::*;
mod homegrown_cc;
mod imp;
mod pad;
glib::wrapper! {
pub struct BaseWebRTCSink(ObjectSubclass<imp::BaseWebRTCSink>) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
}
glib::wrapper! {
pub struct WebRTCSinkPad(ObjectSubclass<pad::WebRTCSinkPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
}
glib::wrapper! {
pub struct WebRTCSink(ObjectSubclass<imp::WebRTCSink>) @extends BaseWebRTCSink, gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy, gst_video::Navigation;
}
@ -124,6 +129,7 @@ enum WebRTCSinkMitigationMode {
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
WebRTCSinkPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
BaseWebRTCSink::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
WebRTCSinkCongestionControl::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
gst::Element::register(

View file

@ -0,0 +1,57 @@
// SPDX-License-Identifier: MPL-2.0
use gst::{glib, prelude::*, subclass::prelude::*};
use once_cell::sync::Lazy;
use std::sync::Mutex;
#[derive(Default)]
pub struct WebRTCSinkPad {
settings: Mutex<Settings>,
}
#[derive(Debug, Default)]
struct Settings {
msid: Option<String>,
}
#[glib::object_subclass]
impl ObjectSubclass for WebRTCSinkPad {
const NAME: &'static str = "GstWebRTCSinkPad";
type Type = super::WebRTCSinkPad;
type ParentType = gst::GhostPad;
}
impl ObjectImpl for WebRTCSinkPad {
fn properties() -> &'static [glib::ParamSpec] {
static PROPS: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpecString::builder("msid")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb("Remote MediaStream ID in use for this pad")
.build()]
});
PROPS.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"msid" => {
settings.msid = value
.get::<Option<String>>()
.expect("type checked upstream")
}
name => panic!("no writable property {name:?}"),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"msid" => settings.msid.to_value(),
name => panic!("no readable property {name:?}"),
}
}
}
impl GstObjectImpl for WebRTCSinkPad {}
impl PadImpl for WebRTCSinkPad {}
impl ProxyPadImpl for WebRTCSinkPad {}
impl GhostPadImpl for WebRTCSinkPad {}

View file

@ -2,6 +2,7 @@
use gst::prelude::*;
use crate::livekit_signaller::LiveKitSignaller;
use crate::signaller::{prelude::*, Signallable, Signaller};
use crate::utils::{Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS};
use crate::webrtcsrc::WebRTCSrcPad;
@ -21,6 +22,7 @@ use url::Url;
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302");
const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false;
const DEFAULT_DO_RETRANSMISSION: bool = true;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@ -38,6 +40,7 @@ struct Settings {
video_codecs: Vec<Codec>,
audio_codecs: Vec<Codec>,
enable_data_channel_navigation: bool,
do_retransmission: bool,
}
#[derive(Default)]
@ -112,7 +115,13 @@ impl ObjectImpl for BaseWebRTCSrc {
.default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION)
.mutable_ready()
.build(),
]
glib::ParamSpecBoolean::builder("do-retransmission")
.nick("Enable retransmission")
.blurb("Send retransmission events upstream when a packet is late")
.default_value(DEFAULT_DO_RETRANSMISSION)
.mutable_ready()
.build(),
]
});
PROPS.as_ref()
@ -170,6 +179,10 @@ impl ObjectImpl for BaseWebRTCSrc {
let mut settings = self.settings.lock().unwrap();
settings.enable_data_channel_navigation = value.get::<bool>().unwrap();
}
"do-retransmission" => {
let mut settings = self.settings.lock().unwrap();
settings.do_retransmission = value.get::<bool>().unwrap();
}
_ => unimplemented!(),
}
}
@ -202,6 +215,7 @@ impl ObjectImpl for BaseWebRTCSrc {
let settings = self.settings.lock().unwrap();
settings.enable_data_channel_navigation.to_value()
}
"do-retransmission" => self.settings.lock().unwrap().do_retransmission.to_value(),
name => panic!("{} getter not implemented", name),
}
}
@ -211,7 +225,7 @@ impl ObjectImpl for BaseWebRTCSrc {
vec![
/**
* BaseWebRTCSrc::request-encoded-filter:
* @producer_id: Identifier of the producer
* @producer_id: (nullable): Identifier of the producer
* @pad_name: The name of the output pad
* @allowed_caps: the allowed caps for the output pad
*
@ -269,6 +283,7 @@ impl Default for Settings {
.filter(|codec| codec.has_decoder())
.collect(),
enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION,
do_retransmission: DEFAULT_DO_RETRANSMISSION,
}
}
}
@ -404,7 +419,11 @@ impl BaseWebRTCSrc {
.expect("Adding ghostpad to the bin should always work");
if let Some(srcpad) = srcpad {
let producer_id = self.signaller().property::<String>("producer-peer-id");
let producer_id = self
.signaller()
.property::<Option<String>>("producer-peer-id")
.or_else(|| pad.property("msid"));
let encoded_filter = self.obj().emit_by_name::<Option<gst::Element>>(
"request-encoded-filter",
&[&producer_id, &srcpad.name(), &srcpad.allowed_caps()],
@ -760,14 +779,17 @@ impl BaseWebRTCSrc {
let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly;
let webrtcbin = self.webrtcbin();
for (i, media) in sdp.medias().enumerate() {
let codec_names = {
let (codec_names, do_retransmission) = {
let settings = self.settings.lock().unwrap();
settings
.video_codecs
.iter()
.chain(settings.audio_codecs.iter())
.map(|codec| codec.name.clone())
.collect::<HashSet<String>>()
(
settings
.video_codecs
.iter()
.chain(settings.audio_codecs.iter())
.map(|codec| codec.name.clone())
.collect::<HashSet<String>>(),
settings.do_retransmission,
)
};
let caps = media
.formats()
@ -828,7 +850,7 @@ impl BaseWebRTCSrc {
&[&direction, &caps],
);
transceiver.set_property("do_nack", true);
transceiver.set_property("do-nack", do_retransmission);
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
}
} else {
@ -1244,11 +1266,6 @@ impl ObjectImpl for WhipServerSrc {
let _ = ws.set_signaller(WhipServerSignaller::default().upcast());
let obj = &*self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
obj.set_element_flags(gst::ElementFlags::SOURCE);
let settings = ws.settings.lock().unwrap();
element
.bind_property("stun-server", &settings.signaller, "stun-server")
@ -1286,3 +1303,44 @@ impl ObjectSubclass for WhipServerSrc {
type Type = super::WhipServerSrc;
type ParentType = super::BaseWebRTCSrc;
}
#[derive(Default)]
pub struct LiveKitWebRTCSrc;
impl ObjectImpl for LiveKitWebRTCSrc {
fn constructed(&self) {
self.parent_constructed();
let element = self.obj();
let ws = element.upcast_ref::<super::BaseWebRTCSrc>().imp();
let _ = ws.set_signaller(LiveKitSignaller::new_consumer().upcast());
}
}
impl GstObjectImpl for LiveKitWebRTCSrc {}
impl BinImpl for LiveKitWebRTCSrc {}
impl ElementImpl for LiveKitWebRTCSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"LiveKitWebRTCSrc",
"Source/Network/WebRTC",
"WebRTC source with LiveKit signaller",
"Jordan Yelloz <jordan.yelloz@collabora.com>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BaseWebRTCSrcImpl for LiveKitWebRTCSrc {}
#[glib::object_subclass]
impl ObjectSubclass for LiveKitWebRTCSrc {
const NAME: &'static str = "GstLiveKitWebRTCSrc";
type Type = super::LiveKitWebRTCSrc;
type ParentType = super::BaseWebRTCSrc;
}

View file

@ -53,6 +53,10 @@ glib::wrapper! {
pub struct WhipServerSrc(ObjectSubclass<imp::WhipServerSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, @implements gst::URIHandler, gst::ChildProxy;
}
glib::wrapper! {
pub struct LiveKitWebRTCSrc(ObjectSubclass<imp::LiveKitWebRTCSrc>) @extends BaseWebRTCSrc, gst::Bin, gst::Element, gst::Object, gst::ChildProxy;
}
glib::wrapper! {
pub struct WebRTCSrcPad(ObjectSubclass<pad::WebRTCSrcPad>) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object;
}
@ -76,5 +80,60 @@ pub fn register(plugin: Option<&gst::Plugin>) -> Result<(), glib::BoolError> {
WhipServerSrc::static_type(),
)?;
/**
* element-livekitwebrtcsrc:
*
* The `livekitwebrtcsrc` plays streams from a LiveKit room.
*
* The element can either subscribe to the streams published by a single
* peer in the room using the same `signaller::producer-peer-id` child
* property that other webrtcsrc elements use or auto-subscribe to all peers
* in a room by not specifying anything for that property. When in
* auto-subscribe mode, you can use
* `signaller::excluded-producer-peer-ids=<a,b,c>` to ignore peers `a`, `b`,
* and `c` while subscribing to all other members of the room.
*
* ## Sample Pipeline
*
* First, start the livekit server with the `--dev` flag to enable the test credentials.
*
* Next, publish a stream:
*
* ```shell
* gst-launch-1.0 \
* videotestsrc is-live=1 \
* ! video/x-raw,width=640,height=360,framerate=15/1 \
* ! timeoverlay ! videoconvert ! queue \
* ! livekitwebrtcsink name=sink \
* signaller::ws-url=ws://127.0.0.1:7880 \
* signaller::api-key=devkey \
* signaller::secret-key=secret \
* signaller::room-name=testroom \
* signaller::identity=gst-producer \
* signaller::participant-name=gst-producer \
* video-caps='video/x-vp8'
* ```
*
* Finally, watch the stream:
*
* ```shell
* gst-launch-1.0 \
* livekitwebrtcsrc \
* signaller::ws-url=ws://127.0.0.1:7880 \
* signaller::api-key=devkey \
* signaller::secret-key=secret \
* signaller::room-name=testroom \
* signaller::identity=gst-consumer \
* signaller::participant-name=gst-consumer \
* ! queue ! videoconvert ! autovideosink
* ```
*/
gst::Element::register(
plugin,
"livekitwebrtcsrc",
gst::Rank::NONE,
LiveKitWebRTCSrc::static_type(),
)?;
Ok(())
}

View file

@ -10,7 +10,7 @@ use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use regex::Regex;
use regex::{Regex, RegexBuilder};
use std::default::Default;
use std::sync::Mutex;
@ -193,7 +193,33 @@ impl ObjectImpl for RegEx {
}
};
let regex = match Regex::new(&pattern) {
let mut builder = RegexBuilder::new(&pattern);
builder
.unicode(s.get::<bool>("unicode").unwrap_or(true))
.case_insensitive(s.get::<bool>("case-insensitive").unwrap_or(false))
.multi_line(s.get::<bool>("multi-line").unwrap_or(false))
.dot_matches_new_line(
s.get::<bool>("dot-matches-new-line").unwrap_or(false),
)
.crlf(s.get::<bool>("crlf").unwrap_or(false))
.line_terminator(s.get::<u8>("line-terminator").unwrap_or(b'\n'))
.swap_greed(s.get::<bool>("swap-greed").unwrap_or(false))
.ignore_whitespace(s.get::<bool>("ignore-whitespace").unwrap_or(false))
.octal(s.get::<bool>("octal").unwrap_or(false));
if let Ok(limit) = s.get::<u64>("size-limit") {
builder.size_limit(limit as usize);
}
if let Ok(limit) = s.get::<u64>("dfa-size-limit") {
builder.dfa_size_limit(limit as usize);
}
if let Ok(limit) = s.get::<u32>("nest-limit") {
builder.nest_limit(limit);
}
let regex = match builder.build() {
Ok(regex) => regex,
Err(err) => {
gst::error!(CAT, imp: self, "Failed to compile regex: {:?}", err);

View file

@ -174,7 +174,7 @@ impl TextWrap {
CAT,
imp: self,
"Outputting contents {}, ts: {}, duration: {}",
drained.to_string(),
drained,
state.start_ts.display(),
duration.display(),
);
@ -199,7 +199,7 @@ impl TextWrap {
state.end_ts = buffer.pts();
let words = data.split_whitespace();
let words = data.split_ascii_whitespace();
let mut current_text = state.current_text.to_string();
for word in words {

View file

@ -13,7 +13,7 @@ gst.workspace = true
gst-base.workspace = true
gst-audio.workspace = true
gst-video.workspace = true
gst-plugin-gtk4 = { path = "../../video/gtk4", optional = true }
gst-plugin-gtk4 = { path = "../../video/gtk4", optional = true, version = "0.12" }
gtk = { workspace = true, optional = true }
gio = { workspace = true, optional = true }
parking_lot = "0.12"

View file

@ -534,7 +534,7 @@ impl ObjectImpl for FallbackSrc {
// Called whenever a value of a property is read. It can be called
// at any time from any thread.
#[allow(clippy::blocks_in_if_conditions)]
#[allow(clippy::block_in_conditions)]
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"enable-audio" => {
@ -3237,7 +3237,7 @@ impl FallbackSrc {
});
}
#[allow(clippy::blocks_in_if_conditions)]
#[allow(clippy::block_in_conditions)]
fn schedule_source_restart_timeout(
&self,
state: &mut State,
@ -3400,7 +3400,7 @@ impl FallbackSrc {
source.restart_timeout = Some(timeout);
}
#[allow(clippy::blocks_in_if_conditions)]
#[allow(clippy::block_in_conditions)]
fn have_fallback_activated(&self, state: &State) -> bool {
let mut have_audio = false;
let mut have_video = false;

View file

@ -771,7 +771,7 @@ impl FallbackSwitch {
is_active
);
#[allow(clippy::blocks_in_if_conditions)]
#[allow(clippy::block_in_conditions)]
let output_clockid = if is_active {
pad_state.schedule_clock(
self,

View file

@ -12,7 +12,7 @@ rust-version.workspace = true
gio = { workspace = true, optional = true }
gst.workspace = true
gst-audio.workspace = true
gst-plugin-gtk4 = { path = "../../video/gtk4", optional = true }
gst-plugin-gtk4 = { path = "../../video/gtk4", optional = true, version = "0.12" }
gtk = { workspace = true, optional = true }
num-rational = { version = "0.4", default-features = false, features = [] }
once_cell.workspace = true

View file

@ -75,16 +75,22 @@ fn test_video_singlesegment() {
}
#[test]
// FIXME: racy: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/328
#[ignore]
fn test_audio_singlesegment() {
test_audio(true);
}
#[test]
// FIXME: racy: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/357
#[ignore]
fn test_video_nonsinglesegment() {
test_video(false);
}
#[test]
// FIXME: racy: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/328
#[ignore]
fn test_audio_nonsinglesegment() {
test_audio(false);
}

View file

@ -12,7 +12,7 @@ rust-version.workspace = true
gst.workspace = true
gst-audio.workspace = true
gst-video.workspace = true
gst-plugin-gtk4 = { path = "../../video/gtk4", optional = true }
gst-plugin-gtk4 = { path = "../../video/gtk4", optional = true, version = "0.12" }
gtk = { workspace = true, optional = true }
gio = { workspace = true, optional = true }
parking_lot = "0.12"

View file

@ -693,7 +693,7 @@ impl ToggleRecord {
}
}
#[allow(clippy::blocks_in_if_conditions)]
#[allow(clippy::block_in_conditions)]
fn handle_secondary_stream<T: HandleData>(
&self,
pad: &gst::Pad,

View file

@ -979,6 +979,8 @@ fn test_two_stream_close_open_nonlivein_nonliveout() {
}
#[test]
// FIXME: racy: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/513
#[ignore]
fn test_two_stream_close_open_nonlivein_liveout() {
init();

View file

@ -1,6 +1,6 @@
[package]
name = "gst-plugin-version-helper"
version = "0.8.0"
version = "0.8.2"
authors = ["Sajeer Ahamed <ahamedsajeer.15.15@cse.mrt.ac.lk>",
"Sebastian Dröge <sebastian@centricular.com>"]
categories = ["development-tools"]
@ -14,4 +14,4 @@ rust-version = "1.69"
[dependencies]
chrono = { version = "0.4.23", default-features = false, features = ["std", "clock"] }
toml_edit = { version = "0.22", default-features = false, features = ["parse"] }
toml_edit = { version = "0.22.8", default-features = false, features = ["parse"] }

View file

@ -95,7 +95,7 @@ fn cargo_metadata_release_date(crate_dir: &path::Path) -> Option<chrono::DateTim
let mut contents = String::new();
file.read_to_string(&mut contents).ok()?;
let doc = contents.parse::<toml_edit::Document>().ok()?;
let doc = contents.parse::<toml_edit::DocumentMut>().ok()?;
let release_date = doc
.get("package")
.and_then(|package| package.as_table_like())

View file

@ -378,6 +378,10 @@ impl SccParse {
// Still need to scan lines to find the first buffer
if state.seeking {
// Remember this timecode in order to fallback to this one
// if invalid timecode is detected during scanning
state.last_timecode = Some(timecode);
drop(state);
return Ok(self.state.lock().unwrap());
}

View file

@ -61,6 +61,7 @@ pub(crate) static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
#[derive(Default)]
pub struct PaintableSink {
paintable: Mutex<Option<ThreadGuard<Paintable>>>,
window: Mutex<Option<ThreadGuard<gtk::Window>>>,
info: Mutex<Option<gst_video::VideoInfo>>,
sender: Mutex<Option<async_channel::Sender<SinkEvent>>>,
pending_frame: Mutex<Option<Frame>>,
@ -231,6 +232,18 @@ impl ElementImpl for PaintableSink {
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
match transition {
gst::StateChange::NullToReady => {
let create_window = glib::program_name().as_deref() == Some("gst-launch-1.0")
|| std::env::var("GST_GTK4_WINDOW").as_deref() == Ok("1");
if create_window {
let res = utils::invoke_on_main_thread(gtk::init);
if let Err(err) = res {
gst::error!(CAT, imp: self, "Failed to create initialize GTK: {err}");
return Err(gst::StateChangeError);
}
}
let mut paintable = self.paintable.lock().unwrap();
if paintable.is_none() {
@ -273,6 +286,10 @@ impl ElementImpl for PaintableSink {
);
}
}
if create_window {
self.create_window();
}
}
_ => (),
}
@ -294,6 +311,17 @@ impl ElementImpl for PaintableSink {
}
});
}
gst::StateChange::ReadyToNull => {
let mut window_guard = self.window.lock().unwrap();
if let Some(window) = window_guard.take() {
drop(window_guard);
glib::MainContext::default().invoke(move || {
let window = window.get_ref();
window.close();
});
}
}
_ => (),
}
@ -520,6 +548,46 @@ impl PaintableSink {
.replace(tmp_caps);
}
fn create_window(&self) {
let self_ = self.to_owned();
glib::MainContext::default().invoke(move || {
let mut window_guard = self_.window.lock().unwrap();
if window_guard.is_some() {
return;
}
let paintable = match &*self_.paintable.lock().unwrap() {
Some(paintable) => paintable.get_ref().clone(),
None => return,
};
let window = gtk::Window::new();
let picture = gtk::Picture::new();
picture.set_paintable(Some(&paintable));
window.set_child(Some(&picture));
window.set_default_size(640, 480);
window.connect_close_request({
let self_ = self_.clone();
move |_window| {
if self_.window.lock().unwrap().is_some() {
gst::element_imp_error!(
self_,
gst::ResourceError::NotFound,
("Output window was closed")
);
}
glib::Propagation::Proceed
}
});
window.show();
*window_guard = Some(ThreadGuard::new(window));
});
}
fn create_paintable(&self, paintable_storage: &mut MutexGuard<Option<ThreadGuard<Paintable>>>) {
#[cfg(any(target_os = "macos", target_os = "windows", feature = "gst-gl"))]
{

View file

@ -220,7 +220,9 @@ impl PaintableImpl for Paintable {
let texture_width = *paintable_width * scale_x as f32;
let texture_height = *paintable_height * scale_y as f32;
let bounds = graphene::Rect::new(*x, *y, texture_width, texture_height);
let x = *x * scale_x as f32;
let y = *y * scale_y as f32;
let bounds = graphene::Rect::new(x, y, texture_width, texture_height);
// Only premultiply GL textures that expect to be in premultiplied RGBA format.
//