mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-12-23 02:26:35 +00:00
livesync: Fix queueing
The logic of the element requires the next buffer to be available immediately after we are done pushing the previous, otherwise we insert a repeat. Making the src loop handle events and queries broke this, as upstream is almost guaranteed not to deliver a buffer in time if we allow non-buffer items to block upstream's push. To fix this, replace our single-item `Option` with a `VecDeque` that we allow to hold an unlimited number of events or queries, but only one buffer at a time. In addition, the code was confused about the current caps and segment. This wasn't an issue before making the src loop handle events and queries, as only the sinkpad cared about the current segment, using it to buffers received, and only the srcpad cared about the current caps, sending it just before sending the next received buffer. Now the sinkpad cares about caps (through `update_fallback_duration`) and the srcpad cares about the segment (when not in single-segment mode). Fix this by - making `in_caps` always hold the current caps of the sinkpad, - adding `pending_caps`, which is used by the srcpad to store caps to be sent with the next received buffer, - adding `in_segment`, holding the current segment of the sinkpad, - adding `pending_segment`, which is used by the srcpad to store the segment to be sent with the next received buffer, - adding `out_segment`, holding the current segment of the srcpad. Maybe a fix for https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/298. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1082>
This commit is contained in:
parent
3c31c98d95
commit
165b5f8c50
1 changed files with 132 additions and 93 deletions
|
@ -14,7 +14,7 @@ use gst::{
|
||||||
};
|
};
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use parking_lot::{Condvar, Mutex, MutexGuard};
|
use parking_lot::{Condvar, Mutex, MutexGuard};
|
||||||
use std::sync::mpsc;
|
use std::{collections::VecDeque, sync::mpsc};
|
||||||
|
|
||||||
/// Offset for the segment in single-segment mode, to handle negative DTS
|
/// Offset for the segment in single-segment mode, to handle negative DTS
|
||||||
const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000);
|
const SEGMENT_OFFSET: gst::ClockTime = gst::ClockTime::from_seconds(60 * 60 * 1000);
|
||||||
|
@ -67,19 +67,23 @@ struct State {
|
||||||
upstream_latency: Option<gst::ClockTime>,
|
upstream_latency: Option<gst::ClockTime>,
|
||||||
fallback_duration: gst::ClockTime,
|
fallback_duration: gst::ClockTime,
|
||||||
|
|
||||||
|
playing: bool,
|
||||||
eos: bool,
|
eos: bool,
|
||||||
segment: Option<gst::FormattedSegment<gst::ClockTime>>,
|
|
||||||
|
|
||||||
srcresult: Result<gst::FlowSuccess, gst::FlowError>,
|
srcresult: Result<gst::FlowSuccess, gst::FlowError>,
|
||||||
playing: bool,
|
|
||||||
sent_segment: bool,
|
|
||||||
clock_id: Option<gst::SingleShotClockId>,
|
clock_id: Option<gst::SingleShotClockId>,
|
||||||
|
|
||||||
|
in_segment: Option<gst::FormattedSegment<gst::ClockTime>>,
|
||||||
|
pending_segment: Option<gst::FormattedSegment<gst::ClockTime>>,
|
||||||
|
out_segment: Option<gst::FormattedSegment<gst::ClockTime>>,
|
||||||
|
|
||||||
in_caps: Option<gst::Caps>,
|
in_caps: Option<gst::Caps>,
|
||||||
|
pending_caps: Option<gst::Caps>,
|
||||||
in_audio_info: Option<gst_audio::AudioInfo>,
|
in_audio_info: Option<gst_audio::AudioInfo>,
|
||||||
out_audio_info: Option<gst_audio::AudioInfo>,
|
out_audio_info: Option<gst_audio::AudioInfo>,
|
||||||
|
|
||||||
in_item: Option<Item>,
|
queue: VecDeque<Item>,
|
||||||
|
buffer_queued: bool,
|
||||||
out_buffer: Option<gst::Buffer>,
|
out_buffer: Option<gst::Buffer>,
|
||||||
|
|
||||||
in_timestamp: Option<Timestamps>,
|
in_timestamp: Option<Timestamps>,
|
||||||
|
@ -113,16 +117,19 @@ impl Default for State {
|
||||||
late_threshold: DEFAULT_LATE_THRESHOLD,
|
late_threshold: DEFAULT_LATE_THRESHOLD,
|
||||||
upstream_latency: None,
|
upstream_latency: None,
|
||||||
fallback_duration: DEFAULT_DURATION,
|
fallback_duration: DEFAULT_DURATION,
|
||||||
eos: false,
|
|
||||||
segment: None,
|
|
||||||
srcresult: Err(gst::FlowError::Flushing),
|
|
||||||
playing: false,
|
playing: false,
|
||||||
sent_segment: false,
|
eos: false,
|
||||||
|
srcresult: Err(gst::FlowError::Flushing),
|
||||||
clock_id: None,
|
clock_id: None,
|
||||||
|
in_segment: None,
|
||||||
|
pending_segment: None,
|
||||||
|
out_segment: None,
|
||||||
in_caps: None,
|
in_caps: None,
|
||||||
|
pending_caps: None,
|
||||||
in_audio_info: None,
|
in_audio_info: None,
|
||||||
out_audio_info: None,
|
out_audio_info: None,
|
||||||
in_item: None,
|
queue: VecDeque::with_capacity(32),
|
||||||
|
buffer_queued: false,
|
||||||
out_buffer: None,
|
out_buffer: None,
|
||||||
in_timestamp: None,
|
in_timestamp: None,
|
||||||
out_timestamp: None,
|
out_timestamp: None,
|
||||||
|
@ -394,14 +401,15 @@ impl ElementImpl for LiveSync {
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
/// Calculate the running time the buffer covers, including latency
|
/// Calculate the running time the buffer covers, including latency
|
||||||
fn ts_range(&self, buf: &gst::BufferRef) -> Option<Timestamps> {
|
fn ts_range(
|
||||||
|
&self,
|
||||||
|
buf: &gst::BufferRef,
|
||||||
|
segment: &gst::FormattedSegment<gst::ClockTime>,
|
||||||
|
) -> Option<Timestamps> {
|
||||||
let mut timestamp_start = buf.dts_or_pts()?;
|
let mut timestamp_start = buf.dts_or_pts()?;
|
||||||
|
|
||||||
if !self.single_segment {
|
if !self.single_segment {
|
||||||
timestamp_start = self
|
timestamp_start = segment
|
||||||
.segment
|
|
||||||
.as_ref()
|
|
||||||
.unwrap()
|
|
||||||
.to_running_time(timestamp_start)
|
.to_running_time(timestamp_start)
|
||||||
.unwrap_or(gst::ClockTime::ZERO);
|
.unwrap_or(gst::ClockTime::ZERO);
|
||||||
timestamp_start += self.latency + self.upstream_latency.unwrap();
|
timestamp_start += self.latency + self.upstream_latency.unwrap();
|
||||||
|
@ -456,16 +464,17 @@ impl LiveSync {
|
||||||
state.in_timestamp = None;
|
state.in_timestamp = None;
|
||||||
state.num_in = 0;
|
state.num_in = 0;
|
||||||
state.num_drop = 0;
|
state.num_drop = 0;
|
||||||
state.segment = None;
|
state.in_segment = None;
|
||||||
} else {
|
} else {
|
||||||
{
|
{
|
||||||
let mut state = self.state.lock();
|
let mut state = self.state.lock();
|
||||||
state.srcresult = Err(gst::FlowError::Flushing);
|
state.srcresult = Err(gst::FlowError::Flushing);
|
||||||
state.out_buffer = None;
|
|
||||||
state.out_audio_info = None;
|
|
||||||
if let Some(clock_id) = state.clock_id.take() {
|
if let Some(clock_id) = state.clock_id.take() {
|
||||||
clock_id.unschedule();
|
clock_id.unschedule();
|
||||||
}
|
}
|
||||||
|
state.pending_caps = None;
|
||||||
|
state.out_audio_info = None;
|
||||||
|
state.out_buffer = None;
|
||||||
self.cond.notify_all();
|
self.cond.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,7 +483,8 @@ impl LiveSync {
|
||||||
let mut state = self.state.lock();
|
let mut state = self.state.lock();
|
||||||
state.in_caps = None;
|
state.in_caps = None;
|
||||||
state.in_audio_info = None;
|
state.in_audio_info = None;
|
||||||
state.in_item = None;
|
state.queue.clear();
|
||||||
|
state.buffer_queued = false;
|
||||||
state.update_fallback_duration();
|
state.update_fallback_duration();
|
||||||
}
|
}
|
||||||
drop(lock);
|
drop(lock);
|
||||||
|
@ -500,7 +510,8 @@ impl LiveSync {
|
||||||
let mut state = self.state.lock();
|
let mut state = self.state.lock();
|
||||||
|
|
||||||
state.srcresult = Ok(gst::FlowSuccess::Ok);
|
state.srcresult = Ok(gst::FlowSuccess::Ok);
|
||||||
state.sent_segment = false;
|
state.pending_segment = None;
|
||||||
|
state.out_segment = None;
|
||||||
state.out_timestamp = None;
|
state.out_timestamp = None;
|
||||||
state.num_out = 0;
|
state.num_out = 0;
|
||||||
state.num_duplicate = 0;
|
state.num_duplicate = 0;
|
||||||
|
@ -513,11 +524,12 @@ impl LiveSync {
|
||||||
{
|
{
|
||||||
let mut state = self.state.lock();
|
let mut state = self.state.lock();
|
||||||
state.srcresult = Err(gst::FlowError::Flushing);
|
state.srcresult = Err(gst::FlowError::Flushing);
|
||||||
state.out_buffer = None;
|
|
||||||
state.out_audio_info = None;
|
|
||||||
if let Some(clock_id) = state.clock_id.take() {
|
if let Some(clock_id) = state.clock_id.take() {
|
||||||
clock_id.unschedule();
|
clock_id.unschedule();
|
||||||
}
|
}
|
||||||
|
state.pending_caps = None;
|
||||||
|
state.out_audio_info = None;
|
||||||
|
state.out_buffer = None;
|
||||||
self.cond.notify_all();
|
self.cond.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -558,12 +570,15 @@ impl LiveSync {
|
||||||
let mut state = self.state.lock();
|
let mut state = self.state.lock();
|
||||||
state.srcresult = Ok(gst::FlowSuccess::Ok);
|
state.srcresult = Ok(gst::FlowSuccess::Ok);
|
||||||
state.eos = false;
|
state.eos = false;
|
||||||
state.sent_segment = false;
|
state.in_segment = None;
|
||||||
state.segment = None;
|
state.pending_segment = None;
|
||||||
|
state.out_segment = None;
|
||||||
state.in_caps = None;
|
state.in_caps = None;
|
||||||
|
state.pending_caps = None;
|
||||||
state.in_audio_info = None;
|
state.in_audio_info = None;
|
||||||
state.out_audio_info = None;
|
state.out_audio_info = None;
|
||||||
state.in_item = None;
|
state.queue.clear();
|
||||||
|
state.buffer_queued = false;
|
||||||
state.out_buffer = None;
|
state.out_buffer = None;
|
||||||
state.update_fallback_duration();
|
state.update_fallback_duration();
|
||||||
|
|
||||||
|
@ -587,11 +602,7 @@ impl LiveSync {
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut state = self.state.lock();
|
let mut state = self.state.lock();
|
||||||
state.segment = Some(segment.clone());
|
state.in_segment = Some(segment.clone());
|
||||||
if !state.single_segment {
|
|
||||||
state.sent_segment = false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
gst::EventView::Gap(_) => {
|
gst::EventView::Gap(_) => {
|
||||||
|
@ -638,30 +649,25 @@ impl LiveSync {
|
||||||
state.in_caps = Some(caps);
|
state.in_caps = Some(caps);
|
||||||
state.in_audio_info = audio_info;
|
state.in_audio_info = audio_info;
|
||||||
state.update_fallback_duration();
|
state.update_fallback_duration();
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
if event.is_serialized() {
|
if !event.is_serialized() {
|
||||||
let mut state = self.state.lock();
|
return gst::Pad::event_default(pad, Some(&*self.obj()), event);
|
||||||
while state.srcresult.is_ok() && state.in_item.is_some() {
|
|
||||||
self.cond.wait(&mut state);
|
|
||||||
}
|
|
||||||
|
|
||||||
if state.srcresult.is_err() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
gst::trace!(CAT, imp: self, "Queueing {:?}", event);
|
|
||||||
state.in_item = Some(Item::Event(event));
|
|
||||||
self.cond.notify_all();
|
|
||||||
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
gst::Pad::event_default(pad, Some(&*self.obj()), event)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut state = self.state.lock();
|
||||||
|
if state.srcresult.is_err() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
gst::trace!(CAT, imp: self, "Queueing {:?}", event);
|
||||||
|
state.queue.push_back(Item::Event(event));
|
||||||
|
self.cond.notify_all();
|
||||||
|
|
||||||
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
fn src_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool {
|
fn src_event(&self, pad: &gst::Pad, mut event: gst::Event) -> bool {
|
||||||
|
@ -695,16 +701,14 @@ impl LiveSync {
|
||||||
let (sender, receiver) = mpsc::sync_channel(1);
|
let (sender, receiver) = mpsc::sync_channel(1);
|
||||||
|
|
||||||
let mut state = self.state.lock();
|
let mut state = self.state.lock();
|
||||||
while state.srcresult.is_ok() && state.in_item.is_some() {
|
|
||||||
self.cond.wait(&mut state);
|
|
||||||
}
|
|
||||||
|
|
||||||
if state.srcresult.is_err() {
|
if state.srcresult.is_err() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
gst::trace!(CAT, imp: self, "Queueing {:?}", query);
|
gst::trace!(CAT, imp: self, "Queueing {:?}", query);
|
||||||
state.in_item = Some(Item::Query(std::ptr::NonNull::from(query), sender));
|
state
|
||||||
|
.queue
|
||||||
|
.push_back(Item::Query(std::ptr::NonNull::from(query), sender));
|
||||||
self.cond.notify_all();
|
self.cond.notify_all();
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
|
@ -774,7 +778,7 @@ impl LiveSync {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while state.srcresult.is_ok() && state.in_item.is_some() {
|
while state.srcresult.is_ok() && state.buffer_queued {
|
||||||
self.cond.wait(&mut state);
|
self.cond.wait(&mut state);
|
||||||
}
|
}
|
||||||
state.srcresult?;
|
state.srcresult?;
|
||||||
|
@ -819,9 +823,10 @@ impl LiveSync {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// At this stage we should really really have a segment
|
||||||
|
let segment = state.in_segment.as_ref().ok_or(gst::FlowError::Error)?;
|
||||||
|
|
||||||
if state.single_segment {
|
if state.single_segment {
|
||||||
// At this stage we should really really have a segment
|
|
||||||
let segment = state.segment.as_ref().ok_or(gst::FlowError::Error)?;
|
|
||||||
let dts = segment
|
let dts = segment
|
||||||
.to_running_time_full(buf_mut.dts())
|
.to_running_time_full(buf_mut.dts())
|
||||||
.map(|r| r + SEGMENT_OFFSET)
|
.map(|r| r + SEGMENT_OFFSET)
|
||||||
|
@ -855,7 +860,7 @@ impl LiveSync {
|
||||||
buf_mut.set_flags(gst::BufferFlags::DISCONT);
|
buf_mut.set_flags(gst::BufferFlags::DISCONT);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut timestamp = state.ts_range(buf_mut);
|
let mut timestamp = state.ts_range(buf_mut, segment);
|
||||||
let lateness = self.buffer_is_backwards(&state, timestamp);
|
let lateness = self.buffer_is_backwards(&state, timestamp);
|
||||||
match lateness {
|
match lateness {
|
||||||
BufferLateness::OnTime => {}
|
BufferLateness::OnTime => {}
|
||||||
|
@ -889,12 +894,13 @@ impl LiveSync {
|
||||||
buf_mut.set_pts(prev.pts().map(|t| t + prev_duration));
|
buf_mut.set_pts(prev.pts().map(|t| t + prev_duration));
|
||||||
buf_mut.set_flags(gst::BufferFlags::GAP);
|
buf_mut.set_flags(gst::BufferFlags::GAP);
|
||||||
|
|
||||||
timestamp = state.ts_range(buf_mut);
|
timestamp = state.ts_range(buf_mut, state.out_segment.as_ref().unwrap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness);
|
gst::trace!(CAT, imp: self, "Queueing {:?} ({:?})", buffer, lateness);
|
||||||
state.in_item = Some(Item::Buffer(buffer, lateness));
|
state.queue.push_back(Item::Buffer(buffer, lateness));
|
||||||
|
state.buffer_queued = true;
|
||||||
state.in_timestamp = timestamp;
|
state.in_timestamp = timestamp;
|
||||||
state.num_in += 1;
|
state.num_in += 1;
|
||||||
self.cond.notify_all();
|
self.cond.notify_all();
|
||||||
|
@ -958,31 +964,54 @@ impl LiveSync {
|
||||||
fn src_loop_inner(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
|
fn src_loop_inner(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||||
let mut state = self.state.lock();
|
let mut state = self.state.lock();
|
||||||
while state.srcresult.is_ok()
|
while state.srcresult.is_ok()
|
||||||
&& (!state.playing || (state.in_item.is_none() && state.out_buffer.is_none()))
|
&& (!state.playing || (state.queue.is_empty() && state.out_buffer.is_none()))
|
||||||
{
|
{
|
||||||
self.cond.wait(&mut state);
|
self.cond.wait(&mut state);
|
||||||
}
|
}
|
||||||
state.srcresult?;
|
state.srcresult?;
|
||||||
|
|
||||||
gst::trace!(CAT, imp: self, "Unqueueing {:?}", state.in_item);
|
let in_item = state.queue.pop_front();
|
||||||
let in_buffer = match state.in_item.take() {
|
gst::trace!(CAT, imp: self, "Unqueueing {:?}", in_item);
|
||||||
|
|
||||||
|
let in_buffer = match in_item {
|
||||||
None => None,
|
None => None,
|
||||||
|
|
||||||
Some(Item::Buffer(buffer, lateness)) => {
|
Some(Item::Buffer(buffer, lateness)) => {
|
||||||
if self.buffer_is_early(&state, state.in_timestamp) {
|
if self.buffer_is_early(&state, state.in_timestamp) {
|
||||||
// Try this buffer again on the next iteration
|
// Try this buffer again on the next iteration
|
||||||
state.in_item = Some(Item::Buffer(buffer, lateness));
|
state.queue.push_front(Item::Buffer(buffer, lateness));
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
|
state.buffer_queued = false;
|
||||||
Some((buffer, lateness))
|
Some((buffer, lateness))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Some(Item::Event(event)) => {
|
Some(Item::Event(event)) => {
|
||||||
|
let mut push = true;
|
||||||
|
|
||||||
|
match event.view() {
|
||||||
|
gst::EventView::Segment(e) => {
|
||||||
|
let segment = e.segment().downcast_ref().unwrap();
|
||||||
|
state.pending_segment = Some(segment.clone());
|
||||||
|
push = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
gst::EventView::Caps(e) => {
|
||||||
|
state.pending_caps = Some(e.caps_owned());
|
||||||
|
state.update_fallback_duration();
|
||||||
|
push = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
self.cond.notify_all();
|
self.cond.notify_all();
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
self.srcpad.push_event(event);
|
if push {
|
||||||
|
self.srcpad.push_event(event);
|
||||||
|
}
|
||||||
|
|
||||||
return Ok(gst::FlowSuccess::Ok);
|
return Ok(gst::FlowSuccess::Ok);
|
||||||
}
|
}
|
||||||
|
@ -999,19 +1028,18 @@ impl LiveSync {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let (duplicate, caps) = if let Some((buffer, lateness)) = in_buffer {
|
let duplicate;
|
||||||
let caps = state.in_caps.take();
|
let mut caps = None;
|
||||||
|
let mut segment = None;
|
||||||
|
if let Some((buffer, lateness)) = in_buffer {
|
||||||
state.out_buffer = Some(buffer);
|
state.out_buffer = Some(buffer);
|
||||||
state.out_timestamp = state.in_timestamp;
|
state.out_timestamp = state.in_timestamp;
|
||||||
|
|
||||||
if caps.is_some() {
|
caps = state.pending_caps.take();
|
||||||
state.out_audio_info = state.in_audio_info.clone();
|
segment = state.pending_segment.take();
|
||||||
}
|
|
||||||
|
|
||||||
|
duplicate = lateness != BufferLateness::OnTime;
|
||||||
self.cond.notify_all();
|
self.cond.notify_all();
|
||||||
|
|
||||||
(lateness != BufferLateness::OnTime, caps)
|
|
||||||
} else {
|
} else {
|
||||||
// Work around borrow checker
|
// Work around borrow checker
|
||||||
let State {
|
let State {
|
||||||
|
@ -1045,8 +1073,11 @@ impl LiveSync {
|
||||||
buffer.set_flags(gst::BufferFlags::GAP);
|
buffer.set_flags(gst::BufferFlags::GAP);
|
||||||
buffer.unset_flags(gst::BufferFlags::DISCONT);
|
buffer.unset_flags(gst::BufferFlags::DISCONT);
|
||||||
|
|
||||||
state.out_timestamp = state.ts_range(state.out_buffer.as_ref().unwrap());
|
state.out_timestamp = state.ts_range(
|
||||||
(true, None)
|
state.out_buffer.as_ref().unwrap(),
|
||||||
|
state.out_segment.as_ref().unwrap(),
|
||||||
|
);
|
||||||
|
duplicate = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
let buffer = state.out_buffer.clone().unwrap();
|
let buffer = state.out_buffer.clone().unwrap();
|
||||||
|
@ -1060,29 +1091,37 @@ impl LiveSync {
|
||||||
let event = gst::event::Caps::new(&caps);
|
let event = gst::event::Caps::new(&caps);
|
||||||
MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
|
MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
|
||||||
state.srcresult?;
|
state.srcresult?;
|
||||||
|
|
||||||
|
state.out_audio_info = caps
|
||||||
|
.structure(0)
|
||||||
|
.unwrap()
|
||||||
|
.has_name("audio/x-raw")
|
||||||
|
.then(|| gst_audio::AudioInfo::from_caps(&caps).unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
if !state.sent_segment {
|
if let Some(segment) = segment {
|
||||||
let event = if state.single_segment {
|
if !state.single_segment {
|
||||||
// Create live segment
|
|
||||||
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
|
|
||||||
segment.set_start(sync_ts + SEGMENT_OFFSET);
|
|
||||||
segment.set_base(sync_ts);
|
|
||||||
segment.set_time(sync_ts);
|
|
||||||
segment.set_position(sync_ts + SEGMENT_OFFSET);
|
|
||||||
|
|
||||||
gst::debug!(CAT, imp: self, "Sending new segment: {:?}", segment);
|
|
||||||
gst::event::Segment::new(&segment)
|
|
||||||
} else {
|
|
||||||
let segment = state.segment.as_ref().unwrap();
|
|
||||||
|
|
||||||
gst::debug!(CAT, imp: self, "Forwarding segment: {:?}", segment);
|
gst::debug!(CAT, imp: self, "Forwarding segment: {:?}", segment);
|
||||||
gst::event::Segment::new(segment)
|
|
||||||
};
|
|
||||||
|
|
||||||
MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
|
let event = gst::event::Segment::new(&segment);
|
||||||
state.srcresult?;
|
MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
|
||||||
state.sent_segment = true;
|
state.srcresult?;
|
||||||
|
} else if state.out_segment.is_none() {
|
||||||
|
// Create live segment
|
||||||
|
let mut live_segment = gst::FormattedSegment::<gst::ClockTime>::new();
|
||||||
|
live_segment.set_start(sync_ts + SEGMENT_OFFSET);
|
||||||
|
live_segment.set_base(sync_ts);
|
||||||
|
live_segment.set_time(sync_ts);
|
||||||
|
live_segment.set_position(sync_ts + SEGMENT_OFFSET);
|
||||||
|
|
||||||
|
gst::debug!(CAT, imp: self, "Sending new segment: {:?}", live_segment);
|
||||||
|
|
||||||
|
let event = gst::event::Segment::new(&live_segment);
|
||||||
|
MutexGuard::unlocked(&mut state, || self.srcpad.push_event(event));
|
||||||
|
state.srcresult?;
|
||||||
|
}
|
||||||
|
|
||||||
|
state.out_segment = Some(segment);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in a new issue