mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-22 11:30:59 +00:00
Remove &
for obj
in log macros
This is no longer necessary. See https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1137
This commit is contained in:
parent
fe8e0a8bf8
commit
86776be58c
26 changed files with 294 additions and 298 deletions
|
@ -130,7 +130,7 @@ impl ObjectImpl for EbuR128Level {
|
|||
let this = args[0].get::<super::EbuR128Level>().unwrap();
|
||||
let imp = this.imp();
|
||||
|
||||
gst::info!(CAT, obj: &this, "Resetting measurements",);
|
||||
gst::info!(CAT, obj: this, "Resetting measurements",);
|
||||
imp.reset.store(true, atomic::Ordering::SeqCst);
|
||||
|
||||
None
|
||||
|
|
|
@ -464,13 +464,13 @@ impl Decrypter {
|
|||
self.sinkpad.pull_range(pull_offset, total_size).map_err(|err| {
|
||||
match err {
|
||||
gst::FlowError::Flushing => {
|
||||
gst::debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing");
|
||||
gst::debug!(CAT, obj: self.sinkpad, "Pausing after pulling buffer, reason: flushing");
|
||||
}
|
||||
gst::FlowError::Eos => {
|
||||
gst::debug!(CAT, obj: &self.sinkpad, "Eos");
|
||||
gst::debug!(CAT, obj: self.sinkpad, "Eos");
|
||||
}
|
||||
flow => {
|
||||
gst::error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow);
|
||||
gst::error!(CAT, obj: self.sinkpad, "Failed to pull, reason: {:?}", flow);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -84,7 +84,7 @@ impl PadSinkHandler for TestSinkPadHandler {
|
|||
let sender = elem.imp().clone_item_sender();
|
||||
async move {
|
||||
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
|
||||
gst::debug!(CAT, obj: &elem, "Flushing");
|
||||
gst::debug!(CAT, obj: elem, "Flushing");
|
||||
return Err(gst::FlowError::Flushing);
|
||||
}
|
||||
|
||||
|
@ -103,7 +103,7 @@ impl PadSinkHandler for TestSinkPadHandler {
|
|||
async move {
|
||||
for buffer in list.iter_owned() {
|
||||
if sender.send_async(StreamItem::Buffer(buffer)).await.is_err() {
|
||||
gst::debug!(CAT, obj: &elem, "Flushing");
|
||||
gst::debug!(CAT, obj: elem, "Flushing");
|
||||
return Err(gst::FlowError::Flushing);
|
||||
}
|
||||
}
|
||||
|
@ -125,7 +125,7 @@ impl PadSinkHandler for TestSinkPadHandler {
|
|||
let imp = elem.imp();
|
||||
return imp.task.flush_stop().await_maybe_on_context().is_ok();
|
||||
} else if sender.send_async(StreamItem::Event(event)).await.is_err() {
|
||||
gst::debug!(CAT, obj: &elem, "Flushing");
|
||||
gst::debug!(CAT, obj: elem, "Flushing");
|
||||
}
|
||||
|
||||
true
|
||||
|
@ -441,9 +441,9 @@ impl TaskImpl for TestSinkTask {
|
|||
self.raise_log_level = settings.raise_log_level;
|
||||
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Preparing Task");
|
||||
gst::log!(CAT, obj: self.element, "Preparing Task");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Preparing Task");
|
||||
gst::trace!(CAT, obj: self.element, "Preparing Task");
|
||||
}
|
||||
|
||||
self.stats.must_log = settings.logs_stats;
|
||||
|
@ -458,9 +458,9 @@ impl TaskImpl for TestSinkTask {
|
|||
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async {
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Starting Task");
|
||||
gst::log!(CAT, obj: self.element, "Starting Task");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Starting Task");
|
||||
gst::trace!(CAT, obj: self.element, "Starting Task");
|
||||
}
|
||||
|
||||
self.last_dts = None;
|
||||
|
@ -473,9 +473,9 @@ impl TaskImpl for TestSinkTask {
|
|||
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async {
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Stopping Task");
|
||||
gst::log!(CAT, obj: self.element, "Stopping Task");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Stopping Task");
|
||||
gst::trace!(CAT, obj: self.element, "Stopping Task");
|
||||
}
|
||||
|
||||
self.flush().await;
|
||||
|
@ -489,9 +489,9 @@ impl TaskImpl for TestSinkTask {
|
|||
let item = self.item_receiver.next().await.unwrap();
|
||||
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Popped item");
|
||||
gst::log!(CAT, obj: self.element, "Popped item");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Popped item");
|
||||
gst::trace!(CAT, obj: self.element, "Popped item");
|
||||
}
|
||||
|
||||
Ok(item)
|
||||
|
@ -502,9 +502,9 @@ impl TaskImpl for TestSinkTask {
|
|||
fn handle_item(&mut self, item: StreamItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
|
||||
async move {
|
||||
if self.raise_log_level {
|
||||
gst::debug!(CAT, obj: &self.element, "Received {:?}", item);
|
||||
gst::debug!(CAT, obj: self.element, "Received {:?}", item);
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Received {:?}", item);
|
||||
gst::trace!(CAT, obj: self.element, "Received {:?}", item);
|
||||
}
|
||||
|
||||
match item {
|
||||
|
@ -527,28 +527,28 @@ impl TaskImpl for TestSinkTask {
|
|||
self.stats.add_buffer(latency, interval);
|
||||
|
||||
if self.raise_log_level {
|
||||
gst::debug!(CAT, obj: &self.element, "o latency {:.2?}", latency);
|
||||
gst::debug!(CAT, obj: &self.element, "o interval {:.2?}", interval);
|
||||
gst::debug!(CAT, obj: self.element, "o latency {:.2?}", latency);
|
||||
gst::debug!(CAT, obj: self.element, "o interval {:.2?}", interval);
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "o latency {:.2?}", latency);
|
||||
gst::trace!(CAT, obj: &self.element, "o interval {:.2?}", interval);
|
||||
gst::trace!(CAT, obj: self.element, "o latency {:.2?}", latency);
|
||||
gst::trace!(CAT, obj: self.element, "o interval {:.2?}", interval);
|
||||
}
|
||||
}
|
||||
|
||||
self.last_dts = Some(dts);
|
||||
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Buffer processed");
|
||||
gst::log!(CAT, obj: self.element, "Buffer processed");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Buffer processed");
|
||||
gst::trace!(CAT, obj: self.element, "Buffer processed");
|
||||
}
|
||||
}
|
||||
StreamItem::Event(event) => match event.view() {
|
||||
EventView::Eos(_) => {
|
||||
if self.raise_log_level {
|
||||
gst::debug!(CAT, obj: &self.element, "EOS");
|
||||
gst::debug!(CAT, obj: self.element, "EOS");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "EOS");
|
||||
gst::trace!(CAT, obj: self.element, "EOS");
|
||||
}
|
||||
|
||||
let elem = self.element.clone();
|
||||
|
|
|
@ -107,9 +107,9 @@ impl TaskImpl for SrcTask {
|
|||
self.raise_log_level = settings.raise_log_level;
|
||||
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Preparing Task");
|
||||
gst::log!(CAT, obj: self.element, "Preparing Task");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Preparing Task");
|
||||
gst::trace!(CAT, obj: self.element, "Preparing Task");
|
||||
}
|
||||
|
||||
self.push_period = settings.push_period;
|
||||
|
@ -123,9 +123,9 @@ impl TaskImpl for SrcTask {
|
|||
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async {
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Starting Task");
|
||||
gst::log!(CAT, obj: self.element, "Starting Task");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Starting Task");
|
||||
gst::trace!(CAT, obj: self.element, "Starting Task");
|
||||
}
|
||||
|
||||
self.timer = Some(
|
||||
|
@ -146,9 +146,9 @@ impl TaskImpl for SrcTask {
|
|||
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Stopping Task");
|
||||
gst::log!(CAT, obj: self.element, "Stopping Task");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Stopping Task");
|
||||
gst::trace!(CAT, obj: self.element, "Stopping Task");
|
||||
}
|
||||
|
||||
self.buffer_pool.set_active(false).unwrap();
|
||||
|
@ -164,17 +164,17 @@ impl TaskImpl for SrcTask {
|
|||
fn try_next(&mut self) -> BoxFuture<'_, Result<gst::Buffer, gst::FlowError>> {
|
||||
async move {
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Awaiting timer");
|
||||
gst::log!(CAT, obj: self.element, "Awaiting timer");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Awaiting timer");
|
||||
gst::trace!(CAT, obj: self.element, "Awaiting timer");
|
||||
}
|
||||
|
||||
self.timer.as_mut().unwrap().next().await;
|
||||
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Timer ticked");
|
||||
gst::log!(CAT, obj: self.element, "Timer ticked");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Timer ticked");
|
||||
gst::trace!(CAT, obj: self.element, "Timer ticked");
|
||||
}
|
||||
|
||||
self.buffer_pool
|
||||
|
@ -188,7 +188,7 @@ impl TaskImpl for SrcTask {
|
|||
buffer
|
||||
})
|
||||
.map_err(|err| {
|
||||
gst::error!(CAT, obj: &self.element, "Failed to acquire buffer {}", err);
|
||||
gst::error!(CAT, obj: self.element, "Failed to acquire buffer {}", err);
|
||||
err
|
||||
})
|
||||
}
|
||||
|
@ -201,16 +201,16 @@ impl TaskImpl for SrcTask {
|
|||
match res {
|
||||
Ok(_) => {
|
||||
if self.raise_log_level {
|
||||
gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
|
||||
gst::log!(CAT, obj: self.element, "Successfully pushed buffer");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Successfully pushed buffer");
|
||||
gst::trace!(CAT, obj: self.element, "Successfully pushed buffer");
|
||||
}
|
||||
}
|
||||
Err(gst::FlowError::Eos) => {
|
||||
if self.raise_log_level {
|
||||
gst::debug!(CAT, obj: &self.element, "EOS");
|
||||
gst::debug!(CAT, obj: self.element, "EOS");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "EOS");
|
||||
gst::trace!(CAT, obj: self.element, "EOS");
|
||||
}
|
||||
let test_src = self.element.imp();
|
||||
test_src.src_pad.push_event(gst::event::Eos::new()).await;
|
||||
|
@ -219,13 +219,13 @@ impl TaskImpl for SrcTask {
|
|||
}
|
||||
Err(gst::FlowError::Flushing) => {
|
||||
if self.raise_log_level {
|
||||
gst::debug!(CAT, obj: &self.element, "Flushing");
|
||||
gst::debug!(CAT, obj: self.element, "Flushing");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Flushing");
|
||||
gst::trace!(CAT, obj: self.element, "Flushing");
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(CAT, obj: &self.element, "Got error {}", err);
|
||||
gst::error!(CAT, obj: self.element, "Got error {}", err);
|
||||
gst::element_error!(
|
||||
&self.element,
|
||||
gst::StreamError::Failed,
|
||||
|
@ -244,18 +244,18 @@ impl TaskImpl for SrcTask {
|
|||
impl SrcTask {
|
||||
async fn push(&mut self, buffer: gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
if self.raise_log_level {
|
||||
gst::debug!(CAT, obj: &self.element, "Pushing {:?}", buffer);
|
||||
gst::debug!(CAT, obj: self.element, "Pushing {:?}", buffer);
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Pushing {:?}", buffer);
|
||||
gst::trace!(CAT, obj: self.element, "Pushing {:?}", buffer);
|
||||
}
|
||||
|
||||
let test_src = self.element.imp();
|
||||
|
||||
if self.need_initial_events {
|
||||
if self.raise_log_level {
|
||||
gst::debug!(CAT, obj: &self.element, "Pushing initial events");
|
||||
gst::debug!(CAT, obj: self.element, "Pushing initial events");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Pushing initial events");
|
||||
gst::trace!(CAT, obj: self.element, "Pushing initial events");
|
||||
}
|
||||
|
||||
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
|
||||
|
@ -283,9 +283,9 @@ impl SrcTask {
|
|||
}
|
||||
|
||||
if self.raise_log_level {
|
||||
gst::debug!(CAT, obj: &self.element, "Forwarding buffer");
|
||||
gst::debug!(CAT, obj: self.element, "Forwarding buffer");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Forwarding buffer");
|
||||
gst::trace!(CAT, obj: self.element, "Forwarding buffer");
|
||||
}
|
||||
|
||||
let ok = test_src.src_pad.push(buffer).await?;
|
||||
|
@ -294,14 +294,14 @@ impl SrcTask {
|
|||
|
||||
if self.num_buffers.opt_eq(self.buffer_count).unwrap_or(false) {
|
||||
if self.raise_log_level {
|
||||
gst::debug!(CAT, obj: &self.element, "Pushing EOS");
|
||||
gst::debug!(CAT, obj: self.element, "Pushing EOS");
|
||||
} else {
|
||||
gst::trace!(CAT, obj: &self.element, "Pushing EOS");
|
||||
gst::trace!(CAT, obj: self.element, "Pushing EOS");
|
||||
}
|
||||
|
||||
let test_src = self.element.imp();
|
||||
if !test_src.src_pad.push_event(gst::event::Eos::new()).await {
|
||||
gst::error!(CAT, obj: &self.element, "Error pushing EOS");
|
||||
gst::error!(CAT, obj: self.element, "Error pushing EOS");
|
||||
}
|
||||
return Err(gst::FlowError::Eos);
|
||||
}
|
||||
|
|
|
@ -170,11 +170,11 @@ impl AppSrcTask {
|
|||
}
|
||||
|
||||
async fn push_item(&mut self, item: StreamItem) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
gst::log!(CAT, obj: &self.element, "Handling {:?}", item);
|
||||
gst::log!(CAT, obj: self.element, "Handling {:?}", item);
|
||||
let appsrc = self.element.imp();
|
||||
|
||||
if self.need_initial_events {
|
||||
gst::debug!(CAT, obj: &self.element, "Pushing initial events");
|
||||
gst::debug!(CAT, obj: self.element, "Pushing initial events");
|
||||
|
||||
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
|
||||
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
|
||||
|
@ -204,7 +204,7 @@ impl AppSrcTask {
|
|||
|
||||
match item {
|
||||
StreamItem::Buffer(buffer) => {
|
||||
gst::log!(CAT, obj: &self.element, "Forwarding {:?}", buffer);
|
||||
gst::log!(CAT, obj: self.element, "Forwarding {:?}", buffer);
|
||||
appsrc.src_pad.push(buffer).await
|
||||
}
|
||||
StreamItem::Event(event) => {
|
||||
|
@ -214,7 +214,7 @@ impl AppSrcTask {
|
|||
Err(gst::FlowError::Eos)
|
||||
}
|
||||
_ => {
|
||||
gst::log!(CAT, obj: &self.element, "Forwarding {:?}", event);
|
||||
gst::log!(CAT, obj: self.element, "Forwarding {:?}", event);
|
||||
appsrc.src_pad.push_event(event).await;
|
||||
Ok(gst::FlowSuccess::Ok)
|
||||
}
|
||||
|
@ -242,18 +242,18 @@ impl TaskImpl for AppSrcTask {
|
|||
let res = self.push_item(item).await;
|
||||
match res {
|
||||
Ok(_) => {
|
||||
gst::log!(CAT, obj: &self.element, "Successfully pushed item");
|
||||
gst::log!(CAT, obj: self.element, "Successfully pushed item");
|
||||
}
|
||||
Err(gst::FlowError::Eos) => {
|
||||
gst::debug!(CAT, obj: &self.element, "EOS");
|
||||
gst::debug!(CAT, obj: self.element, "EOS");
|
||||
let appsrc = self.element.imp();
|
||||
appsrc.src_pad.push_event(gst::event::Eos::new()).await;
|
||||
}
|
||||
Err(gst::FlowError::Flushing) => {
|
||||
gst::debug!(CAT, obj: &self.element, "Flushing");
|
||||
gst::debug!(CAT, obj: self.element, "Flushing");
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(CAT, obj: &self.element, "Got error {}", err);
|
||||
gst::error!(CAT, obj: self.element, "Got error {}", err);
|
||||
gst::element_error!(
|
||||
&self.element,
|
||||
gst::StreamError::Failed,
|
||||
|
@ -270,13 +270,13 @@ impl TaskImpl for AppSrcTask {
|
|||
|
||||
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Stopping task");
|
||||
gst::log!(CAT, obj: self.element, "Stopping task");
|
||||
|
||||
self.flush();
|
||||
self.need_initial_events = true;
|
||||
self.need_segment = true;
|
||||
|
||||
gst::log!(CAT, obj: &self.element, "Task stopped");
|
||||
gst::log!(CAT, obj: self.element, "Task stopped");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -284,12 +284,12 @@ impl TaskImpl for AppSrcTask {
|
|||
|
||||
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Starting task flush");
|
||||
gst::log!(CAT, obj: self.element, "Starting task flush");
|
||||
|
||||
self.flush();
|
||||
self.need_segment = true;
|
||||
|
||||
gst::log!(CAT, obj: &self.element, "Task flush started");
|
||||
gst::log!(CAT, obj: self.element, "Task flush started");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
|
|
@ -127,10 +127,10 @@ impl DataQueue {
|
|||
pub fn start(&self) {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
if inner.state == DataQueueState::Started {
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue already Started");
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Data queue already Started");
|
||||
return;
|
||||
}
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Starting data queue");
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Starting data queue");
|
||||
inner.state = DataQueueState::Started;
|
||||
inner.wake();
|
||||
}
|
||||
|
@ -138,10 +138,10 @@ impl DataQueue {
|
|||
pub fn stop(&self) {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
if inner.state == DataQueueState::Stopped {
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue already Stopped");
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Data queue already Stopped");
|
||||
return;
|
||||
}
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Stopping data queue");
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Stopping data queue");
|
||||
inner.state = DataQueueState::Stopped;
|
||||
inner.wake();
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ impl DataQueue {
|
|||
pub fn clear(&self) {
|
||||
let mut inner = self.0.lock().unwrap();
|
||||
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Clearing data queue");
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Clearing data queue");
|
||||
|
||||
let src_pad = inner.src_pad.clone();
|
||||
for item in inner.queue.drain(..) {
|
||||
|
@ -163,7 +163,7 @@ impl DataQueue {
|
|||
}
|
||||
}
|
||||
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue cleared");
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Data queue cleared");
|
||||
}
|
||||
|
||||
pub fn push(&self, item: DataQueueItem) -> Result<(), DataQueueItem> {
|
||||
|
@ -172,7 +172,7 @@ impl DataQueue {
|
|||
if inner.state == DataQueueState::Stopped {
|
||||
gst::debug!(
|
||||
DATA_QUEUE_CAT,
|
||||
obj: &inner.element,
|
||||
obj: inner.element,
|
||||
"Rejecting item {:?} in state {:?}",
|
||||
item,
|
||||
inner.state
|
||||
|
@ -180,7 +180,7 @@ impl DataQueue {
|
|||
return Err(item);
|
||||
}
|
||||
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Pushing item {:?}", item);
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Pushing item {:?}", item);
|
||||
|
||||
let (count, bytes) = item.size();
|
||||
let queue_ts = inner.queue.iter().filter_map(|i| i.timestamp()).next();
|
||||
|
@ -188,14 +188,14 @@ impl DataQueue {
|
|||
|
||||
if let Some(max) = inner.max_size_buffers {
|
||||
if max <= inner.cur_size_buffers {
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (buffers): {} <= {}", max, inner.cur_size_buffers);
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Queue is full (buffers): {} <= {}", max, inner.cur_size_buffers);
|
||||
return Err(item);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(max) = inner.max_size_bytes {
|
||||
if max <= inner.cur_size_bytes {
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (bytes): {} <= {}", max, inner.cur_size_bytes);
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Queue is full (bytes): {} <= {}", max, inner.cur_size_bytes);
|
||||
return Err(item);
|
||||
}
|
||||
}
|
||||
|
@ -209,7 +209,7 @@ impl DataQueue {
|
|||
};
|
||||
|
||||
if max <= level {
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (time): {} <= {}", max, level);
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Queue is full (time): {} <= {}", max, level);
|
||||
return Err(item);
|
||||
}
|
||||
}
|
||||
|
@ -232,10 +232,10 @@ impl DataQueue {
|
|||
match inner.state {
|
||||
DataQueueState::Started => match inner.queue.pop_front() {
|
||||
None => {
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue is empty");
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Data queue is empty");
|
||||
}
|
||||
Some(item) => {
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Popped item {:?}", item);
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Popped item {:?}", item);
|
||||
|
||||
let (count, bytes) = item.size();
|
||||
inner.cur_size_buffers -= count;
|
||||
|
@ -245,7 +245,7 @@ impl DataQueue {
|
|||
}
|
||||
},
|
||||
DataQueueState::Stopped => {
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: &inner.element, "Data queue Stopped");
|
||||
gst::debug!(DATA_QUEUE_CAT, obj: inner.element, "Data queue Stopped");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1033,7 +1033,7 @@ impl TaskImpl for JitterBufferTask {
|
|||
|
||||
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Starting task");
|
||||
gst::log!(CAT, obj: self.element, "Starting task");
|
||||
|
||||
self.src_pad_handler.clear();
|
||||
self.sink_pad_handler.clear();
|
||||
|
@ -1046,7 +1046,7 @@ impl TaskImpl for JitterBufferTask {
|
|||
state.jbuf.set_delay(latency);
|
||||
*jb.state.lock().unwrap() = state;
|
||||
|
||||
gst::log!(CAT, obj: &self.element, "Task started");
|
||||
gst::log!(CAT, obj: self.element, "Task started");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -1106,9 +1106,9 @@ impl TaskImpl for JitterBufferTask {
|
|||
|
||||
// Got aborted, reschedule if needed
|
||||
if let Some(delay_fut) = delay_fut {
|
||||
gst::debug!(CAT, obj: &self.element, "Waiting");
|
||||
gst::debug!(CAT, obj: self.element, "Waiting");
|
||||
if let Err(Aborted) = delay_fut.await {
|
||||
gst::debug!(CAT, obj: &self.element, "Waiting aborted");
|
||||
gst::debug!(CAT, obj: self.element, "Waiting aborted");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
@ -1126,7 +1126,7 @@ impl TaskImpl for JitterBufferTask {
|
|||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &self.element,
|
||||
obj: self.element,
|
||||
"Woke up at {}, earliest_pts {}",
|
||||
now.display(),
|
||||
state.earliest_pts.display()
|
||||
|
@ -1182,13 +1182,13 @@ impl TaskImpl for JitterBufferTask {
|
|||
if let Err(err) = res {
|
||||
match err {
|
||||
gst::FlowError::Eos => {
|
||||
gst::debug!(CAT, obj: &self.element, "Pushing EOS event");
|
||||
gst::debug!(CAT, obj: self.element, "Pushing EOS event");
|
||||
let _ = jb.src_pad.push_event(gst::event::Eos::new()).await;
|
||||
}
|
||||
gst::FlowError::Flushing => {
|
||||
gst::debug!(CAT, obj: &self.element, "Flushing")
|
||||
gst::debug!(CAT, obj: self.element, "Flushing")
|
||||
}
|
||||
err => gst::error!(CAT, obj: &self.element, "Error {}", err),
|
||||
err => gst::error!(CAT, obj: self.element, "Error {}", err),
|
||||
}
|
||||
|
||||
return Err(err);
|
||||
|
@ -1204,7 +1204,7 @@ impl TaskImpl for JitterBufferTask {
|
|||
|
||||
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Stopping task");
|
||||
gst::log!(CAT, obj: self.element, "Stopping task");
|
||||
|
||||
let jb = self.element.imp();
|
||||
let mut jb_state = jb.state.lock().unwrap();
|
||||
|
@ -1218,7 +1218,7 @@ impl TaskImpl for JitterBufferTask {
|
|||
|
||||
*jb_state = State::default();
|
||||
|
||||
gst::log!(CAT, obj: &self.element, "Task stopped");
|
||||
gst::log!(CAT, obj: self.element, "Task stopped");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
|
|
@ -784,15 +784,15 @@ impl ProxySrcTask {
|
|||
|
||||
match item {
|
||||
DataQueueItem::Buffer(buffer) => {
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Forwarding {:?}", buffer);
|
||||
gst::log!(SRC_CAT, obj: self.element, "Forwarding {:?}", buffer);
|
||||
proxysrc.src_pad.push(buffer).await.map(drop)
|
||||
}
|
||||
DataQueueItem::BufferList(list) => {
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Forwarding {:?}", list);
|
||||
gst::log!(SRC_CAT, obj: self.element, "Forwarding {:?}", list);
|
||||
proxysrc.src_pad.push_list(list).await.map(drop)
|
||||
}
|
||||
DataQueueItem::Event(event) => {
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Forwarding {:?}", event);
|
||||
gst::log!(SRC_CAT, obj: self.element, "Forwarding {:?}", event);
|
||||
proxysrc.src_pad.push_event(event).await;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -805,7 +805,7 @@ impl TaskImpl for ProxySrcTask {
|
|||
|
||||
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Starting task");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Starting task");
|
||||
|
||||
let proxysrc = self.element.imp();
|
||||
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
|
||||
|
@ -819,7 +819,7 @@ impl TaskImpl for ProxySrcTask {
|
|||
|
||||
self.dataqueue.start();
|
||||
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Task started");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Task started");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -841,25 +841,25 @@ impl TaskImpl for ProxySrcTask {
|
|||
let proxysrc = self.element.imp();
|
||||
match res {
|
||||
Ok(()) => {
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Successfully pushed item");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Successfully pushed item");
|
||||
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
|
||||
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
|
||||
shared_ctx.last_res = Ok(gst::FlowSuccess::Ok);
|
||||
}
|
||||
Err(gst::FlowError::Flushing) => {
|
||||
gst::debug!(SRC_CAT, obj: &self.element, "Flushing");
|
||||
gst::debug!(SRC_CAT, obj: self.element, "Flushing");
|
||||
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
|
||||
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
|
||||
shared_ctx.last_res = Err(gst::FlowError::Flushing);
|
||||
}
|
||||
Err(gst::FlowError::Eos) => {
|
||||
gst::debug!(SRC_CAT, obj: &self.element, "EOS");
|
||||
gst::debug!(SRC_CAT, obj: self.element, "EOS");
|
||||
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
|
||||
let mut shared_ctx = proxy_ctx.as_ref().unwrap().lock_shared();
|
||||
shared_ctx.last_res = Err(gst::FlowError::Eos);
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(SRC_CAT, obj: &self.element, "Got error {}", err);
|
||||
gst::error!(SRC_CAT, obj: self.element, "Got error {}", err);
|
||||
gst::element_error!(
|
||||
&self.element,
|
||||
gst::StreamError::Failed,
|
||||
|
@ -879,7 +879,7 @@ impl TaskImpl for ProxySrcTask {
|
|||
|
||||
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Stopping task");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Stopping task");
|
||||
|
||||
let proxysrc = self.element.imp();
|
||||
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
|
||||
|
@ -894,7 +894,7 @@ impl TaskImpl for ProxySrcTask {
|
|||
pending_queue.notify_more_queue_space();
|
||||
}
|
||||
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Task stopped");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Task stopped");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -902,7 +902,7 @@ impl TaskImpl for ProxySrcTask {
|
|||
|
||||
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Starting task flush");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Starting task flush");
|
||||
|
||||
let proxysrc = self.element.imp();
|
||||
let proxy_ctx = proxysrc.proxy_ctx.lock().unwrap();
|
||||
|
@ -912,7 +912,7 @@ impl TaskImpl for ProxySrcTask {
|
|||
|
||||
shared_ctx.last_res = Err(gst::FlowError::Flushing);
|
||||
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Task flush started");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Task flush started");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
|
|
@ -264,15 +264,15 @@ impl QueueTask {
|
|||
|
||||
match item {
|
||||
DataQueueItem::Buffer(buffer) => {
|
||||
gst::log!(CAT, obj: &self.element, "Forwarding {:?}", buffer);
|
||||
gst::log!(CAT, obj: self.element, "Forwarding {:?}", buffer);
|
||||
queue.src_pad.push(buffer).await.map(drop)
|
||||
}
|
||||
DataQueueItem::BufferList(list) => {
|
||||
gst::log!(CAT, obj: &self.element, "Forwarding {:?}", list);
|
||||
gst::log!(CAT, obj: self.element, "Forwarding {:?}", list);
|
||||
queue.src_pad.push_list(list).await.map(drop)
|
||||
}
|
||||
DataQueueItem::Event(event) => {
|
||||
gst::log!(CAT, obj: &self.element, "Forwarding {:?}", event);
|
||||
gst::log!(CAT, obj: self.element, "Forwarding {:?}", event);
|
||||
queue.src_pad.push_event(event).await;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -285,7 +285,7 @@ impl TaskImpl for QueueTask {
|
|||
|
||||
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Starting task");
|
||||
gst::log!(CAT, obj: self.element, "Starting task");
|
||||
|
||||
let queue = self.element.imp();
|
||||
let mut last_res = queue.last_res.lock().unwrap();
|
||||
|
@ -294,7 +294,7 @@ impl TaskImpl for QueueTask {
|
|||
|
||||
*last_res = Ok(gst::FlowSuccess::Ok);
|
||||
|
||||
gst::log!(CAT, obj: &self.element, "Task started");
|
||||
gst::log!(CAT, obj: self.element, "Task started");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -316,20 +316,20 @@ impl TaskImpl for QueueTask {
|
|||
let queue = self.element.imp();
|
||||
match res {
|
||||
Ok(()) => {
|
||||
gst::log!(CAT, obj: &self.element, "Successfully pushed item");
|
||||
gst::log!(CAT, obj: self.element, "Successfully pushed item");
|
||||
*queue.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok);
|
||||
}
|
||||
Err(gst::FlowError::Flushing) => {
|
||||
gst::debug!(CAT, obj: &self.element, "Flushing");
|
||||
gst::debug!(CAT, obj: self.element, "Flushing");
|
||||
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Flushing);
|
||||
}
|
||||
Err(gst::FlowError::Eos) => {
|
||||
gst::debug!(CAT, obj: &self.element, "EOS");
|
||||
gst::debug!(CAT, obj: self.element, "EOS");
|
||||
*queue.last_res.lock().unwrap() = Err(gst::FlowError::Eos);
|
||||
queue.src_pad.push_event(gst::event::Eos::new()).await;
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(CAT, obj: &self.element, "Got error {}", err);
|
||||
gst::error!(CAT, obj: self.element, "Got error {}", err);
|
||||
gst::element_error!(
|
||||
&self.element,
|
||||
gst::StreamError::Failed,
|
||||
|
@ -347,7 +347,7 @@ impl TaskImpl for QueueTask {
|
|||
|
||||
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Stopping task");
|
||||
gst::log!(CAT, obj: self.element, "Stopping task");
|
||||
|
||||
let queue = self.element.imp();
|
||||
let mut last_res = queue.last_res.lock().unwrap();
|
||||
|
@ -361,7 +361,7 @@ impl TaskImpl for QueueTask {
|
|||
|
||||
*last_res = Err(gst::FlowError::Flushing);
|
||||
|
||||
gst::log!(CAT, obj: &self.element, "Task stopped");
|
||||
gst::log!(CAT, obj: self.element, "Task stopped");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -369,7 +369,7 @@ impl TaskImpl for QueueTask {
|
|||
|
||||
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Starting task flush");
|
||||
gst::log!(CAT, obj: self.element, "Starting task flush");
|
||||
|
||||
let queue = self.element.imp();
|
||||
let mut last_res = queue.last_res.lock().unwrap();
|
||||
|
@ -382,7 +382,7 @@ impl TaskImpl for QueueTask {
|
|||
|
||||
*last_res = Err(gst::FlowError::Flushing);
|
||||
|
||||
gst::log!(CAT, obj: &self.element, "Task flush started");
|
||||
gst::log!(CAT, obj: self.element, "Task flush started");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
|
|
@ -241,7 +241,7 @@ impl PadSrcInner {
|
|||
err
|
||||
})?;
|
||||
|
||||
gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
|
||||
gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Processing any pending sub tasks");
|
||||
Context::drain_sub_tasks().await?;
|
||||
|
||||
Ok(success)
|
||||
|
@ -260,18 +260,18 @@ impl PadSrcInner {
|
|||
err
|
||||
})?;
|
||||
|
||||
gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
|
||||
gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Processing any pending sub tasks");
|
||||
Context::drain_sub_tasks().await?;
|
||||
|
||||
Ok(success)
|
||||
}
|
||||
|
||||
pub async fn push_event(&self, event: gst::Event) -> bool {
|
||||
gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Pushing {:?}", event);
|
||||
gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Pushing {:?}", event);
|
||||
|
||||
let was_handled = self.gst_pad().push_event(event);
|
||||
|
||||
gst::log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
|
||||
gst::log!(RUNTIME_CAT, obj: self.gst_pad, "Processing any pending sub tasks");
|
||||
if Context::drain_sub_tasks().await.is_err() {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -76,7 +76,7 @@ impl<T: SocketRead> Socket<T> {
|
|||
buffer_pool.set_active(true).map_err(|err| {
|
||||
gst::error!(
|
||||
SOCKET_CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Failed to prepare socket: {}",
|
||||
err
|
||||
);
|
||||
|
@ -124,7 +124,7 @@ impl<T: SocketRead> Socket<T> {
|
|||
pub async fn try_next(
|
||||
&mut self,
|
||||
) -> Result<(gst::Buffer, Option<std::net::SocketAddr>), SocketError> {
|
||||
gst::log!(SOCKET_CAT, obj: &self.element, "Trying to read data");
|
||||
gst::log!(SOCKET_CAT, obj: self.element, "Trying to read data");
|
||||
|
||||
if self.mapped_buffer.is_none() {
|
||||
match self.buffer_pool.acquire_buffer(None) {
|
||||
|
@ -132,7 +132,7 @@ impl<T: SocketRead> Socket<T> {
|
|||
self.mapped_buffer = Some(buffer.into_mapped_buffer_writable().unwrap());
|
||||
}
|
||||
Err(err) => {
|
||||
gst::debug!(SOCKET_CAT, obj: &self.element, "Failed to acquire buffer {:?}", err);
|
||||
gst::debug!(SOCKET_CAT, obj: self.element, "Failed to acquire buffer {:?}", err);
|
||||
return Err(SocketError::Gst(err));
|
||||
}
|
||||
}
|
||||
|
@ -151,7 +151,7 @@ impl<T: SocketRead> Socket<T> {
|
|||
// so as to display another message
|
||||
gst::debug!(
|
||||
SOCKET_CAT,
|
||||
obj: &self.element,
|
||||
obj: self.element,
|
||||
"Read {} bytes at {} (clock {})",
|
||||
len,
|
||||
running_time.display(),
|
||||
|
@ -159,7 +159,7 @@ impl<T: SocketRead> Socket<T> {
|
|||
);
|
||||
running_time
|
||||
} else {
|
||||
gst::debug!(SOCKET_CAT, obj: &self.element, "Read {} bytes", len);
|
||||
gst::debug!(SOCKET_CAT, obj: self.element, "Read {} bytes", len);
|
||||
gst::ClockTime::NONE
|
||||
};
|
||||
|
||||
|
@ -175,7 +175,7 @@ impl<T: SocketRead> Socket<T> {
|
|||
Ok((buffer, saddr))
|
||||
}
|
||||
Err(err) => {
|
||||
gst::debug!(SOCKET_CAT, obj: &self.element, "Read error {:?}", err);
|
||||
gst::debug!(SOCKET_CAT, obj: self.element, "Read error {:?}", err);
|
||||
|
||||
Err(SocketError::Io(err))
|
||||
}
|
||||
|
@ -186,7 +186,7 @@ impl<T: SocketRead> Socket<T> {
|
|||
impl<T: SocketRead> Drop for Socket<T> {
|
||||
fn drop(&mut self) {
|
||||
if let Err(err) = self.buffer_pool.set_active(false) {
|
||||
gst::error!(SOCKET_CAT, obj: &self.element, "Failed to unprepare socket: {}", err);
|
||||
gst::error!(SOCKET_CAT, obj: self.element, "Failed to unprepare socket: {}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -184,12 +184,12 @@ impl TcpClientSrcTask {
|
|||
&mut self,
|
||||
buffer: gst::Buffer,
|
||||
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
|
||||
gst::log!(CAT, obj: self.element, "Handling {:?}", buffer);
|
||||
|
||||
let tcpclientsrc = self.element.imp();
|
||||
|
||||
if self.need_initial_events {
|
||||
gst::debug!(CAT, obj: &self.element, "Pushing initial events");
|
||||
gst::debug!(CAT, obj: self.element, "Pushing initial events");
|
||||
|
||||
let stream_id = format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
|
||||
let stream_start_evt = gst::event::StreamStart::builder(&stream_id)
|
||||
|
@ -228,20 +228,20 @@ impl TcpClientSrcTask {
|
|||
let res = tcpclientsrc.src_pad.push(buffer).await;
|
||||
match res {
|
||||
Ok(_) => {
|
||||
gst::log!(CAT, obj: &self.element, "Successfully pushed buffer");
|
||||
gst::log!(CAT, obj: self.element, "Successfully pushed buffer");
|
||||
}
|
||||
Err(gst::FlowError::Flushing) => {
|
||||
gst::debug!(CAT, obj: &self.element, "Flushing");
|
||||
gst::debug!(CAT, obj: self.element, "Flushing");
|
||||
}
|
||||
Err(gst::FlowError::Eos) => {
|
||||
gst::debug!(CAT, obj: &self.element, "EOS");
|
||||
gst::debug!(CAT, obj: self.element, "EOS");
|
||||
tcpclientsrc
|
||||
.src_pad
|
||||
.push_event(gst::event::Eos::new())
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(CAT, obj: &self.element, "Got error {}", err);
|
||||
gst::error!(CAT, obj: self.element, "Got error {}", err);
|
||||
gst::element_error!(
|
||||
self.element,
|
||||
gst::StreamError::Failed,
|
||||
|
@ -260,7 +260,7 @@ impl TaskImpl for TcpClientSrcTask {
|
|||
|
||||
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Preparing task connecting to {:?}", self.saddr);
|
||||
gst::log!(CAT, obj: self.element, "Preparing task connecting to {:?}", self.saddr);
|
||||
|
||||
let socket = Async::<TcpStream>::connect(self.saddr)
|
||||
.await
|
||||
|
@ -285,7 +285,7 @@ impl TaskImpl for TcpClientSrcTask {
|
|||
})?,
|
||||
);
|
||||
|
||||
gst::log!(CAT, obj: &self.element, "Task prepared");
|
||||
gst::log!(CAT, obj: self.element, "Task prepared");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -320,7 +320,7 @@ impl TaskImpl for TcpClientSrcTask {
|
|||
.await
|
||||
.map(|(buffer, _saddr)| buffer)
|
||||
.map_err(|err| {
|
||||
gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
|
||||
gst::error!(CAT, obj: self.element, "Got error {:?}", err);
|
||||
match err {
|
||||
SocketError::Gst(err) => {
|
||||
gst::element_error!(
|
||||
|
@ -351,9 +351,9 @@ impl TaskImpl for TcpClientSrcTask {
|
|||
|
||||
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Stopping task");
|
||||
gst::log!(CAT, obj: self.element, "Stopping task");
|
||||
self.need_initial_events = true;
|
||||
gst::log!(CAT, obj: &self.element, "Task stopped");
|
||||
gst::log!(CAT, obj: self.element, "Task stopped");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -361,9 +361,9 @@ impl TaskImpl for TcpClientSrcTask {
|
|||
|
||||
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Stopping task flush");
|
||||
gst::log!(CAT, obj: self.element, "Stopping task flush");
|
||||
self.need_initial_events = true;
|
||||
gst::log!(CAT, obj: &self.element, "Task flush stopped");
|
||||
gst::log!(CAT, obj: self.element, "Task flush stopped");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
|
|
@ -141,7 +141,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
|
|||
let sender = elem.imp().clone_item_sender();
|
||||
async move {
|
||||
if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
|
||||
gst::debug!(CAT, obj: &elem, "Flushing");
|
||||
gst::debug!(CAT, obj: elem, "Flushing");
|
||||
return Err(gst::FlowError::Flushing);
|
||||
}
|
||||
|
||||
|
@ -160,7 +160,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
|
|||
async move {
|
||||
for buffer in list.iter_owned() {
|
||||
if sender.send_async(TaskItem::Buffer(buffer)).await.is_err() {
|
||||
gst::debug!(CAT, obj: &elem, "Flushing");
|
||||
gst::debug!(CAT, obj: elem, "Flushing");
|
||||
return Err(gst::FlowError::Flushing);
|
||||
}
|
||||
}
|
||||
|
@ -182,7 +182,7 @@ impl PadSinkHandler for UdpSinkPadHandler {
|
|||
let imp = elem.imp();
|
||||
return imp.task.flush_stop().await_maybe_on_context().is_ok();
|
||||
} else if sender.send_async(TaskItem::Event(event)).await.is_err() {
|
||||
gst::debug!(CAT, obj: &elem, "Flushing");
|
||||
gst::debug!(CAT, obj: elem, "Flushing");
|
||||
}
|
||||
|
||||
true
|
||||
|
@ -306,7 +306,7 @@ impl UdpSinkTask {
|
|||
};
|
||||
|
||||
let saddr = SocketAddr::new(bind_addr, bind_port as u16);
|
||||
gst::debug!(CAT, obj: &self.element, "Binding to {:?}", saddr);
|
||||
gst::debug!(CAT, obj: self.element, "Binding to {:?}", saddr);
|
||||
|
||||
let socket = match family {
|
||||
SocketFamily::Ipv4 => socket2::Socket::new(
|
||||
|
@ -326,7 +326,7 @@ impl UdpSinkTask {
|
|||
Err(err) => {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &self.element,
|
||||
obj: self.element,
|
||||
"Failed to create {} socket: {}",
|
||||
match family {
|
||||
SocketFamily::Ipv4 => "IPv4",
|
||||
|
@ -378,7 +378,7 @@ impl UdpSinkTask {
|
|||
|
||||
fn add_client(&mut self, addr: SocketAddr) {
|
||||
if self.clients.contains(&addr) {
|
||||
gst::warning!(CAT, obj: &self.element, "Not adding client {:?} again", &addr);
|
||||
gst::warning!(CAT, obj: self.element, "Not adding client {:?} again", &addr);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -386,11 +386,11 @@ impl UdpSinkTask {
|
|||
let mut settings = udpsink.settings.lock().unwrap();
|
||||
match self.configure_client(&settings, &addr) {
|
||||
Ok(()) => {
|
||||
gst::info!(CAT, obj: &self.element, "Added client {:?}", addr);
|
||||
gst::info!(CAT, obj: self.element, "Added client {:?}", addr);
|
||||
self.clients.insert(addr);
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(CAT, obj: &self.element, "Failed to add client {:?}: {}", addr, err);
|
||||
gst::error!(CAT, obj: self.element, "Failed to add client {:?}: {}", addr, err);
|
||||
settings.clients = self.clients.clone();
|
||||
self.element.post_error_message(err);
|
||||
}
|
||||
|
@ -399,7 +399,7 @@ impl UdpSinkTask {
|
|||
|
||||
fn remove_client(&mut self, addr: &SocketAddr) {
|
||||
if self.clients.take(addr).is_none() {
|
||||
gst::warning!(CAT, obj: &self.element, "Not removing unknown client {:?}", &addr);
|
||||
gst::warning!(CAT, obj: self.element, "Not removing unknown client {:?}", &addr);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -407,10 +407,10 @@ impl UdpSinkTask {
|
|||
let mut settings = udpsink.settings.lock().unwrap();
|
||||
match self.unconfigure_client(&settings, addr) {
|
||||
Ok(()) => {
|
||||
gst::info!(CAT, obj: &self.element, "Removed client {:?}", addr);
|
||||
gst::info!(CAT, obj: self.element, "Removed client {:?}", addr);
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(CAT, obj: &self.element, "Failed to remove client {:?}: {}", addr, err);
|
||||
gst::error!(CAT, obj: self.element, "Failed to remove client {:?}: {}", addr, err);
|
||||
settings.clients = self.clients.clone();
|
||||
self.element.post_error_message(err);
|
||||
}
|
||||
|
@ -419,9 +419,9 @@ impl UdpSinkTask {
|
|||
|
||||
fn replace_with_clients(&mut self, mut clients_to_add: BTreeSet<SocketAddr>) {
|
||||
if clients_to_add.is_empty() {
|
||||
gst::info!(CAT, obj: &self.element, "Clearing clients");
|
||||
gst::info!(CAT, obj: self.element, "Clearing clients");
|
||||
} else {
|
||||
gst::info!(CAT, obj: &self.element, "Replacing clients");
|
||||
gst::info!(CAT, obj: self.element, "Replacing clients");
|
||||
}
|
||||
|
||||
let old_clients = std::mem::take(&mut self.clients);
|
||||
|
@ -435,19 +435,19 @@ impl UdpSinkTask {
|
|||
// client is already configured
|
||||
self.clients.insert(*addr);
|
||||
} else if let Err(err) = self.unconfigure_client(&settings, addr) {
|
||||
gst::error!(CAT, obj: &self.element, "Failed to remove client {:?}: {}", addr, err);
|
||||
gst::error!(CAT, obj: self.element, "Failed to remove client {:?}: {}", addr, err);
|
||||
res = Err(err);
|
||||
} else {
|
||||
gst::info!(CAT, obj: &self.element, "Removed client {:?}", addr);
|
||||
gst::info!(CAT, obj: self.element, "Removed client {:?}", addr);
|
||||
}
|
||||
}
|
||||
|
||||
for addr in clients_to_add.into_iter() {
|
||||
if let Err(err) = self.configure_client(&settings, &addr) {
|
||||
gst::error!(CAT, obj: &self.element, "Failed to add client {:?}: {}", addr, err);
|
||||
gst::error!(CAT, obj: self.element, "Failed to add client {:?}: {}", addr, err);
|
||||
res = Err(err);
|
||||
} else {
|
||||
gst::info!(CAT, obj: &self.element, "Added client {:?}", addr);
|
||||
gst::info!(CAT, obj: self.element, "Added client {:?}", addr);
|
||||
self.clients.insert(addr);
|
||||
}
|
||||
}
|
||||
|
@ -627,7 +627,7 @@ impl UdpSinkTask {
|
|||
};
|
||||
|
||||
if let Some(socket) = socket.as_mut() {
|
||||
gst::log!(CAT, obj: &self.element, "Sending to {:?}", &client);
|
||||
gst::log!(CAT, obj: self.element, "Sending to {:?}", &client);
|
||||
socket.send_to(&data, *client).await.map_err(|err| {
|
||||
element_error!(
|
||||
self.element,
|
||||
|
@ -650,7 +650,7 @@ impl UdpSinkTask {
|
|||
|
||||
gst::log!(
|
||||
CAT,
|
||||
obj: &self.element,
|
||||
obj: self.element,
|
||||
"Sent buffer {:?} to all clients",
|
||||
&buffer
|
||||
);
|
||||
|
@ -663,7 +663,7 @@ impl UdpSinkTask {
|
|||
let now = self.element.current_running_time();
|
||||
|
||||
if let Ok(Some(delay)) = running_time.opt_checked_sub(now) {
|
||||
gst::trace!(CAT, obj: &self.element, "sync: waiting {}", delay);
|
||||
gst::trace!(CAT, obj: self.element, "sync: waiting {}", delay);
|
||||
runtime::timer::delay_for(delay.into()).await;
|
||||
}
|
||||
}
|
||||
|
@ -674,7 +674,7 @@ impl TaskImpl for UdpSinkTask {
|
|||
|
||||
fn prepare(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::info!(CAT, obj: &self.element, "Preparing Task");
|
||||
gst::info!(CAT, obj: self.element, "Preparing Task");
|
||||
assert!(self.clients.is_empty());
|
||||
let clients = {
|
||||
let udpsink = self.element.imp();
|
||||
|
@ -695,7 +695,7 @@ impl TaskImpl for UdpSinkTask {
|
|||
|
||||
fn unprepare(&mut self) -> BoxFuture<'_, ()> {
|
||||
async move {
|
||||
gst::info!(CAT, obj: &self.element, "Unpreparing Task");
|
||||
gst::info!(CAT, obj: self.element, "Unpreparing Task");
|
||||
|
||||
let udpsink = self.element.imp();
|
||||
let settings = udpsink.settings.lock().unwrap();
|
||||
|
@ -709,7 +709,7 @@ impl TaskImpl for UdpSinkTask {
|
|||
fn try_next(&mut self) -> BoxFuture<'_, Result<TaskItem, gst::FlowError>> {
|
||||
async move {
|
||||
loop {
|
||||
gst::info!(CAT, obj: &self.element, "Awaiting next item or command");
|
||||
gst::info!(CAT, obj: self.element, "Awaiting next item or command");
|
||||
futures::select_biased! {
|
||||
cmd = self.cmd_receiver.recv_async() => {
|
||||
self.process_command(cmd.unwrap());
|
||||
|
@ -751,7 +751,7 @@ impl TaskImpl for UdpSinkTask {
|
|||
|
||||
fn handle_item(&mut self, item: TaskItem) -> BoxFuture<'_, Result<(), gst::FlowError>> {
|
||||
async move {
|
||||
gst::info!(CAT, obj: &self.element, "Handling {:?}", item);
|
||||
gst::info!(CAT, obj: self.element, "Handling {:?}", item);
|
||||
|
||||
match item {
|
||||
TaskItem::Buffer(buffer) => self.render(buffer).await.map_err(|err| {
|
||||
|
@ -785,7 +785,7 @@ impl TaskImpl for UdpSinkTask {
|
|||
|
||||
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async {
|
||||
gst::info!(CAT, obj: &self.element, "Stopping Task");
|
||||
gst::info!(CAT, obj: self.element, "Stopping Task");
|
||||
self.flush().await;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -794,7 +794,7 @@ impl TaskImpl for UdpSinkTask {
|
|||
|
||||
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async {
|
||||
gst::info!(CAT, obj: &self.element, "Starting Task Flush");
|
||||
gst::info!(CAT, obj: self.element, "Starting Task Flush");
|
||||
self.flush().await;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -204,7 +204,7 @@ impl TaskImpl for UdpSrcTask {
|
|||
let udpsrc = self.element.imp();
|
||||
let mut settings = udpsrc.settings.lock().unwrap();
|
||||
|
||||
gst::debug!(CAT, obj: &self.element, "Preparing Task");
|
||||
gst::debug!(CAT, obj: self.element, "Preparing Task");
|
||||
|
||||
self.retrieve_sender_address = settings.retrieve_sender_address;
|
||||
|
||||
|
@ -261,7 +261,7 @@ impl TaskImpl for UdpSrcTask {
|
|||
let saddr = SocketAddr::new(bind_addr, port as u16);
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &self.element,
|
||||
obj: self.element,
|
||||
"Binding to {:?} for multicast group {:?}",
|
||||
saddr,
|
||||
addr
|
||||
|
@ -270,7 +270,7 @@ impl TaskImpl for UdpSrcTask {
|
|||
saddr
|
||||
} else {
|
||||
let saddr = SocketAddr::new(addr, port as u16);
|
||||
gst::debug!(CAT, obj: &self.element, "Binding to {:?}", saddr);
|
||||
gst::debug!(CAT, obj: self.element, "Binding to {:?}", saddr);
|
||||
|
||||
saddr
|
||||
};
|
||||
|
@ -398,7 +398,7 @@ impl TaskImpl for UdpSrcTask {
|
|||
|
||||
fn unprepare(&mut self) -> BoxFuture<'_, ()> {
|
||||
async move {
|
||||
gst::debug!(CAT, obj: &self.element, "Unpreparing Task");
|
||||
gst::debug!(CAT, obj: self.element, "Unpreparing Task");
|
||||
let udpsrc = self.element.imp();
|
||||
udpsrc.settings.lock().unwrap().used_socket = None;
|
||||
self.element.notify("used-socket");
|
||||
|
@ -408,12 +408,12 @@ impl TaskImpl for UdpSrcTask {
|
|||
|
||||
fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Starting task");
|
||||
gst::log!(CAT, obj: self.element, "Starting task");
|
||||
self.socket
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.set_clock(self.element.clock(), self.element.base_time());
|
||||
gst::log!(CAT, obj: &self.element, "Task started");
|
||||
gst::log!(CAT, obj: self.element, "Task started");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -438,7 +438,7 @@ impl TaskImpl for UdpSrcTask {
|
|||
buffer
|
||||
})
|
||||
.map_err(|err| {
|
||||
gst::error!(CAT, obj: &self.element, "Got error {:?}", err);
|
||||
gst::error!(CAT, obj: self.element, "Got error {:?}", err);
|
||||
match err {
|
||||
SocketError::Gst(err) => {
|
||||
gst::element_error!(
|
||||
|
@ -465,11 +465,11 @@ impl TaskImpl for UdpSrcTask {
|
|||
|
||||
fn handle_item(&mut self, buffer: gst::Buffer) -> BoxFuture<'_, Result<(), gst::FlowError>> {
|
||||
async {
|
||||
gst::log!(CAT, obj: &self.element, "Handling {:?}", buffer);
|
||||
gst::log!(CAT, obj: self.element, "Handling {:?}", buffer);
|
||||
let udpsrc = self.element.imp();
|
||||
|
||||
if self.need_initial_events {
|
||||
gst::debug!(CAT, obj: &self.element, "Pushing initial events");
|
||||
gst::debug!(CAT, obj: self.element, "Pushing initial events");
|
||||
|
||||
let stream_id =
|
||||
format!("{:08x}{:08x}", rand::random::<u32>(), rand::random::<u32>());
|
||||
|
@ -500,14 +500,14 @@ impl TaskImpl for UdpSrcTask {
|
|||
|
||||
let res = udpsrc.src_pad.push(buffer).await.map(drop);
|
||||
match res {
|
||||
Ok(_) => gst::log!(CAT, obj: &self.element, "Successfully pushed buffer"),
|
||||
Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: &self.element, "Flushing"),
|
||||
Ok(_) => gst::log!(CAT, obj: self.element, "Successfully pushed buffer"),
|
||||
Err(gst::FlowError::Flushing) => gst::debug!(CAT, obj: self.element, "Flushing"),
|
||||
Err(gst::FlowError::Eos) => {
|
||||
gst::debug!(CAT, obj: &self.element, "EOS");
|
||||
gst::debug!(CAT, obj: self.element, "EOS");
|
||||
udpsrc.src_pad.push_event(gst::event::Eos::new()).await;
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(CAT, obj: &self.element, "Got error {}", err);
|
||||
gst::error!(CAT, obj: self.element, "Got error {}", err);
|
||||
gst::element_error!(
|
||||
self.element,
|
||||
gst::StreamError::Failed,
|
||||
|
@ -524,10 +524,10 @@ impl TaskImpl for UdpSrcTask {
|
|||
|
||||
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Stopping task");
|
||||
gst::log!(CAT, obj: self.element, "Stopping task");
|
||||
self.need_initial_events = true;
|
||||
self.need_segment = true;
|
||||
gst::log!(CAT, obj: &self.element, "Task stopped");
|
||||
gst::log!(CAT, obj: self.element, "Task stopped");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -535,9 +535,9 @@ impl TaskImpl for UdpSrcTask {
|
|||
|
||||
fn flush_stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(CAT, obj: &self.element, "Stopping task flush");
|
||||
gst::log!(CAT, obj: self.element, "Stopping task flush");
|
||||
self.need_segment = true;
|
||||
gst::log!(CAT, obj: &self.element, "Stopped task flush");
|
||||
gst::log!(CAT, obj: self.element, "Stopped task flush");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
|
|
@ -129,7 +129,7 @@ mod imp_src {
|
|||
while let Ok(Some(_item)) = self.receiver.try_next() {}
|
||||
}
|
||||
async fn push_item(&self, item: Item) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
gst::debug!(SRC_CAT, obj: &self.element, "Handling {:?}", item);
|
||||
gst::debug!(SRC_CAT, obj: self.element, "Handling {:?}", item);
|
||||
|
||||
let elementsrctest = self.element.imp();
|
||||
match item {
|
||||
|
@ -150,7 +150,7 @@ mod imp_src {
|
|||
fn try_next(&mut self) -> BoxFuture<'_, Result<Item, gst::FlowError>> {
|
||||
async move {
|
||||
self.receiver.next().await.ok_or_else(|| {
|
||||
gst::log!(SRC_CAT, obj: &self.element, "SrcPad channel aborted");
|
||||
gst::log!(SRC_CAT, obj: self.element, "SrcPad channel aborted");
|
||||
gst::FlowError::Eos
|
||||
})
|
||||
}
|
||||
|
@ -161,9 +161,9 @@ mod imp_src {
|
|||
async move {
|
||||
let res = self.push_item(item).await.map(drop);
|
||||
match res {
|
||||
Ok(_) => gst::log!(SRC_CAT, obj: &self.element, "Successfully pushed item"),
|
||||
Ok(_) => gst::log!(SRC_CAT, obj: self.element, "Successfully pushed item"),
|
||||
Err(gst::FlowError::Flushing) => {
|
||||
gst::debug!(SRC_CAT, obj: &self.element, "Flushing")
|
||||
gst::debug!(SRC_CAT, obj: self.element, "Flushing")
|
||||
}
|
||||
Err(err) => panic!("Got error {}", err),
|
||||
}
|
||||
|
@ -175,9 +175,9 @@ mod imp_src {
|
|||
|
||||
fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Stopping task");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Stopping task");
|
||||
self.flush();
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Task stopped");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Task stopped");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
@ -185,9 +185,9 @@ mod imp_src {
|
|||
|
||||
fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> {
|
||||
async move {
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Starting task flush");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Starting task flush");
|
||||
self.flush();
|
||||
gst::log!(SRC_CAT, obj: &self.element, "Task flush started");
|
||||
gst::log!(SRC_CAT, obj: self.element, "Task flush started");
|
||||
Ok(())
|
||||
}
|
||||
.boxed()
|
||||
|
|
|
@ -193,17 +193,17 @@ impl FMP4Mux {
|
|||
Some(buffer) => buffer,
|
||||
None => {
|
||||
if stream.sinkpad.is_eos() {
|
||||
gst::trace!(CAT, obj: &stream.sinkpad, "Stream is EOS");
|
||||
gst::trace!(CAT, obj: stream.sinkpad, "Stream is EOS");
|
||||
} else {
|
||||
all_have_data_or_eos = false;
|
||||
gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no buffer");
|
||||
gst::trace!(CAT, obj: stream.sinkpad, "Stream has no buffer");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if stream.fragment_filled {
|
||||
gst::trace!(CAT, obj: &stream.sinkpad, "Stream has current fragment filled");
|
||||
gst::trace!(CAT, obj: stream.sinkpad, "Stream has current fragment filled");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -216,7 +216,7 @@ impl FMP4Mux {
|
|||
{
|
||||
Some(segment) => segment,
|
||||
None => {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Got buffer before segment");
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
};
|
||||
|
@ -224,7 +224,7 @@ impl FMP4Mux {
|
|||
// If the stream has no valid running time, assume it's before everything else.
|
||||
let running_time = match segment.to_running_time(buffer.dts_or_pts()) {
|
||||
None => {
|
||||
gst::trace!(CAT, obj: &stream.sinkpad, "Stream has no valid running time");
|
||||
gst::trace!(CAT, obj: stream.sinkpad, "Stream has no valid running time");
|
||||
if earliest_stream
|
||||
.as_ref()
|
||||
.map_or(true, |(_, _, earliest_running_time)| {
|
||||
|
@ -238,7 +238,7 @@ impl FMP4Mux {
|
|||
Some(running_time) => running_time,
|
||||
};
|
||||
|
||||
gst::trace!(CAT, obj: &stream.sinkpad, "Stream has running time {} queued", running_time);
|
||||
gst::trace!(CAT, obj: stream.sinkpad, "Stream has running time {} queued", running_time);
|
||||
|
||||
if earliest_stream
|
||||
.as_ref()
|
||||
|
@ -284,22 +284,22 @@ impl FMP4Mux {
|
|||
|
||||
assert!(!stream.fragment_filled);
|
||||
|
||||
gst::trace!(CAT, obj: &stream.sinkpad, "Handling buffer {:?}", buffer);
|
||||
gst::trace!(CAT, obj: stream.sinkpad, "Handling buffer {:?}", buffer);
|
||||
|
||||
let intra_only = stream.intra_only;
|
||||
|
||||
if !intra_only && buffer.dts().is_none() {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Require DTS for video streams");
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Require DTS for video streams");
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
|
||||
if intra_only && buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Intra-only stream with delta units");
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Intra-only stream with delta units");
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
|
||||
let pts_position = buffer.pts().ok_or_else(|| {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Require timestamped buffers");
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Require timestamped buffers");
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
let duration = buffer.duration();
|
||||
|
@ -308,22 +308,22 @@ impl FMP4Mux {
|
|||
let mut pts = segment
|
||||
.to_running_time_full(pts_position)
|
||||
.ok_or_else(|| {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert PTS to running time");
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Couldn't convert PTS to running time");
|
||||
gst::FlowError::Error
|
||||
})?
|
||||
.positive_or_else(|_| {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported");
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Negative PTSs are not supported");
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
let mut end_pts = segment
|
||||
.to_running_time_full(end_pts_position)
|
||||
.ok_or_else(|| {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert end PTS to running time");
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Couldn't convert end PTS to running time");
|
||||
gst::FlowError::Error
|
||||
})?
|
||||
.positive_or_else(|_| {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Negative PTSs are not supported");
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Negative PTSs are not supported");
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
|
@ -332,7 +332,7 @@ impl FMP4Mux {
|
|||
if pts < stream.current_position {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Decreasing PTS {} < {} for intra-only stream",
|
||||
pts,
|
||||
stream.current_position,
|
||||
|
@ -353,7 +353,7 @@ impl FMP4Mux {
|
|||
let end_dts_position = duration.opt_add(dts_position).unwrap_or(dts_position);
|
||||
|
||||
let signed_dts = segment.to_running_time_full(dts_position).ok_or_else(|| {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Couldn't convert DTS to running time");
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Couldn't convert DTS to running time");
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
let mut dts = match signed_dts {
|
||||
|
@ -371,7 +371,7 @@ impl FMP4Mux {
|
|||
|
||||
let dts_offset = stream.dts_offset.unwrap();
|
||||
if dts > dts_offset {
|
||||
gst::warning!(CAT, obj: &stream.sinkpad, "DTS before first DTS");
|
||||
gst::warning!(CAT, obj: stream.sinkpad, "DTS before first DTS");
|
||||
gst::ClockTime::ZERO
|
||||
} else {
|
||||
dts_offset - dts
|
||||
|
@ -385,7 +385,7 @@ impl FMP4Mux {
|
|||
.ok_or_else(|| {
|
||||
gst::error!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Couldn't convert end DTS to running time"
|
||||
);
|
||||
gst::FlowError::Error
|
||||
|
@ -405,7 +405,7 @@ impl FMP4Mux {
|
|||
|
||||
let dts_offset = stream.dts_offset.unwrap();
|
||||
if dts > dts_offset {
|
||||
gst::warning!(CAT, obj: &stream.sinkpad, "End DTS before first DTS");
|
||||
gst::warning!(CAT, obj: stream.sinkpad, "End DTS before first DTS");
|
||||
gst::ClockTime::ZERO
|
||||
} else {
|
||||
dts_offset - dts
|
||||
|
@ -419,7 +419,7 @@ impl FMP4Mux {
|
|||
if dts < stream.current_position {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Decreasing DTS {} < {}",
|
||||
dts,
|
||||
stream.current_position,
|
||||
|
@ -456,7 +456,7 @@ impl FMP4Mux {
|
|||
if !buffer.flags().contains(gst::BufferFlags::DELTA_UNIT) {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Starting new GOP at PTS {} DTS {} (DTS offset {})",
|
||||
pts,
|
||||
dts.display(),
|
||||
|
@ -480,7 +480,7 @@ impl FMP4Mux {
|
|||
if let Some(prev_gop) = stream.queued_gops.get_mut(1) {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Updating previous GOP starting at PTS {} to end PTS {} DTS {}",
|
||||
prev_gop.earliest_pts,
|
||||
pts,
|
||||
|
@ -500,7 +500,7 @@ impl FMP4Mux {
|
|||
if !intra_only {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Previous GOP has final earliest PTS at {}",
|
||||
prev_gop.earliest_pts
|
||||
);
|
||||
|
@ -530,7 +530,7 @@ impl FMP4Mux {
|
|||
if gop.earliest_pts > pts && !gop.final_earliest_pts {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Updating current GOP earliest PTS from {} to {}",
|
||||
gop.earliest_pts,
|
||||
pts
|
||||
|
@ -542,7 +542,7 @@ impl FMP4Mux {
|
|||
if prev_gop.end_pts < pts {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Updating previous GOP starting PTS {} end time from {} to {}",
|
||||
pts,
|
||||
prev_gop.end_pts,
|
||||
|
@ -562,7 +562,7 @@ impl FMP4Mux {
|
|||
if gop.start_pts <= dts && !gop.final_earliest_pts {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"GOP has final earliest PTS at {}",
|
||||
gop.earliest_pts
|
||||
);
|
||||
|
@ -575,7 +575,7 @@ impl FMP4Mux {
|
|||
} else {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Waiting for keyframe at the beginning of the stream"
|
||||
);
|
||||
}
|
||||
|
@ -586,7 +586,7 @@ impl FMP4Mux {
|
|||
) {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Queued full GOPs duration updated to {}",
|
||||
prev_gop.end_pts.saturating_sub(first_gop.earliest_pts),
|
||||
);
|
||||
|
@ -594,7 +594,7 @@ impl FMP4Mux {
|
|||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Queued duration updated to {}",
|
||||
Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
|
||||
.map(|(end, start)| end.end_pts.saturating_sub(start.start_pts))
|
||||
|
@ -669,7 +669,7 @@ impl FMP4Mux {
|
|||
fragment_end_pts.unwrap_or(fragment_start_pts + settings.fragment_duration);
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Draining up to end PTS {} / duration {}",
|
||||
dequeue_end_pts,
|
||||
dequeue_end_pts - fragment_start_pts
|
||||
|
@ -704,7 +704,7 @@ impl FMP4Mux {
|
|||
fragment_end_pts = Some(last_gop.end_pts);
|
||||
gst::info!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Draining up to PTS {} for this fragment",
|
||||
last_gop.end_pts,
|
||||
);
|
||||
|
@ -719,7 +719,7 @@ impl FMP4Mux {
|
|||
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Don't have a complete GOP for the first stream on timeout in a live pipeline",
|
||||
);
|
||||
|
||||
|
@ -733,7 +733,7 @@ impl FMP4Mux {
|
|||
if gops.is_empty() {
|
||||
gst::info!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Draining no buffers",
|
||||
);
|
||||
|
||||
|
@ -772,7 +772,7 @@ impl FMP4Mux {
|
|||
|
||||
gst::info!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Draining {} worth of buffers starting at PTS {} DTS {}, DTS offset {}",
|
||||
end_pts.saturating_sub(earliest_pts),
|
||||
earliest_pts,
|
||||
|
@ -786,7 +786,7 @@ impl FMP4Mux {
|
|||
) {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Queued full GOPs duration updated to {}",
|
||||
prev_gop.end_pts.saturating_sub(first_gop.earliest_pts),
|
||||
);
|
||||
|
@ -794,7 +794,7 @@ impl FMP4Mux {
|
|||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &stream.sinkpad,
|
||||
obj: stream.sinkpad,
|
||||
"Queued duration updated to {}",
|
||||
Option::zip(stream.queued_gops.front(), stream.queued_gops.back())
|
||||
.map(|(end, start)| end.end_pts.saturating_sub(start.start_pts))
|
||||
|
@ -847,19 +847,15 @@ impl FMP4Mux {
|
|||
let dts = buffer.dts.unwrap();
|
||||
|
||||
if pts > dts {
|
||||
Some(
|
||||
i64::try_from((pts - dts).nseconds())
|
||||
.map_err(|_| {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
|
||||
gst::FlowError::Error
|
||||
})?,
|
||||
)
|
||||
Some(i64::try_from((pts - dts).nseconds()).map_err(|_| {
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
|
||||
gst::FlowError::Error
|
||||
})?)
|
||||
} else {
|
||||
let diff = i64::try_from((dts - pts).nseconds())
|
||||
.map_err(|_| {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Too big PTS/DTS difference");
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
let diff = i64::try_from((dts - pts).nseconds()).map_err(|_| {
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Too big PTS/DTS difference");
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
Some(-diff)
|
||||
}
|
||||
};
|
||||
|
@ -933,7 +929,7 @@ impl FMP4Mux {
|
|||
None => {
|
||||
gst::error!(
|
||||
CAT,
|
||||
obj: &state.streams[idx].sinkpad,
|
||||
obj: state.streams[idx].sinkpad,
|
||||
"No reference timestamp set on any buffers in the first fragment",
|
||||
);
|
||||
return Err(gst::FlowError::Error);
|
||||
|
@ -984,7 +980,7 @@ impl FMP4Mux {
|
|||
None => {
|
||||
gst::error!(
|
||||
CAT,
|
||||
obj: &state.streams[idx].sinkpad,
|
||||
obj: state.streams[idx].sinkpad,
|
||||
"No reference timestamp set on all buffers"
|
||||
);
|
||||
return Err(gst::FlowError::Error);
|
||||
|
@ -1017,7 +1013,7 @@ impl FMP4Mux {
|
|||
None => {
|
||||
gst::error!(
|
||||
CAT,
|
||||
obj: &state.streams[idx].sinkpad,
|
||||
obj: state.streams[idx].sinkpad,
|
||||
"No reference timestamp set on all buffers"
|
||||
);
|
||||
return Err(gst::FlowError::Error);
|
||||
|
@ -1045,7 +1041,7 @@ impl FMP4Mux {
|
|||
if utc_time_dts < state.streams[idx].current_utc_time {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &state.streams[idx].sinkpad,
|
||||
obj: state.streams[idx].sinkpad,
|
||||
"Decreasing UTC DTS timestamp for buffer {} < {}",
|
||||
utc_time_dts,
|
||||
state.streams[idx].current_utc_time,
|
||||
|
@ -1059,7 +1055,7 @@ impl FMP4Mux {
|
|||
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: &state.streams[idx].sinkpad,
|
||||
obj: state.streams[idx].sinkpad,
|
||||
"Updating buffer timestamp from {} to relative UTC DTS time {} / absolute DTS time {}, UTC PTS time {}",
|
||||
buffer.timestamp,
|
||||
timestamp,
|
||||
|
@ -1090,7 +1086,7 @@ impl FMP4Mux {
|
|||
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: &state.streams[idx].sinkpad,
|
||||
obj: state.streams[idx].sinkpad,
|
||||
"Updating buffer with timestamp {} duration from {} to relative UTC duration {}",
|
||||
buffer.timestamp,
|
||||
buffer.duration,
|
||||
|
@ -1101,7 +1097,7 @@ impl FMP4Mux {
|
|||
} else if let Ok(Some(common_duration)) = common_duration {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: &state.streams[idx].sinkpad,
|
||||
obj: state.streams[idx].sinkpad,
|
||||
"Updating last buffer with timestamp {} duration from {} to common relative UTC duration {}",
|
||||
buffer.timestamp,
|
||||
buffer.duration,
|
||||
|
@ -1112,7 +1108,7 @@ impl FMP4Mux {
|
|||
} else {
|
||||
gst::trace!(
|
||||
CAT,
|
||||
obj: &state.streams[idx].sinkpad,
|
||||
obj: state.streams[idx].sinkpad,
|
||||
"Keeping last buffer with timestamp {} duration at {}",
|
||||
buffer.timestamp,
|
||||
buffer.duration,
|
||||
|
@ -1126,7 +1122,7 @@ impl FMP4Mux {
|
|||
}
|
||||
|
||||
if let Some(start_time) = start_time {
|
||||
gst::debug!(CAT, obj: &state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time);
|
||||
gst::debug!(CAT, obj: state.streams[idx].sinkpad, "Fragment starting at UTC time {}", start_time);
|
||||
timing_info.as_mut().unwrap().start_time = start_time;
|
||||
} else {
|
||||
assert!(timing_info.is_none());
|
||||
|
@ -1441,12 +1437,12 @@ impl FMP4Mux {
|
|||
let caps = match pad.current_caps() {
|
||||
Some(caps) => caps,
|
||||
None => {
|
||||
gst::warning!(CAT, obj: &pad, "Skipping pad without caps");
|
||||
gst::warning!(CAT, obj: pad, "Skipping pad without caps");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
gst::info!(CAT, obj: &pad, "Configuring caps {:?}", caps);
|
||||
gst::info!(CAT, obj: pad, "Configuring caps {:?}", caps);
|
||||
|
||||
let s = caps.structure(0).unwrap();
|
||||
|
||||
|
@ -1454,7 +1450,7 @@ impl FMP4Mux {
|
|||
match s.name() {
|
||||
"video/x-h264" | "video/x-h265" => {
|
||||
if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
|
||||
gst::error!(CAT, obj: &pad, "Received caps without codec_data");
|
||||
gst::error!(CAT, obj: pad, "Received caps without codec_data");
|
||||
return Err(gst::FlowError::NotNegotiated);
|
||||
}
|
||||
}
|
||||
|
@ -1463,7 +1459,7 @@ impl FMP4Mux {
|
|||
}
|
||||
"audio/mpeg" => {
|
||||
if !s.has_field_with_type("codec_data", gst::Buffer::static_type()) {
|
||||
gst::error!(CAT, obj: &pad, "Received caps without codec_data");
|
||||
gst::error!(CAT, obj: pad, "Received caps without codec_data");
|
||||
return Err(gst::FlowError::NotNegotiated);
|
||||
}
|
||||
intra_only = true;
|
||||
|
@ -2004,7 +2000,7 @@ impl AggregatorImpl for FMP4Mux {
|
|||
{
|
||||
Some(segment) => segment,
|
||||
None => {
|
||||
gst::error!(CAT, obj: &stream.sinkpad, "Got buffer before segment");
|
||||
gst::error!(CAT, obj: stream.sinkpad, "Got buffer before segment");
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
};
|
||||
|
@ -2024,7 +2020,7 @@ impl AggregatorImpl for FMP4Mux {
|
|||
if queued_end_pts.saturating_sub(fragment_start_pts)
|
||||
>= settings.fragment_duration
|
||||
{
|
||||
gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
|
||||
gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment");
|
||||
stream.fragment_filled = true;
|
||||
}
|
||||
}
|
||||
|
@ -2085,7 +2081,7 @@ impl AggregatorImpl for FMP4Mux {
|
|||
if queued_end_pts.saturating_sub(earliest_pts)
|
||||
>= settings.fragment_duration
|
||||
{
|
||||
gst::debug!(CAT, obj: &stream.sinkpad, "Stream queued enough data for this fragment");
|
||||
gst::debug!(CAT, obj: stream.sinkpad, "Stream queued enough data for this fragment");
|
||||
stream.fragment_filled = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -760,13 +760,13 @@ impl Receiver {
|
|||
let flushing = {
|
||||
let queue = (receiver.0.queue.0).0.lock().unwrap();
|
||||
if queue.shutdown {
|
||||
gst::debug!(CAT, obj: &element, "Shutting down");
|
||||
gst::debug!(CAT, obj: element, "Shutting down");
|
||||
break;
|
||||
}
|
||||
|
||||
// If an error happened in the meantime, just go out of here
|
||||
if queue.error.is_some() {
|
||||
gst::error!(CAT, obj: &element, "Error while waiting for connection");
|
||||
gst::error!(CAT, obj: element, "Error while waiting for connection");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -781,7 +781,7 @@ impl Receiver {
|
|||
|
||||
let res = match recv.capture(50) {
|
||||
_ if flushing => {
|
||||
gst::debug!(CAT, obj: &element, "Flushing");
|
||||
gst::debug!(CAT, obj: element, "Flushing");
|
||||
Err(gst::FlowError::Flushing)
|
||||
}
|
||||
Err(_) => {
|
||||
|
@ -793,11 +793,11 @@ impl Receiver {
|
|||
Err(gst::FlowError::Error)
|
||||
}
|
||||
Ok(None) if timeout > 0 && timer.elapsed().as_millis() >= timeout as u128 => {
|
||||
gst::debug!(CAT, obj: &element, "Timed out -- assuming EOS",);
|
||||
gst::debug!(CAT, obj: element, "Timed out -- assuming EOS",);
|
||||
Err(gst::FlowError::Eos)
|
||||
}
|
||||
Ok(None) => {
|
||||
gst::debug!(CAT, obj: &element, "No frame received yet, retry");
|
||||
gst::debug!(CAT, obj: element, "No frame received yet, retry");
|
||||
continue;
|
||||
}
|
||||
Ok(Some(Frame::Video(frame))) => {
|
||||
|
@ -832,7 +832,7 @@ impl Receiver {
|
|||
if let Some(metadata) = frame.metadata() {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Received metadata at timecode {}: {}",
|
||||
(frame.timecode() as u64 * 100).nseconds(),
|
||||
metadata,
|
||||
|
@ -849,7 +849,7 @@ impl Receiver {
|
|||
while queue.buffer_queue.len() > receiver.0.max_queue_length {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Dropping old buffer -- queue has {} items",
|
||||
queue.buffer_queue.len()
|
||||
);
|
||||
|
@ -860,7 +860,7 @@ impl Receiver {
|
|||
timer = time::Instant::now();
|
||||
}
|
||||
Err(gst::FlowError::Eos) => {
|
||||
gst::debug!(CAT, obj: &element, "Signalling EOS");
|
||||
gst::debug!(CAT, obj: element, "Signalling EOS");
|
||||
let mut queue = (receiver.0.queue.0).0.lock().unwrap();
|
||||
queue.timeout = true;
|
||||
(receiver.0.queue.0).1.notify_one();
|
||||
|
@ -874,7 +874,7 @@ impl Receiver {
|
|||
timer = time::Instant::now();
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(CAT, obj: &element, "Signalling error");
|
||||
gst::error!(CAT, obj: element, "Signalling error");
|
||||
let mut queue = (receiver.0.queue.0).0.lock().unwrap();
|
||||
if queue.error.is_none() {
|
||||
queue.error = Some(err);
|
||||
|
|
|
@ -1106,7 +1106,7 @@ impl BandwidthEstimator {
|
|||
|
||||
if !list.is_empty() {
|
||||
if let Err(err) = bwe.imp().push_list(list) {
|
||||
gst::error!(CAT, obj: &bwe, "pause task, reason: {err:?}");
|
||||
gst::error!(CAT, obj: bwe, "pause task, reason: {err:?}");
|
||||
pause()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,7 +85,7 @@ impl Signaller {
|
|||
let send_task_handle = task::spawn(async move {
|
||||
while let Some(msg) = websocket_receiver.next().await {
|
||||
if let Some(element) = element_clone.upgrade() {
|
||||
gst::trace!(CAT, obj: &element, "Sending websocket message {:?}", msg);
|
||||
gst::trace!(CAT, obj: element, "Sending websocket message {:?}", msg);
|
||||
}
|
||||
ws_sink
|
||||
.send(WsMessage::Text(serde_json::to_string(&msg).unwrap()))
|
||||
|
@ -93,7 +93,7 @@ impl Signaller {
|
|||
}
|
||||
|
||||
if let Some(element) = element_clone.upgrade() {
|
||||
gst::info!(CAT, obj: &element, "Done sending");
|
||||
gst::info!(CAT, obj: element, "Done sending");
|
||||
}
|
||||
|
||||
ws_sink.send(WsMessage::Close(None)).await?;
|
||||
|
@ -121,14 +121,14 @@ impl Signaller {
|
|||
if let Some(element) = element_clone.upgrade() {
|
||||
match msg {
|
||||
Ok(WsMessage::Text(msg)) => {
|
||||
gst::trace!(CAT, obj: &element, "Received message {}", msg);
|
||||
gst::trace!(CAT, obj: element, "Received message {}", msg);
|
||||
|
||||
if let Ok(msg) = serde_json::from_str::<p::OutgoingMessage>(&msg) {
|
||||
match msg {
|
||||
p::OutgoingMessage::Welcome { peer_id } => {
|
||||
gst::info!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"We are registered with the server, our peer id is {}",
|
||||
peer_id
|
||||
);
|
||||
|
@ -140,14 +140,14 @@ impl Signaller {
|
|||
if let Err(err) =
|
||||
element.start_session(&session_id, &peer_id)
|
||||
{
|
||||
gst::warning!(CAT, obj: &element, "{}", err);
|
||||
gst::warning!(CAT, obj: element, "{}", err);
|
||||
}
|
||||
}
|
||||
p::OutgoingMessage::EndSession(session_info) => {
|
||||
if let Err(err) =
|
||||
element.end_session(&session_info.session_id)
|
||||
{
|
||||
gst::warning!(CAT, obj: &element, "{}", err);
|
||||
gst::warning!(CAT, obj: element, "{}", err);
|
||||
}
|
||||
}
|
||||
p::OutgoingMessage::Peer(p::PeerMessage {
|
||||
|
@ -165,7 +165,7 @@ impl Signaller {
|
|||
.unwrap(),
|
||||
),
|
||||
) {
|
||||
gst::warning!(CAT, obj: &element, "{}", err);
|
||||
gst::warning!(CAT, obj: element, "{}", err);
|
||||
}
|
||||
}
|
||||
p::PeerMessageInner::Sdp(p::SdpMessage::Offer {
|
||||
|
@ -173,7 +173,7 @@ impl Signaller {
|
|||
}) => {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Ignoring offer from peer"
|
||||
);
|
||||
}
|
||||
|
@ -187,14 +187,14 @@ impl Signaller {
|
|||
None,
|
||||
&candidate,
|
||||
) {
|
||||
gst::warning!(CAT, obj: &element, "{}", err);
|
||||
gst::warning!(CAT, obj: element, "{}", err);
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Ignoring unsupported message {:?}",
|
||||
msg
|
||||
);
|
||||
|
@ -203,7 +203,7 @@ impl Signaller {
|
|||
} else {
|
||||
gst::error!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Unknown message from server: {}",
|
||||
msg
|
||||
);
|
||||
|
@ -215,7 +215,7 @@ impl Signaller {
|
|||
Ok(WsMessage::Close(reason)) => {
|
||||
gst::info!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"websocket connection closed: {:?}",
|
||||
reason
|
||||
);
|
||||
|
@ -235,7 +235,7 @@ impl Signaller {
|
|||
}
|
||||
|
||||
if let Some(element) = element_clone.upgrade() {
|
||||
gst::info!(CAT, obj: &element, "Stopped websocket receiving");
|
||||
gst::info!(CAT, obj: element, "Stopped websocket receiving");
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
@ -1302,7 +1302,7 @@ impl WebRTCSink {
|
|||
Ok(None) => {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Promise returned without a reply for {}",
|
||||
session_id
|
||||
);
|
||||
|
@ -1312,7 +1312,7 @@ impl WebRTCSink {
|
|||
Err(err) => {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Promise returned with an error for {}: {:?}",
|
||||
session_id,
|
||||
err
|
||||
|
@ -1522,7 +1522,7 @@ impl WebRTCSink {
|
|||
let this = Self::from_instance(&element);
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Connection state for in session {} (peer {}) failed",
|
||||
session_id_clone,
|
||||
peer_id_clone
|
||||
|
@ -1532,7 +1532,7 @@ impl WebRTCSink {
|
|||
_ => {
|
||||
gst::log!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Connection state in session {} (peer {}) changed: {:?}",
|
||||
session_id_clone,
|
||||
peer_id_clone,
|
||||
|
@ -1556,7 +1556,7 @@ impl WebRTCSink {
|
|||
gst_webrtc::WebRTCICEConnectionState::Failed => {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Ice connection state in session {} (peer {}) failed",
|
||||
session_id_clone,
|
||||
peer_id_clone,
|
||||
|
@ -1566,7 +1566,7 @@ impl WebRTCSink {
|
|||
_ => {
|
||||
gst::log!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Ice connection state in session {} (peer {}) changed: {:?}",
|
||||
session_id_clone,
|
||||
peer_id_clone,
|
||||
|
@ -1603,7 +1603,7 @@ impl WebRTCSink {
|
|||
if let Some(element) = element_clone.upgrade() {
|
||||
gst::log!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Ice gathering state in session {} (peer {}) changed: {:?}",
|
||||
session_id_clone,
|
||||
peer_id_clone,
|
||||
|
@ -1708,7 +1708,7 @@ impl WebRTCSink {
|
|||
}
|
||||
gst::MessageView::Latency(..) => {
|
||||
if let Some(pipeline) = pipeline_clone.upgrade() {
|
||||
gst::info!(CAT, obj: &pipeline, "Recalculating latency");
|
||||
gst::info!(CAT, obj: pipeline, "Recalculating latency");
|
||||
let _ = pipeline.recalculate_latency();
|
||||
}
|
||||
}
|
||||
|
@ -2295,7 +2295,7 @@ impl WebRTCSink {
|
|||
|
||||
match fut.await {
|
||||
Ok(Err(err)) => {
|
||||
gst::error!(CAT, obj: &element, "error: {}", err);
|
||||
gst::error!(CAT, obj: element, "error: {}", err);
|
||||
gst::element_error!(
|
||||
element,
|
||||
gst::StreamError::CodecNotFound,
|
||||
|
@ -2638,7 +2638,7 @@ impl ObjectImpl for WebRTCSink {
|
|||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"applying default configuration on encoder {:?}",
|
||||
enc
|
||||
);
|
||||
|
|
|
@ -542,13 +542,13 @@ impl JsonGstParse {
|
|||
Ok(buffer) => Some(buffer),
|
||||
Err(gst::FlowError::Eos) => None,
|
||||
Err(gst::FlowError::Flushing) => {
|
||||
gst::debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing");
|
||||
gst::debug!(CAT, obj: self.sinkpad, "Pausing after pulling buffer, reason: flushing");
|
||||
|
||||
self.sinkpad.pause_task().unwrap();
|
||||
return;
|
||||
}
|
||||
Err(flow) => {
|
||||
gst::error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow);
|
||||
gst::error!(CAT, obj: self.sinkpad, "Failed to pull, reason: {:?}", flow);
|
||||
|
||||
gst::element_imp_error!(
|
||||
self,
|
||||
|
|
|
@ -1914,7 +1914,7 @@ impl FallbackSrc {
|
|||
Some(gst::PadProbeData::Event(ref ev)) if ev.type_() == gst::EventType::Eos => {
|
||||
gst::debug!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"Received EOS from {}source on pad {}",
|
||||
if fallback_source { "fallback " } else { "" },
|
||||
pad.name()
|
||||
|
@ -3083,7 +3083,7 @@ impl FallbackSrc {
|
|||
Some(element) => element,
|
||||
};
|
||||
|
||||
gst::debug!(CAT, obj: &element, "Woke up, retrying");
|
||||
gst::debug!(CAT, obj: element, "Woke up, retrying");
|
||||
element.call_async(move |element| {
|
||||
let imp = element.imp();
|
||||
|
||||
|
|
|
@ -207,7 +207,7 @@ impl PipelineSnapshot {
|
|||
|
||||
for pipeline in pipelines.into_iter() {
|
||||
let pipeline = pipeline.downcast::<gst::Pipeline>().unwrap();
|
||||
gst::debug!(CAT, obj: &tracer, "dump {}", pipeline.name());
|
||||
gst::debug!(CAT, obj: tracer, "dump {}", pipeline.name());
|
||||
|
||||
let dump_name = format!("{}{}", settings.dot_prefix, pipeline.name());
|
||||
|
||||
|
|
|
@ -1421,7 +1421,7 @@ impl UriPlaylistBin {
|
|||
// block pad until next item is ready
|
||||
gst::log!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"blocking pad {}:{} until next item is ready",
|
||||
parent.name(),
|
||||
pad.name()
|
||||
|
@ -1438,7 +1438,7 @@ impl UriPlaylistBin {
|
|||
|
||||
gst::log!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"pad {}:{} has been unblocked",
|
||||
parent.name(),
|
||||
pad.name()
|
||||
|
@ -1454,7 +1454,7 @@ impl UriPlaylistBin {
|
|||
// all the streams are eos, item is now done
|
||||
gst::log!(
|
||||
CAT,
|
||||
obj: &element,
|
||||
obj: element,
|
||||
"all streams of item #{} are eos",
|
||||
item.index()
|
||||
);
|
||||
|
|
|
@ -756,13 +756,13 @@ impl MccParse {
|
|||
Ok(buffer) => Some(buffer),
|
||||
Err(gst::FlowError::Eos) => None,
|
||||
Err(gst::FlowError::Flushing) => {
|
||||
gst::debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing");
|
||||
gst::debug!(CAT, obj: self.sinkpad, "Pausing after pulling buffer, reason: flushing");
|
||||
|
||||
let _ = self.sinkpad.pause_task();
|
||||
return;
|
||||
}
|
||||
Err(flow) => {
|
||||
gst::error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow);
|
||||
gst::error!(CAT, obj: self.sinkpad, "Failed to pull, reason: {:?}", flow);
|
||||
|
||||
gst::element_imp_error!(
|
||||
self,
|
||||
|
|
|
@ -639,13 +639,13 @@ impl SccParse {
|
|||
Ok(buffer) => Some(buffer),
|
||||
Err(gst::FlowError::Eos) => None,
|
||||
Err(gst::FlowError::Flushing) => {
|
||||
gst::debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing");
|
||||
gst::debug!(CAT, obj: self.sinkpad, "Pausing after pulling buffer, reason: flushing");
|
||||
|
||||
let _ = self.sinkpad.pause_task();
|
||||
return;
|
||||
}
|
||||
Err(flow) => {
|
||||
gst::error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow);
|
||||
gst::error!(CAT, obj: self.sinkpad, "Failed to pull, reason: {:?}", flow);
|
||||
|
||||
gst::element_imp_error!(
|
||||
self,
|
||||
|
|
Loading…
Reference in a new issue