Compare commits
8 commits
main
...
scte35-ins
Author | SHA1 | Date | |
---|---|---|---|
2b2fa32796 | |||
455637af2a | |||
cddaba3478 | |||
fc9707fe66 | |||
d8e9295b91 | |||
4e37ce96ff | |||
8851dbccda | |||
1f15d49595 |
4 changed files with 528 additions and 1287 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1,2 +1,4 @@
|
||||||
*.srt
|
*.srt
|
||||||
*.mp4
|
*.mp4
|
||||||
|
*.dot
|
||||||
|
*.png
|
||||||
|
|
1564
Cargo.lock
generated
1564
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
14
Cargo.toml
14
Cargo.toml
|
@ -5,19 +5,17 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
eyre = "0.6.6"
|
eyre = "0.6.6"
|
||||||
glib = "0.15.4"
|
glib = { git = "https://github.com/gtk-rs/gtk-rs-core" }
|
||||||
gst = { package = "gstreamer", version = "0.18.3" }
|
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
|
||||||
gstreamer-app = "0.18.0"
|
gst-mpegts = { package = "gstreamer-mpegts-sys", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
|
||||||
gstreamer-base = "0.18.0"
|
gst-plugin-videofx = { git = "https://gitlab.freedesktop.org/rafaelcaricio/gst-plugins-rs.git", branch = "add-imgcmp" }
|
||||||
gstreamer-video = { version = "0.18.5", features = ["v1_16"] }
|
|
||||||
gst-plugin-rusoto = { git = "https://gitlab.freedesktop.org/rafaelcaricio/gst-plugins-rs.git", branch = "transbin-accept-any-video-caps", version = "0.9.0" }
|
|
||||||
gst-plugin-closedcaption = { git = "https://gitlab.freedesktop.org/rafaelcaricio/gst-plugins-rs.git", branch = "transbin-accept-any-video-caps", version = "0.9.0" }
|
|
||||||
gst-plugin-textwrap = { git = "https://gitlab.freedesktop.org/rafaelcaricio/gst-plugins-rs.git", branch = "transbin-accept-any-video-caps", version = "0.9.0" }
|
|
||||||
ctrlc = "3.2.1"
|
ctrlc = "3.2.1"
|
||||||
signal-hook = "0.3.13"
|
signal-hook = "0.3.13"
|
||||||
#tokio = { version = "1.17", features = ["full"] }
|
#tokio = { version = "1.17", features = ["full"] }
|
||||||
#axum = "0.4.5"
|
#axum = "0.4.5"
|
||||||
#tower = "0.4.12"
|
#tower = "0.4.12"
|
||||||
#tower-http = { version = "0.2.2", features = ["add-extension"] }
|
#tower-http = { version = "0.2.2", features = ["add-extension"] }
|
||||||
|
clap = { version = "3" , features = ["derive"] }
|
||||||
|
url = "2"
|
||||||
log = "0.4.14"
|
log = "0.4.14"
|
||||||
pretty_env_logger = "0.4.0"
|
pretty_env_logger = "0.4.0"
|
||||||
|
|
233
src/main.rs
233
src/main.rs
|
@ -1,85 +1,143 @@
|
||||||
|
use glib::translate::ToGlibPtr;
|
||||||
use gst::prelude::*;
|
use gst::prelude::*;
|
||||||
use gstreamer_app as gst_app;
|
use log::{debug, info, trace};
|
||||||
use log::{debug, error, info, trace, warn};
|
use std::fs::File;
|
||||||
use std::process;
|
use std::io::Write;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::Duration;
|
||||||
|
use clap::Parser;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
fn send_splice<C>(element: &gst::Element, gst_sit: C)
|
||||||
|
where
|
||||||
|
C: FnOnce() -> *mut gst_mpegts::GstMpegtsSCTESIT,
|
||||||
|
{
|
||||||
|
let sit = gst_sit();
|
||||||
|
assert!(!sit.is_null());
|
||||||
|
unsafe {
|
||||||
|
let section = gst_mpegts::gst_mpegts_section_from_scte_sit(sit, 500);
|
||||||
|
gst_mpegts::gst_mpegts_section_send_event(section, element.to_glib_none().0);
|
||||||
|
gst::ffi::gst_mini_object_unref(section as _);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_splice_in(element: &gst::Element, event_id: u32, time: gst::ClockTime) {
|
||||||
|
info!("Sending Splice In event: {} @ {}", event_id, time.display());
|
||||||
|
send_splice(element, || unsafe {
|
||||||
|
gst_mpegts::gst_mpegts_scte_splice_in_new(event_id, time.nseconds())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_splice_out(element: &gst::Element, event_id: u32, time: gst::ClockTime) {
|
||||||
|
info!(
|
||||||
|
"Sending Splice Out event: {} @ {}",
|
||||||
|
event_id,
|
||||||
|
time.display()
|
||||||
|
);
|
||||||
|
send_splice(element, || unsafe {
|
||||||
|
gst_mpegts::gst_mpegts_scte_splice_out_new(event_id, time.nseconds(), 0)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Default)]
|
||||||
|
pub struct EventId(Arc<Mutex<u32>>);
|
||||||
|
|
||||||
|
impl EventId {
|
||||||
|
pub fn next(&self) -> u32 {
|
||||||
|
let mut counter = self.0.lock().unwrap();
|
||||||
|
*counter += 1;
|
||||||
|
*counter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
#[clap(author, version, about, long_about = None)]
|
||||||
|
struct Configuration {
|
||||||
|
/// HLS input source of the stream
|
||||||
|
#[clap(short='s', long)]
|
||||||
|
hls_source_url: String,
|
||||||
|
|
||||||
|
/// RTP destination for the stream
|
||||||
|
#[clap(short='d', long)]
|
||||||
|
rtp_destination_url: String,
|
||||||
|
|
||||||
|
/// Image of the frame used as reference to trigger the SCTE35 event
|
||||||
|
#[clap(short='i', long)]
|
||||||
|
slate_image_path: String,
|
||||||
|
|
||||||
|
/// The SCTE35 stream PID
|
||||||
|
#[clap(short='p', long, default_value="500")]
|
||||||
|
scte_pid: u32,
|
||||||
|
|
||||||
|
/// The duration of the SCTE35 Splice event in seconds
|
||||||
|
#[clap(short='d', long)]
|
||||||
|
scte_duration_secs: u64,
|
||||||
|
}
|
||||||
|
|
||||||
fn main() -> eyre::Result<()> {
|
fn main() -> eyre::Result<()> {
|
||||||
pretty_env_logger::init();
|
pretty_env_logger::init_timed();
|
||||||
gst::init()?;
|
gst::init()?;
|
||||||
gstrusoto::plugin_register_static()?;
|
gstvideofx::plugin_register_static()?;
|
||||||
gstrsclosedcaption::plugin_register_static()?;
|
unsafe {
|
||||||
gstrstextwrap::plugin_register_static()?;
|
gst_mpegts::gst_mpegts_initialize();
|
||||||
|
}
|
||||||
|
|
||||||
|
let conf: Configuration = Configuration::parse();
|
||||||
|
|
||||||
let pipeline = gst::parse_launch(
|
let pipeline = gst::parse_launch(
|
||||||
r#"
|
r#"
|
||||||
souphttpsrc location="https://playertest.longtailvideo.com/adaptive/elephants_dream_v4/redundant.m3u8" ! hlsdemux name=demuxer
|
|
||||||
|
|
||||||
demuxer.src_0 ! decodebin ! cccombiner name=ccc_fr ! videoconvert ! x264enc ! video/x-h264,profile=main ! muxer.video_0
|
urisourcebin name=source ! tsdemux name=demux ! queue ! h264parse ! tee name=v
|
||||||
demuxer.src_1 ! decodebin ! audioconvert ! audioresample ! opusenc ! audio/x-opus,rate=48000,channels=2 ! muxer.audio_0
|
|
||||||
demuxer.src_2 ! decodebin ! audioconvert ! audioresample ! opusenc ! audio/x-opus,rate=48000,channels=2 ! muxer.audio_1
|
|
||||||
demuxer.src_3 ! decodebin ! audioconvert ! audioresample ! opusenc ! audio/x-opus,rate=48000,channels=2 ! muxer.audio_2
|
|
||||||
|
|
||||||
souphttpsrc location="https://playertest.longtailvideo.com/adaptive/elephants_dream_v4/media_b/french/ed.m3u8" ! hlsdemux ! subparse ! tttocea608 ! ccconverter ! closedcaption/x-cea-708,format=cc_data ! ccc_fr.caption
|
v. ! queue ! mpegtsmux name=mux scte-35-null-interval=450000 ! rtpmp2tpay ! udpsink name=rtpsink sync=true
|
||||||
|
v. ! queue ! decodebin ! videoconvert ! imgcmp name=imgcmp ! autovideosink
|
||||||
|
|
||||||
qtmux name=muxer ! filesink location=output_cae708_only_fr.mp4
|
demux. ! queue ! aacparse ! mux.
|
||||||
"#,
|
"#,
|
||||||
)?
|
)?
|
||||||
.downcast::<gst::Pipeline>()
|
.downcast::<gst::Pipeline>()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
pipeline.set_async_handling(true);
|
|
||||||
|
|
||||||
// souphttpsrc location="https://playertest.longtailvideo.com/adaptive/elephants_dream_v4/media_b/chinese/ed.m3u8" ! hlsdemux ! subparse ! tttocea608 ! ccconverter ! closedcaption/x-cea-708,format=cc_data ! ccc_ch.caption
|
|
||||||
// souphttpsrc location="https://playertest.longtailvideo.com/adaptive/elephants_dream_v4/media_b/french/ed.m3u8" ! hlsdemux ! subparse ! tttocea608 ! appsink name=sink
|
|
||||||
|
|
||||||
info!("Starting pipeline...");
|
info!("Starting pipeline...");
|
||||||
|
|
||||||
let demuxer = pipeline.by_name("demuxer").unwrap();
|
// Set the HLS source URL
|
||||||
demuxer.connect_pad_added(|_, pad| {
|
let source = pipeline.by_name("source").unwrap();
|
||||||
let name = pad.name();
|
source.set_property("uri", conf.hls_source_url);
|
||||||
let caps = pad.caps().unwrap();
|
|
||||||
let caps_type = caps.structure(0).unwrap().name();
|
// Set SCTE35 stream PID
|
||||||
// dbg!(name);
|
let mux = pipeline.by_name("mux").unwrap();
|
||||||
debug!("Pad {} added with caps {}", name, caps_type);
|
mux.set_property("scte-35-pid", conf.scte_pid);
|
||||||
});
|
|
||||||
// let app_sink = pipeline
|
// Set the frame we are searching for
|
||||||
// .by_name("sink")
|
let imgcmp = pipeline.by_name("imgcmp").unwrap();
|
||||||
// .unwrap()
|
imgcmp.set_property("location", conf.slate_image_path);
|
||||||
// .downcast::<gst_app::AppSink>()
|
|
||||||
// .unwrap();
|
// Set the RTP destination
|
||||||
// app_sink.set_sync(false);
|
let rtp_url = Url::parse(&conf.rtp_destination_url).expect("Valid URL in format rtp://<host>:<port>");
|
||||||
// app_sink.set_callbacks(
|
let rtp_sink = pipeline.by_name("rtpsink").unwrap();
|
||||||
// gst_app::AppSinkCallbacks::builder()
|
rtp_sink.set_properties(&[
|
||||||
// .new_sample(move |app| {
|
("host", &rtp_url.host_str().unwrap().to_string()),
|
||||||
// let sample = app.pull_sample().unwrap();
|
("port", &(rtp_url.port().unwrap() as i32)),
|
||||||
// let buffer = sample.buffer().unwrap();
|
// In order to make sure the slate image is not event visible, we delay 1 second
|
||||||
//
|
("ts-offset", &(gst::ClockTime::from_seconds(1).nseconds() as i64)),
|
||||||
// // We don't care about buffers that are not video
|
]);
|
||||||
// if buffer
|
|
||||||
// .flags()
|
|
||||||
// .contains(gst::BufferFlags::DECODE_ONLY | gst::BufferFlags::GAP)
|
|
||||||
// {
|
|
||||||
// return Ok(gst::FlowSuccess::Ok);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // let data = buffer.map_readable().unwrap();
|
|
||||||
// // let text = std::str::from_utf8(&data).unwrap();
|
|
||||||
// // println!("Subtext = {}", text);
|
|
||||||
// dbg!(buffer);
|
|
||||||
//
|
|
||||||
// Ok(gst::FlowSuccess::Ok)
|
|
||||||
// })
|
|
||||||
// .build(),
|
|
||||||
// );
|
|
||||||
|
|
||||||
let context = glib::MainContext::default();
|
let context = glib::MainContext::default();
|
||||||
let main_loop = glib::MainLoop::new(Some(&context), false);
|
let main_loop = glib::MainLoop::new(Some(&context), false);
|
||||||
|
|
||||||
pipeline.set_state(gst::State::Playing)?;
|
pipeline.set_state(gst::State::Playing)?;
|
||||||
|
|
||||||
|
|
||||||
let bus = pipeline.bus().unwrap();
|
let bus = pipeline.bus().unwrap();
|
||||||
bus.add_watch({
|
bus.add_watch({
|
||||||
|
let event_counter = EventId::default();
|
||||||
|
let ad_running = Arc::new(AtomicBool::new(false));
|
||||||
|
let ad_duration = Duration::from_secs(conf.scte_duration_secs);
|
||||||
|
|
||||||
let main_loop = main_loop.clone();
|
let main_loop = main_loop.clone();
|
||||||
|
let pipeline_weak = pipeline.downgrade();
|
||||||
|
let imgcmp_weak = imgcmp.downgrade();
|
||||||
|
let muxer_weak = mux.downgrade();
|
||||||
move |_, msg| {
|
move |_, msg| {
|
||||||
use gst::MessageView;
|
use gst::MessageView;
|
||||||
|
|
||||||
|
@ -95,6 +153,59 @@ fn main() -> eyre::Result<()> {
|
||||||
);
|
);
|
||||||
main_loop.quit();
|
main_loop.quit();
|
||||||
}
|
}
|
||||||
|
MessageView::StateChanged(s) => {
|
||||||
|
if let Some(pipeline) = pipeline_weak.upgrade() {
|
||||||
|
if s.src().map(|e| e == pipeline).unwrap_or(false) {
|
||||||
|
debug!("Writing dot file for status: {:?}", s.current());
|
||||||
|
|
||||||
|
let mut file =
|
||||||
|
File::create(format!("Pipeline-{:?}.dot", s.current())).unwrap();
|
||||||
|
let dot_data =
|
||||||
|
pipeline.debug_to_dot_data(gst::DebugGraphDetails::all());
|
||||||
|
file.write_all(dot_data.as_bytes()).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MessageView::Element(elem_msg) => {
|
||||||
|
if let (Some(pipeline), Some(imgcmp), Some(muxer)) = (
|
||||||
|
pipeline_weak.upgrade(),
|
||||||
|
imgcmp_weak.upgrade(),
|
||||||
|
muxer_weak.upgrade(),
|
||||||
|
) {
|
||||||
|
trace!("Element Message: {:?}", elem_msg);
|
||||||
|
if elem_msg.src().map(|e| e == imgcmp).unwrap_or(false)
|
||||||
|
&& elem_msg.message().has_name("image-detected")
|
||||||
|
&& !ad_running.load(Ordering::Relaxed)
|
||||||
|
{
|
||||||
|
// We need to notify a specific time in the stream where the SCTE-35 marker
|
||||||
|
// is, so we use the pipeline running time to base our timing calculations
|
||||||
|
let now = pipeline.current_running_time().unwrap();
|
||||||
|
|
||||||
|
// Trigger the Splice Out event in the SCTE-35 stream
|
||||||
|
send_splice_out(&muxer, event_counter.next(), now);
|
||||||
|
ad_running.store(true, Ordering::Relaxed);
|
||||||
|
info!("Ad started..");
|
||||||
|
|
||||||
|
// Now we add a timed call for the duration of the ad from now to indicate via
|
||||||
|
// splice in that the stream can go back to normal programming.
|
||||||
|
glib::timeout_add(ad_duration, {
|
||||||
|
let muxer_weak = muxer.downgrade();
|
||||||
|
let event_counter = event_counter.clone();
|
||||||
|
let ad_running = Arc::clone(&ad_running);
|
||||||
|
move || {
|
||||||
|
if let Some(muxer) = muxer_weak.upgrade() {
|
||||||
|
let now = muxer.current_running_time().unwrap();
|
||||||
|
send_splice_in(&muxer, event_counter.next(), now);
|
||||||
|
ad_running.store(false, Ordering::Relaxed);
|
||||||
|
info!("Ad ended!")
|
||||||
|
}
|
||||||
|
// This shall not run again
|
||||||
|
glib::Continue(false)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -104,16 +215,20 @@ fn main() -> eyre::Result<()> {
|
||||||
.expect("Failed to add bus watch");
|
.expect("Failed to add bus watch");
|
||||||
|
|
||||||
ctrlc::set_handler({
|
ctrlc::set_handler({
|
||||||
let pipeline_weak = pipeline.downgrade();
|
let main_loop = main_loop.clone();
|
||||||
move || {
|
move || {
|
||||||
let pipeline = pipeline_weak.upgrade().unwrap();
|
main_loop.quit();
|
||||||
pipeline.set_state(gst::State::Null).unwrap();
|
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
main_loop.run();
|
main_loop.run();
|
||||||
bus.remove_watch().unwrap();
|
bus.remove_watch().unwrap();
|
||||||
|
|
||||||
|
debug!("Writing Final dot file");
|
||||||
|
let mut file = File::create("Pipeline-Final.dot").unwrap();
|
||||||
|
let dot_data = pipeline.debug_to_dot_data(gst::DebugGraphDetails::all());
|
||||||
|
file.write_all(dot_data.as_bytes()).unwrap();
|
||||||
|
|
||||||
pipeline.set_state(gst::State::Null)?;
|
pipeline.set_state(gst::State::Null)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
Loading…
Reference in a new issue