2022-03-11 13:03:03 +00:00
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::mem;
|
|
|
|
use std::sync::{atomic, Arc, Mutex};
|
|
|
|
|
|
|
|
use once_cell::sync::Lazy;
|
2022-05-12 13:41:54 +00:00
|
|
|
use thiserror::Error;
|
2022-03-11 13:03:03 +00:00
|
|
|
|
|
|
|
use gst::{glib, prelude::*};
|
|
|
|
|
|
|
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|
|
|
gst::DebugCategory::new(
|
|
|
|
"utilsrs-stream-producer",
|
|
|
|
gst::DebugColorFlags::empty(),
|
|
|
|
Some("gst_app Stream Producer interface"),
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
/// The interface for transporting media data from one node
|
|
|
|
/// to another.
|
|
|
|
///
|
|
|
|
/// A producer is essentially a GStreamer `appsink` whose output
|
|
|
|
/// is sent to a set of consumers, who are essentially `appsrc` wrappers
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
pub struct StreamProducer {
|
|
|
|
/// The appsink to dispatch data for
|
|
|
|
appsink: gst_app::AppSink,
|
|
|
|
/// The consumers to dispatch data to
|
|
|
|
consumers: Arc<Mutex<StreamConsumers>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PartialEq for StreamProducer {
|
|
|
|
fn eq(&self, other: &Self) -> bool {
|
|
|
|
self.appsink.eq(&other.appsink)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Eq for StreamProducer {}
|
|
|
|
|
2022-05-12 13:41:54 +00:00
|
|
|
/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
|
|
|
|
/// The producer and consumer will stay alive while the link is.
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct ConsumptionLink {
|
|
|
|
consumer: gst_app::AppSrc,
|
|
|
|
producer: Option<StreamProducer>,
|
2022-07-05 10:35:01 +00:00
|
|
|
/// number of buffers dropped because `consumer` internal queue was full
|
|
|
|
dropped: Arc<atomic::AtomicU64>,
|
|
|
|
/// number of buffers pushed through `consumer`
|
|
|
|
pushed: Arc<atomic::AtomicU64>,
|
2022-05-12 13:41:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl ConsumptionLink {
|
|
|
|
/// Replace the producer by a new one, keeping the existing consumer.
|
|
|
|
pub fn change_producer(
|
|
|
|
&mut self,
|
|
|
|
new_producer: &StreamProducer,
|
|
|
|
) -> Result<(), AddConsumerError> {
|
|
|
|
self.disconnect();
|
|
|
|
*self = new_producer.add_consumer(&self.consumer)?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Disconnect the consumer from the producer
|
|
|
|
pub fn disconnect(&mut self) {
|
|
|
|
if let Some(producer) = self.producer.take() {
|
|
|
|
producer.remove_consumer(&self.consumer);
|
|
|
|
}
|
|
|
|
}
|
2022-07-05 10:35:01 +00:00
|
|
|
|
|
|
|
/// number of dropped buffers because the consumer internal queue was full
|
|
|
|
pub fn dropped(&self) -> u64 {
|
|
|
|
self.dropped.load(atomic::Ordering::SeqCst)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// number of buffers pushed through this link
|
|
|
|
pub fn pushed(&self) -> u64 {
|
|
|
|
self.pushed.load(atomic::Ordering::SeqCst)
|
|
|
|
}
|
2022-05-12 13:41:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for ConsumptionLink {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.disconnect();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Error)]
|
|
|
|
/// Error type returned when adding consumers to producers.
|
|
|
|
pub enum AddConsumerError {
|
|
|
|
#[error("Consumer already added")]
|
|
|
|
/// Consumer has already been added to this producer.
|
|
|
|
AlreadyAdded,
|
|
|
|
}
|
|
|
|
|
2022-03-11 13:03:03 +00:00
|
|
|
impl StreamProducer {
|
2022-05-12 07:29:28 +00:00
|
|
|
/// Configure a consumer `appsrc` for later use in a `StreamProducer`
|
|
|
|
///
|
|
|
|
/// This is automatically called when calling `add_consumer()`.
|
|
|
|
pub fn configure_consumer(consumer: &gst_app::AppSrc) {
|
|
|
|
// Latency on the appsrc is set by the publisher before the first buffer
|
|
|
|
// and whenever it changes
|
|
|
|
consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
|
|
|
|
consumer.set_format(gst::Format::Time);
|
|
|
|
consumer.set_is_live(true);
|
|
|
|
consumer.set_handle_segment_change(true);
|
|
|
|
consumer.set_max_buffers(0);
|
|
|
|
consumer.set_max_bytes(0);
|
|
|
|
consumer.set_max_time(500 * gst::ClockTime::MSECOND);
|
|
|
|
consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
|
|
|
|
consumer.set_automatic_eos(false);
|
|
|
|
}
|
|
|
|
|
2022-05-12 13:41:54 +00:00
|
|
|
/// Add an appsrc to dispatch data to.
|
|
|
|
///
|
|
|
|
/// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
|
|
|
|
pub fn add_consumer(
|
|
|
|
&self,
|
|
|
|
consumer: &gst_app::AppSrc,
|
|
|
|
) -> Result<ConsumptionLink, AddConsumerError> {
|
2022-03-11 13:03:03 +00:00
|
|
|
let mut consumers = self.consumers.lock().unwrap();
|
2022-05-12 07:34:11 +00:00
|
|
|
if consumers.consumers.contains_key(consumer) {
|
2022-05-12 07:37:14 +00:00
|
|
|
gst::error!(CAT, obj: &self.appsink, "Consumer {} ({:?}) already added", consumer.name(), consumer);
|
2022-05-12 13:41:54 +00:00
|
|
|
return Err(AddConsumerError::AlreadyAdded);
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
|
2022-05-12 07:37:14 +00:00
|
|
|
gst::debug!(CAT, obj: &self.appsink, "Adding consumer {} ({:?})", consumer.name(), consumer);
|
2022-03-11 13:03:03 +00:00
|
|
|
|
2022-05-12 07:29:28 +00:00
|
|
|
Self::configure_consumer(consumer);
|
2022-03-11 13:03:03 +00:00
|
|
|
|
|
|
|
// Forward force-keyunit events upstream to the appsink
|
|
|
|
let srcpad = consumer.static_pad("src").unwrap();
|
|
|
|
let appsink = &self.appsink;
|
|
|
|
let fku_probe_id = srcpad
|
|
|
|
.add_probe(
|
|
|
|
gst::PadProbeType::EVENT_UPSTREAM,
|
|
|
|
glib::clone!(@weak appsink, @weak consumer => @default-panic, move |_pad, info| {
|
|
|
|
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
|
|
|
|
if gst_video::UpstreamForceKeyUnitEvent::parse(ev).is_ok() {
|
|
|
|
gst::debug!(CAT, obj: &appsink, "Requesting keyframe");
|
|
|
|
let _ = appsink.send_event(ev.clone());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
gst::PadProbeReturn::Ok
|
|
|
|
}),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
2022-07-05 10:35:01 +00:00
|
|
|
let stream_consumer = StreamConsumer::new(consumer, fku_probe_id);
|
|
|
|
let dropped = stream_consumer.dropped.clone();
|
|
|
|
let pushed = stream_consumer.pushed.clone();
|
|
|
|
|
|
|
|
consumers
|
|
|
|
.consumers
|
|
|
|
.insert(consumer.clone(), stream_consumer);
|
2022-05-12 13:41:54 +00:00
|
|
|
|
|
|
|
Ok(ConsumptionLink {
|
|
|
|
consumer: consumer.clone(),
|
|
|
|
producer: Some(self.clone()),
|
2022-07-05 10:35:01 +00:00
|
|
|
dropped,
|
|
|
|
pushed,
|
2022-05-12 13:41:54 +00:00
|
|
|
})
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Remove a consumer appsrc by id
|
|
|
|
pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
|
|
|
|
let name = consumer.name();
|
|
|
|
if self
|
|
|
|
.consumers
|
|
|
|
.lock()
|
|
|
|
.unwrap()
|
|
|
|
.consumers
|
2022-05-12 07:34:11 +00:00
|
|
|
.remove(consumer)
|
2022-03-11 13:03:03 +00:00
|
|
|
.is_some()
|
|
|
|
{
|
2022-05-12 07:37:14 +00:00
|
|
|
gst::debug!(CAT, obj: &self.appsink, "Removed consumer {} ({:?})", name, consumer);
|
2022-05-12 12:56:55 +00:00
|
|
|
consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
|
2022-03-11 13:03:03 +00:00
|
|
|
} else {
|
2022-05-12 07:37:14 +00:00
|
|
|
gst::debug!(CAT, obj: &self.appsink, "Consumer {} ({:?}) not found", name, consumer);
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Stop discarding data samples and start forwarding them to the consumers.
|
|
|
|
///
|
|
|
|
/// This is useful for example for prerolling live sources.
|
|
|
|
pub fn forward(&self) {
|
|
|
|
self.consumers.lock().unwrap().discard = false;
|
|
|
|
}
|
|
|
|
|
2022-05-12 13:17:13 +00:00
|
|
|
/// configure if EOS from appsrc should be forwarded to all the consumers
|
|
|
|
pub fn set_forward_eos(&self, forward_eos: bool) {
|
|
|
|
self.consumers.lock().unwrap().forward_eos = forward_eos;
|
|
|
|
}
|
|
|
|
|
2022-03-11 13:03:03 +00:00
|
|
|
/// Get the GStreamer `appsink` wrapped by this producer
|
|
|
|
pub fn appsink(&self) -> &gst_app::AppSink {
|
|
|
|
&self.appsink
|
|
|
|
}
|
2022-05-12 07:51:56 +00:00
|
|
|
|
|
|
|
/// Signals an error on all consumers
|
|
|
|
pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
|
|
|
|
let consumers = self.consumers.lock().unwrap();
|
|
|
|
|
|
|
|
for consumer in consumers.consumers.keys() {
|
|
|
|
let mut msg_builder =
|
|
|
|
gst::message::Error::builder_from_error(error.clone()).src(consumer);
|
|
|
|
if let Some(debug) = debug {
|
|
|
|
msg_builder = msg_builder.debug(debug);
|
|
|
|
}
|
|
|
|
|
|
|
|
let _ = consumer.post_message(msg_builder.build());
|
|
|
|
}
|
|
|
|
}
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> From<&'a gst_app::AppSink> for StreamProducer {
|
|
|
|
fn from(appsink: &'a gst_app::AppSink) -> Self {
|
|
|
|
let consumers = Arc::new(Mutex::new(StreamConsumers {
|
|
|
|
current_latency: None,
|
|
|
|
latency_updated: false,
|
|
|
|
consumers: HashMap::new(),
|
|
|
|
discard: true,
|
2022-05-12 13:17:13 +00:00
|
|
|
forward_eos: true,
|
2022-03-11 13:03:03 +00:00
|
|
|
}));
|
|
|
|
|
|
|
|
appsink.set_callbacks(
|
|
|
|
gst_app::AppSinkCallbacks::builder()
|
|
|
|
.new_sample(glib::clone!(@strong consumers => move |appsink| {
|
|
|
|
let mut consumers = consumers.lock().unwrap();
|
|
|
|
|
|
|
|
let sample = match appsink.pull_sample() {
|
|
|
|
Ok(sample) => sample,
|
|
|
|
Err(_err) => {
|
|
|
|
gst::debug!(CAT, obj: appsink, "Failed to pull sample");
|
|
|
|
return Err(gst::FlowError::Flushing);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if consumers.discard {
|
|
|
|
return Ok(gst::FlowSuccess::Ok);
|
|
|
|
}
|
|
|
|
|
|
|
|
let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
|
|
|
|
let flags = buf.flags();
|
|
|
|
|
|
|
|
(flags.contains(gst::BufferFlags::DISCONT),
|
|
|
|
!flags.contains(gst::BufferFlags::DELTA_UNIT))
|
|
|
|
} else {
|
|
|
|
(false, true)
|
|
|
|
};
|
|
|
|
|
|
|
|
gst::trace!(CAT, obj: appsink, "processing sample");
|
|
|
|
|
|
|
|
let latency = consumers.current_latency;
|
|
|
|
let latency_updated = mem::replace(&mut consumers.latency_updated, false);
|
2022-05-12 07:44:59 +00:00
|
|
|
|
|
|
|
let mut needs_keyframe_request = false;
|
2022-03-11 13:03:03 +00:00
|
|
|
|
|
|
|
let current_consumers = consumers
|
|
|
|
.consumers
|
|
|
|
.values()
|
|
|
|
.filter_map(|consumer| {
|
|
|
|
if let Some(latency) = latency {
|
|
|
|
if consumer.forwarded_latency
|
|
|
|
.compare_exchange(
|
|
|
|
false,
|
|
|
|
true,
|
|
|
|
atomic::Ordering::SeqCst,
|
|
|
|
atomic::Ordering::SeqCst,
|
|
|
|
)
|
|
|
|
.is_ok()
|
|
|
|
|| latency_updated
|
|
|
|
{
|
|
|
|
consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-12 07:44:59 +00:00
|
|
|
if is_discont && !is_keyframe {
|
2022-03-11 13:03:03 +00:00
|
|
|
// Whenever we have a discontinuity, we need a new keyframe
|
|
|
|
consumer.needs_keyframe.store(
|
|
|
|
true,
|
|
|
|
atomic::Ordering::SeqCst,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst)
|
|
|
|
{
|
|
|
|
// If we need a keyframe (and this one isn't) request a keyframe upstream
|
2022-05-12 07:44:59 +00:00
|
|
|
if !needs_keyframe_request {
|
2022-03-11 13:03:03 +00:00
|
|
|
gst::debug!(CAT, obj: appsink, "Requesting keyframe for first buffer");
|
2022-05-12 07:44:59 +00:00
|
|
|
needs_keyframe_request = true;
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
|
2022-07-05 10:35:01 +00:00
|
|
|
consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
|
|
|
|
|
2022-03-11 13:03:03 +00:00
|
|
|
gst::debug!(CAT, obj: appsink, "Ignoring frame for {} while waiting for a keyframe",
|
|
|
|
consumer.appsrc.name());
|
|
|
|
None
|
|
|
|
} else {
|
|
|
|
consumer.needs_keyframe.store(false, atomic::Ordering::SeqCst);
|
2022-07-05 10:35:01 +00:00
|
|
|
consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
|
2022-03-11 13:03:03 +00:00
|
|
|
|
|
|
|
Some(consumer.appsrc.clone())
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
drop(consumers);
|
|
|
|
|
2022-05-12 07:44:59 +00:00
|
|
|
if needs_keyframe_request {
|
|
|
|
appsink.send_event(
|
|
|
|
gst_video::UpstreamForceKeyUnitEvent::builder()
|
|
|
|
.all_headers(true)
|
|
|
|
.build(),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
2022-03-11 13:03:03 +00:00
|
|
|
for consumer in current_consumers {
|
|
|
|
if let Err(err) = consumer.push_sample(&sample) {
|
|
|
|
gst::warning!(CAT, obj: appsink, "Failed to push sample: {}", err);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
}))
|
2022-05-12 13:17:13 +00:00
|
|
|
.eos(glib::clone!(@strong consumers => move |appsink| {
|
|
|
|
let stream_consumers = consumers
|
2022-03-11 13:03:03 +00:00
|
|
|
.lock()
|
2022-05-12 13:17:13 +00:00
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
if stream_consumers.forward_eos {
|
|
|
|
let current_consumers = stream_consumers
|
|
|
|
.consumers
|
|
|
|
.values()
|
|
|
|
.map(|c| c.appsrc.clone())
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
drop(stream_consumers);
|
|
|
|
|
|
|
|
for consumer in current_consumers {
|
|
|
|
gst::debug!(CAT, obj: appsink, "set EOS on consumer {}", consumer.name());
|
|
|
|
let _ = consumer.end_of_stream();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
gst::debug!(CAT, obj: appsink, "don't forward EOS to consumers");
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
}))
|
|
|
|
.build(),
|
|
|
|
);
|
|
|
|
|
|
|
|
let sinkpad = appsink.static_pad("sink").unwrap();
|
|
|
|
sinkpad.add_probe(
|
|
|
|
gst::PadProbeType::EVENT_UPSTREAM,
|
|
|
|
glib::clone!(@strong consumers => move |_pad, info| {
|
|
|
|
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
|
|
|
|
if let gst::EventView::Latency(ev) = ev.view() {
|
|
|
|
let latency = ev.latency();
|
|
|
|
let mut consumers = consumers.lock().unwrap();
|
|
|
|
consumers.current_latency = Some(latency);
|
|
|
|
consumers.latency_updated = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
gst::PadProbeReturn::Ok
|
|
|
|
}),
|
|
|
|
);
|
|
|
|
|
|
|
|
StreamProducer {
|
|
|
|
appsink: appsink.clone(),
|
|
|
|
consumers,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Wrapper around a HashMap of consumers, exists for thread safety
|
|
|
|
/// and also protects some of the producer state
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct StreamConsumers {
|
|
|
|
/// The currently-observed latency
|
|
|
|
current_latency: Option<gst::ClockTime>,
|
|
|
|
/// Whether the consumers' appsrc latency needs updating
|
|
|
|
latency_updated: bool,
|
|
|
|
/// The consumers, AppSrc pointer value -> consumer
|
2022-05-12 07:34:11 +00:00
|
|
|
consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
|
2022-03-11 13:03:03 +00:00
|
|
|
/// Whether appsrc samples should be forwarded to consumers yet
|
|
|
|
discard: bool,
|
2022-05-12 13:17:13 +00:00
|
|
|
/// Whether appsrc EOS should be forwarded to consumers
|
|
|
|
forward_eos: bool,
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Wrapper around a consumer's `appsrc`
|
|
|
|
#[derive(Debug)]
|
|
|
|
struct StreamConsumer {
|
|
|
|
/// The GStreamer `appsrc` of the consumer
|
|
|
|
appsrc: gst_app::AppSrc,
|
|
|
|
/// The id of a pad probe that intercepts force-key-unit events
|
|
|
|
fku_probe_id: Option<gst::PadProbeId>,
|
|
|
|
/// Whether an initial latency was forwarded to the `appsrc`
|
|
|
|
forwarded_latency: atomic::AtomicBool,
|
|
|
|
/// Whether a first buffer has made it through, used to determine
|
|
|
|
/// whether a new key unit should be requested. Only useful for encoded
|
|
|
|
/// streams.
|
2022-05-12 07:44:59 +00:00
|
|
|
needs_keyframe: Arc<atomic::AtomicBool>,
|
2022-07-05 10:35:01 +00:00
|
|
|
/// number of buffers dropped because `appsrc` internal queue was full
|
|
|
|
dropped: Arc<atomic::AtomicU64>,
|
|
|
|
/// number of buffers pushed through `appsrc`
|
|
|
|
pushed: Arc<atomic::AtomicU64>,
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl StreamConsumer {
|
|
|
|
/// Create a new consumer
|
|
|
|
fn new(appsrc: &gst_app::AppSrc, fku_probe_id: gst::PadProbeId) -> Self {
|
2022-05-12 07:44:59 +00:00
|
|
|
let needs_keyframe = Arc::new(atomic::AtomicBool::new(true));
|
|
|
|
let needs_keyframe_clone = needs_keyframe.clone();
|
2022-07-05 10:35:01 +00:00
|
|
|
let dropped = Arc::new(atomic::AtomicU64::new(0));
|
|
|
|
let dropped_clone = dropped.clone();
|
2022-05-12 07:44:59 +00:00
|
|
|
|
2022-03-11 13:03:03 +00:00
|
|
|
appsrc.set_callbacks(
|
|
|
|
gst_app::AppSrcCallbacks::builder()
|
|
|
|
.enough_data(move |appsrc| {
|
|
|
|
gst::debug!(
|
|
|
|
CAT,
|
|
|
|
obj: appsrc,
|
2022-05-12 07:37:14 +00:00
|
|
|
"consumer {} ({:?}) is not consuming fast enough, old samples are getting dropped",
|
|
|
|
appsrc.name(),
|
|
|
|
appsrc,
|
2022-03-11 13:03:03 +00:00
|
|
|
);
|
2022-05-12 07:44:59 +00:00
|
|
|
|
|
|
|
needs_keyframe_clone.store(true, atomic::Ordering::SeqCst);
|
2022-07-05 10:35:01 +00:00
|
|
|
dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
|
2022-03-11 13:03:03 +00:00
|
|
|
})
|
|
|
|
.build(),
|
|
|
|
);
|
|
|
|
|
|
|
|
StreamConsumer {
|
|
|
|
appsrc: appsrc.clone(),
|
|
|
|
fku_probe_id: Some(fku_probe_id),
|
|
|
|
forwarded_latency: atomic::AtomicBool::new(false),
|
2022-05-12 07:44:59 +00:00
|
|
|
needs_keyframe,
|
2022-07-05 10:35:01 +00:00
|
|
|
dropped,
|
|
|
|
pushed: Arc::new(atomic::AtomicU64::new(0)),
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Drop for StreamConsumer {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
if let Some(fku_probe_id) = self.fku_probe_id.take() {
|
|
|
|
let srcpad = self.appsrc.static_pad("src").unwrap();
|
|
|
|
srcpad.remove_probe(fku_probe_id);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl PartialEq for StreamConsumer {
|
|
|
|
fn eq(&self, other: &Self) -> bool {
|
|
|
|
self.appsrc.eq(&other.appsrc)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Eq for StreamConsumer {}
|
|
|
|
|
|
|
|
impl std::hash::Hash for StreamConsumer {
|
|
|
|
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
|
|
|
|
std::hash::Hash::hash(&self.appsrc, state);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
|
|
|
|
fn borrow(&self) -> &gst_app::AppSrc {
|
|
|
|
&self.appsrc
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use std::{
|
|
|
|
str::FromStr,
|
|
|
|
sync::{Arc, Mutex},
|
|
|
|
};
|
|
|
|
|
|
|
|
use futures::channel::mpsc;
|
|
|
|
use futures::{channel::mpsc::Receiver, SinkExt, StreamExt};
|
|
|
|
use gst::prelude::*;
|
|
|
|
|
2022-05-12 13:41:54 +00:00
|
|
|
use crate::{ConsumptionLink, StreamProducer};
|
2022-03-11 13:03:03 +00:00
|
|
|
|
|
|
|
fn create_producer() -> (
|
|
|
|
gst::Pipeline,
|
|
|
|
gst_app::AppSrc,
|
|
|
|
gst_app::AppSink,
|
|
|
|
StreamProducer,
|
|
|
|
) {
|
|
|
|
let producer_pipe =
|
|
|
|
gst::parse_launch("appsrc name=producer_src ! appsink name=producer_sink")
|
|
|
|
.unwrap()
|
|
|
|
.downcast::<gst::Pipeline>()
|
|
|
|
.unwrap();
|
|
|
|
let producer_sink = producer_pipe
|
|
|
|
.by_name("producer_sink")
|
|
|
|
.unwrap()
|
|
|
|
.downcast::<gst_app::AppSink>()
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
(
|
|
|
|
producer_pipe.clone(),
|
|
|
|
producer_pipe
|
|
|
|
.by_name("producer_src")
|
|
|
|
.unwrap()
|
|
|
|
.downcast::<gst_app::AppSrc>()
|
|
|
|
.unwrap(),
|
|
|
|
producer_sink.clone(),
|
|
|
|
StreamProducer::from(&producer_sink),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
struct Consumer {
|
|
|
|
pipeline: gst::Pipeline,
|
|
|
|
src: gst_app::AppSrc,
|
|
|
|
sink: gst_app::AppSink,
|
|
|
|
receiver: Mutex<Receiver<gst::Sample>>,
|
|
|
|
connected: Mutex<bool>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Consumer {
|
|
|
|
fn new(id: &str) -> Self {
|
|
|
|
let pipeline = gst::parse_launch(&format!("appsrc name={} ! appsink name=sink", id))
|
|
|
|
.unwrap()
|
|
|
|
.downcast::<gst::Pipeline>()
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
|
|
|
|
let sender = Arc::new(Mutex::new(sender));
|
|
|
|
let sink = pipeline
|
|
|
|
.by_name("sink")
|
|
|
|
.unwrap()
|
|
|
|
.downcast::<gst_app::AppSink>()
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
sink.set_callbacks(
|
|
|
|
gst_app::AppSinkCallbacks::builder()
|
|
|
|
// Add a handler to the "new-sample" signal.
|
|
|
|
.new_sample(move |appsink| {
|
|
|
|
// Pull the sample in question out of the appsink's buffer.
|
|
|
|
let sender_clone = sender.clone();
|
|
|
|
futures::executor::block_on(
|
|
|
|
sender_clone
|
|
|
|
.lock()
|
|
|
|
.unwrap()
|
|
|
|
.send(appsink.pull_sample().unwrap()),
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
})
|
|
|
|
.build(),
|
|
|
|
);
|
|
|
|
|
|
|
|
Self {
|
|
|
|
pipeline: pipeline.clone(),
|
|
|
|
src: pipeline
|
|
|
|
.by_name(id)
|
|
|
|
.unwrap()
|
|
|
|
.downcast::<gst_app::AppSrc>()
|
|
|
|
.unwrap(),
|
|
|
|
sink,
|
|
|
|
receiver: Mutex::new(receiver),
|
|
|
|
connected: Mutex::new(false),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-05-12 13:41:54 +00:00
|
|
|
fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
|
2022-03-11 13:03:03 +00:00
|
|
|
{
|
|
|
|
let mut connected = self.connected.lock().unwrap();
|
|
|
|
*connected = true;
|
|
|
|
}
|
|
|
|
|
2022-05-12 13:41:54 +00:00
|
|
|
producer.add_consumer(&self.src).unwrap()
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn disconnect(&self, producer: &StreamProducer) {
|
|
|
|
{
|
|
|
|
let mut connected = self.connected.lock().unwrap();
|
|
|
|
*connected = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
producer.remove_consumer(&self.src);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn simple() {
|
|
|
|
gst::init().unwrap();
|
|
|
|
|
|
|
|
let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
|
|
|
|
producer.forward();
|
|
|
|
producer_pipe
|
|
|
|
.set_state(gst::State::Playing)
|
|
|
|
.expect("Couldn't set producer pipeline state");
|
|
|
|
|
|
|
|
let mut consumers: Vec<Consumer> = Vec::new();
|
|
|
|
let consumer = Consumer::new("consumer1");
|
2022-07-05 10:35:01 +00:00
|
|
|
let link1 = consumer.connect(&producer);
|
2022-03-11 13:03:03 +00:00
|
|
|
consumer
|
|
|
|
.pipeline
|
|
|
|
.set_state(gst::State::Playing)
|
|
|
|
.expect("Couldn't set producer pipeline state");
|
|
|
|
consumers.push(consumer);
|
|
|
|
|
|
|
|
let consumer = Consumer::new("consumer2");
|
2022-07-05 10:35:01 +00:00
|
|
|
let link2 = consumer.connect(&producer);
|
2022-03-11 13:03:03 +00:00
|
|
|
consumer
|
|
|
|
.pipeline
|
|
|
|
.set_state(gst::State::Playing)
|
|
|
|
.expect("Couldn't set producer pipeline state");
|
|
|
|
consumers.push(consumer);
|
|
|
|
|
|
|
|
for i in 0..10 {
|
|
|
|
let caps = gst::Caps::from_str(&format!("test,n={}", i)).unwrap();
|
|
|
|
producer_src.set_caps(Some(&caps));
|
|
|
|
producer_src.push_buffer(gst::Buffer::new()).unwrap();
|
|
|
|
|
|
|
|
for consumer in &consumers {
|
|
|
|
if *consumer.connected.lock().unwrap() {
|
|
|
|
let sample =
|
|
|
|
futures::executor::block_on(consumer.receiver.lock().unwrap().next())
|
|
|
|
.expect("Received an empty buffer?");
|
|
|
|
sample.buffer().expect("No buffer on the sample?");
|
|
|
|
assert_eq!(sample.caps(), Some(caps.as_ref()));
|
|
|
|
} else {
|
|
|
|
debug_assert!(
|
|
|
|
consumer
|
|
|
|
.sink
|
|
|
|
.try_pull_sample(gst::ClockTime::from_nseconds(0))
|
|
|
|
.is_none(),
|
|
|
|
"Disconnected consumer got a new sample?!"
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if i == 5 {
|
|
|
|
consumers.get(0).unwrap().disconnect(&producer);
|
|
|
|
}
|
|
|
|
}
|
2022-07-05 10:35:01 +00:00
|
|
|
|
|
|
|
assert_eq!(link1.pushed(), 6);
|
|
|
|
assert_eq!(link1.dropped(), 0);
|
|
|
|
assert_eq!(link2.pushed(), 10);
|
|
|
|
assert_eq!(link2.dropped(), 0);
|
2022-03-11 13:03:03 +00:00
|
|
|
}
|
|
|
|
}
|