gst-plugins-rs/utils/livesync/src/livesync/imp.rs
Jan Alexander Steffens (heftig) 33696a8aed livesync: Only resend segment if not in single-segment mode
In single-segment mode, the outgoing segment does not change when the
incoming segment changes. We only need to resend the segment if we got
flushed or deactivated.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1069>
2023-01-30 15:37:00 +00:00

1205 lines
39 KiB
Rust

// Copyright (C) 2022 LTN Global Communications, Inc.
// Contact: Jan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::{
glib::{self, translate::IntoGlib},
prelude::*,
subclass::prelude::*,
};
use once_cell::sync::Lazy;
use parking_lot::{Condvar, Mutex, MutexGuard};
use std::sync::mpsc;
/// Offset for the segment in single-segment mode, to handle negative DTS
const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000);
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"livesync",
gst::DebugColorFlags::empty(),
Some("debug category for the livesync element"),
)
});
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BufferLateness {
OnTime,
LateUnderThreshold,
LateOverThreshold,
}
#[derive(Debug)]
enum Item {
Buffer(gst::Buffer, BufferLateness),
Event(gst::Event),
// SAFETY: Item needs to wait until the query and the receiver has returned
Query(std::ptr::NonNull<gst::QueryRef>, mpsc::SyncSender<bool>),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Timestamps {
start: gst::ClockTime,
end: gst::ClockTime,
}
// SAFETY: Need to be able to pass *mut gst::QueryRef
unsafe impl Send for Item {}
#[derive(Debug)]
pub struct LiveSync {
state: Mutex<State>,
cond: Condvar,
sinkpad: gst::Pad,
srcpad: gst::Pad,
}
#[derive(Debug)]
struct State {
latency: gst::ClockTime,
late_threshold: Option<gst::ClockTime>,
upstream_latency: Option<gst::ClockTime>,
fallback_duration: gst::ClockTime,
eos: bool,
segment: Option<gst::FormattedSegment<gst::ClockTime>>,
srcresult: Result<gst::FlowSuccess, gst::FlowError>,
playing: bool,
sent_segment: bool,
clock_id: Option<gst::SingleShotClockId>,
in_caps: Option<gst::Caps>,
in_audio_info: Option<gst_audio::AudioInfo>,
out_audio_info: Option<gst_audio::AudioInfo>,
in_item: Option<Item>,
out_buffer: Option<gst::Buffer>,
in_timestamp: Option<Timestamps>,
out_timestamp: Option<Timestamps>,
num_in: u64,
num_drop: u64,
num_out: u64,
num_duplicate: u64,
single_segment: bool,
}
const PROP_LATENCY: &str = "latency";
const PROP_LATE_THRESHOLD: &str = "late-threshold";
const PROP_IN: &str = "in";
const PROP_DROP: &str = "drop";
const PROP_OUT: &str = "out";
const PROP_DUPLICATE: &str = "duplicate";
const PROP_SINGLE_SEGMENT: &str = "single-segment";
const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::ZERO;
const DEFAULT_DURATION: gst::ClockTime = gst::ClockTime::from_mseconds(100);
const MINIMUM_LATE_THRESHOLD: gst::ClockTime = gst::ClockTime::ZERO;
const DEFAULT_LATE_THRESHOLD: Option<gst::ClockTime> = Some(gst::ClockTime::from_seconds(2));
impl Default for State {
fn default() -> Self {
Self {
latency: DEFAULT_LATENCY,
late_threshold: DEFAULT_LATE_THRESHOLD,
upstream_latency: None,
fallback_duration: DEFAULT_DURATION,
eos: false,
segment: None,
srcresult: Err(gst::FlowError::Flushing),
playing: false,
sent_segment: false,
clock_id: None,
in_caps: None,
in_audio_info: None,
out_audio_info: None,
in_item: None,
out_buffer: None,
in_timestamp: None,
out_timestamp: None,
num_in: 0,
num_drop: 0,
num_out: 0,
num_duplicate: 0,
single_segment: false,
}
}
}
#[glib::object_subclass]
impl ObjectSubclass for LiveSync {
const NAME: &'static str = "GstLiveSync";
type Type = super::LiveSync;
type ParentType = gst::Element;
fn with_class(class: &Self::Class) -> Self {
let sinkpad =
gst::Pad::builder_with_template(&class.pad_template("sink").unwrap(), Some("sink"))
.activatemode_function(|pad, parent, mode, active| {
Self::catch_panic_pad_function(
parent,
|| Err(gst::loggable_error!(CAT, "sink_activate_mode panicked")),
|livesync| livesync.sink_activate_mode(pad, mode, active),
)
})
.event_function(|pad, parent, event| {
Self::catch_panic_pad_function(
parent,
|| false,
|livesync| livesync.sink_event(pad, event),
)
})
.query_function(|pad, parent, query| {
Self::catch_panic_pad_function(
parent,
|| false,
|livesync| livesync.sink_query(pad, query),
)
})
.chain_function(|pad, parent, buffer| {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|livesync| livesync.sink_chain(pad, buffer),
)
})
.flags(
gst::PadFlags::PROXY_CAPS
| gst::PadFlags::PROXY_ALLOCATION
| gst::PadFlags::PROXY_SCHEDULING,
)
.build();
let srcpad =
gst::Pad::builder_with_template(&class.pad_template("src").unwrap(), Some("src"))
.activatemode_function(|pad, parent, mode, active| {
Self::catch_panic_pad_function(
parent,
|| Err(gst::loggable_error!(CAT, "src_activate_mode panicked")),
|livesync| livesync.src_activate_mode(pad, mode, active),
)
})
.event_function(|pad, parent, event| {
Self::catch_panic_pad_function(
parent,
|| false,
|livesync| livesync.src_event(pad, event),
)
})
.query_function(|pad, parent, query| {
Self::catch_panic_pad_function(
parent,
|| false,
|livesync| livesync.src_query(pad, query),
)
})
.flags(
gst::PadFlags::PROXY_CAPS
| gst::PadFlags::PROXY_ALLOCATION
| gst::PadFlags::PROXY_SCHEDULING,
)
.build();
Self {
state: Default::default(),
cond: Condvar::new(),
sinkpad,
srcpad,
}
}
}
impl ObjectImpl for LiveSync {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<[glib::ParamSpec; 7]> = Lazy::new(|| {
[
glib::ParamSpecUInt64::builder(PROP_LATENCY)
.nick("Latency")
.blurb(
"Additional latency to allow upstream to take longer to \
produce buffers for the current position (in nanoseconds)",
)
.maximum(i64::MAX as u64)
.default_value(DEFAULT_LATENCY.into_glib())
.mutable_playing()
.build(),
glib::ParamSpecUInt64::builder(PROP_LATE_THRESHOLD)
.nick("Late threshold")
.blurb(
"Maximum time spent (in nanoseconds) before \
accepting one late buffer; -1 = never",
)
.minimum(MINIMUM_LATE_THRESHOLD.into_glib())
.default_value(DEFAULT_LATE_THRESHOLD.into_glib())
.mutable_playing()
.build(),
glib::ParamSpecUInt64::builder(PROP_IN)
.nick("Frames input")
.blurb("Number of incoming frames accepted")
.read_only()
.build(),
glib::ParamSpecUInt64::builder(PROP_DROP)
.nick("Frames dropped")
.blurb("Number of incoming frames dropped")
.read_only()
.build(),
glib::ParamSpecUInt64::builder(PROP_OUT)
.nick("Frames output")
.blurb("Number of outgoing frames produced")
.read_only()
.build(),
glib::ParamSpecUInt64::builder(PROP_DUPLICATE)
.nick("Frames duplicated")
.blurb("Number of outgoing frames duplicated")
.read_only()
.build(),
glib::ParamSpecBoolean::builder(PROP_SINGLE_SEGMENT)
.nick("Single segment")
.blurb("Timestamp buffers and eat segments so as to appear as one segment")
.mutable_ready()
.build(),
]
});
PROPERTIES.as_ref()
}
fn constructed(&self) {
self.parent_constructed();
let obj = self.obj();
obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.srcpad).unwrap();
obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut state = self.state.lock();
match pspec.name() {
PROP_LATENCY => {
state.latency = value.get().unwrap();
state.update_fallback_duration();
let _ = self.obj().post_message(gst::message::Latency::new());
}
PROP_LATE_THRESHOLD => {
state.late_threshold = value.get().unwrap();
}
PROP_SINGLE_SEGMENT => {
state.single_segment = value.get().unwrap();
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let state = self.state.lock();
match pspec.name() {
PROP_LATENCY => state.latency.to_value(),
PROP_LATE_THRESHOLD => state.late_threshold.to_value(),
PROP_IN => state.num_in.to_value(),
PROP_DROP => state.num_drop.to_value(),
PROP_OUT => state.num_out.to_value(),
PROP_DUPLICATE => state.num_duplicate.to_value(),
PROP_SINGLE_SEGMENT => state.single_segment.to_value(),
_ => unimplemented!(),
}
}
}
impl GstObjectImpl for LiveSync {}
impl ElementImpl for LiveSync {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"Live Synchronizer",
"Filter",
"Outputs livestream, inserting gap frames when input lags",
"Jan Alexander Steffens (heftig) <jan.steffens@ltnglobal.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<[gst::PadTemplate; 2]> = Lazy::new(|| {
let caps = gst::Caps::new_any();
[
gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap(),
gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap(),
]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, imp: self, "Changing state {:?}", transition);
if transition == gst::StateChange::PausedToPlaying {
let mut state = self.state.lock();
state.playing = true;
self.cond.notify_all();
}
let success = self.parent_change_state(transition)?;
if transition == gst::StateChange::PlayingToPaused {
let mut state = self.state.lock();
state.playing = false;
}
match (transition, success) {
(
gst::StateChange::ReadyToPaused | gst::StateChange::PlayingToPaused,
gst::StateChangeSuccess::Success,
) => Ok(gst::StateChangeSuccess::NoPreroll),
(_, s) => Ok(s),
}
}
fn provide_clock(&self) -> Option<gst::Clock> {
Some(gst::SystemClock::obtain())
}
}
impl State {
/// Calculate the running time the buffer covers, including latency
fn ts_range(&self, buf: &gst::BufferRef) -> Option<Timestamps> {
let mut timestamp_start = buf.dts_or_pts()?;
if !self.single_segment {
timestamp_start = self
.segment
.as_ref()
.unwrap()
.to_running_time(timestamp_start)
.unwrap_or(gst::ClockTime::ZERO);
timestamp_start += self.latency + self.upstream_latency.unwrap();
} else {
timestamp_start += self.upstream_latency.unwrap();
timestamp_start = timestamp_start.saturating_sub(SEGMENT_OFFSET);
}
Some(Timestamps {
start: timestamp_start,
end: timestamp_start + buf.duration().unwrap(),
})
}
fn update_fallback_duration(&mut self) {
self.fallback_duration = self
// First, try 1/framerate from the caps
.in_caps
.as_ref()
.and_then(|c| c.structure(0))
.filter(|s| s.name().starts_with("video/"))
.and_then(|s| s.get::<gst::Fraction>("framerate").ok())
.and_then(|framerate| {
gst::ClockTime::SECOND
.mul_div_round(framerate.numer() as u64, framerate.denom() as u64)
})
.filter(|&dur| dur > 8.mseconds() && dur < 10.seconds())
// Otherwise, half the configured latency
.or_else(|| Some(self.latency / 2))
// In any case, don't allow a zero duration
.filter(|&dur| dur > gst::ClockTime::ZERO)
// Safe default
.unwrap_or(DEFAULT_DURATION);
}
}
impl LiveSync {
fn sink_activate_mode(
&self,
pad: &gst::Pad,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
if mode != gst::PadMode::Push {
return Err(gst::loggable_error!(CAT, "Wrong scheduling mode"));
}
if active {
let mut state = self.state.lock();
state.srcresult = Ok(gst::FlowSuccess::Ok);
state.eos = false;
state.in_timestamp = None;
state.num_in = 0;
state.num_drop = 0;
state.segment = None;
} else {
{
let mut state = self.state.lock();
state.srcresult = Err(gst::FlowError::Flushing);
state.out_buffer = None;
state.out_audio_info = None;
if let Some(clock_id) = state.clock_id.take() {
clock_id.unschedule();
}
self.cond.notify_all();
}
let lock = pad.stream_lock();
{
let mut state = self.state.lock();
state.in_caps = None;
state.in_audio_info = None;
state.in_item = None;
state.update_fallback_duration();
}
drop(lock);
}
Ok(())
}
fn src_activate_mode(
&self,
pad: &gst::Pad,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
if mode != gst::PadMode::Push {
return Err(gst::loggable_error!(CAT, "Wrong scheduling mode"));
}
if active {
let ret;
{
let mut state = self.state.lock();
state.srcresult = Ok(gst::FlowSuccess::Ok);
state.sent_segment = false;
state.out_timestamp = None;
state.num_out = 0;
state.num_duplicate = 0;
ret = self.start_src_task().map_err(Into::into);
}
ret
} else {
{
let mut state = self.state.lock();
state.srcresult = Err(gst::FlowError::Flushing);
state.out_buffer = None;
state.out_audio_info = None;
if let Some(clock_id) = state.clock_id.take() {
clock_id.unschedule();
}
self.cond.notify_all();
}
pad.stop_task().map_err(Into::into)
}
}
fn sink_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool {
{
let state = self.state.lock();
if state.single_segment {
let event = event.make_mut();
let latency = state.latency.nseconds() as i64;
event.set_running_time_offset(event.running_time_offset() + latency);
}
}
match event.view() {
gst::EventView::FlushStart(_) => {
let ret = self.srcpad.push_event(event);
{
let mut state = self.state.lock();
state.srcresult = Err(gst::FlowError::Flushing);
if let Some(clock_id) = state.clock_id.take() {
clock_id.unschedule();
}
self.cond.notify_all();
}
let _ = self.srcpad.pause_task();
return ret;
}
gst::EventView::FlushStop(_) => {
let ret = self.srcpad.push_event(event);
let mut state = self.state.lock();
state.srcresult = Ok(gst::FlowSuccess::Ok);
state.eos = false;
state.sent_segment = false;
state.segment = None;
state.in_caps = None;
state.in_audio_info = None;
state.out_audio_info = None;
state.in_item = None;
state.out_buffer = None;
state.update_fallback_duration();
let _ = self.start_src_task();
return ret;
}
gst::EventView::StreamStart(_) => {
let mut state = self.state.lock();
state.srcresult = Ok(gst::FlowSuccess::Ok);
state.eos = false;
}
gst::EventView::Segment(e) => {
let segment = match e.segment().downcast_ref() {
Some(s) => s,
None => {
gst::error!(CAT, imp: self, "Got non-TIME segment");
return false;
}
};
let mut state = self.state.lock();
state.segment = Some(segment.clone());
if !state.single_segment {
state.sent_segment = false;
}
return true;
}
gst::EventView::Gap(_) => {
gst::debug!(CAT, imp: self, "Got gap event");
return true;
}
gst::EventView::Eos(_) => {
let mut state = self.state.lock();
if let Err(err) = state.srcresult {
if matches!(err, gst::FlowError::Flushing | gst::FlowError::Eos) {
let err = state.srcresult.unwrap_err();
gst::element_imp_error!(
self,
gst::StreamError::Failed,
("Internal data flow error."),
["streaming task paused, reason {} ({:?})", err, err]
);
}
}
state.eos = true;
}
gst::EventView::Caps(c) => {
let caps = c.caps_owned();
let audio_info = match caps
.structure(0)
.unwrap()
.has_name("audio/x-raw")
.then(|| gst_audio::AudioInfo::from_caps(&caps))
.transpose()
{
Ok(ai) => ai,
Err(e) => {
gst::error!(CAT, imp: self, "Failed to parse audio caps: {}", e);
return false;
}
};
let mut state = self.state.lock();
state.in_caps = Some(caps);
state.in_audio_info = audio_info;
state.update_fallback_duration();
return true;
}
_ => {}
}
if event.is_serialized() {
let mut state = self.state.lock();
while state.srcresult.is_ok() && state.in_item.is_some() {
self.cond.wait(&mut state);
}
if state.srcresult.is_err() {
return false;
}
gst::trace!(CAT, imp: self, "Queueing {:?}", event);
state.in_item = Some(Item::Event(event));
self.cond.notify_all();
true
} else {
gst::Pad::event_default(pad, Some(&*self.obj()), event)
}
}
fn src_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool {
{
let state = self.state.lock();
if state.single_segment {
let event = event.make_mut();
let latency = state.latency.nseconds() as i64;
event.set_running_time_offset(event.running_time_offset() - latency);
}
}
match event.view() {
gst::EventView::Reconfigure(_) => {
{
let mut state = self.state.lock();
if state.srcresult == Err(gst::FlowError::NotLinked) {
state.srcresult = Ok(gst::FlowSuccess::Ok);
let _ = self.start_src_task();
}
}
self.sinkpad.push_event(event)
}
_ => gst::Pad::event_default(pad, Some(&*self.obj()), event),
}
}
fn sink_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
if query.is_serialized() {
let (sender, receiver) = mpsc::sync_channel(1);
let mut state = self.state.lock();
while state.srcresult.is_ok() && state.in_item.is_some() {
self.cond.wait(&mut state);
}
if state.srcresult.is_err() {
return false;
}
gst::trace!(CAT, imp: self, "Queueing {:?}", query);
state.in_item = Some(Item::Query(std::ptr::NonNull::from(query), sender));
self.cond.notify_all();
drop(state);
receiver.recv().unwrap_or(false)
} else {
gst::Pad::query_default(pad, Some(&*self.obj()), query)
}
}
fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool {
match query.view_mut() {
gst::QueryViewMut::Latency(_) => {
if !gst::Pad::query_default(pad, Some(&*self.obj()), query) {
return false;
}
let q = match query.view_mut() {
gst::QueryViewMut::Latency(q) => q,
_ => unreachable!(),
};
let mut state = self.state.lock();
let latency = state.latency;
let (_live, min, max) = q.result();
q.set(true, min + latency, max.map(|max| max + latency));
state.upstream_latency = Some(min);
true
}
_ => gst::Pad::query_default(pad, Some(&*self.obj()), query),
}
}
fn sink_chain(
&self,
_pad: &gst::Pad,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::trace!(CAT, imp: self, "incoming {:?}", buffer);
let mut state = self.state.lock();
if state.upstream_latency.is_none() {
gst::debug!(CAT, imp: self, "Have no upstream latency yet, querying");
let mut q = gst::query::Latency::new();
if MutexGuard::unlocked(&mut state, || self.sinkpad.peer_query(&mut q)) {
let (live, min, max) = q.result();
gst::debug!(
CAT,
imp: self,
"Latency query response: live {} min {} max {}",
live,
min,
max.display()
);
state.upstream_latency = Some(min);
} else {
gst::warning!(
CAT,
imp: self,
"Can't query upstream latency -- assuming zero"
);
}
}
while state.srcresult.is_ok() && state.in_item.is_some() {
self.cond.wait(&mut state);
}
state.srcresult?;
let buf_mut = buffer.make_mut();
if buf_mut.pts().is_none() {
gst::warning!(CAT, imp: self, "incoming buffer has no timestamps");
}
if let Some(audio_info) = &state.in_audio_info {
let buf_duration = buf_mut.duration().unwrap_or_default();
if let Some(calc_duration) = audio_info
.convert::<Option<gst::ClockTime>>(Some(gst::format::Bytes::from_usize(
buf_mut.size(),
)))
.flatten()
{
let diff = if buf_duration < calc_duration {
calc_duration - buf_duration
} else {
buf_duration - calc_duration
};
if diff.nseconds() > 1 {
gst::warning!(
CAT,
imp: self,
"Correcting duration on audio buffer from {} to {}",
buf_duration,
calc_duration,
);
buf_mut.set_duration(calc_duration);
}
} else {
gst::debug!(
CAT,
imp: self,
"Failed to calculate duration of {:?}",
buf_mut,
);
}
}
if state.single_segment {
// At this stage we should really really have a segment
let segment = state.segment.as_ref().ok_or(gst::FlowError::Error)?;
let dts = segment
.to_running_time_full(buf_mut.dts())
.map(|r| r + SEGMENT_OFFSET)
.and_then(|r| r.positive());
let pts = segment
.to_running_time_full(buf_mut.pts())
.map(|r| r + SEGMENT_OFFSET)
.and_then(|r| r.positive())
.or_else(|| {
self.obj()
.current_running_time()
.map(|r| r + SEGMENT_OFFSET)
});
buf_mut.set_dts(dts.map(|t| t + state.latency));
buf_mut.set_pts(pts.map(|t| t + state.latency));
}
if buf_mut.duration().is_none() {
gst::debug!(CAT, imp: self, "incoming buffer without duration");
buf_mut.set_duration(Some(state.fallback_duration));
}
if state
.out_buffer
.as_ref()
.map_or(false, |b| b.flags().contains(gst::BufferFlags::GAP))
{
// We are done bridging a gap, so mark it as DISCONT instead
buf_mut.unset_flags(gst::BufferFlags::GAP);
buf_mut.set_flags(gst::BufferFlags::DISCONT);
}
let mut timestamp = state.ts_range(buf_mut);
let lateness = self.buffer_is_backwards(&state, timestamp);
match lateness {
BufferLateness::OnTime => {}
BufferLateness::LateUnderThreshold => {
gst::debug!(CAT, imp: self, "discarding late {:?}", buf_mut);
state.num_drop += 1;
return Ok(gst::FlowSuccess::Ok);
}
BufferLateness::LateOverThreshold => {
gst::debug!(CAT, imp: self, "accepting late {:?}", buf_mut);
let prev = state.out_buffer.as_ref().unwrap();
let prev_duration = prev.duration().unwrap();
if let Some(audio_info) = &state.in_audio_info {
let mut map_info = buf_mut.map_writable().map_err(|e| {
gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
gst::FlowError::Error
})?;
audio_info
.format_info()
.fill_silence(map_info.as_mut_slice());
} else {
buf_mut.set_duration(Some(state.fallback_duration));
}
buf_mut.set_dts(prev.dts().map(|t| t + prev_duration));
buf_mut.set_pts(prev.pts().map(|t| t + prev_duration));
buf_mut.set_flags(gst::BufferFlags::GAP);
timestamp = state.ts_range(buf_mut);
}
}
gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness);
state.in_item = Some(Item::Buffer(buffer, lateness));
state.in_timestamp = timestamp;
state.num_in += 1;
self.cond.notify_all();
Ok(gst::FlowSuccess::Ok)
}
fn start_src_task(&self) -> Result<(), glib::BoolError> {
self.srcpad.start_task({
let pad = self.srcpad.downgrade();
move || {
let pad = pad.upgrade().unwrap();
let parent = pad.parent_element().unwrap();
let livesync = parent.downcast_ref::<super::LiveSync>().unwrap();
let ret = livesync.imp().src_loop(&pad);
if !ret {
gst::log!(CAT, obj: &parent, "Loop stopping");
let _ = pad.pause_task();
}
}
})
}
fn src_loop(&self, pad: &gst::Pad) -> bool {
let mut err = match self.src_loop_inner() {
Ok(_) => return true,
Err(e) => e,
};
let eos;
{
let mut state = self.state.lock();
match state.srcresult {
// Can be set to Flushing by another thread
Err(e) => err = e,
// Communicate our flow return
Ok(_) => state.srcresult = Err(err),
}
eos = state.eos;
state.clock_id = None;
self.cond.notify_all();
}
if eos && !matches!(err, gst::FlowError::Flushing | gst::FlowError::Eos) {
gst::element_imp_error!(
self,
gst::StreamError::Failed,
("Internal data flow error."),
["streaming task paused, reason {} ({:?})", err, err]
);
pad.push_event(gst::event::Eos::new());
}
false
}
fn src_loop_inner(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock();
while state.srcresult.is_ok()
&& (!state.playing || (state.in_item.is_none() && state.out_buffer.is_none()))
{
self.cond.wait(&mut state);
}
state.srcresult?;
gst::trace!(CAT, imp: self, "Unqueueing {:?}", state.in_item);
let in_buffer = match state.in_item.take() {
None => None,
Some(Item::Buffer(buffer, lateness)) => {
if self.buffer_is_early(&state, state.in_timestamp) {
// Try this buffer again on the next iteration
state.in_item = Some(Item::Buffer(buffer, lateness));
None
} else {
Some((buffer, lateness))
}
}
Some(Item::Event(event)) => {
self.cond.notify_all();
drop(state);
self.srcpad.push_event(event);
return Ok(gst::FlowSuccess::Ok);
}
Some(Item::Query(mut query, sender)) => {
self.cond.notify_all();
drop(state);
// SAFETY: The other thread is waiting for us to handle the query
let res = self.srcpad.peer_query(unsafe { query.as_mut() });
sender.send(res).ok();
return Ok(gst::FlowSuccess::Ok);
}
};
let (duplicate, caps) = if let Some((buffer, lateness)) = in_buffer {
let caps = state.in_caps.take();
state.out_buffer = Some(buffer);
state.out_timestamp = state.in_timestamp;
if caps.is_some() {
state.out_audio_info = state.in_audio_info.clone();
}
self.cond.notify_all();
(lateness != BufferLateness::OnTime, caps)
} else {
// Work around borrow checker
let State {
fallback_duration,
out_buffer: ref mut buffer,
out_audio_info: ref audio_info,
..
} = *state;
gst::debug!(CAT, imp: self, "repeating {:?}", buffer);
let buffer = buffer.as_mut().unwrap().make_mut();
let prev_duration = buffer.duration().unwrap();
if let Some(audio_info) = audio_info {
if !buffer.flags().contains(gst::BufferFlags::GAP) {
let mut map_info = buffer.map_writable().map_err(|e| {
gst::error!(CAT, imp: self, "Failed to map buffer: {}", e);
gst::FlowError::Error
})?;
audio_info
.format_info()
.fill_silence(map_info.as_mut_slice());
}
} else {
buffer.set_duration(Some(fallback_duration));
}
buffer.set_dts(buffer.dts().map(|t| t + prev_duration));
buffer.set_pts(buffer.pts().map(|t| t + prev_duration));
buffer.set_flags(gst::BufferFlags::GAP);
buffer.unset_flags(gst::BufferFlags::DISCONT);
state.out_timestamp = state.ts_range(state.out_buffer.as_ref().unwrap());
(true, None)
};
let buffer = state.out_buffer.clone().unwrap();
let sync_ts = state
.out_timestamp
.map_or(gst::ClockTime::ZERO, |t| t.start);
if let Some(caps) = caps {
gst::debug!(CAT, imp: self, "Sending new caps: {}", caps);
let event = gst::event::Caps::new(&caps);
MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
state.srcresult?;
}
if !state.sent_segment {
let event = if state.single_segment {
// Create live segment
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
segment.set_start(sync_ts + SEGMENT_OFFSET);
segment.set_base(sync_ts);
segment.set_time(sync_ts);
segment.set_position(sync_ts + SEGMENT_OFFSET);
gst::debug!(CAT, imp: self, "Sending new segment: {:?}", segment);
gst::event::Segment::new(&segment)
} else {
let segment = state.segment.as_ref().unwrap();
gst::debug!(CAT, imp: self, "Forwarding segment: {:?}", segment);
gst::event::Segment::new(segment)
};
MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
state.srcresult?;
state.sent_segment = true;
}
{
let element = self.obj();
let base_time = element.base_time().ok_or_else(|| {
gst::error!(CAT, imp: self, "Missing base time");
gst::FlowError::Flushing
})?;
let clock = element.clock().ok_or_else(|| {
gst::error!(CAT, imp: self, "Missing clock");
gst::FlowError::Flushing
})?;
let clock_id = clock.new_single_shot_id(base_time + sync_ts);
state.clock_id = Some(clock_id.clone());
gst::trace!(
CAT,
imp: self,
"Waiting for clock to reach {}",
clock_id.time(),
);
let (res, _) = MutexGuard::unlocked(&mut state, || clock_id.wait());
gst::trace!(CAT, imp: self, "Clock returned {res:?}",);
if res == Err(gst::ClockError::Unscheduled) {
return Err(gst::FlowError::Flushing);
}
state.srcresult?;
state.clock_id = None;
}
state.num_out += 1;
if duplicate {
state.num_duplicate += 1;
}
drop(state);
gst::trace!(CAT, imp: self, "Pushing {buffer:?}");
self.srcpad.push(buffer)
}
fn buffer_is_backwards(&self, state: &State, timestamp: Option<Timestamps>) -> BufferLateness {
let timestamp = match timestamp {
Some(t) => t,
None => return BufferLateness::OnTime,
};
let out_timestamp = match state.out_timestamp {
Some(t) => t,
None => return BufferLateness::OnTime,
};
if timestamp.end > out_timestamp.end {
return BufferLateness::OnTime;
}
gst::debug!(
CAT,
imp: self,
"Timestamp regresses: buffer ends at {}, expected {}",
timestamp.end,
out_timestamp.end,
);
let late_threshold = match state.late_threshold {
Some(gst::ClockTime::ZERO) => return BufferLateness::LateOverThreshold,
Some(t) => t,
None => return BufferLateness::LateUnderThreshold,
};
let in_timestamp = match state.in_timestamp {
Some(t) => t,
None => return BufferLateness::LateUnderThreshold,
};
if timestamp.start > in_timestamp.end + late_threshold {
BufferLateness::LateOverThreshold
} else {
BufferLateness::LateUnderThreshold
}
}
fn buffer_is_early(&self, state: &State, timestamp: Option<Timestamps>) -> bool {
let timestamp = match timestamp {
Some(t) => t,
None => return false,
};
let out_timestamp = match state.out_timestamp {
Some(t) => t,
None => return false,
};
let slack = state
.out_buffer
.as_deref()
.map_or(gst::ClockTime::ZERO, |b| b.duration().unwrap());
if timestamp.start < out_timestamp.end + slack {
return false;
}
gst::debug!(
CAT,
imp: self,
"Timestamp is too early: buffer starts at {}, expected {}",
timestamp.start,
out_timestamp.end,
);
true
}
}