mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-24 09:58:13 +00:00
spotify: fix "start a runtime from within a runtime" with static link
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/986>
This commit is contained in:
parent
b015688447
commit
8bd9de8d48
1 changed files with 200 additions and 112 deletions
|
@ -6,9 +6,10 @@
|
|||
//
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use std::sync::{mpsc, Arc, Mutex};
|
||||
use std::sync::{mpsc, Arc, Mutex, MutexGuard};
|
||||
|
||||
use anyhow::bail;
|
||||
use futures::future::{AbortHandle, Abortable, Aborted};
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::{runtime, task::JoinHandle};
|
||||
|
||||
|
@ -74,10 +75,16 @@ struct Settings {
|
|||
|
||||
#[derive(Default)]
|
||||
pub struct SpotifyAudioSrc {
|
||||
setup_thread: Mutex<Option<SetupThread>>,
|
||||
state: Arc<Mutex<Option<State>>>,
|
||||
settings: Mutex<Settings>,
|
||||
}
|
||||
|
||||
struct SetupThread {
|
||||
thread_handle: std::thread::JoinHandle<Result<anyhow::Result<()>, Aborted>>,
|
||||
abort_handle: AbortHandle,
|
||||
}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for SpotifyAudioSrc {
|
||||
const NAME: &'static str = "GstSpotifyAudioSrc";
|
||||
|
@ -237,17 +244,22 @@ impl BaseSrcImpl for SpotifyAudioSrc {
|
|||
}
|
||||
}
|
||||
|
||||
if let Err(err) = RUNTIME.block_on(async move { self.setup().await }) {
|
||||
let details = format!("{:?}", err);
|
||||
gst::error!(CAT, imp: self, "failed to start: {}", details);
|
||||
gst::element_imp_error!(self, gst::ResourceError::Settings, [&details]);
|
||||
return Err(gst::error_msg!(gst::ResourceError::Settings, [&details]));
|
||||
{
|
||||
let setup_thread = self.setup_thread.lock().unwrap();
|
||||
if setup_thread.is_some() {
|
||||
// already starting
|
||||
return Ok(());
|
||||
}
|
||||
self.start_setup(setup_thread);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn stop(&self) -> Result<(), gst::ErrorMessage> {
|
||||
// stop the setup if it's not completed yet
|
||||
self.cancel_setup();
|
||||
|
||||
if let Some(state) = self.state.lock().unwrap().take() {
|
||||
gst::debug!(CAT, imp: self, "stopping");
|
||||
state.player.stop();
|
||||
|
@ -258,6 +270,12 @@ impl BaseSrcImpl for SpotifyAudioSrc {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
|
||||
self.cancel_setup();
|
||||
|
||||
self.parent_unlock()
|
||||
}
|
||||
}
|
||||
|
||||
impl PushSrcImpl for SpotifyAudioSrc {
|
||||
|
@ -265,6 +283,41 @@ impl PushSrcImpl for SpotifyAudioSrc {
|
|||
&self,
|
||||
_buffer: Option<&mut gst::BufferRef>,
|
||||
) -> Result<CreateSuccess, gst::FlowError> {
|
||||
let state_set = {
|
||||
let state = self.state.lock().unwrap();
|
||||
state.is_some()
|
||||
};
|
||||
|
||||
if !state_set {
|
||||
let setup_thread = self.setup_thread.lock().unwrap();
|
||||
if setup_thread.is_none() {
|
||||
// unlock() could potentially cancel the setup, and create() can be called after unlock() without going through start() again.
|
||||
self.start_setup(setup_thread);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// wait for the setup to be completed
|
||||
let mut setup_thread = self.setup_thread.lock().unwrap();
|
||||
if let Some(setup) = setup_thread.take() {
|
||||
let res = setup.thread_handle.join().unwrap();
|
||||
|
||||
match res {
|
||||
Err(_aborted) => {
|
||||
gst::debug!(CAT, imp: self, "setup has been cancelled");
|
||||
return Err(gst::FlowError::Flushing);
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
let details = format!("{:?}", err);
|
||||
gst::error!(CAT, imp: self, "failed to start: {}", details);
|
||||
gst::element_imp_error!(self, gst::ResourceError::Settings, [&details]);
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
Ok(Ok(_)) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let state = self.state.lock().unwrap();
|
||||
let state = state.as_ref().unwrap();
|
||||
|
||||
|
@ -290,112 +343,6 @@ impl PushSrcImpl for SpotifyAudioSrc {
|
|||
}
|
||||
}
|
||||
|
||||
impl SpotifyAudioSrc {
|
||||
async fn setup(&self) -> anyhow::Result<()> {
|
||||
let (credentials, cache, track) = {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
let credentials_cache = if settings.cache_credentials.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(&settings.cache_credentials)
|
||||
};
|
||||
|
||||
let files_cache = if settings.cache_files.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(&settings.cache_files)
|
||||
};
|
||||
|
||||
let max_size = if settings.cache_max_size != 0 {
|
||||
Some(settings.cache_max_size)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let cache = Cache::new(credentials_cache, None, files_cache, max_size)?;
|
||||
|
||||
let credentials = match cache.credentials() {
|
||||
Some(cached_cred) => {
|
||||
gst::debug!(CAT, imp: self, "reuse credentials from cache",);
|
||||
cached_cred
|
||||
}
|
||||
None => {
|
||||
gst::debug!(CAT, imp: self, "credentials not in cache",);
|
||||
|
||||
if settings.username.is_empty() {
|
||||
bail!("username is not set and credentials are not in cache");
|
||||
}
|
||||
if settings.password.is_empty() {
|
||||
bail!("password is not set and credentials are not in cache");
|
||||
}
|
||||
|
||||
let cred = Credentials::with_password(&settings.username, &settings.password);
|
||||
cache.save_credentials(&cred);
|
||||
cred
|
||||
}
|
||||
};
|
||||
|
||||
if settings.track.is_empty() {
|
||||
bail!("track is not set")
|
||||
}
|
||||
|
||||
(credentials, cache, settings.track.clone())
|
||||
};
|
||||
|
||||
let state = self.state.clone();
|
||||
|
||||
let (session, _credentials) =
|
||||
Session::connect(SessionConfig::default(), credentials, Some(cache), false).await?;
|
||||
|
||||
let player_config = PlayerConfig {
|
||||
passthrough: true,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// use a sync channel to prevent buffering the whole track inside the channel
|
||||
let (sender, receiver) = mpsc::sync_channel(2);
|
||||
let sender_clone = sender.clone();
|
||||
|
||||
let (mut player, mut player_event_channel) =
|
||||
Player::new(player_config, session, Box::new(NoOpVolume), || {
|
||||
Box::new(BufferSink { sender })
|
||||
});
|
||||
|
||||
let track = match SpotifyId::from_uri(&track) {
|
||||
Ok(track) => track,
|
||||
Err(_) => bail!("Failed to create Spotify URI from track"),
|
||||
};
|
||||
|
||||
player.load(track, true, 0);
|
||||
|
||||
let player_channel_handle = RUNTIME.spawn(async move {
|
||||
let sender = sender_clone;
|
||||
|
||||
while let Some(event) = player_event_channel.recv().await {
|
||||
match event {
|
||||
PlayerEvent::EndOfTrack { .. } => {
|
||||
let _ = sender.send(Message::Eos);
|
||||
}
|
||||
PlayerEvent::Unavailable { .. } => {
|
||||
let _ = sender.send(Message::Unavailable);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut state = state.lock().unwrap();
|
||||
state.replace(State {
|
||||
player,
|
||||
receiver,
|
||||
player_channel_handle,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct BufferSink {
|
||||
sender: mpsc::SyncSender<Message>,
|
||||
}
|
||||
|
@ -456,3 +403,144 @@ impl URIHandlerImpl for SpotifyAudioSrc {
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SpotifyAudioSrc {
|
||||
fn start_setup(&self, mut setup_thread: MutexGuard<Option<SetupThread>>) {
|
||||
let self_ = self.to_owned();
|
||||
|
||||
// run the runtime from another thread to prevent the "start a runtime from within a runtime" panic
|
||||
// when the plugin is statically linked.
|
||||
let (abort_handle, abort_registration) = AbortHandle::new_pair();
|
||||
let thread_handle = std::thread::spawn(move || {
|
||||
RUNTIME.block_on(async move {
|
||||
let future = Abortable::new(self_.setup(), abort_registration);
|
||||
future.await
|
||||
})
|
||||
});
|
||||
|
||||
setup_thread.replace(SetupThread {
|
||||
thread_handle,
|
||||
abort_handle,
|
||||
});
|
||||
}
|
||||
|
||||
async fn setup(&self) -> anyhow::Result<()> {
|
||||
{
|
||||
let state = self.state.lock().unwrap();
|
||||
|
||||
if state.is_some() {
|
||||
// already setup
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let (credentials, cache, track) = {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
let credentials_cache = if settings.cache_credentials.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(&settings.cache_credentials)
|
||||
};
|
||||
|
||||
let files_cache = if settings.cache_files.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(&settings.cache_files)
|
||||
};
|
||||
|
||||
let max_size = if settings.cache_max_size != 0 {
|
||||
Some(settings.cache_max_size)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let cache = Cache::new(credentials_cache, None, files_cache, max_size)?;
|
||||
|
||||
let credentials = match cache.credentials() {
|
||||
Some(cached_cred) => {
|
||||
gst::debug!(CAT, imp: self, "reuse credentials from cache",);
|
||||
cached_cred
|
||||
}
|
||||
None => {
|
||||
gst::debug!(CAT, imp: self, "credentials not in cache",);
|
||||
|
||||
if settings.username.is_empty() {
|
||||
bail!("username is not set and credentials are not in cache");
|
||||
}
|
||||
if settings.password.is_empty() {
|
||||
bail!("password is not set and credentials are not in cache");
|
||||
}
|
||||
|
||||
let cred = Credentials::with_password(&settings.username, &settings.password);
|
||||
cache.save_credentials(&cred);
|
||||
cred
|
||||
}
|
||||
};
|
||||
|
||||
if settings.track.is_empty() {
|
||||
bail!("track is not set")
|
||||
}
|
||||
|
||||
(credentials, cache, settings.track.clone())
|
||||
};
|
||||
|
||||
let (session, _credentials) =
|
||||
Session::connect(SessionConfig::default(), credentials, Some(cache), false).await?;
|
||||
|
||||
let player_config = PlayerConfig {
|
||||
passthrough: true,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
// use a sync channel to prevent buffering the whole track inside the channel
|
||||
let (sender, receiver) = mpsc::sync_channel(2);
|
||||
let sender_clone = sender.clone();
|
||||
|
||||
let (mut player, mut player_event_channel) =
|
||||
Player::new(player_config, session, Box::new(NoOpVolume), || {
|
||||
Box::new(BufferSink { sender })
|
||||
});
|
||||
|
||||
let track = match SpotifyId::from_uri(&track) {
|
||||
Ok(track) => track,
|
||||
Err(_) => bail!("Failed to create Spotify URI from track"),
|
||||
};
|
||||
|
||||
player.load(track, true, 0);
|
||||
|
||||
let player_channel_handle = RUNTIME.spawn(async move {
|
||||
let sender = sender_clone;
|
||||
|
||||
while let Some(event) = player_event_channel.recv().await {
|
||||
match event {
|
||||
PlayerEvent::EndOfTrack { .. } => {
|
||||
let _ = sender.send(Message::Eos);
|
||||
}
|
||||
PlayerEvent::Unavailable { .. } => {
|
||||
let _ = sender.send(Message::Unavailable);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
state.replace(State {
|
||||
player,
|
||||
receiver,
|
||||
player_channel_handle,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cancel_setup(&self) {
|
||||
let mut setup_thread = self.setup_thread.lock().unwrap();
|
||||
|
||||
if let Some(setup) = setup_thread.take() {
|
||||
setup.abort_handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue