From 102523f435775af76dd4fbb009a2265d26b94b8f Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Wed, 30 Dec 2020 23:15:29 +0100 Subject: [PATCH] Handle stopped state --- src/connection.rs | 45 +++-- src/imp.rs | 58 +++--- src/lib.rs | 2 +- src/server.rs | 460 ++++++++++++++++++++++++++++++---------------- 4 files changed, 363 insertions(+), 202 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index f412c98..4089c24 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,11 +1,11 @@ +use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType}; use std::collections::VecDeque; use std::io; use std::io::{Read, Write}; -use std::time::Duration; use std::net::TcpStream; use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; use std::thread; -use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType}; +use std::time::Duration; const BUFFER_SIZE: usize = 4096; @@ -63,17 +63,16 @@ impl Connection { match self.reader.try_recv() { Err(TryRecvError::Empty) => Ok(ReadResult::NoBytesReceived), Err(TryRecvError::Disconnected) => Err(ConnectionError::SocketClosed), - Ok(result) => { - match self.handshake_completed { - true => Ok(result), - false => match result { - ReadResult::HandshakingInProgress => unreachable!(), - ReadResult::NoBytesReceived => Ok(result), - ReadResult::BytesReceived {buffer, byte_count} - => self.handle_handshake_bytes(&buffer[..byte_count]), + Ok(result) => match self.handshake_completed { + true => Ok(result), + false => match result { + ReadResult::HandshakingInProgress => unreachable!(), + ReadResult::NoBytesReceived => Ok(result), + ReadResult::BytesReceived { buffer, byte_count } => { + self.handle_handshake_bytes(&buffer[..byte_count]) } - } - } + }, + }, } } @@ -87,15 +86,18 @@ impl Connection { }; match result { - HandshakeProcessResult::InProgress {response_bytes} => { + HandshakeProcessResult::InProgress { response_bytes } => { if response_bytes.len() > 0 { self.write(response_bytes); } Ok(ReadResult::HandshakingInProgress) - }, + } - HandshakeProcessResult::Completed {response_bytes, remaining_bytes} => { + HandshakeProcessResult::Completed { + response_bytes, + remaining_bytes, + } => { println!("Handshake successful!"); if response_bytes.len() > 0 { self.write(response_bytes); @@ -108,7 +110,10 @@ impl Connection { } self.handshake_completed = true; - Ok(ReadResult::BytesReceived {buffer, byte_count: buffer_size}) + Ok(ReadResult::BytesReceived { + buffer, + byte_count: buffer_size, + }) } } } @@ -116,7 +121,7 @@ impl Connection { fn start_byte_writer(byte_receiver: Receiver>, socket: &TcpStream) { let mut socket = socket.try_clone().expect("failed to clone socket"); - thread::spawn(move|| { + thread::spawn(move || { let mut send_queue = VecDeque::new(); loop { @@ -136,7 +141,7 @@ fn start_byte_writer(byte_receiver: Receiver>, socket: &TcpStream) { println!("Error writing to socket: {:?}", error); return; } - } + }, } } }); @@ -144,7 +149,7 @@ fn start_byte_writer(byte_receiver: Receiver>, socket: &TcpStream) { fn start_result_reader(sender: Sender, socket: &TcpStream) { let mut socket = socket.try_clone().unwrap(); - thread::spawn(move|| { + thread::spawn(move || { let mut buffer = [0; BUFFER_SIZE]; loop { match socket.read(&mut buffer) { @@ -161,7 +166,7 @@ fn start_result_reader(sender: Sender, socket: &TcpStream) { }; sender.send(result).unwrap(); - }, + } Err(error) => { println!("Error occurred reading from socket: {:?}", error); diff --git a/src/imp.rs b/src/imp.rs index 6688d9b..18d49d4 100644 --- a/src/imp.rs +++ b/src/imp.rs @@ -1,3 +1,5 @@ +use crate::server::Server; +use bytes::Bytes; use glib::subclass; use glib::subclass::prelude::*; use gst::prelude::*; @@ -5,12 +7,9 @@ use gst::subclass::prelude::*; use gst::{gst_debug, gst_error, gst_info, gst_log}; use gst_base::prelude::*; use gst_base::subclass::prelude::*; - use once_cell::sync::Lazy; -use std::u32; -use crate::server::Server; use std::sync::Mutex; -use bytes::Bytes; +use std::u32; static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( @@ -26,14 +25,14 @@ const DEFAULT_PORT: u32 = 5000; #[derive(Debug, Clone)] struct Settings { address: String, - port: u32 + port: u32, } impl Default for Settings { fn default() -> Self { Settings { address: DEFAULT_ADDRESS.into(), - port: DEFAULT_PORT + port: DEFAULT_PORT, } } } @@ -45,7 +44,7 @@ static PROPERTIES: [subclass::Property; 2] = [ "Address", "The address the server should listen for incoming connections", DEFAULT_ADDRESS.into(), - glib::ParamFlags::READWRITE + glib::ParamFlags::READWRITE, ) }), subclass::Property("port", |name| { @@ -56,7 +55,7 @@ static PROPERTIES: [subclass::Property; 2] = [ 1000, u32::MAX, DEFAULT_PORT, - glib::ParamFlags::READWRITE + glib::ParamFlags::READWRITE, ) }), ]; @@ -64,9 +63,7 @@ static PROPERTIES: [subclass::Property; 2] = [ #[derive(Debug)] enum State { Stopped, - Started { - stream_key: String, - }, + Started { stream_key: String }, } impl Default for State { @@ -103,7 +100,8 @@ impl ObjectSubclass for RtmpSvrSrc { gst::PadDirection::Src, gst::PadPresence::Always, &caps, - ).unwrap(); + ) + .unwrap(); klass.add_pad_template(src_pad_template); klass.install_properties(&PROPERTIES); @@ -118,13 +116,16 @@ impl ObjectSubclass for RtmpSvrSrc { } impl ObjectImpl for RtmpSvrSrc { - fn set_property(&self, obj: &Self::Type, id: usize, value: &glib::Value) { let prop = &PROPERTIES[id]; match *prop { subclass::Property("address", ..) => { let mut settings = self.settings.lock().unwrap(); - let address = value.get().expect("type checked upstream").unwrap_or_else(|| DEFAULT_ADDRESS).into(); + let address = value + .get() + .expect("type checked upstream") + .unwrap_or_else(|| DEFAULT_ADDRESS) + .into(); settings.address = address; gst_debug!(CAT, obj: obj, "Set address to: {}", settings.address); } @@ -137,6 +138,7 @@ impl ObjectImpl for RtmpSvrSrc { _ => unimplemented!(), }; } + fn get_property(&self, obj: &Self::Type, id: usize) -> glib::Value { let prop = &PROPERTIES[id]; match *prop { @@ -204,18 +206,26 @@ impl BaseSrcImpl for RtmpSvrSrc { impl PushSrcImpl for RtmpSvrSrc { fn create(&self, src: &Self::Type) -> Result { let mut state = self.state.lock().unwrap(); - if let State::Started { .. } = *state { - let chunk = Bytes::from("Mock"); - // Here we return the buffer - let size = chunk.len(); - assert_ne!(chunk.len(), 0); - let buffer = gst::Buffer::from_slice(chunk); + // gst_debug!(CAT, obj: src, "End of stream"); + // Err(gst::FlowError::Eos) - Ok(buffer) - } else { - gst_debug!(CAT, obj: src, "End of stream"); - Err(gst::FlowError::Eos) + match *state { + State::Started { .. } => { + let chunk = Bytes::from("Mock"); + // Here we return the buffer + let size = chunk.len(); + assert_ne!(chunk.len(), 0); + + let buffer = gst::Buffer::from_slice(chunk); + + Ok(buffer) + } + State::Stopped => { + gst::element_error!(src, gst::LibraryError::Failed, ["Not started yet"]); + + return Err(gst::FlowError::Error); + } } } } diff --git a/src/lib.rs b/src/lib.rs index 93adfbf..d23077b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ use glib::prelude::*; -mod imp; mod connection; +mod imp; mod server; glib::wrapper! { diff --git a/src/server.rs b/src/server.rs index 525145c..602e131 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,22 +1,24 @@ +use bytes::Bytes; +use rml_rtmp::chunk_io::Packet; +use rml_rtmp::sessions::StreamMetadata; +use rml_rtmp::sessions::{ + ServerSession, ServerSessionConfig, ServerSessionEvent, ServerSessionResult, +}; +use rml_rtmp::time::RtmpTimestamp; +use slab::Slab; use std::collections::{HashMap, HashSet}; use std::rc::Rc; -use bytes::Bytes; -use slab::Slab; -use rml_rtmp::sessions::{ServerSession, ServerSessionConfig, ServerSessionResult, ServerSessionEvent}; -use rml_rtmp::sessions::StreamMetadata; -use rml_rtmp::chunk_io::Packet; -use rml_rtmp::time::RtmpTimestamp; enum ClientAction { Waiting, Publishing(String), // Publishing to a stream key - Watching { - stream_key: String, - stream_id: u32, - }, + Watching { stream_key: String, stream_id: u32 }, } -enum ReceivedDataType {Audio, Video} +enum ReceivedDataType { + Audio, + Video, +} struct Client { session: ServerSession, @@ -30,7 +32,10 @@ impl Client { match self.current_action { ClientAction::Waiting => None, ClientAction::Publishing(_) => None, - ClientAction::Watching {stream_key: _, stream_id} => Some(stream_id), + ClientAction::Watching { + stream_key: _, + stream_id, + } => Some(stream_id), } } } @@ -45,11 +50,13 @@ struct MediaChannel { #[derive(Debug)] pub enum ServerResult { - DisconnectConnection {connection_id: usize}, + DisconnectConnection { + connection_id: usize, + }, OutboundPacket { target_connection_id: usize, packet: Packet, - } + }, } pub struct Server { @@ -67,7 +74,11 @@ impl Server { } } - pub fn bytes_received(&mut self, connection_id: usize, bytes: &[u8]) -> Result, String> { + pub fn bytes_received( + &mut self, + connection_id: usize, + bytes: &[u8], + ) -> Result, String> { let mut server_results = Vec::new(); if !self.connection_to_client_map.contains_key(&connection_id) { @@ -78,7 +89,11 @@ impl Server { Err(error) => return Err(error.to_string()), }; - self.handle_session_results(connection_id, initial_session_results, &mut server_results); + self.handle_session_results( + connection_id, + initial_session_results, + &mut server_results, + ); let client = Client { session, connection_id, @@ -87,7 +102,8 @@ impl Server { }; let client_id = Some(self.clients.insert(client)); - self.connection_to_client_map.insert(connection_id, client_id.unwrap()); + self.connection_to_client_map + .insert(connection_id, client_id.unwrap()); } let client_results: Vec; @@ -111,17 +127,22 @@ impl Server { let client = self.clients.remove(client_id); match client.current_action { ClientAction::Publishing(stream_key) => self.publishing_ended(stream_key), - ClientAction::Watching{stream_key, stream_id: _} => self.play_ended(client_id, stream_key), + ClientAction::Watching { + stream_key, + stream_id: _, + } => self.play_ended(client_id, stream_key), ClientAction::Waiting => (), } - }, + } } } - fn handle_session_results(&mut self, - executed_connection_id: usize, - session_results: Vec, - server_results: &mut Vec) { + fn handle_session_results( + &mut self, + executed_connection_id: usize, + session_results: Vec, + server_results: &mut Vec, + ) { for result in session_results { match result { ServerSessionResult::OutboundResponse(packet) => { @@ -129,59 +150,133 @@ impl Server { target_connection_id: executed_connection_id, packet, }) - }, + } - ServerSessionResult::RaisedEvent(event) => - self.handle_raised_event(executed_connection_id, event, server_results), + ServerSessionResult::RaisedEvent(event) => { + self.handle_raised_event(executed_connection_id, event, server_results) + } x => println!("Server result received: {:?}", x), } } } - fn handle_raised_event(&mut self, - executed_connection_id: usize, - event: ServerSessionEvent, - server_results: &mut Vec) { + fn handle_raised_event( + &mut self, + executed_connection_id: usize, + event: ServerSessionEvent, + server_results: &mut Vec, + ) { match event { - ServerSessionEvent::ConnectionRequested {request_id, app_name} => { - self.handle_connection_requested(executed_connection_id, request_id, app_name, server_results); - }, + ServerSessionEvent::ConnectionRequested { + request_id, + app_name, + } => { + self.handle_connection_requested( + executed_connection_id, + request_id, + app_name, + server_results, + ); + } - ServerSessionEvent::PublishStreamRequested {request_id, app_name, stream_key, mode: _} => { - self.handle_publish_requested(executed_connection_id, request_id, app_name, stream_key, server_results); - }, + ServerSessionEvent::PublishStreamRequested { + request_id, + app_name, + stream_key, + mode: _, + } => { + self.handle_publish_requested( + executed_connection_id, + request_id, + app_name, + stream_key, + server_results, + ); + } - ServerSessionEvent::PlayStreamRequested {request_id, app_name, stream_key, start_at: _, duration: _, reset: _, stream_id} => { - self.handle_play_requested(executed_connection_id, request_id, app_name, stream_key, stream_id, server_results); - }, + ServerSessionEvent::PlayStreamRequested { + request_id, + app_name, + stream_key, + start_at: _, + duration: _, + reset: _, + stream_id, + } => { + self.handle_play_requested( + executed_connection_id, + request_id, + app_name, + stream_key, + stream_id, + server_results, + ); + } - ServerSessionEvent::StreamMetadataChanged {app_name, stream_key, metadata} => { + ServerSessionEvent::StreamMetadataChanged { + app_name, + stream_key, + metadata, + } => { self.handle_metadata_received(app_name, stream_key, metadata, server_results); - }, + } - ServerSessionEvent::VideoDataReceived {app_name: _, stream_key, data, timestamp} => { - self.handle_audio_video_data_received(stream_key, timestamp, data, ReceivedDataType::Video, server_results); - }, + ServerSessionEvent::VideoDataReceived { + app_name: _, + stream_key, + data, + timestamp, + } => { + self.handle_audio_video_data_received( + stream_key, + timestamp, + data, + ReceivedDataType::Video, + server_results, + ); + } - ServerSessionEvent::AudioDataReceived {app_name: _, stream_key, data, timestamp} => { - self.handle_audio_video_data_received(stream_key, timestamp, data, ReceivedDataType::Audio, server_results); - }, + ServerSessionEvent::AudioDataReceived { + app_name: _, + stream_key, + data, + timestamp, + } => { + self.handle_audio_video_data_received( + stream_key, + timestamp, + data, + ReceivedDataType::Audio, + server_results, + ); + } - _ => println!("Event raised by connection {}: {:?}", executed_connection_id, event), + _ => println!( + "Event raised by connection {}: {:?}", + executed_connection_id, event + ), } } - fn handle_connection_requested(&mut self, - requested_connection_id: usize, - request_id: u32, - app_name: String, - server_results: &mut Vec) { - println!("Connection {} requested connection to app '{}'", requested_connection_id, app_name); + fn handle_connection_requested( + &mut self, + requested_connection_id: usize, + request_id: u32, + app_name: String, + server_results: &mut Vec, + ) { + println!( + "Connection {} requested connection to app '{}'", + requested_connection_id, app_name + ); let accept_result; { - let client_id = self.connection_to_client_map.get(&requested_connection_id).unwrap(); + let client_id = self + .connection_to_client_map + .get(&requested_connection_id) + .unwrap(); let client = self.clients.get_mut(*client_id).unwrap(); accept_result = client.session.accept_request(request_id); } @@ -190,9 +285,9 @@ impl Server { Err(error) => { println!("Error occurred accepting connection request: {:?}", error); server_results.push(ServerResult::DisconnectConnection { - connection_id: requested_connection_id} - ) - }, + connection_id: requested_connection_id, + }) + } Ok(results) => { self.handle_session_results(requested_connection_id, results, server_results); @@ -200,13 +295,18 @@ impl Server { } } - fn handle_publish_requested(&mut self, - requested_connection_id: usize, - request_id: u32, - app_name: String, - stream_key: String, - server_results: &mut Vec) { - println!("Publish requested on app '{}' and stream key '{}'", app_name, stream_key); + fn handle_publish_requested( + &mut self, + requested_connection_id: usize, + request_id: u32, + app_name: String, + stream_key: String, + server_results: &mut Vec, + ) { + println!( + "Publish requested on app '{}' and stream key '{}'", + app_name, stream_key + ); match self.channels.get(&stream_key) { None => (), @@ -214,27 +314,30 @@ impl Server { None => (), Some(_) => { println!("Stream key already being published to"); - server_results.push(ServerResult::DisconnectConnection {connection_id: requested_connection_id}); + server_results.push(ServerResult::DisconnectConnection { + connection_id: requested_connection_id, + }); return; } - } + }, } let accept_result; { - let client_id = self.connection_to_client_map.get(&requested_connection_id).unwrap(); + let client_id = self + .connection_to_client_map + .get(&requested_connection_id) + .unwrap(); let client = self.clients.get_mut(*client_id).unwrap(); client.current_action = ClientAction::Publishing(stream_key.clone()); - let channel = self.channels - .entry(stream_key) - .or_insert(MediaChannel { - publishing_client_id: None, - watching_client_ids: HashSet::new(), - metadata: None, - video_sequence_header: None, - audio_sequence_header: None, - }); + let channel = self.channels.entry(stream_key).or_insert(MediaChannel { + publishing_client_id: None, + watching_client_ids: HashSet::new(), + metadata: None, + video_sequence_header: None, + audio_sequence_header: None, + }); channel.publishing_client_id = Some(*client_id); accept_result = client.session.accept_request(request_id); @@ -244,9 +347,9 @@ impl Server { Err(error) => { println!("Error occurred accepting publish request: {:?}", error); server_results.push(ServerResult::DisconnectConnection { - connection_id: requested_connection_id} - ) - }, + connection_id: requested_connection_id, + }) + } Ok(results) => { self.handle_session_results(requested_connection_id, results, server_results); @@ -254,25 +357,34 @@ impl Server { } } - fn handle_play_requested(&mut self, - requested_connection_id: usize, - request_id: u32, - app_name: String, - stream_key: String, - stream_id: u32, - server_results: &mut Vec) { - println!("Play requested on app '{}' and stream key '{}'", app_name, stream_key); + fn handle_play_requested( + &mut self, + requested_connection_id: usize, + request_id: u32, + app_name: String, + stream_key: String, + stream_id: u32, + server_results: &mut Vec, + ) { + println!( + "Play requested on app '{}' and stream key '{}'", + app_name, stream_key + ); let accept_result; { - let client_id = self.connection_to_client_map.get(&requested_connection_id).unwrap(); + let client_id = self + .connection_to_client_map + .get(&requested_connection_id) + .unwrap(); let client = self.clients.get_mut(*client_id).unwrap(); client.current_action = ClientAction::Watching { stream_key: stream_key.clone(), stream_id, }; - let channel = self.channels + let channel = self + .channels .entry(stream_key.clone()) .or_insert(MediaChannel { publishing_client_id: None, @@ -286,22 +398,24 @@ impl Server { accept_result = match client.session.accept_request(request_id) { Err(error) => Err(error), Ok(mut results) => { - // If the channel already has existing metadata, send that to the new client // so they have up to date info match channel.metadata { None => (), Some(ref metadata) => { - let packet = match client.session.send_metadata(stream_id, metadata.clone()) { + let packet = match client + .session + .send_metadata(stream_id, metadata.clone()) + { Ok(packet) => packet, Err(error) => { println!("Error occurred sending existing metadata to new client: {:?}", error); server_results.push(ServerResult::DisconnectConnection { - connection_id: requested_connection_id} - ); + connection_id: requested_connection_id, + }); return; - }, + } }; results.push(ServerSessionResult::OutboundResponse(packet)); @@ -312,16 +426,24 @@ impl Server { match channel.video_sequence_header { None => (), Some(ref data) => { - let packet = match client.session.send_video_data(stream_id, data.clone(), RtmpTimestamp::new(0), false) { + let packet = match client.session.send_video_data( + stream_id, + data.clone(), + RtmpTimestamp::new(0), + false, + ) { Ok(packet) => packet, Err(error) => { - println!("Error occurred sending video header to new client: {:?}", error); - server_results.push(ServerResult::DisconnectConnection { - connection_id: requested_connection_id} + println!( + "Error occurred sending video header to new client: {:?}", + error ); + server_results.push(ServerResult::DisconnectConnection { + connection_id: requested_connection_id, + }); return; - }, + } }; results.push(ServerSessionResult::OutboundResponse(packet)); @@ -331,16 +453,24 @@ impl Server { match channel.audio_sequence_header { None => (), Some(ref data) => { - let packet = match client.session.send_audio_data(stream_id, data.clone(), RtmpTimestamp::new(0), false) { + let packet = match client.session.send_audio_data( + stream_id, + data.clone(), + RtmpTimestamp::new(0), + false, + ) { Ok(packet) => packet, Err(error) => { - println!("Error occurred sending audio header to new client: {:?}", error); - server_results.push(ServerResult::DisconnectConnection { - connection_id: requested_connection_id} + println!( + "Error occurred sending audio header to new client: {:?}", + error ); + server_results.push(ServerResult::DisconnectConnection { + connection_id: requested_connection_id, + }); return; - }, + } }; results.push(ServerSessionResult::OutboundResponse(packet)); @@ -356,11 +486,11 @@ impl Server { Err(error) => { println!("Error occurred accepting playback request: {:?}", error); server_results.push(ServerResult::DisconnectConnection { - connection_id: requested_connection_id} - ); + connection_id: requested_connection_id, + }); return; - }, + } Ok(results) => { self.handle_session_results(requested_connection_id, results, server_results); @@ -368,12 +498,17 @@ impl Server { } } - fn handle_metadata_received(&mut self, - app_name: String, - stream_key: String, - metadata: StreamMetadata, - server_results: &mut Vec) { - println!("New metadata received for app '{}' and stream key '{}'", app_name, stream_key); + fn handle_metadata_received( + &mut self, + app_name: String, + stream_key: String, + metadata: StreamMetadata, + server_results: &mut Vec, + ) { + println!( + "New metadata received for app '{}' and stream key '{}'", + app_name, stream_key + ); let channel = match self.channels.get_mut(&stream_key) { Some(channel) => channel, None => return, @@ -394,30 +529,36 @@ impl Server { None => continue, }; - match client.session.send_metadata(active_stream_id, metadata.clone()) { - Ok(packet) => { - server_results.push(ServerResult::OutboundPacket { - target_connection_id: client.connection_id, - packet, - }) - }, + match client + .session + .send_metadata(active_stream_id, metadata.clone()) + { + Ok(packet) => server_results.push(ServerResult::OutboundPacket { + target_connection_id: client.connection_id, + packet, + }), Err(error) => { - println!("Error sending metadata to client on connection id {}: {:?}", client.connection_id, error); + println!( + "Error sending metadata to client on connection id {}: {:?}", + client.connection_id, error + ); server_results.push(ServerResult::DisconnectConnection { - connection_id: client.connection_id + connection_id: client.connection_id, }); - }, + } } } } - fn handle_audio_video_data_received(&mut self, - stream_key: String, - timestamp: RtmpTimestamp, - data: Bytes, - data_type: ReceivedDataType, - server_results: &mut Vec) { + fn handle_audio_video_data_received( + &mut self, + stream_key: String, + timestamp: RtmpTimestamp, + data: Bytes, + data_type: ReceivedDataType, + server_results: &mut Vec, + ) { let channel = match self.channels.get_mut(&stream_key) { Some(channel) => channel, None => return, @@ -430,7 +571,7 @@ impl Server { if is_video_sequence_header(data.clone()) { channel.video_sequence_header = Some(data.clone()); } - }, + } ReceivedDataType::Audio => { if is_audio_sequence_header(data.clone()) { @@ -452,14 +593,14 @@ impl Server { let should_send_to_client = match data_type { ReceivedDataType::Video => { - client.has_received_video_keyframe || - (is_video_sequence_header(data.clone()) || - is_video_keyframe(data.clone())) - }, + client.has_received_video_keyframe + || (is_video_sequence_header(data.clone()) + || is_video_keyframe(data.clone())) + } ReceivedDataType::Audio => { client.has_received_video_keyframe || is_audio_sequence_header(data.clone()) - }, + } }; if !should_send_to_client { @@ -467,30 +608,41 @@ impl Server { } let send_result = match data_type { - ReceivedDataType::Audio => client.session.send_audio_data(active_stream_id, data.clone(), timestamp.clone(), true), + ReceivedDataType::Audio => client.session.send_audio_data( + active_stream_id, + data.clone(), + timestamp.clone(), + true, + ), ReceivedDataType::Video => { if is_video_keyframe(data.clone()) { client.has_received_video_keyframe = true; } - client.session.send_video_data(active_stream_id, data.clone(), timestamp.clone(), true) - }, + client.session.send_video_data( + active_stream_id, + data.clone(), + timestamp.clone(), + true, + ) + } }; match send_result { - Ok(packet) => { - server_results.push(ServerResult::OutboundPacket { - target_connection_id: client.connection_id, - packet, - }) - }, + Ok(packet) => server_results.push(ServerResult::OutboundPacket { + target_connection_id: client.connection_id, + packet, + }), Err(error) => { - println!("Error sending metadata to client on connection id {}: {:?}", client.connection_id, error); + println!( + "Error sending metadata to client on connection id {}: {:?}", + client.connection_id, error + ); server_results.push(ServerResult::DisconnectConnection { - connection_id: client.connection_id + connection_id: client.connection_id, }); - }, + } } } } @@ -517,21 +669,15 @@ impl Server { fn is_video_sequence_header(data: Bytes) -> bool { // This is assuming h264. - return data.len() >= 2 && - data[0] == 0x17 && - data[1] == 0x00; + return data.len() >= 2 && data[0] == 0x17 && data[1] == 0x00; } fn is_audio_sequence_header(data: Bytes) -> bool { // This is assuming aac - return data.len() >= 2 && - data[0] == 0xaf && - data[1] == 0x00; + return data.len() >= 2 && data[0] == 0xaf && data[1] == 0x00; } fn is_video_keyframe(data: Bytes) -> bool { // assumings h264 - return data.len() >= 2 && - data[0] == 0x17 && - data[1] != 0x00; // 0x00 is the sequence header, don't count that for now + return data.len() >= 2 && data[0] == 0x17 && data[1] != 0x00; // 0x00 is the sequence header, don't count that for now }