threadshare: Generalize UdpSocket to Socket.

This commit is contained in:
LEE Dongjun 2018-07-06 16:19:59 +09:00 committed by Sebastian Dröge
parent ec3e0875a1
commit 1ac85c91e5
2 changed files with 63 additions and 28 deletions

View file

@ -1,4 +1,5 @@
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com> // Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2018 LEE Dongjun <redongjun@gmail.com>
// //
// This library is free software; you can redistribute it and/or // This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public // modify it under the terms of the GNU Library General Public
@ -24,7 +25,6 @@ use gst::prelude::*;
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::task; use futures::task;
use futures::{Async, Future, IntoFuture, Poll, Stream}; use futures::{Async, Future, IntoFuture, Poll, Stream};
use tokio::net;
use either::Either; use either::Either;
@ -38,8 +38,7 @@ lazy_static! {
); );
} }
#[derive(Clone)] pub struct Socket<T: SocketRead + 'static>(Arc<Mutex<SocketInner<T>>>);
pub struct Socket(Arc<Mutex<SocketInner>>);
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
enum SocketState { enum SocketState {
@ -49,10 +48,16 @@ enum SocketState {
Shutdown, Shutdown,
} }
struct SocketInner { pub trait SocketRead: Send {
const DO_TIMESTAMP: bool;
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error>;
}
struct SocketInner<T: SocketRead + 'static> {
element: gst::Element, element: gst::Element,
state: SocketState, state: SocketState,
socket: net::UdpSocket, reader: T,
buffer_pool: gst::BufferPool, buffer_pool: gst::BufferPool,
current_task: Option<task::Task>, current_task: Option<task::Task>,
shutdown_receiver: Option<oneshot::Receiver<()>>, shutdown_receiver: Option<oneshot::Receiver<()>>,
@ -60,16 +65,12 @@ struct SocketInner {
base_time: Option<gst::ClockTime>, base_time: Option<gst::ClockTime>,
} }
impl Socket { impl<T: SocketRead + 'static> Socket<T> {
pub fn new( pub fn new(element: &gst::Element, reader: T, buffer_pool: gst::BufferPool) -> Self {
element: &gst::Element, Socket(Arc::new(Mutex::new(SocketInner::<T> {
socket: net::UdpSocket,
buffer_pool: gst::BufferPool,
) -> Self {
Socket(Arc::new(Mutex::new(SocketInner {
element: element.clone(), element: element.clone(),
state: SocketState::Unscheduled, state: SocketState::Unscheduled,
socket: socket, reader: reader,
buffer_pool: buffer_pool, buffer_pool: buffer_pool,
current_task: None, current_task: None,
shutdown_receiver: None, shutdown_receiver: None,
@ -89,7 +90,7 @@ impl Socket {
// //
// Need to wait for a possible shutdown to finish first // Need to wait for a possible shutdown to finish first
// spawn() on the reactor, change state to Scheduled // spawn() on the reactor, change state to Scheduled
let stream = SocketStream(self.clone(), None); let stream = SocketStream::<T>(self.clone(), None);
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
gst_debug!(SOCKET_CAT, obj: &inner.element, "Scheduling socket"); gst_debug!(SOCKET_CAT, obj: &inner.element, "Scheduling socket");
@ -132,7 +133,7 @@ impl Socket {
Ok(()) Ok(())
} }
pub fn unpause(&self, clock: gst::Clock, base_time: gst::ClockTime) { pub fn unpause(&self, clock: Option<gst::Clock>, base_time: Option<gst::ClockTime>) {
// Paused->Playing // Paused->Playing
// //
// Change state to Running and signal task // Change state to Running and signal task
@ -145,8 +146,8 @@ impl Socket {
assert_eq!(inner.state, SocketState::Scheduled); assert_eq!(inner.state, SocketState::Scheduled);
inner.state = SocketState::Running; inner.state = SocketState::Running;
inner.clock = Some(clock); inner.clock = clock;
inner.base_time = Some(base_time); inner.base_time = base_time;
if let Some(task) = inner.current_task.take() { if let Some(task) = inner.current_task.take() {
task.notify(); task.notify();
@ -208,15 +209,24 @@ impl Socket {
} }
} }
impl Drop for SocketInner { impl<T: SocketRead + 'static> Clone for Socket<T> {
fn clone(&self) -> Self {
Socket::<T>(self.0.clone())
}
}
impl<T: SocketRead + 'static> Drop for SocketInner<T> {
fn drop(&mut self) { fn drop(&mut self) {
assert_eq!(self.state, SocketState::Unscheduled); assert_eq!(self.state, SocketState::Unscheduled);
} }
} }
struct SocketStream(Socket, Option<gst::MappedBuffer<gst::buffer::Writable>>); struct SocketStream<T: SocketRead + 'static>(
Socket<T>,
Option<gst::MappedBuffer<gst::buffer::Writable>>,
);
impl Stream for SocketStream { impl<T: SocketRead + 'static> Stream for SocketStream<T> {
type Item = gst::Buffer; type Item = gst::Buffer;
type Error = Either<gst::FlowError, io::Error>; type Error = Either<gst::FlowError, io::Error>;
@ -249,7 +259,7 @@ impl Stream for SocketStream {
}, },
}; };
match inner.socket.poll_recv(buffer.as_mut_slice()) { match inner.reader.poll_read(buffer.as_mut_slice()) {
Ok(Async::NotReady) => { Ok(Async::NotReady) => {
gst_debug!(SOCKET_CAT, obj: &inner.element, "No data available"); gst_debug!(SOCKET_CAT, obj: &inner.element, "No data available");
inner.current_task = Some(task::current()); inner.current_task = Some(task::current());
@ -260,9 +270,15 @@ impl Stream for SocketStream {
return Err(Either::Right(err)); return Err(Either::Right(err));
} }
Ok(Async::Ready(len)) => { Ok(Async::Ready(len)) => {
let time = inner.clock.as_ref().unwrap().get_time(); let dts = if T::DO_TIMESTAMP {
let dts = time - inner.base_time.unwrap(); let time = inner.clock.as_ref().unwrap().get_time();
gst_debug!(SOCKET_CAT, obj: &inner.element, "Read {} bytes at {} (clock {})", len, dts, time); let running_time = time - inner.base_time.unwrap();
gst_debug!(SOCKET_CAT, obj: &inner.element, "Read {} bytes at {} (clock {})", len, running_time, time);
running_time
} else {
gst_debug!(SOCKET_CAT, obj: &inner.element, "Read {} bytes", len);
gst::CLOCK_TIME_NONE
};
(len, dts) (len, dts)
} }
} }

View file

@ -23,12 +23,13 @@ use gst::prelude::*;
use gobject_subclass::object::*; use gobject_subclass::object::*;
use gst_plugin::element::*; use gst_plugin::element::*;
use std::io;
use std::sync::Mutex; use std::sync::Mutex;
use std::u16; use std::u16;
use futures; use futures;
use futures::Future;
use futures::future; use futures::future;
use futures::{Future, Poll};
use tokio::net; use tokio::net;
use either::Either; use either::Either;
@ -139,10 +140,28 @@ static PROPERTIES: [Property; 8] = [
), ),
]; ];
pub struct UdpReader {
socket: net::UdpSocket,
}
impl UdpReader {
pub fn new(socket: net::UdpSocket) -> Self {
Self { socket: socket }
}
}
impl SocketRead for UdpReader {
const DO_TIMESTAMP: bool = true;
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, io::Error> {
self.socket.poll_recv(buf)
}
}
struct State { struct State {
io_context: Option<IOContext>, io_context: Option<IOContext>,
pending_future_id: Option<PendingFutureId>, pending_future_id: Option<PendingFutureId>,
socket: Option<Socket>, socket: Option<Socket<UdpReader>>,
need_initial_events: bool, need_initial_events: bool,
configured_caps: Option<gst::Caps>, configured_caps: Option<gst::Caps>,
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>, pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>,
@ -556,7 +575,7 @@ impl UdpSrc {
) )
})?; })?;
let socket = Socket::new(&element.clone().upcast(), socket, buffer_pool); let socket = Socket::new(element.upcast_ref(), UdpReader::new(socket), buffer_pool);
let element_clone = element.clone(); let element_clone = element.clone();
let element_clone2 = element.clone(); let element_clone2 = element.clone();
@ -646,7 +665,7 @@ impl UdpSrc {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket { if let Some(ref socket) = state.socket {
socket.unpause(element.get_clock().unwrap(), element.get_base_time()); socket.unpause(element.get_clock(), Some(element.get_base_time()));
} }
gst_debug!(self.cat, obj: element, "Started"); gst_debug!(self.cat, obj: element, "Started");