Can insert SCTE-35 in mpegts streams
This commit is contained in:
parent
621a61a37b
commit
1f15d49595
3 changed files with 168 additions and 1614 deletions
1595
Cargo.lock
generated
1595
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
11
Cargo.toml
11
Cargo.toml
|
@ -5,14 +5,9 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
eyre = "0.6.6"
|
||||
glib = "0.15.4"
|
||||
gst = { package = "gstreamer", version = "0.18.3" }
|
||||
gstreamer-app = "0.18.0"
|
||||
gstreamer-base = "0.18.0"
|
||||
gstreamer-video = { version = "0.18.5", features = ["v1_16"] }
|
||||
gst-plugin-rusoto = { git = "https://gitlab.freedesktop.org/rafaelcaricio/gst-plugins-rs.git", branch = "transbin-accept-any-video-caps", version = "0.9.0" }
|
||||
gst-plugin-closedcaption = { git = "https://gitlab.freedesktop.org/rafaelcaricio/gst-plugins-rs.git", branch = "transbin-accept-any-video-caps", version = "0.9.0" }
|
||||
gst-plugin-textwrap = { git = "https://gitlab.freedesktop.org/rafaelcaricio/gst-plugins-rs.git", branch = "transbin-accept-any-video-caps", version = "0.9.0" }
|
||||
glib = { git = "https://github.com/gtk-rs/gtk-rs-core" }
|
||||
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
|
||||
gst-mpegts = { package = "gstreamer-mpegts-sys", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
|
||||
ctrlc = "3.2.1"
|
||||
signal-hook = "0.3.13"
|
||||
#tokio = { version = "1.17", features = ["full"] }
|
||||
|
|
176
src/main.rs
176
src/main.rs
|
@ -1,82 +1,132 @@
|
|||
use gst::prelude::*;
|
||||
use gstreamer_app as gst_app;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::process;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use gst::prelude::*;
|
||||
use log::info;
|
||||
use std::time::Duration;
|
||||
|
||||
fn send_splice<C>(element: &gst::Element, gst_sit: C)
|
||||
where
|
||||
C: FnOnce() -> *mut gst_mpegts::GstMpegtsSCTESIT,
|
||||
{
|
||||
let sit = gst_sit();
|
||||
unsafe {
|
||||
let section = gst_mpegts::gst_mpegts_section_from_scte_sit(sit, 500);
|
||||
gst_mpegts::gst_mpegts_section_send_event(section, element.as_ptr());
|
||||
gst::ffi::gst_mini_object_unref(section as _);
|
||||
};
|
||||
}
|
||||
|
||||
fn send_splice_in(element: &gst::Element, event_id: u32, time: gst::ClockTime) {
|
||||
info!("Sending Splice In event: {} @ {}", event_id, time.display());
|
||||
send_splice(element, || unsafe {
|
||||
gst_mpegts::gst_mpegts_scte_splice_in_new(event_id, time.nseconds())
|
||||
})
|
||||
}
|
||||
|
||||
fn send_splice_out(
|
||||
element: &gst::Element,
|
||||
event_id: u32,
|
||||
time: gst::ClockTime,
|
||||
duration: gst::ClockTime,
|
||||
) {
|
||||
info!("Sending Splice Out event: {} @ {} for {}", event_id, time.display(), duration.display());
|
||||
send_splice(element, || unsafe {
|
||||
gst_mpegts::gst_mpegts_scte_splice_out_new(event_id, time.nseconds(), duration.nseconds())
|
||||
})
|
||||
}
|
||||
|
||||
fn main() -> eyre::Result<()> {
|
||||
pretty_env_logger::init();
|
||||
gst::init()?;
|
||||
gstrusoto::plugin_register_static()?;
|
||||
gstrsclosedcaption::plugin_register_static()?;
|
||||
gstrstextwrap::plugin_register_static()?;
|
||||
unsafe {
|
||||
gst_mpegts::gst_mpegts_initialize();
|
||||
}
|
||||
|
||||
let pipeline = gst::parse_launch(
|
||||
r#"
|
||||
souphttpsrc location="https://playertest.longtailvideo.com/adaptive/elephants_dream_v4/redundant.m3u8" ! hlsdemux name=demuxer
|
||||
|
||||
demuxer.src_0 ! decodebin ! cccombiner name=ccc_fr ! videoconvert ! x264enc ! video/x-h264,profile=main ! muxer.video_0
|
||||
demuxer.src_1 ! decodebin ! audioconvert ! audioresample ! opusenc ! audio/x-opus,rate=48000,channels=2 ! muxer.audio_0
|
||||
demuxer.src_2 ! decodebin ! audioconvert ! audioresample ! opusenc ! audio/x-opus,rate=48000,channels=2 ! muxer.audio_1
|
||||
demuxer.src_3 ! decodebin ! audioconvert ! audioresample ! opusenc ! audio/x-opus,rate=48000,channels=2 ! muxer.audio_2
|
||||
videotestsrc is-live=true ! video/x-raw,framerate=30/1,width=1280,height=720 ! timeoverlay ! x264enc tune=zerolatency name=encoder
|
||||
|
||||
souphttpsrc location="https://playertest.longtailvideo.com/adaptive/elephants_dream_v4/media_b/french/ed.m3u8" ! hlsdemux ! subparse ! tttocea608 ! ccconverter ! closedcaption/x-cea-708,format=cc_data ! ccc_fr.caption
|
||||
encoder. ! video/x-h264,profile=main ! queue ! mpegtsmux name=mux scte-35-pid=500 scte-35-null-interval=450000 alignment=7 ! rtpmp2tpay ! udpsink sync=true host=184.73.103.62 port=5000
|
||||
|
||||
audiotestsrc is-live=true ! audioconvert ! avenc_aac bitrate=128000 ! queue ! mux.
|
||||
|
||||
qtmux name=muxer ! filesink location=output_cae708_only_fr.mp4
|
||||
"#,
|
||||
)?
|
||||
.downcast::<gst::Pipeline>()
|
||||
.unwrap();
|
||||
pipeline.set_async_handling(true);
|
||||
|
||||
// souphttpsrc location="https://playertest.longtailvideo.com/adaptive/elephants_dream_v4/media_b/chinese/ed.m3u8" ! hlsdemux ! subparse ! tttocea608 ! ccconverter ! closedcaption/x-cea-708,format=cc_data ! ccc_ch.caption
|
||||
// souphttpsrc location="https://playertest.longtailvideo.com/adaptive/elephants_dream_v4/media_b/french/ed.m3u8" ! hlsdemux ! subparse ! tttocea608 ! appsink name=sink
|
||||
|
||||
info!("Starting pipeline...");
|
||||
|
||||
let demuxer = pipeline.by_name("demuxer").unwrap();
|
||||
demuxer.connect_pad_added(|_, pad| {
|
||||
let name = pad.name();
|
||||
let caps = pad.caps().unwrap();
|
||||
let caps_type = caps.structure(0).unwrap().name();
|
||||
// dbg!(name);
|
||||
debug!("Pad {} added with caps {}", name, caps_type);
|
||||
let ad_event_counter = Arc::new(Mutex::new(1u32));
|
||||
|
||||
// Every 90 seconds we will loop on an ad scheduling process..
|
||||
glib::timeout_add(Duration::from_secs(90), {
|
||||
let pipeline_weak = pipeline.downgrade();
|
||||
let ad_event_counter = ad_event_counter.clone();
|
||||
move || {
|
||||
if let Some(pipeline) = pipeline_weak.upgrade() {
|
||||
let muxer = pipeline.by_name("mux").unwrap();
|
||||
|
||||
// We need to notify a specific time in the stream where the SCTE-35 marker
|
||||
// is, so we use the pipeline running time to base our timing calculations
|
||||
let now = pipeline.current_running_time().unwrap();
|
||||
|
||||
// How much ahead should the ad be inserted, we say 5 seconds in the future
|
||||
let ahead = gst::ClockTime::from_seconds(5);
|
||||
|
||||
// Schedule an advertisement in 5 seconds from now with a 10s duration
|
||||
let ad_duration = gst::ClockTime::from_seconds(10);
|
||||
// next event id
|
||||
let event_id = {
|
||||
let mut ad_event_counter = ad_event_counter.lock().unwrap();
|
||||
*ad_event_counter += 1;
|
||||
*ad_event_counter
|
||||
};
|
||||
send_splice_out(
|
||||
&muxer,
|
||||
event_id,
|
||||
now + ahead,
|
||||
ad_duration.clone(),
|
||||
);
|
||||
|
||||
// Now we add a timed call for 30 seconds from now to indicate via splice in that
|
||||
// the stream can go back to normal programming. This is not strictly necessary
|
||||
// since we are saying how long our splice out should be, but it is good
|
||||
// to have this indication anyway.
|
||||
glib::timeout_add(ad_duration.into(), {
|
||||
let muxer_weak = muxer.downgrade();
|
||||
let ad_event_counter = ad_event_counter.clone();
|
||||
move || {
|
||||
if let Some(muxer) = muxer_weak.upgrade() {
|
||||
// next event id
|
||||
let event_id = {
|
||||
let mut ad_event_counter = ad_event_counter.lock().unwrap();
|
||||
*ad_event_counter += 1;
|
||||
*ad_event_counter
|
||||
};
|
||||
let now = muxer.current_running_time().unwrap();
|
||||
send_splice_in(&muxer, event_id, now + ahead);
|
||||
}
|
||||
// This don't need to run again
|
||||
glib::Continue(false)
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
// Run this again next time...
|
||||
glib::Continue(true)
|
||||
}
|
||||
});
|
||||
// let app_sink = pipeline
|
||||
// .by_name("sink")
|
||||
// .unwrap()
|
||||
// .downcast::<gst_app::AppSink>()
|
||||
// .unwrap();
|
||||
// app_sink.set_sync(false);
|
||||
// app_sink.set_callbacks(
|
||||
// gst_app::AppSinkCallbacks::builder()
|
||||
// .new_sample(move |app| {
|
||||
// let sample = app.pull_sample().unwrap();
|
||||
// let buffer = sample.buffer().unwrap();
|
||||
//
|
||||
// // We don't care about buffers that are not video
|
||||
// if buffer
|
||||
// .flags()
|
||||
// .contains(gst::BufferFlags::DECODE_ONLY | gst::BufferFlags::GAP)
|
||||
// {
|
||||
// return Ok(gst::FlowSuccess::Ok);
|
||||
// }
|
||||
//
|
||||
// // let data = buffer.map_readable().unwrap();
|
||||
// // let text = std::str::from_utf8(&data).unwrap();
|
||||
// // println!("Subtext = {}", text);
|
||||
// dbg!(buffer);
|
||||
//
|
||||
// Ok(gst::FlowSuccess::Ok)
|
||||
// })
|
||||
// .build(),
|
||||
// );
|
||||
|
||||
|
||||
let context = glib::MainContext::default();
|
||||
let main_loop = glib::MainLoop::new(Some(&context), false);
|
||||
|
||||
pipeline.set_state(gst::State::Playing)?;
|
||||
|
||||
|
||||
let bus = pipeline.bus().unwrap();
|
||||
bus.add_watch({
|
||||
let main_loop = main_loop.clone();
|
||||
|
@ -101,13 +151,25 @@ fn main() -> eyre::Result<()> {
|
|||
glib::Continue(true)
|
||||
}
|
||||
})
|
||||
.expect("Failed to add bus watch");
|
||||
.expect("Failed to add bus watch");
|
||||
|
||||
ctrlc::set_handler({
|
||||
let pipeline_weak = pipeline.downgrade();
|
||||
move || {
|
||||
let pipeline = pipeline_weak.upgrade().unwrap();
|
||||
pipeline.set_state(gst::State::Null).unwrap();
|
||||
if let Some(pipeline) = pipeline_weak.upgrade() {
|
||||
pipeline.call_async(|itself| {
|
||||
|
||||
let dot_graph = itself
|
||||
.debug_to_dot_data(gst::DebugGraphDetails::all())
|
||||
.to_string();
|
||||
let mut graph = File::create("pipeline.dot").unwrap();
|
||||
graph.write_all(dot_graph.as_bytes()).unwrap();
|
||||
|
||||
itself.set_state(gst::State::Null).unwrap();
|
||||
|
||||
process::exit(0);
|
||||
});
|
||||
}
|
||||
}
|
||||
})?;
|
||||
|
||||
|
|
Loading…
Reference in a new issue