utils/fallbacksrc: Add "source" property that allows to directly provide a source element

This works as an alternative to the "uri" property, which would use an
`uridecodebin3` element.

The provided source element can have static audio/video pads or provide
them as sometimes pad while signalling no-more-pads or a stream
collection message once all pads are available.
This commit is contained in:
Sebastian Dröge 2020-05-19 16:51:59 +03:00 committed by Sebastian Dröge
parent d130b29146
commit ed3ef5f741

View file

@ -40,6 +40,7 @@ struct Settings {
enable_audio: bool,
enable_video: bool,
uri: Option<String>,
source: Option<gst::Element>,
fallback_uri: Option<String>,
timeout: u64,
retry_timeout: u64,
@ -51,6 +52,7 @@ impl Default for Settings {
enable_audio: true,
enable_video: true,
uri: None,
source: None,
fallback_uri: None,
timeout: 5 * gst::SECOND_VAL,
retry_timeout: 60 * gst::SECOND_VAL,
@ -58,7 +60,13 @@ impl Default for Settings {
}
}
// Blocking buffer pad probe on the decodebin pads. Once blocked we have a running time for the
#[derive(Debug)]
enum Source {
Uri(String),
Element(gst::Element),
}
// Blocking buffer pad probe on the source pads. Once blocked we have a running time for the
// current buffer that can later be used for offsetting
//
// This is used for the initial offsetting after starting of the stream and for "pausing" when
@ -69,18 +77,18 @@ struct Block {
running_time: gst::ClockTime,
}
// Connects one decodebin source pad with fallbackswitch and the corresponding fallback input
// Connects one source pad with fallbackswitch and the corresponding fallback input
struct Stream {
// Fallback input stream
// for video: filesrc, decoder, converters, imagefreeze
// for audio: live audiotestsrc, converters
fallback_input: gst::Element,
// source pad from decodebin
decodebin_srcpad: Option<gst::Pad>,
decodebin_srcpad_block: Option<Block>,
// source pad from source
source_srcpad: Option<gst::Pad>,
source_srcpad_block: Option<Block>,
// clocksync for decodebin source pad
// clocksync for source source pad
clocksync: gst::Element,
// fallbackswitch
@ -91,7 +99,7 @@ struct Stream {
}
struct State {
// uridecodebin3
// uridecodebin3 or custom source element
source: gst::Element,
source_is_live: bool,
source_pending_restart: bool,
@ -108,11 +116,12 @@ struct State {
buffering_percent: u8,
last_buffering_update: Option<Instant>,
// Stream collection posted by decodebin3
// Stream collection posted by source
streams: Option<gst::StreamCollection>,
// Configure settings
settings: Settings,
configured_source: Source,
}
struct FallbackSrc {
@ -130,7 +139,7 @@ enum Status {
Running,
}
static PROPERTIES: [subclass::Property; 7] = [
static PROPERTIES: [subclass::Property; 8] = [
subclass::Property("enable-audio", |name| {
glib::ParamSpec::boolean(
name,
@ -152,6 +161,15 @@ static PROPERTIES: [subclass::Property; 7] = [
subclass::Property("uri", |name| {
glib::ParamSpec::string(name, "URI", "URI to use", None, glib::ParamFlags::READWRITE)
}),
subclass::Property("source", |name| {
glib::ParamSpec::object(
name,
"Source",
"Source to use instead of the URI",
gst::Element::static_type(),
glib::ParamFlags::READWRITE,
)
}),
subclass::Property("fallback-uri", |name| {
glib::ParamSpec::string(
name,
@ -214,7 +232,7 @@ impl ObjectSubclass for FallbackSrc {
klass.set_metadata(
"Fallback Source",
"Generic/Source",
"Live source with uridecodebin3 and fallback image stream",
"Live source with uridecodebin3 or custom source, and fallback image stream",
"Sebastian Dröge <sebastian@centricular.com>",
);
@ -284,6 +302,18 @@ impl ObjectImpl for FallbackSrc {
);
settings.uri = new_value;
}
subclass::Property("source", ..) => {
let mut settings = self.settings.lock().unwrap();
let new_value = value.get().expect("type checked upstream");
gst_info!(
CAT,
obj: element,
"Changing source from {:?} to {:?}",
settings.source,
new_value,
);
settings.source = new_value;
}
subclass::Property("fallback-uri", ..) => {
let mut settings = self.settings.lock().unwrap();
let new_value = value.get().expect("type checked upstream");
@ -343,6 +373,10 @@ impl ObjectImpl for FallbackSrc {
let settings = self.settings.lock().unwrap();
Ok(settings.uri.to_value())
}
subclass::Property("source", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.source.to_value())
}
subclass::Property("fallback-uri", ..) => {
let settings = self.settings.lock().unwrap();
Ok(settings.fallback_uri.to_value())
@ -373,7 +407,7 @@ impl ObjectImpl for FallbackSrc {
}
// Otherwise if buffering < 100, we have no streams yet or of the expected
// streams there is no decodebin source pad yet, we're buffering
// streams there is no source pad yet, we're buffering
let mut have_audio = false;
let mut have_video = false;
if let Some(ref streams) = state.streams {
@ -391,17 +425,13 @@ impl ObjectImpl for FallbackSrc {
&& state
.audio_stream
.as_ref()
.map(|s| {
s.decodebin_srcpad.is_none() || s.decodebin_srcpad_block.is_some()
})
.map(|s| s.source_srcpad.is_none() || s.source_srcpad_block.is_some())
.unwrap_or(false))
|| (have_video
&& state
.video_stream
.as_ref()
.map(|s| {
s.decodebin_srcpad.is_none() || s.decodebin_srcpad_block.is_some()
})
.map(|s| s.source_srcpad.is_none() || s.source_srcpad_block.is_some())
.unwrap_or(false))
{
return Ok(Status::Buffering.to_value());
@ -472,6 +502,7 @@ impl BinImpl for FallbackSrc {
// Don't forward upwards, we are exposing streams based on properties
// TODO: Do stream configuration via our own stream collection and handling
// of stream select events
// TODO: Also needs updating of StreamCollection handling in CustomSource
self.handle_stream_collection(bin, m);
}
MessageView::Error(ref m) => {
@ -488,12 +519,19 @@ impl FallbackSrc {
fn create_main_input(
&self,
element: &gst::Bin,
uri: &str,
source: &Source,
) -> Result<gst::Element, gst::StateChangeError> {
let source = gst::ElementFactory::make("uridecodebin3", Some("uridecodebin"))
.expect("No uridecodebin3 found");
let source = match source {
Source::Uri(ref uri) => {
let source = gst::ElementFactory::make("uridecodebin3", Some("uridecodebin"))
.expect("No uridecodebin3 found");
source.set_property("uri", &uri).unwrap();
source.set_property("uri", &uri).unwrap();
source
}
Source::Element(ref source) => custom_source::CustomSource::new(source),
};
// Handle any async state changes internally, they don't affect the pipeline because we
// convert everything to a live stream
@ -510,7 +548,7 @@ impl FallbackSrc {
};
let src = FallbackSrc::from_instance(&element);
if let Err(msg) = src.handle_decodebin_pad_added(&element, pad) {
if let Err(msg) = src.handle_source_pad_added(&element, pad) {
element.post_error_message(&msg);
}
});
@ -522,7 +560,7 @@ impl FallbackSrc {
};
let src = FallbackSrc::from_instance(&element);
if let Err(msg) = src.handle_decodebin_pad_removed(&element, pad) {
if let Err(msg) = src.handle_source_pad_removed(&element, pad) {
element.post_error_message(&msg);
}
});
@ -747,8 +785,8 @@ impl FallbackSrc {
Ok(Stream {
fallback_input,
decodebin_srcpad: None,
decodebin_srcpad_block: None,
source_srcpad: None,
source_srcpad_block: None,
clocksync,
switch,
srcpad: ghostpad.upcast(),
@ -765,11 +803,21 @@ impl FallbackSrc {
}
let settings = self.settings.lock().unwrap().clone();
let uri = match settings.uri {
Some(ref uri) => uri,
let configured_source = match settings
.uri
.as_ref()
.cloned()
.map(Source::Uri)
.or_else(|| settings.source.as_ref().cloned().map(Source::Element))
{
Some(source) => source,
None => {
gst_error!(CAT, obj: element, "No URI configured");
gst_element_error!(element, gst::LibraryError::Settings, ["No URI configured"]);
gst_error!(CAT, obj: element, "No URI or source element configured");
gst_element_error!(
element,
gst::LibraryError::Settings,
["No URI or source element configured"]
);
return Err(gst::StateChangeError);
}
};
@ -777,7 +825,7 @@ impl FallbackSrc {
let fallback_uri = &settings.fallback_uri;
// Create main input
let source = self.create_main_input(element, uri)?;
let source = self.create_main_input(element, &configured_source)?;
let mut flow_combiner = gst_base::UniqueFlowCombiner::new();
@ -813,6 +861,7 @@ impl FallbackSrc {
last_buffering_update: None,
streams: None,
settings,
configured_source,
});
drop(state_guard);
@ -838,7 +887,7 @@ impl FallbackSrc {
element.notify("status");
// In theory all streams should've been removed from uridecodebin's pad-removed signal
// In theory all streams should've been removed from the source's pad-removed signal
// handler when going from Paused to Ready but better safe than sorry here
for stream in [&state.video_stream, &state.audio_stream]
.iter()
@ -951,17 +1000,12 @@ impl FallbackSrc {
state.flow_combiner.update_pad_flow(pad, res)
}
fn handle_decodebin_pad_added(
fn handle_source_pad_added(
&self,
element: &gst::Bin,
pad: &gst::Pad,
) -> Result<(), gst::ErrorMessage> {
gst_debug!(
CAT,
obj: element,
"Pad {} added to decodebin",
pad.get_name(),
);
gst_debug!(CAT, obj: element, "Pad {} added to source", pad.get_name(),);
let mut state_guard = self.state.lock().unwrap();
let state = match &mut *state_guard {
@ -975,8 +1019,21 @@ impl FallbackSrc {
x if x.starts_with("audio_") => ("audio", &mut state.audio_stream),
x if x.starts_with("video_") => ("video", &mut state.video_stream),
_ => {
// TODO: handle subtitles etc
return Ok(());
let caps = match pad.get_current_caps().or_else(|| pad.query_caps(None)) {
Some(caps) if !caps.is_any() && !caps.is_empty() => caps,
_ => return Ok(()),
};
let s = caps.get_structure(0).unwrap();
if s.get_name().starts_with("audio/") {
("audio", &mut state.audio_stream)
} else if s.get_name().starts_with("video/") {
("video", &mut state.video_stream)
} else {
// TODO: handle subtitles etc
return Ok(());
}
}
};
@ -986,7 +1043,7 @@ impl FallbackSrc {
return Ok(());
}
Some(Stream {
decodebin_srcpad: Some(_),
source_srcpad: Some(_),
..
}) => {
gst_debug!(CAT, obj: element, "Already configured a {} stream", type_);
@ -1000,17 +1057,17 @@ impl FallbackSrc {
gst_error!(
CAT,
obj: element,
"Failed to link decodebin pad to clocksync: {}",
"Failed to link source pad to clocksync: {}",
err
);
gst_error_msg!(
gst::CoreError::Negotiation,
["Failed to link decodebin pad to clocksync: {}", err]
["Failed to link source pad to clocksync: {}", err]
)
})?;
stream.decodebin_srcpad = Some(pad.clone());
stream.decodebin_srcpad_block = Some(self.add_decodebin_pad_probe(element, pad));
stream.source_srcpad = Some(pad.clone());
stream.source_srcpad_block = Some(self.add_pad_probe(element, pad));
drop(state_guard);
element.notify("status");
@ -1018,7 +1075,7 @@ impl FallbackSrc {
Ok(())
}
fn add_decodebin_pad_probe(&self, element: &gst::Bin, pad: &gst::Pad) -> Block {
fn add_pad_probe(&self, element: &gst::Bin, pad: &gst::Pad) -> Block {
gst_debug!(CAT, obj: element, "Adding probe to pad {}", pad.get_name());
let element_weak = element.downgrade();
@ -1037,7 +1094,7 @@ impl FallbackSrc {
let src = FallbackSrc::from_instance(&element);
if let Err(msg) = src.handle_decodebin_pad_blocked(&element, pad, buffer) {
if let Err(msg) = src.handle_pad_blocked(&element, pad, buffer) {
element.post_error_message(&msg);
}
@ -1053,7 +1110,7 @@ impl FallbackSrc {
}
}
fn handle_decodebin_pad_blocked(
fn handle_pad_blocked(
&self,
element: &gst::Bin,
pad: &gst::Pad,
@ -1074,7 +1131,7 @@ impl FallbackSrc {
for block in [state.video_stream.as_mut(), state.audio_stream.as_mut()]
.iter_mut()
.filter_map(|s| s.as_mut())
.filter_map(|s| s.decodebin_srcpad_block.take())
.filter_map(|s| s.source_srcpad_block.take())
{
block.pad.remove_probe(block.probe_id);
}
@ -1091,20 +1148,20 @@ impl FallbackSrc {
let stream = if let Some(stream) = state
.audio_stream
.as_mut()
.filter(|s| s.decodebin_srcpad.as_ref() == Some(pad))
.filter(|s| s.source_srcpad.as_ref() == Some(pad))
{
stream
} else if let Some(stream) = state
.video_stream
.as_mut()
.filter(|s| s.decodebin_srcpad.as_ref() == Some(pad))
.filter(|s| s.source_srcpad.as_ref() == Some(pad))
{
stream
} else {
unreachable!();
};
let block = match stream.decodebin_srcpad_block {
let block = match stream.source_srcpad_block {
Some(ref mut block) => block,
None => return Ok(()),
};
@ -1145,7 +1202,7 @@ impl FallbackSrc {
block.running_time = running_time;
self.unblock_decodebin_pads(element, state);
self.unblock_pads(element, state);
drop(state_guard);
element.notify("status");
@ -1153,7 +1210,7 @@ impl FallbackSrc {
Ok(())
}
fn unblock_decodebin_pads(&self, element: &gst::Bin, state: &mut State) {
fn unblock_pads(&self, element: &gst::Bin, state: &mut State) {
// Check if all streams are blocked and have a running time and we have
// 100% buffering
if state.buffering_percent < 100 {
@ -1186,22 +1243,22 @@ impl FallbackSrc {
let audio_running_time = state
.audio_stream
.as_ref()
.and_then(|s| s.decodebin_srcpad_block.as_ref().map(|b| b.running_time))
.and_then(|s| s.source_srcpad_block.as_ref().map(|b| b.running_time))
.unwrap_or(gst::CLOCK_TIME_NONE);
let video_running_time = state
.video_stream
.as_ref()
.and_then(|s| s.decodebin_srcpad_block.as_ref().map(|b| b.running_time))
.and_then(|s| s.source_srcpad_block.as_ref().map(|b| b.running_time))
.unwrap_or(gst::CLOCK_TIME_NONE);
let audio_srcpad = state
.audio_stream
.as_ref()
.and_then(|s| s.decodebin_srcpad.as_ref().cloned());
.and_then(|s| s.source_srcpad.as_ref().cloned());
let video_srcpad = state
.video_stream
.as_ref()
.and_then(|s| s.decodebin_srcpad.as_ref().cloned());
.and_then(|s| s.source_srcpad.as_ref().cloned());
let audio_is_eos = audio_srcpad
.as_ref()
@ -1256,7 +1313,7 @@ impl FallbackSrc {
if let Some(block) = state
.audio_stream
.as_mut()
.and_then(|s| s.decodebin_srcpad_block.take())
.and_then(|s| s.source_srcpad_block.take())
{
if !audio_is_eos {
block.pad.set_offset(offset);
@ -1267,7 +1324,7 @@ impl FallbackSrc {
if let Some(block) = state
.video_stream
.as_mut()
.and_then(|s| s.decodebin_srcpad_block.take())
.and_then(|s| s.source_srcpad_block.take())
{
if !video_is_eos {
block.pad.set_offset(offset);
@ -1299,7 +1356,7 @@ impl FallbackSrc {
if let Some(block) = state
.audio_stream
.as_mut()
.and_then(|s| s.decodebin_srcpad_block.take())
.and_then(|s| s.source_srcpad_block.take())
{
if !audio_is_eos {
block.pad.set_offset(offset);
@ -1331,7 +1388,7 @@ impl FallbackSrc {
if let Some(block) = state
.video_stream
.as_mut()
.and_then(|s| s.decodebin_srcpad_block.take())
.and_then(|s| s.source_srcpad_block.take())
{
if !video_is_eos {
block.pad.set_offset(offset);
@ -1341,7 +1398,7 @@ impl FallbackSrc {
}
}
fn handle_decodebin_pad_removed(
fn handle_source_pad_removed(
&self,
element: &gst::Bin,
pad: &gst::Pad,
@ -1349,7 +1406,7 @@ impl FallbackSrc {
gst_debug!(
CAT,
obj: element,
"Pad {} removed from decodebin",
"Pad {} removed from source",
pad.get_name()
);
@ -1362,25 +1419,25 @@ impl FallbackSrc {
};
// Don't have to do anything here other than forgetting about the pad. Unlinking will
// automatically happen while the pad is being removed from decodebin and thus leaves the
// automatically happen while the pad is being removed from source and thus leaves the
// bin hierarchy
let stream = if let Some(stream) = state
.audio_stream
.as_mut()
.filter(|s| s.decodebin_srcpad.as_ref() == Some(pad))
.filter(|s| s.source_srcpad.as_ref() == Some(pad))
{
stream
} else if let Some(stream) = state
.video_stream
.as_mut()
.filter(|s| s.decodebin_srcpad.as_ref() == Some(pad))
.filter(|s| s.source_srcpad.as_ref() == Some(pad))
{
stream
} else {
return Ok(());
};
stream.decodebin_srcpad = None;
stream.source_srcpad = None;
drop(state_guard);
element.notify("status");
@ -1402,20 +1459,18 @@ impl FallbackSrc {
state.buffering_percent = m.get_percent() as u8;
if state.buffering_percent < 100 {
state.last_buffering_update = Some(Instant::now());
// Block decodebin pads if needed to pause
// Block source pads if needed to pause
if let Some(ref mut stream) = state.audio_stream {
if stream.decodebin_srcpad_block.is_none() {
if let Some(ref pad) = stream.decodebin_srcpad {
stream.decodebin_srcpad_block =
Some(self.add_decodebin_pad_probe(element, pad));
if stream.source_srcpad_block.is_none() {
if let Some(ref pad) = stream.source_srcpad {
stream.source_srcpad_block = Some(self.add_pad_probe(element, pad));
}
}
}
if let Some(ref mut stream) = state.video_stream {
if stream.decodebin_srcpad_block.is_none() {
if let Some(ref pad) = stream.decodebin_srcpad {
stream.decodebin_srcpad_block =
Some(self.add_decodebin_pad_probe(element, pad));
if stream.source_srcpad_block.is_none() {
if let Some(ref pad) = stream.source_srcpad {
stream.source_srcpad_block = Some(self.add_pad_probe(element, pad));
}
}
}
@ -1425,7 +1480,7 @@ impl FallbackSrc {
} else {
state.last_buffering_update = None;
// Check if we can unblock now
self.unblock_decodebin_pads(element, state);
self.unblock_pads(element, state);
drop(state_guard);
element.notify("status");
@ -1475,7 +1530,7 @@ impl FallbackSrc {
state.streams = Some(streams);
self.unblock_decodebin_pads(element, state);
self.unblock_pads(element, state);
drop(state_guard);
element.notify("status");
@ -1606,18 +1661,23 @@ impl FallbackSrc {
Some(state) => state,
};
// FIXME: Create a new uridecodebin3 because it currently is not reusable
// See https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/746
element.remove(&state.source).unwrap();
let (source, old_source) = if let Source::Uri(..) = state.configured_source
{
// FIXME: Create a new uridecodebin3 because it currently is not reusable
// See https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/-/issues/746
element.remove(&state.source).unwrap();
let source = src
.create_main_input(
element,
state.settings.uri.as_deref().expect("no uri"),
let source = src
.create_main_input(element, &state.configured_source)
.expect("failed to create new source");
(
source.clone(),
Some(mem::replace(&mut state.source, source)),
)
.expect("failed to create new source");
let old_source = mem::replace(&mut state.source, source.clone());
} else {
(state.source.clone(), None)
};
state.source_pending_restart = false;
state.source_pending_restart_timeout = None;
@ -1625,9 +1685,11 @@ impl FallbackSrc {
state.last_buffering_update = None;
drop(state_guard);
// Drop old source after releasing the lock, it might call the pad-removed callback
// still
drop(old_source);
if let Some(old_source) = old_source {
// Drop old source after releasing the lock, it might call the pad-removed callback
// still
drop(old_source);
}
if source.sync_state_with_parent().is_err() {
gst_error!(CAT, obj: element, "Source failed to change state");
@ -1782,3 +1844,394 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
FallbackSrc::get_type(),
)
}
mod custom_source {
use super::CAT;
use glib::prelude::*;
use glib::subclass;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use std::{mem, sync::Mutex};
use once_cell::sync::OnceCell;
static PROPERTIES: [subclass::Property; 1] = [subclass::Property("source", |name| {
glib::ParamSpec::object(
name,
"Source",
"Source",
gst::Element::static_type(),
glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY,
)
})];
struct Stream {
source_pad: gst::Pad,
ghost_pad: gst::Pad,
// Dummy stream we created
stream: gst::Stream,
}
struct State {
pads: Vec<Stream>,
num_audio: usize,
num_video: usize,
}
pub struct CustomSource {
source: OnceCell<gst::Element>,
state: Mutex<State>,
}
impl ObjectSubclass for CustomSource {
const NAME: &'static str = "FallbackSrcCustomSource";
type ParentType = gst::Bin;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn new() -> Self {
Self {
source: OnceCell::default(),
state: Mutex::new(State {
pads: vec![],
num_audio: 0,
num_video: 0,
}),
}
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
let src_pad_template = gst::PadTemplate::new(
"audio_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_any(),
)
.unwrap();
klass.add_pad_template(src_pad_template);
let src_pad_template = gst::PadTemplate::new(
"video_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&gst::Caps::new_any(),
)
.unwrap();
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
}
impl ObjectImpl for CustomSource {
glib_object_impl!();
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
let element = obj.downcast_ref::<gst::Bin>().unwrap();
match *prop {
subclass::Property("source", ..) => {
let source = value.get::<gst::Element>().unwrap().unwrap();
self.source.set(source.clone()).unwrap();
element.add(&source).unwrap();
}
_ => unreachable!(),
}
}
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let bin = obj.downcast_ref::<gst::Bin>().unwrap();
bin.set_suppressed_flags(gst::ElementFlags::SOURCE | gst::ElementFlags::SINK);
bin.set_element_flags(gst::ElementFlags::SOURCE);
bin.set_bin_flags(gst::BinFlags::STREAMS_AWARE);
}
}
impl ElementImpl for CustomSource {
#[allow(clippy::single_match)]
fn change_state(
&self,
element: &gst::Element,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
let element = element.downcast_ref::<gst::Bin>().unwrap();
match transition {
gst::StateChange::NullToReady => {
self.start(element)?;
}
_ => (),
}
self.parent_change_state(element.upcast_ref(), transition)?;
match transition {
gst::StateChange::ReadyToNull => {
self.stop(element)?;
Ok(gst::StateChangeSuccess::Success)
}
_ => Ok(gst::StateChangeSuccess::Success),
}
}
}
impl BinImpl for CustomSource {
#[allow(clippy::single_match)]
fn handle_message(&self, bin: &gst::Bin, msg: gst::Message) {
use gst::MessageView;
match msg.view() {
MessageView::StreamCollection(_) => {
// TODO: Drop stream collection message for now, we only create a simple custom
// one here so that fallbacksrc can know about our streams. It is never
// forwarded.
if let Err(msg) = self.handle_source_no_more_pads(&bin) {
bin.post_error_message(&msg);
}
}
_ => self.parent_handle_message(bin, msg),
}
}
}
impl CustomSource {
fn start(
&self,
element: &gst::Bin,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Starting");
let source = self.source.get().unwrap();
let templates = source.get_pad_template_list();
if templates
.iter()
.any(|templ| templ.get_property_presence() == gst::PadPresence::Request)
{
gst_error!(CAT, obj: element, "Request pads not supported");
gst_element_error!(
element,
gst::LibraryError::Settings,
["Request pads not supported"]
);
return Err(gst::StateChangeError);
}
let has_sometimes_pads = templates
.iter()
.any(|templ| templ.get_property_presence() == gst::PadPresence::Sometimes);
// Handle all source pads that already exist
for pad in source.get_src_pads() {
if let Err(msg) = self.handle_source_pad_added(&element, &pad) {
element.post_error_message(&msg);
return Err(gst::StateChangeError);
}
}
if !has_sometimes_pads {
if let Err(msg) = self.handle_source_no_more_pads(&element) {
element.post_error_message(&msg);
return Err(gst::StateChangeError);
}
} else {
gst_debug!(CAT, obj: element, "Found sometimes pads");
let element_weak = element.downgrade();
source.connect_pad_added(move |_, pad| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = CustomSource::from_instance(&element);
if let Err(msg) = src.handle_source_pad_added(&element, pad) {
element.post_error_message(&msg);
}
});
let element_weak = element.downgrade();
source.connect_pad_removed(move |_, pad| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = CustomSource::from_instance(&element);
if let Err(msg) = src.handle_source_pad_removed(&element, pad) {
element.post_error_message(&msg);
}
});
let element_weak = element.downgrade();
source.connect_no_more_pads(move |_| {
let element = match element_weak.upgrade() {
None => return,
Some(element) => element,
};
let src = CustomSource::from_instance(&element);
if let Err(msg) = src.handle_source_no_more_pads(&element) {
element.post_error_message(&msg);
}
});
}
Ok(gst::StateChangeSuccess::Success)
}
fn handle_source_pad_added(
&self,
element: &gst::Bin,
pad: &gst::Pad,
) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Source added pad {}", pad.get_name());
let mut state = self.state.lock().unwrap();
let mut stream_type = None;
// Take stream type from stream-start event if we can
if let Some(event) = pad.get_sticky_event(gst::EventType::StreamStart, 0) {
if let gst::EventView::StreamStart(ev) = event.view() {
stream_type = ev.get_stream().map(|s| s.get_stream_type());
}
}
// Otherwise from the caps
if stream_type.is_none() {
let caps = match pad.get_current_caps().or_else(|| pad.query_caps(None)) {
Some(caps) if !caps.is_any() && !caps.is_empty() => caps,
_ => {
gst_error!(CAT, obj: element, "Pad {} had no caps", pad.get_name());
return Err(gst_error_msg!(
gst::CoreError::Negotiation,
["Pad had no caps"]
));
}
};
let s = caps.get_structure(0).unwrap();
if s.get_name().starts_with("audio/") {
stream_type = Some(gst::StreamType::AUDIO);
} else if s.get_name().starts_with("video/") {
stream_type = Some(gst::StreamType::VIDEO);
} else {
return Ok(());
}
}
let stream_type = stream_type.unwrap();
let (templ, name) = if stream_type.contains(gst::StreamType::AUDIO) {
let name = format!("audio_{}", state.num_audio);
state.num_audio += 1;
(element.get_pad_template("audio_%u").unwrap(), name)
} else {
let name = format!("video_{}", state.num_video);
state.num_video += 1;
(element.get_pad_template("video_%u").unwrap(), name)
};
let ghost_pad = gst::GhostPad::new_from_template(Some(&name), pad, &templ).unwrap();
let stream = Stream {
source_pad: pad.clone(),
ghost_pad: ghost_pad.clone().upcast(),
// TODO: We only add the stream type right now
stream: gst::Stream::new(None, None, stream_type, gst::StreamFlags::NONE),
};
state.pads.push(stream);
drop(state);
ghost_pad.set_active(true).unwrap();
element.add_pad(&ghost_pad).unwrap();
Ok(())
}
fn handle_source_pad_removed(
&self,
element: &gst::Bin,
pad: &gst::Pad,
) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Source removed pad {}", pad.get_name());
let mut state = self.state.lock().unwrap();
let (i, stream) = match state
.pads
.iter()
.enumerate()
.find(|(_i, p)| &p.source_pad == pad)
{
None => return Ok(()),
Some(v) => v,
};
let ghost_pad = stream.ghost_pad.clone();
state.pads.remove(i);
drop(state);
ghost_pad.set_active(false).unwrap();
let _ = element.remove_pad(&ghost_pad);
Ok(())
}
fn handle_source_no_more_pads(&self, element: &gst::Bin) -> Result<(), gst::ErrorMessage> {
gst_debug!(CAT, obj: element, "Source signalled no-more-pads");
let state = self.state.lock().unwrap();
let streams = state
.pads
.iter()
.map(|p| p.stream.clone())
.collect::<Vec<_>>();
let collection = gst::StreamCollection::new(None).streams(&streams).build();
drop(state);
element.no_more_pads();
let _ = element.post_message(
&gst::Message::new_stream_collection(&collection)
.src(Some(element))
.build(),
);
Ok(())
}
fn stop(
&self,
element: &gst::Bin,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Stopping");
let mut state = self.state.lock().unwrap();
let pads = mem::replace(&mut state.pads, vec![]);
state.num_audio = 0;
state.num_video = 0;
drop(state);
for pad in pads {
let _ = element.remove_pad(&pad.ghost_pad);
}
Ok(gst::StateChangeSuccess::Success)
}
#[allow(clippy::new_ret_no_self)]
pub fn new(source: &gst::Element) -> gst::Element {
glib::Object::new(CustomSource::get_type(), &[("source", source)])
.unwrap()
.downcast::<gst::Element>()
.unwrap()
}
}
}