From 2d9692797a4f3024587e24785318ad418beb9787 Mon Sep 17 00:00:00 2001 From: Rafael Caricio Date: Mon, 28 Mar 2022 00:07:49 +0200 Subject: [PATCH] Not working --- src/transcriber/imp.rs | 89 +++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 40 deletions(-) diff --git a/src/transcriber/imp.rs b/src/transcriber/imp.rs index bc770f2..94a8973 100644 --- a/src/transcriber/imp.rs +++ b/src/transcriber/imp.rs @@ -79,9 +79,6 @@ struct State { /// Flag to indicate that we need to send the required stream initialization events to the src pad send_events: bool, - /// Flag to indicate that we need to send the EOS event to the src pad - send_eos: bool, - /// Flag to indicate that we need to send a discontinuity buffer in the src pad send_discontinuity: bool, @@ -111,7 +108,6 @@ impl Default for State { buffers: VecDeque::new(), start_time: None, send_events: true, - send_eos: false, send_discontinuity: true, in_segment: gst::FormattedSegment::new(), seqnum: gst::Seqnum::next(), @@ -165,8 +161,6 @@ impl Transcriber { let start_time = state.start_time.unwrap(); let mut last_position = state.out_segment.position().unwrap(); - let send_eos = state.send_eos && state.buffers.is_empty(); - while let Some(buf) = state.buffers.front() { let pts = buf.pts().unwrap(); gst_trace!( @@ -201,15 +195,6 @@ impl Transcriber { drop(state); - // We're EOS, we can pause and exit early - if send_eos { - let _ = self.srcpad.pause_task(); - - return self - .srcpad - .push_event(gst::event::Eos::builder().seqnum(seqnum).build()); - } - for mut buf in items.drain(..) { let mut pts = buf.pts().unwrap(); let mut duration = buf.duration().unwrap(); @@ -314,7 +299,7 @@ impl Transcriber { gst_debug!( CAT, obj: element, - "Item is ready for queuing: {}, PTS {}", + "Item is ready for queuing: \"{}\", PTS {}", item.word, start_time ); @@ -405,7 +390,7 @@ impl Transcriber { } }; - gst_trace!( + gst_debug!( CAT, obj: element, "result: {}", @@ -533,6 +518,7 @@ impl Transcriber { if ret { let (_, min, _) = peer_query.result(); let our_latency = self.settings.lock().unwrap().latency; + gst_info!(CAT, obj: element, "Replying to latency query: {}", our_latency + min); // We never drop buffers, so our max latency is set to infinity q.set(true, our_latency + min, gst::ClockTime::NONE); } @@ -658,22 +644,24 @@ impl Transcriber { if let Some(ws_sink) = self.ws_sink.borrow_mut().as_mut() { if let Some(buffer) = buffer { let data = buffer.map_readable().unwrap(); - for chunk in data.chunks(8192) { + for chunk in data.chunks(8000) { ws_sink .send(Message::Binary(chunk.to_vec())) .await .map_err(|err| { - gst_error!(CAT, obj: element, "Failed sending packet: {}", err); + gst_error!(CAT, obj: element, "Failed sending audio packet to server: {}", err); gst::FlowError::Error })?; } + gst_trace!(CAT, obj: element, "Sent complete buffer to server!"); } else { // Send end of stream + gst_info!(CAT, obj: element, "Closing transcription session to Vosk server"); ws_sink .send(Message::Text("{\"eof\": 1}".to_string())) .await .map_err(|err| { - gst_error!(CAT, obj: element, "Failed sending packet: {}", err); + gst_error!(CAT, obj: element, "Failed sending EOF packet to server: {}", err); gst::FlowError::Error })?; } @@ -724,10 +712,12 @@ impl Transcriber { } fn ensure_connection(&self, element: &super::Transcriber) -> Result<(), gst::ErrorMessage> { - let state = self.state.lock().unwrap(); + { + let state = self.state.lock().unwrap(); - if state.connected { - return Ok(()); + if state.connected { + return Ok(()); + } } let settings = self.settings.lock().unwrap(); @@ -748,7 +738,6 @@ impl Transcriber { let url = settings.server_address.clone(); drop(settings); - drop(state); let (ws, _) = { let _enter = RUNTIME.enter(); @@ -774,6 +763,8 @@ impl Transcriber { let s = in_caps.structure(0).unwrap(); let sample_rate = s.get::("rate").unwrap(); + gst_debug!(CAT, obj: &element, "Configuring transcription session, sample_rate={}", sample_rate); + let config = Configuration::new(sample_rate); let packet = serde_json::to_vec(&config).unwrap(); let msg = Message::Binary(packet); @@ -788,6 +779,8 @@ impl Transcriber { return; } + } else { + return; } } @@ -795,11 +788,7 @@ impl Transcriber { let transcribe = element.imp(); let msg = match ws_stream.next().await { Some(msg) => msg, - None => { - let mut state = transcribe.state.lock().unwrap(); - state.send_eos = true; - break; - } + None => continue }; let msg = match msg { @@ -818,9 +807,12 @@ impl Transcriber { let mut sender = transcribe.state.lock().unwrap().sender.clone(); if let Some(sender) = sender.as_mut() { - if sender.send(msg).await.is_err() { + if let Err(err) = sender.send(msg).await { + gst_error!(CAT, obj: &element, "Stopped RECV handler: {}", err); break; } + } else { + break; } } }; @@ -921,15 +913,24 @@ impl ObjectSubclass for Transcriber { impl ObjectImpl for Transcriber { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: Lazy> = Lazy::new(|| { - vec![glib::ParamSpecUInt::new( - "latency", - "Latency", - "Amount of milliseconds to allow Vosk to transcribe", - 0, - std::u32::MAX, - DEFAULT_LATENCY.mseconds() as u32, - glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, - )] + vec![ + glib::ParamSpecUInt::new( + "latency", + "Latency", + "Amount of milliseconds to allow Vosk to transcribe", + 0, + std::u32::MAX, + DEFAULT_LATENCY.mseconds() as u32, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecString::new( + "server-address", + "Server Address", + "Address of the Vosk websocket server", + Some(DEFAULT_SERVER_ADDRESS), + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ) + ] }); PROPERTIES.as_ref() @@ -957,6 +958,10 @@ impl ObjectImpl for Transcriber { value.get::().expect("type checked upstream").into(), ); } + "server-address" => { + let mut settings = self.settings.lock().unwrap(); + settings.server_address = value.get().expect("type checked upstream") + } _ => unimplemented!(), } } @@ -967,6 +972,10 @@ impl ObjectImpl for Transcriber { let settings = self.settings.lock().unwrap(); (settings.latency.mseconds() as u32).to_value() } + "server-address" => { + let settings = self.settings.lock().unwrap(); + settings.server_address.to_value() + } _ => unimplemented!(), } } @@ -1002,7 +1011,7 @@ impl ElementImpl for Transcriber { .unwrap(); let sink_caps = gst::Caps::builder("audio/x-raw") - .field("format", "S16LE") + .field("format", "S16BE") .field("rate", gst::IntRange::new(8000_i32, 48000)) .field("channels", 1) .build();