From b062f63ec3c73e09018896abbc05260139090480 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Fri, 4 Dec 2020 23:21:57 +0100 Subject: [PATCH] Add new text/json crate This new crate consists of two elements, jsongstenc and jsongstparse Both these elements can deal with an ndjson based format, consisting for now of two item types: "Buffer" and "Header" eg: {"Header":{"format":"foobar"}} {"Buffer":{"pts":0,"duration":43,"data":{"foo":"bar"}}} jsongstparse will interpret this by first sending caps application/x-json, format=foobar, then a buffer containing {"foo":"bar"}, timestamped as required. Elements further downstream can then interpret the data further. jsongstenc will simply perform the reverse operation. --- Cargo.toml | 1 + ci/utils.py | 2 +- meson.build | 1 + text/json/Cargo.toml | 36 ++ text/json/build.rs | 3 + text/json/src/jsongstenc/imp.rs | 301 +++++++++ text/json/src/jsongstenc/mod.rs | 36 ++ text/json/src/jsongstparse/imp.rs | 996 ++++++++++++++++++++++++++++++ text/json/src/jsongstparse/mod.rs | 36 ++ text/json/src/lib.rs | 40 ++ text/json/src/line_reader.rs | 1 + text/json/tests/json.rs | 104 ++++ 12 files changed, 1556 insertions(+), 1 deletion(-) create mode 100644 text/json/Cargo.toml create mode 100644 text/json/build.rs create mode 100644 text/json/src/jsongstenc/imp.rs create mode 100644 text/json/src/jsongstenc/mod.rs create mode 100644 text/json/src/jsongstparse/imp.rs create mode 100644 text/json/src/jsongstparse/mod.rs create mode 100644 text/json/src/lib.rs create mode 120000 text/json/src/line_reader.rs create mode 100644 text/json/tests/json.rs diff --git a/Cargo.toml b/Cargo.toml index eb62ea94..a8c14922 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "video/rspng", "video/hsv", "text/wrap", + "text/json", ] [profile.release] diff --git a/ci/utils.py b/ci/utils.py index a3ec3d1e..e0c7626d 100644 --- a/ci/utils.py +++ b/ci/utils.py @@ -2,7 +2,7 @@ import os DIRS = ['audio', 'generic', 'net', 'text', 'utils', 'video'] # Plugins whose name is prefixed by 'rs' -RS_PREFIXED = ['audiofx', 'closedcaption', 'dav1d', 'file'] +RS_PREFIXED = ['audiofx', 'closedcaption', 'dav1d', 'file', 'json'] OVERRIDE = {'wrap': 'rstextwrap', 'flavors': 'rsflv'} diff --git a/meson.build b/meson.build index 04000e45..15ffc254 100644 --- a/meson.build +++ b/meson.build @@ -48,6 +48,7 @@ plugins_rep = { 'generic/threadshare': 'libgstthreadshare', 'utils/togglerecord': 'libgsttogglerecord', 'video/hsv': 'libgsthsv', + 'text/json': 'libgstrsjson', } exclude = [] diff --git a/text/json/Cargo.toml b/text/json/Cargo.toml new file mode 100644 index 00000000..fb977a41 --- /dev/null +++ b/text/json/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "gst-plugin-json" +version = "0.6.0" +authors = ["Mathieu Duponchelle "] +license = "LGPL-2.1-or-later" +edition = "2018" +description = "Rust JSON Plugin" +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" + +[dependencies] +glib = { git = "https://github.com/gtk-rs/gtk-rs" } +once_cell = "1.0" +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0", features = ["raw_value"] } + +[dependencies.gst] +git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" +features = ["v1_12"] +package="gstreamer" + +[lib] +name = "gstrsjson" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[build-dependencies] +gst-plugin-version-helper = { path="../../version-helper" } + +[dev-dependencies.gst-check] +git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" +package="gstreamer-check" + +[features] +# GStreamer 1.14 is required for static linking +static = ["gst/v1_14"] + diff --git a/text/json/build.rs b/text/json/build.rs new file mode 100644 index 00000000..17be1215 --- /dev/null +++ b/text/json/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::get_info() +} diff --git a/text/json/src/jsongstenc/imp.rs b/text/json/src/jsongstenc/imp.rs new file mode 100644 index 00000000..07e555e1 --- /dev/null +++ b/text/json/src/jsongstenc/imp.rs @@ -0,0 +1,301 @@ +// Copyright (C) 2020 Mathieu Duponchelle +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::subclass; +use glib::subclass::prelude::*; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::{gst_log, gst_trace}; + +use once_cell::sync::Lazy; + +use std::sync::Mutex; + +use serde::Serialize; + +#[derive(Serialize, Debug)] +enum Line<'a> { + Header { + format: String, + }, + Buffer { + pts: i64, + duration: i64, + #[serde(borrow)] + data: &'a serde_json::value::RawValue, + }, +} + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "jsongstenc", + gst::DebugColorFlags::empty(), + Some("GStreamer JSON Encoder Element"), + ) +}); + +#[derive(Debug)] +struct State { + start_ts: gst::ClockTime, + end_ts: gst::ClockTime, + current_line: String, + format: Option, +} + +impl Default for State { + fn default() -> Self { + Self { + start_ts: gst::CLOCK_TIME_NONE, + end_ts: gst::CLOCK_TIME_NONE, + current_line: "".to_string(), + format: None, + } + } +} + +pub struct JsonGstEnc { + srcpad: gst::Pad, + sinkpad: gst::Pad, + state: Mutex, +} + +impl JsonGstEnc { + fn sink_chain( + &self, + _pad: &gst::Pad, + element: &super::JsonGstEnc, + buffer: gst::Buffer, + ) -> Result { + let pts = buffer.get_pts(); + let duration = buffer.get_duration(); + + let mut state = self.state.lock().unwrap(); + + if let Some(format) = &state.format { + let line = Line::Header { + format: format.to_string(), + }; + + let mut json = serde_json::to_string(&line).map_err(|err| { + gst::element_error!( + element, + gst::ResourceError::Write, + ["Failed to serialize as json {}", err] + ); + + gst::FlowError::Error + })?; + + json.push('\n'); + + let mut buf = gst::Buffer::from_mut_slice(json.into_bytes()); + { + let buf_mut = buf.get_mut().unwrap(); + buf_mut.set_pts(pts); + } + + state.format = None; + drop(state); + + self.srcpad.push(buf)?; + } else { + drop(state); + } + + let map = buffer.map_readable().map_err(|_| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to map buffer readable"] + ); + + gst::FlowError::Error + })?; + + let text = std::str::from_utf8(map.as_slice()).map_err(|err| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to map decode as utf8: {}", err] + ); + + gst::FlowError::Error + })?; + + let data: &serde_json::value::RawValue = serde_json::from_str(text).map_err(|err| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to parse input as json: {}", err] + ); + + gst::FlowError::Error + })?; + + let line = Line::Buffer { + pts: pts.unwrap_or(std::u64::MAX) as i64, + duration: duration.unwrap_or(std::u64::MAX) as i64, + data, + }; + + let mut json = serde_json::to_string(&line).map_err(|err| { + gst::element_error!( + element, + gst::ResourceError::Write, + ["Failed to serialize as json {}", err] + ); + + gst::FlowError::Error + })?; + + json.push('\n'); + + let mut buf = gst::Buffer::from_mut_slice(json.into_bytes()); + { + let buf_mut = buf.get_mut().unwrap(); + buf_mut.set_pts(pts); + buf_mut.set_duration(duration); + } + + self.srcpad.push(buf) + } + + fn sink_event(&self, pad: &gst::Pad, element: &super::JsonGstEnc, event: gst::Event) -> bool { + use gst::EventView; + + gst_log!(CAT, obj: pad, "Handling event {:?}", event); + + match event.view() { + EventView::Caps(e) => { + { + let mut state = self.state.lock().unwrap(); + let caps = e.get_caps(); + let s = caps.get_structure(0).unwrap(); + state.format = match s.get::("format") { + Err(_) => None, + Ok(format) => format, + }; + } + + // We send our own caps downstream + let caps = gst::Caps::builder("application/x-json").build(); + self.srcpad.push_event(gst::event::Caps::new(&caps)) + } + EventView::Eos(_) => pad.event_default(Some(element), event), + _ => pad.event_default(Some(element), event), + } + } +} + +impl ObjectSubclass for JsonGstEnc { + const NAME: &'static str = "RsJsonGstEnc"; + type Type = super::JsonGstEnc; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib::object_subclass!(); + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.get_pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) + .chain_function(|pad, parent, buffer| { + JsonGstEnc::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |enc, element| enc.sink_chain(pad, element, buffer), + ) + }) + .event_function(|pad, parent, event| { + JsonGstEnc::catch_panic_pad_function( + parent, + || false, + |enc, element| enc.sink_event(pad, element, event), + ) + }) + .build(); + + let templ = klass.get_pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_with_template(&templ, Some("src")).build(); + + Self { + srcpad, + sinkpad, + state: Mutex::new(State::default()), + } + } + + fn class_init(klass: &mut Self::Class) { + klass.set_metadata( + "GStreamer buffers to JSON", + "Encoder/JSON", + "Wraps buffers containing any valid top-level JSON structures \ + into higher level JSON objects, and outputs those as ndjson", + "Mathieu Duponchelle ", + ); + + let caps = gst::Caps::builder("application/x-json").build(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + + let caps = gst::Caps::builder("application/x-json").build(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(src_pad_template); + } +} + +impl ObjectImpl for JsonGstEnc { + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + } +} + +impl ElementImpl for JsonGstEnc { + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result { + gst_trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::ReadyToPaused | gst::StateChange::PausedToReady => { + // Reset the whole state + let mut state = self.state.lock().unwrap(); + *state = State::default(); + } + _ => (), + } + + self.parent_change_state(element, transition) + } +} diff --git a/text/json/src/jsongstenc/mod.rs b/text/json/src/jsongstenc/mod.rs new file mode 100644 index 00000000..b3fdb56b --- /dev/null +++ b/text/json/src/jsongstenc/mod.rs @@ -0,0 +1,36 @@ +// Copyright (C) 2020 Mathieu Duponchelle +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct JsonGstEnc(ObjectSubclass) @extends gst::Element, gst::Object; +} + +unsafe impl Send for JsonGstEnc {} +unsafe impl Sync for JsonGstEnc {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "jsongstenc", + gst::Rank::None, + JsonGstEnc::static_type(), + ) +} diff --git a/text/json/src/jsongstparse/imp.rs b/text/json/src/jsongstparse/imp.rs new file mode 100644 index 00000000..bb4d7fb9 --- /dev/null +++ b/text/json/src/jsongstparse/imp.rs @@ -0,0 +1,996 @@ +// Copyright (C) 2020 Mathieu Duponchelle +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst::{gst_debug, gst_error, gst_info, gst_log, gst_trace, gst_warning}; + +use once_cell::sync::Lazy; + +use std::cmp; +use std::convert::TryInto; +use std::sync::{Mutex, MutexGuard}; + +use serde::Deserialize; + +use crate::line_reader::LineReader; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "jsongstparse", + gst::DebugColorFlags::empty(), + Some("GStreamer Json Parser Element"), + ) +}); + +#[derive(Debug)] +struct PullState { + need_stream_start: bool, + stream_id: String, + offset: u64, + duration: gst::ClockTime, +} + +impl PullState { + fn new(element: &super::JsonGstParse, pad: &gst::Pad) -> Self { + Self { + need_stream_start: true, + stream_id: pad.create_stream_id(element, Some("src")).to_string(), + offset: 0, + duration: gst::CLOCK_TIME_NONE, + } + } +} + +#[derive(Debug)] +struct State { + reader: LineReader>, + need_segment: bool, + need_caps: bool, + format: Option, + pending_events: Vec, + last_position: gst::ClockTime, + segment: gst::FormattedSegment, + + // Pull mode + pull: Option, + + // seeking + seeking: bool, + discont: bool, + seek_seqnum: Option, + last_raw_line: Vec, + replay_last_line: bool, + need_flush_stop: bool, +} + +impl Default for State { + fn default() -> Self { + Self { + reader: LineReader::new(), + need_segment: true, + need_caps: true, + format: None, + pending_events: Vec::new(), + last_position: gst::CLOCK_TIME_NONE, + segment: gst::FormattedSegment::::new(), + pull: None, + seeking: false, + discont: false, + seek_seqnum: None, + last_raw_line: Vec::new(), + replay_last_line: false, + need_flush_stop: false, + } + } +} + +#[derive(Deserialize, Debug)] +enum Line<'a> { + Header { + format: String, + }, + Buffer { + pts: u64, + duration: u64, + #[serde(borrow)] + data: &'a serde_json::value::RawValue, + }, +} + +impl State { + fn get_line(&mut self, drain: bool) -> Result, (&[u8], serde_json::Error)> { + let line = if self.replay_last_line { + self.replay_last_line = false; + &self.last_raw_line + } else { + match self.reader.get_line_with_drain(drain) { + None => { + return Ok(None); + } + Some(line) => { + self.last_raw_line = line.to_vec(); + line + } + } + }; + + let line: Line = serde_json::from_slice(&line).map_err(|err| (line, err))?; + + Ok(Some(line)) + } + + fn create_events(&mut self, element: &super::JsonGstParse) -> Vec { + let mut events = Vec::new(); + + if self.need_flush_stop { + let mut b = gst::event::FlushStop::builder(true); + + if let Some(seek_seqnum) = self.seek_seqnum { + b = b.seqnum(seek_seqnum); + } + + events.push(b.build()); + self.need_flush_stop = false; + } + + if let Some(pull) = &mut self.pull { + if pull.need_stream_start { + events.push(gst::event::StreamStart::new(&pull.stream_id)); + pull.need_stream_start = false; + } + } + + if self.need_caps { + let mut caps_builder = gst::Caps::builder("application/x-json"); + + if let Some(format) = &self.format { + caps_builder = caps_builder.field("format", format); + } + + let caps = caps_builder.build(); + + events.push(gst::event::Caps::new(&caps)); + gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps); + self.need_caps = false; + } + + if self.need_segment { + let mut b = gst::event::Segment::builder(&self.segment); + + if let Some(seek_seqnum) = self.seek_seqnum { + b = b.seqnum(seek_seqnum); + } + + events.push(b.build()); + self.need_segment = false; + } + + events.extend(self.pending_events.drain(..)); + events + } + + fn add_buffer_metadata( + &mut self, + _element: &super::JsonGstParse, + buffer: &mut gst::buffer::Buffer, + pts: gst::ClockTime, + duration: gst::ClockTime, + ) { + let buffer = buffer.get_mut().unwrap(); + + self.last_position = pts + duration; + + buffer.set_pts(pts); + + if self.discont { + buffer.set_flags(gst::BufferFlags::DISCONT); + self.discont = false; + } + + buffer.set_duration(duration); + } +} + +pub struct JsonGstParse { + srcpad: gst::Pad, + sinkpad: gst::Pad, + state: Mutex, +} + +impl JsonGstParse { + fn handle_buffer( + &self, + element: &super::JsonGstParse, + buffer: Option, + ) -> Result { + let mut state = self.state.lock().unwrap(); + + let drain; + if let Some(buffer) = buffer { + let buffer = buffer.into_mapped_buffer_readable().map_err(|_| { + gst::element_error!( + element, + gst::ResourceError::Read, + ["Failed to map buffer readable"] + ); + + gst::FlowError::Error + })?; + + state.reader.push(buffer); + drain = false; + } else { + drain = true; + } + + loop { + let seeking = state.seeking; + let line = state.get_line(drain); + match line { + Ok(Some(Line::Buffer { + pts, + duration, + data, + })) => { + gst_debug!( + CAT, + obj: element, + "Got buffer with timestamp {} and duration {}", + pts, + duration + ); + + let pts: gst::ClockTime = pts.into(); + let duration: gst::ClockTime = duration.into(); + + if !seeking { + let data = data.to_string().clone(); + let mut events = state.create_events(element); + + let mut buffer = gst::Buffer::from_slice(data); + + if state.last_position < pts { + events.push(gst::event::Gap::new( + state.last_position, + pts - state.last_position, + )); + } + + state.add_buffer_metadata(element, &mut buffer, pts, duration); + + let send_eos = state.segment.get_stop().is_some() + && buffer.get_pts() + buffer.get_duration() >= state.segment.get_stop(); + + // Drop our state mutex while we push out buffers or events + drop(state); + + for event in events { + gst_debug!(CAT, obj: element, "Pushing event {:?}", event); + self.srcpad.push_event(event); + } + + self.srcpad.push(buffer).map_err(|err| { + if err != gst::FlowError::Flushing { + gst_error!(CAT, obj: element, "Pushing buffer returned {:?}", err); + } + err + })?; + + if send_eos { + return Err(gst::FlowError::Eos); + } + + state = self.state.lock().unwrap(); + } else { + state = self.handle_skipped_line(element, pts, state)?; + } + } + Ok(Some(Line::Header { format })) => { + if state.format.is_none() { + state.format = Some(format); + } else { + gst_warning!(CAT, obj: element, "Ignoring format change",); + } + } + Err((line, err)) => { + gst_error!( + CAT, + obj: element, + "Couldn't parse line '{:?}': {:?}", + std::str::from_utf8(line), + err + ); + + gst::element_error!( + element, + gst::StreamError::Decode, + ["Couldn't parse line '{:?}': {:?}", line, err] + ); + + break Err(gst::FlowError::Error); + } + Ok(None) => { + if drain && state.pull.is_some() { + eprintln!("Finished draining"); + break Err(gst::FlowError::Eos); + } + break Ok(gst::FlowSuccess::Ok); + } + } + } + } + + fn handle_skipped_line( + &self, + element: &super::JsonGstParse, + pts: gst::ClockTime, + mut state: MutexGuard, + ) -> Result, gst::FlowError> { + if pts >= state.segment.get_start() { + state.seeking = false; + state.discont = true; + state.replay_last_line = true; + state.need_flush_stop = true; + + gst_debug!(CAT, obj: element, "Done seeking"); + } + + drop(state); + + Ok(self.state.lock().unwrap()) + } + + fn sink_activate( + &self, + pad: &gst::Pad, + element: &super::JsonGstParse, + ) -> Result<(), gst::LoggableError> { + let mode = { + let mut query = gst::query::Scheduling::new(); + let mut state = self.state.lock().unwrap(); + + state.pull = None; + + if !pad.peer_query(&mut query) { + gst_debug!(CAT, obj: pad, "Scheduling query failed on peer"); + gst::PadMode::Push + } else if query + .has_scheduling_mode_with_flags(gst::PadMode::Pull, gst::SchedulingFlags::SEEKABLE) + { + gst_debug!(CAT, obj: pad, "Activating in Pull mode"); + + state.pull = Some(PullState::new(element, &self.srcpad)); + + gst::PadMode::Pull + } else { + gst_debug!(CAT, obj: pad, "Activating in Push mode"); + gst::PadMode::Push + } + }; + + pad.activate_mode(mode, true)?; + Ok(()) + } + + fn start_task(&self, element: &super::JsonGstParse) -> Result<(), gst::LoggableError> { + let element_weak = element.downgrade(); + let pad_weak = self.sinkpad.downgrade(); + let res = self.sinkpad.start_task(move || { + let element = match element_weak.upgrade() { + Some(element) => element, + None => { + if let Some(pad) = pad_weak.upgrade() { + pad.pause_task().unwrap(); + } + return; + } + }; + + let parse = Self::from_instance(&element); + parse.loop_fn(&element); + }); + if res.is_err() { + return Err(gst::loggable_error!(CAT, "Failed to start pad task")); + } + Ok(()) + } + + fn sink_activatemode( + &self, + _pad: &gst::Pad, + element: &super::JsonGstParse, + mode: gst::PadMode, + active: bool, + ) -> Result<(), gst::LoggableError> { + if mode == gst::PadMode::Pull { + if active { + self.start_task(element)?; + } else { + let _ = self.sinkpad.stop_task(); + } + } + + Ok(()) + } + + fn scan_duration( + &self, + element: &super::JsonGstParse, + ) -> Result, gst::LoggableError> { + gst_debug!(CAT, obj: element, "Scanning duration"); + + /* First let's query the bytes duration upstream */ + let mut q = gst::query::Duration::new(gst::Format::Bytes); + + if !self.sinkpad.peer_query(&mut q) { + return Err(gst::loggable_error!( + CAT, + "Failed to query upstream duration" + )); + } + + let size = match q.get_result().try_into().unwrap() { + gst::format::Bytes(Some(size)) => size, + gst::format::Bytes(None) => { + return Err(gst::loggable_error!( + CAT, + "Failed to query upstream duration" + )); + } + }; + + let mut offset = size; + let mut buffers = Vec::new(); + let mut last_pts = None; + + loop { + let scan_size = cmp::min(offset, 4096); + + offset -= scan_size; + + match self.sinkpad.pull_range(offset, scan_size as u32) { + Ok(buffer) => { + buffers.push(buffer); + } + Err(flow) => { + return Err(gst::loggable_error!( + CAT, + "Failed to pull buffer while scanning duration: {:?}", + flow + )); + } + } + + let mut reader = LineReader::new(); + + for buf in buffers.iter().rev() { + let buf = buf + .clone() + .into_mapped_buffer_readable() + .map_err(|_| gst::loggable_error!(CAT, "Failed to map buffer readable"))?; + + reader.push(buf); + } + + while let Some(line) = reader.get_line_with_drain(true) { + if let Ok(Line::Buffer { + pts, + duration, + data: _data, + }) = serde_json::from_slice(&line) + { + last_pts = Some((pts + duration).into()); + } + } + + if last_pts.is_some() || offset == 0 { + gst_debug!( + CAT, + obj: element, + "Duration scan done, last_pts: {:?}", + last_pts + ); + break (Ok(last_pts)); + } + } + } + + fn push_eos(&self, element: &super::JsonGstParse) { + let mut state = self.state.lock().unwrap(); + + if state.seeking { + state.need_flush_stop = true; + } + + let mut events = state.create_events(element); + let mut eos_event = gst::event::Eos::builder(); + + if let Some(seek_seqnum) = state.seek_seqnum { + eos_event = eos_event.seqnum(seek_seqnum); + } + + events.push(eos_event.build()); + + // Drop our state mutex while we push out events + drop(state); + + for event in events { + gst_debug!(CAT, obj: element, "Pushing event {:?}", event); + self.srcpad.push_event(event); + } + } + + fn loop_fn(&self, element: &super::JsonGstParse) { + let mut state = self.state.lock().unwrap(); + let State { ref mut pull, .. } = *state; + let mut pull = pull.as_mut().unwrap(); + let offset = pull.offset; + let scan_duration = pull.duration.is_none(); + + pull.offset += 4096; + + drop(state); + + if scan_duration { + match self.scan_duration(element) { + Ok(Some(pts)) => { + let mut state = self.state.lock().unwrap(); + let mut pull = state.pull.as_mut().unwrap(); + pull.duration = pts; + } + Ok(None) => { + let mut state = self.state.lock().unwrap(); + let mut pull = state.pull.as_mut().unwrap(); + pull.duration = 0.into(); + } + Err(err) => { + err.log(); + + gst::element_error!( + element, + gst::StreamError::Decode, + ["Failed to scan duration"] + ); + + self.sinkpad.pause_task().unwrap(); + } + } + } + + let buffer = match self.sinkpad.pull_range(offset, 4096) { + Ok(buffer) => Some(buffer), + Err(gst::FlowError::Eos) => None, + Err(gst::FlowError::Flushing) => { + gst_debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing"); + + self.sinkpad.pause_task().unwrap(); + return; + } + Err(flow) => { + gst_error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow); + + gst::element_error!( + element, + gst::StreamError::Failed, + ["Streaming stopped, failed to pull buffer"] + ); + + self.sinkpad.pause_task().unwrap(); + return; + } + }; + + if let Err(flow) = self.handle_buffer(element, buffer) { + match flow { + gst::FlowError::Flushing => { + gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow); + } + gst::FlowError::Eos => { + self.push_eos(element); + + gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow); + } + _ => { + self.push_eos(element); + + gst_error!(CAT, obj: element, "Pausing after flow {:?}", flow); + + gst::element_error!( + element, + gst::StreamError::Failed, + ["Streaming stopped, reason: {:?}", flow] + ); + } + } + + self.sinkpad.pause_task().unwrap(); + } + } + + fn sink_chain( + &self, + pad: &gst::Pad, + element: &super::JsonGstParse, + buffer: gst::Buffer, + ) -> Result { + gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); + + self.handle_buffer(element, Some(buffer)) + } + + fn flush(&self, mut state: &mut State) { + state.reader.clear(); + if let Some(pull) = &mut state.pull { + pull.offset = 0; + } + state.segment = gst::FormattedSegment::::new(); + state.need_segment = true; + state.need_caps = true; + state.pending_events.clear(); + state.last_position = 0.into(); + state.last_raw_line = [].to_vec(); + state.format = None; + } + + fn sink_event(&self, pad: &gst::Pad, element: &super::JsonGstParse, event: gst::Event) -> bool { + use gst::EventView; + + gst_log!(CAT, obj: pad, "Handling event {:?}", event); + + match event.view() { + EventView::Caps(_) => { + // We send a proper caps event from the chain function later + gst_log!(CAT, obj: pad, "Dropping caps event"); + true + } + EventView::Segment(_) => { + // We send a gst::Format::Time segment event later when needed + gst_log!(CAT, obj: pad, "Dropping segment event"); + true + } + EventView::FlushStop(_) => { + let mut state = self.state.lock().unwrap(); + self.flush(&mut state); + drop(state); + + pad.event_default(Some(element), event) + } + EventView::Eos(_) => { + gst_log!(CAT, obj: pad, "Draining"); + if let Err(err) = self.handle_buffer(element, None) { + gst_error!(CAT, obj: pad, "Failed to drain parser: {:?}", err); + } + pad.event_default(Some(element), event) + } + _ => { + if event.is_sticky() + && !self.srcpad.has_current_caps() + && event.get_type() > gst::EventType::Caps + { + gst_log!(CAT, obj: pad, "Deferring sticky event until we have caps"); + let mut state = self.state.lock().unwrap(); + state.pending_events.push(event); + true + } else { + pad.event_default(Some(element), event) + } + } + } + } + + fn perform_seek(&self, event: &gst::event::Seek, element: &super::JsonGstParse) -> bool { + if self.state.lock().unwrap().pull.is_none() { + gst_error!(CAT, obj: element, "seeking is only supported in pull mode"); + return false; + } + + let (rate, flags, start_type, start, stop_type, stop) = event.get(); + + let mut start: gst::ClockTime = match start.try_into() { + Ok(start) => start, + Err(_) => { + gst_error!(CAT, obj: element, "seek has invalid format"); + return false; + } + }; + + let mut stop: gst::ClockTime = match stop.try_into() { + Ok(stop) => stop, + Err(_) => { + gst_error!(CAT, obj: element, "seek has invalid format"); + return false; + } + }; + + if !flags.contains(gst::SeekFlags::FLUSH) { + gst_error!(CAT, obj: element, "only flushing seeks are supported"); + return false; + } + + if start_type == gst::SeekType::End || stop_type == gst::SeekType::End { + gst_error!(CAT, obj: element, "Relative seeks are not supported"); + return false; + } + + let seek_seqnum = event.get_seqnum(); + + let event = gst::event::FlushStart::builder() + .seqnum(seek_seqnum) + .build(); + + gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event); + self.sinkpad.push_event(event); + + let event = gst::event::FlushStart::builder() + .seqnum(seek_seqnum) + .build(); + + gst_debug!(CAT, obj: element, "Pushing event {:?}", event); + self.srcpad.push_event(event); + + self.sinkpad.pause_task().unwrap(); + + let mut state = self.state.lock().unwrap(); + let pull = state.pull.as_ref().unwrap(); + + if start_type == gst::SeekType::Set { + start = start.min(pull.duration).unwrap_or(start); + } + + if stop_type == gst::SeekType::Set { + stop = stop.min(pull.duration).unwrap_or(stop); + } + + state.seeking = true; + state.seek_seqnum = Some(seek_seqnum); + + self.flush(&mut state); + + let event = gst::event::FlushStop::builder(true) + .seqnum(seek_seqnum) + .build(); + + /* Drop our state while we push a serialized event upstream */ + drop(state); + + gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event); + self.sinkpad.push_event(event); + + state = self.state.lock().unwrap(); + + state + .segment + .do_seek(rate, flags, start_type, start, stop_type, stop); + + match self.start_task(element) { + Err(error) => { + error.log(); + false + } + _ => true, + } + } + + fn src_event(&self, pad: &gst::Pad, element: &super::JsonGstParse, event: gst::Event) -> bool { + use gst::EventView; + + gst_log!(CAT, obj: pad, "Handling event {:?}", event); + match event.view() { + EventView::Seek(e) => self.perform_seek(&e, element), + _ => pad.event_default(Some(element), event), + } + } + + fn src_query( + &self, + pad: &gst::Pad, + element: &super::JsonGstParse, + query: &mut gst::QueryRef, + ) -> bool { + use gst::QueryView; + + gst_log!(CAT, obj: pad, "Handling query {:?}", query); + + match query.view_mut() { + QueryView::Seeking(mut q) => { + let state = self.state.lock().unwrap(); + + let fmt = q.get_format(); + + if fmt == gst::Format::Time { + if let Some(pull) = state.pull.as_ref() { + q.set( + true, + gst::GenericFormattedValue::Time(0.into()), + gst::GenericFormattedValue::Time(pull.duration), + ); + true + } else { + false + } + } else { + false + } + } + QueryView::Position(ref mut q) => { + // For Time answer ourselfs, otherwise forward + if q.get_format() == gst::Format::Time { + let state = self.state.lock().unwrap(); + q.set(state.last_position); + true + } else { + self.sinkpad.peer_query(query) + } + } + QueryView::Duration(ref mut q) => { + // For Time answer ourselfs, otherwise forward + let state = self.state.lock().unwrap(); + if q.get_format() == gst::Format::Time { + if let Some(pull) = state.pull.as_ref() { + if pull.duration.is_some() { + q.set(state.pull.as_ref().unwrap().duration); + true + } else { + false + } + } else { + false + } + } else { + self.sinkpad.peer_query(query) + } + } + _ => pad.query_default(Some(element), query), + } + } +} + +impl ObjectSubclass for JsonGstParse { + const NAME: &'static str = "RsJsonGstParse"; + type Type = super::JsonGstParse; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct; + type Class = subclass::simple::ClassStruct; + + glib::object_subclass!(); + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.get_pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) + .activate_function(|pad, parent| { + JsonGstParse::catch_panic_pad_function( + parent, + || Err(gst::loggable_error!(CAT, "Panic activating sink pad")), + |parse, element| parse.sink_activate(pad, element), + ) + }) + .activatemode_function(|pad, parent, mode, active| { + JsonGstParse::catch_panic_pad_function( + parent, + || { + Err(gst::loggable_error!( + CAT, + "Panic activating sink pad with mode" + )) + }, + |parse, element| parse.sink_activatemode(pad, element, mode, active), + ) + }) + .chain_function(|pad, parent, buffer| { + JsonGstParse::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |parse, element| parse.sink_chain(pad, element, buffer), + ) + }) + .event_function(|pad, parent, event| { + JsonGstParse::catch_panic_pad_function( + parent, + || false, + |parse, element| parse.sink_event(pad, element, event), + ) + }) + .build(); + + let templ = klass.get_pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) + .event_function(|pad, parent, event| { + JsonGstParse::catch_panic_pad_function( + parent, + || false, + |parse, element| parse.src_event(pad, element, event), + ) + }) + .query_function(|pad, parent, query| { + JsonGstParse::catch_panic_pad_function( + parent, + || false, + |parse, element| parse.src_query(pad, element, query), + ) + }) + .build(); + + Self { + srcpad, + sinkpad, + state: Mutex::new(State::default()), + } + } + + fn class_init(klass: &mut Self::Class) { + klass.set_metadata( + "JSON GStreamer parser", + "Parser/JSON", + "Parses ndjson as output by jsongstenc", + "Mathieu Duponchelle ", + ); + + let caps = gst::Caps::builder("application/x-json").build(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(src_pad_template); + + let caps = gst::Caps::new_any(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + } +} + +impl ObjectImpl for JsonGstParse { + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + } +} + +impl ElementImpl for JsonGstParse { + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result { + gst_trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::ReadyToPaused | gst::StateChange::PausedToReady => { + // Reset the whole state + let mut state = self.state.lock().unwrap(); + *state = State::default(); + } + _ => (), + } + + self.parent_change_state(element, transition) + } +} diff --git a/text/json/src/jsongstparse/mod.rs b/text/json/src/jsongstparse/mod.rs new file mode 100644 index 00000000..cc7eeda3 --- /dev/null +++ b/text/json/src/jsongstparse/mod.rs @@ -0,0 +1,36 @@ +// Copyright (C) 2020 Mathieu Duponchelle +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct JsonGstParse(ObjectSubclass) @extends gst::Element, gst::Object; +} + +unsafe impl Send for JsonGstParse {} +unsafe impl Sync for JsonGstParse {} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "jsongstparse", + gst::Rank::Primary, + JsonGstParse::static_type(), + ) +} diff --git a/text/json/src/lib.rs b/text/json/src/lib.rs new file mode 100644 index 00000000..4d910698 --- /dev/null +++ b/text/json/src/lib.rs @@ -0,0 +1,40 @@ +// Copyright (C) 2020 Mathieu Duponchelle +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +#![recursion_limit = "128"] + +mod jsongstenc; +mod jsongstparse; +mod line_reader; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + jsongstparse::register(plugin)?; + jsongstenc::register(plugin)?; + Ok(()) +} + +gst::plugin_define!( + rsjson, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "LGPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/text/json/src/line_reader.rs b/text/json/src/line_reader.rs new file mode 120000 index 00000000..95af5864 --- /dev/null +++ b/text/json/src/line_reader.rs @@ -0,0 +1 @@ +../../../video/closedcaption/src/line_reader.rs \ No newline at end of file diff --git a/text/json/tests/json.rs b/text/json/tests/json.rs new file mode 100644 index 00000000..ad7d7410 --- /dev/null +++ b/text/json/tests/json.rs @@ -0,0 +1,104 @@ +// Copyright (C) 2020 Mathieu Duponchelle +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use gst::EventView; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstrsjson::plugin_register_static().expect("json test"); + }); +} + +#[test] +fn test_enc() { + init(); + + let input = "{\"foo\":42}"; + + let mut h = gst_check::Harness::new("jsongstenc"); + + h.set_src_caps_str("application/x-json, format=test"); + + let buf = { + let mut buf = gst::Buffer::from_mut_slice(Vec::from(&input[..])); + let buf_ref = buf.get_mut().unwrap(); + buf_ref.set_pts(gst::ClockTime::from_seconds(0)); + buf_ref.set_duration(gst::ClockTime::from_seconds(2)); + buf + }; + + assert_eq!(h.push(buf), Ok(gst::FlowSuccess::Ok)); + + let buf = h.pull().expect("Couldn't pull buffer"); + let map = buf.map_readable().expect("Couldn't map buffer readable"); + assert_eq!( + std::str::from_utf8(map.as_ref()), + Ok("{\"Header\":{\"format\":\"test\"}}\n"), + ); + + let buf = h.pull().expect("Couldn't pull buffer"); + assert_eq!(buf.get_pts(), 0.into()); + assert_eq!(buf.get_duration(), 2 * gst::SECOND); + let map = buf.map_readable().expect("Couldn't map buffer readable"); + assert_eq!( + std::str::from_utf8(map.as_ref()), + Ok("{\"Buffer\":{\"pts\":0,\"duration\":2000000000,\"data\":{\"foo\":42}}}\n") + ); +} + +#[test] +fn test_parse() { + init(); + + let input = [ + "{\"Header\":{\"format\":\"test\"}}\n", + "{\"Buffer\":{\"pts\":0,\"duration\":2000000000,\"data\":{\"foo\":42}}}\n", + ]; + + let mut h = gst_check::Harness::new("jsongstparse"); + + h.set_src_caps_str("text/x-raw"); + + for input in &input { + let buf = gst::Buffer::from_mut_slice(Vec::from(&input[..])); + assert_eq!(h.push(buf), Ok(gst::FlowSuccess::Ok)); + } + + while h.events_in_queue() > 0 { + let ev = h.pull_event().unwrap(); + + match ev.view() { + EventView::Caps(ev) => { + assert!(ev.get_caps().is_strictly_equal(&gst::Caps::new_simple( + &"application/x-json", + &[(&"format", &"test")] + ))); + } + _ => (), + } + } + + let buf = h.pull().expect("Couldn't pull buffer"); + let map = buf.map_readable().expect("Couldn't map buffer readable"); + assert_eq!(buf.get_pts(), 0.into()); + assert_eq!(buf.get_duration(), 2 * gst::SECOND); + assert_eq!(std::str::from_utf8(map.as_ref()), Ok("{\"foo\":42}")); +}