CJ3W6DCERC3FVMB7FJOA2V5MTAHHORYOEJB3J5HFJ5PFBVK2UXWQC
JSY2UNEKUOGKUN3ZF6SPACARUEKKYT6M3OYIYNKMDMMYVRNFFKVQC
2OEKDJGNRB26WDYNIPXLD3X5KFEXUQN2EACBXO6T4F5PPB3YL2TQC
XZ34MAZKL5TBQ5SKENRSBXHMUVNG4IB7UHY2GUTIORI6VW67OR7QC
6GZTCB6VJEBACZR2KCGIXBOQ3JFHTBJQJNPAGQNNWQFDB6VRFEUAC
J6AU7ZAOSQE7BLKW4HKLQOTSZGIZFAZGCJEX7IVBZ3HZNTNGHSHQC
MDSWQ3KZTRAW6QPPKQTKS6BDEX4T6C42VGCZOA6UGT2AOJKERTEQC
O7LARFJLYA7QBV73LE6N3DWVVKHCE43LI6EMCD34AXYHP6M5PWDQC
use std::{fs::File, path::Path, time::Duration};
use anyhow::{Context, Result};
use symphonia::core::{
audio::{AudioBufferRef, SampleBuffer},
codecs::Decoder,
formats::{FormatReader, SeekMode, SeekTo},
io::MediaSourceStream,
probe::Hint,
units::Time,
};
use crate::SourceError;
pub struct Source {
format: Box<dyn FormatReader>,
decoder: Box<dyn Decoder>,
track_id: u32,
pub duration: Duration,
pub sample_rate: f64,
sample_buf: symphonia::core::audio::SampleBuffer<f32>,
i: u32,
buffering: bool,
}
impl Source {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let file = Box::new(File::open(path).unwrap());
let mss = MediaSourceStream::new(file, Default::default());
Self::from_mss(mss)
}
pub fn from_mss(mss: MediaSourceStream) -> Result<Self> {
let mut hint = Hint::new();
hint.with_extension("mp3");
// let probed = symphonia::default::get_probe()
// .format(&hint, mss, &Default::default(), &Default::default())
// .unwrap();
// let mut format = probed.format;
let mut format = Box::new(symphonia::default::formats::Mp3Reader::try_new(
mss,
&Default::default(),
)?);
let track = format.default_track().unwrap().clone();
let mut decoder =
symphonia::default::get_codecs().make(&track.codec_params, &Default::default())?;
let codec_params = decoder.codec_params();
let duration = {
let time = codec_params
.time_base
.unwrap()
.calc_time(codec_params.n_frames.unwrap());
Duration::from_secs(time.seconds) + Duration::from_secs_f64(time.frac)
};
let audio_buf = get_next_audio_buffer(&mut *format, track.id, &mut *decoder)?;
let spec = *audio_buf.spec();
let sample_buf = {
let duration = audio_buf.capacity() as u64;
let mut sample_buf = SampleBuffer::new(duration, spec);
sample_buf.copy_interleaved_ref(audio_buf);
sample_buf
};
Ok(Self {
format,
decoder,
track_id: track.id,
duration,
sample_rate: spec.rate as f64,
sample_buf,
buffering: false,
i: 0,
})
}
fn decode_next(&mut self) {
let next = get_next_audio_buffer(&mut *self.format, self.track_id, &mut *self.decoder);
match next {
Ok(audio_buf) => {
self.sample_buf.copy_interleaved_ref(audio_buf);
self.buffering = false;
}
Err(e) => {
self.sample_buf.clear();
self.buffering = true;
println!("error while decoding next packed {}", e);
}
}
self.i = 0;
}
}
fn get_next_audio_buffer<'a>(
format: &mut dyn FormatReader,
track_id: u32,
decoder: &'a mut dyn Decoder,
) -> Result<AudioBufferRef<'a>> {
let packet = loop {
let packet = format.next_packet().context("Couldn't fetch next packet")?;
if packet.track_id() == track_id {
break packet;
}
};
let audio_buf = decoder
.decode(&packet)
.context("Couldn't decode next packet")?;
Ok(audio_buf)
}
impl crate::Source for Source {
fn seek(&mut self, pos: Duration) -> Result<(), SourceError> {
if let Err(_) = self.format.seek(
SeekMode::Coarse,
SeekTo::Time {
time: Time {
seconds: pos.as_secs(),
frac: pos.as_secs_f64().fract(),
},
track_id: None,
},
) {
self.buffering = true;
return Err(SourceError::Buffering);
}
self.decode_next();
self.i = 0;
Ok(())
}
fn next(&mut self, buf: &mut [[f32; 2]]) -> Result<(), SourceError> {
for b in buf {
if self.i >= self.sample_buf.len() as u32 {
self.decode_next();
if self.buffering {
return Err(SourceError::Buffering);
}
}
b[0] = self.sample_buf.samples()[(self.i + 0) as usize];
b[1] = self.sample_buf.samples()[(self.i + 1) as usize];
self.i += 2;
}
Ok(())
}
}
pub mod hls;
pub mod symphonia;
use std::{
io::{Read, Seek, SeekFrom},
sync::Arc,
time::Duration,
};
use parking_lot::Mutex;
use tracing::{debug, instrument, trace};
use super::{SegmentCache, SegmentInfos};
use crate::util::hls::BYTERATE;
pub struct MediaSource {
duration: Duration,
segment_infos: SegmentInfos,
segment_cache: Arc<Mutex<SegmentCache>>,
curr_segment: Option<Vec<u8>>,
curr_segment_idx: usize,
curr_offset: usize,
}
impl MediaSource {
pub fn new(
duration: Duration,
segment_infos: SegmentInfos,
segment_cache: Arc<Mutex<SegmentCache>>,
pos: Duration,
) -> Self {
let (curr_segment_idx, curr_offset) = segment_infos.segment_at(pos).unwrap();
if segment_cache.lock().segments[curr_segment_idx].is_some() {
segment_cache.lock().buffering = false;
}
segment_cache.lock().source_position = (curr_segment_idx, curr_offset);
debug!(
"Creating hls::MediaSource with {:?}/{:?}",
curr_segment_idx, curr_offset
);
Self {
duration,
segment_infos,
segment_cache,
curr_segment: None,
curr_segment_idx,
curr_offset,
}
}
#[instrument(skip_all)]
fn load_segment(&mut self) -> std::io::Result<()> {
trace!(
"grabbing segment {} from the cache...",
self.curr_segment_idx
);
let mut cache = self.segment_cache.lock();
let segment = match &cache.segments[self.curr_segment_idx] {
Some(s) => s.clone(),
None => {
debug!("failed to grab segment");
cache.buffering = true;
return Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"Buffering...",
));
}
};
self.curr_segment = Some(segment);
Ok(())
}
fn advance(&mut self) -> std::io::Result<()> {
self.curr_segment_idx += 1;
self.curr_segment = None;
if self.curr_segment_idx < self.segment_infos.0.len() {
self.load_segment()?;
self.curr_offset = 0;
Ok(())
} else {
Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"End of stream",
))
}
}
fn ended(&self) -> bool {
self.curr_segment_idx >= self.segment_infos.0.len()
}
}
impl Read for MediaSource {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.curr_segment.is_none() {
self.load_segment()?;
// self.curr_offset = 0;
}
let mut nb_filled = 0;
while nb_filled < buf.len() && !self.ended() {
let segment = self.curr_segment.as_ref().unwrap();
let nb_remaining_to_take = buf.len() - nb_filled;
// debug!("{:?} - {:?}", segment.len(), self.curr_offset);
let nb_available_in_segment = segment.len().saturating_sub(self.curr_offset);
let nb_to_take = nb_remaining_to_take.min(nb_available_in_segment);
let segment_slice = &segment[self.curr_offset..][..nb_to_take];
buf[nb_filled..nb_filled + nb_to_take].copy_from_slice(segment_slice);
self.curr_offset += nb_to_take;
nb_filled += nb_to_take;
if nb_filled < buf.len() {
self.advance()?;
}
}
self.segment_cache.lock().source_position = (self.curr_segment_idx, self.curr_offset);
Ok(nb_filled)
}
}
impl Seek for MediaSource {
fn seek(&mut self, seek_from: std::io::SeekFrom) -> std::io::Result<u64> {
let curr_pos = (self
.segment_infos
.time_at_position(self.curr_segment_idx, self.curr_offset)
.as_secs_f64()
* BYTERATE) as i64;
let idx = match seek_from {
SeekFrom::Start(idx) => idx as i64,
SeekFrom::Current(off) => curr_pos + off,
SeekFrom::End(off) => (self.duration.as_secs_f64() * BYTERATE) as i64 - off,
}
.max(0) as u64;
let time = Duration::from_secs_f64(idx as f64 / BYTERATE);
let segment_idx = self
.segment_infos
.segment_at(time)
.map(|p| p.0)
.unwrap_or(self.segment_infos.0.len() - 1);
let segment_start = self.segment_infos.time_at_position(segment_idx, 0);
let offset = ((time - segment_start).as_secs_f64() * BYTERATE) as u64;
self.curr_segment_idx = segment_idx;
self.curr_offset = offset as usize;
self.curr_segment = None;
self.segment_cache.lock().source_position = (self.curr_segment_idx, self.curr_offset);
println!(
"HLS SOURCE SEEKED TO {}/{}",
self.curr_segment_idx, self.curr_offset
);
Ok(idx)
}
}
impl symphonia::core::io::MediaSource for MediaSource {
fn is_seekable(&self) -> bool {
true
}
fn byte_len(&self) -> Option<u64> {
Some((BYTERATE * self.duration.as_secs_f64()).ceil() as u64)
}
}
use std::time::Duration;
use hls_m3u8::MediaPlaylist;
mod fetcher;
mod source;
pub use fetcher::Fetcher;
pub use source::MediaSource;
pub const BYTERATE: f64 = 128_000.0 / 8.0;
// We need a thread that fetches the HLS segments continuously as we stream the audio
// and inserts them into a cache protected by a mutex.
// And the SoundcloudMediaSource needs to pull the segments from the cache.
// The fetcher will try to keep the cache filled for the next 10 seconds
#[derive(Debug, Clone)]
pub struct SegmentInfo {
url: String,
duration: Duration,
}
#[derive(Debug)]
pub struct SegmentCache {
pub source_position: (usize, usize), // (segment idx, byte idx)
pub segments: Vec<Option<Vec<u8>>>,
pub buffering: bool,
}
impl SegmentCache {
pub fn new(nb_segments: usize) -> Self {
Self {
source_position: (0, 0),
segments: vec![None; nb_segments],
buffering: false,
}
}
}
#[derive(Debug, Clone)]
pub struct SegmentInfos(pub Vec<SegmentInfo>);
impl SegmentInfos {
pub fn from_hls(hls: &MediaPlaylist) -> Self {
Self(
hls.segments
.values()
.map(|segment| SegmentInfo {
url: segment.uri().to_string(),
duration: segment.duration.duration(),
})
.collect(),
)
}
pub fn segment_at(&self, time: Duration) -> Option<(usize, usize)> {
let mut t = Duration::ZERO;
for (idx, segment) in self.0.iter().enumerate() {
t += segment.duration;
if t > time {
let byte_offset =
((time - (t - segment.duration)).as_secs_f64() * BYTERATE) as usize;
return Some((idx, byte_offset));
}
}
return None;
}
pub fn time_at_position(&self, segment_idx: usize, byte_idx: usize) -> Duration {
let mut t = Duration::ZERO;
for (idx, segment) in self.0.iter().enumerate() {
if segment_idx == idx {
t += Duration::from_secs_f64(byte_idx as f64 / BYTERATE);
break;
}
t += segment.duration;
}
return t;
}
}
use std::{io::Read, sync::Arc, time::Duration};
use anyhow::{Context, Result};
use parking_lot::Mutex;
use tracing::{debug, instrument, trace};
use super::{SegmentCache, SegmentInfos};
#[derive(Debug)]
pub struct Fetcher {
duration: Duration,
segment_infos: SegmentInfos,
segment_cache: Arc<Mutex<SegmentCache>>,
}
impl Fetcher {
pub fn spawn(
duration: Duration,
segment_infos: SegmentInfos,
segment_cache: Arc<Mutex<SegmentCache>>,
) -> Result<()> {
// grab the first segment to help symphonia probe the format
let first_empty = segment_cache.lock().segments[0].is_none();
if first_empty {
segment_cache.lock().segments[0] =
Some(fetch_segment_retry(&segment_infos.0[0].url, 5)?);
}
let fetcher = Fetcher {
duration,
segment_infos,
segment_cache,
};
std::thread::spawn(move || {
if let Err(e) = fetcher.run() {
println!("{e}");
}
});
Ok(())
}
#[instrument(skip_all)]
fn run(self) -> Result<()> {
loop {
// We look where the source is located and make sure the following 10 seconds are loaded in cache
let source_position = self.segment_cache.lock().source_position;
trace!("fetching iteration at position {:?}", source_position);
let curr_time = self
.segment_infos
.time_at_position(source_position.0, source_position.1);
let curr_segment = self
.segment_infos
.segment_at(curr_time)
.map(|p| p.0)
.unwrap();
let target_time = (curr_time + Duration::SECOND * 10).min(self.duration);
let target_segment = self
.segment_infos
.segment_at(target_time)
.map(|p| p.0)
.unwrap_or(self.segment_infos.0.len());
for idx in curr_segment..=target_segment {
if self.segment_cache.lock().segments.get(idx) == Some(&None) {
trace!("fetching next segment: {}", idx);
let info = &self.segment_infos.0[idx];
self.segment_cache.lock().segments[idx] =
Some(fetch_segment_retry(&info.url, 5)?);
trace!("next segment received!");
if idx == self.segment_cache.lock().source_position.0 {
debug!("STOPPED BUFFERING");
self.segment_cache.lock().buffering = false;
}
}
}
std::thread::sleep(Duration::SECOND);
}
}
}
fn fetch_segment_retry(url: &str, nb_tries: usize) -> Result<Vec<u8>> {
for _ in 0..nb_tries {
if let Ok(s) = fetch_segment(url) {
return Ok(s);
}
std::thread::sleep(Duration::SECOND);
}
fetch_segment(url).context("Couldn't fetch segment")
}
fn fetch_segment(url: &str) -> Result<Vec<u8>> {
Ok(ureq::get(url)
.call()?
.into_reader()
.bytes()
.collect::<Result<Vec<u8>, std::io::Error>>()?)
}
use std::{sync::Arc, time::Duration};
use anyhow::Result;
use hls_m3u8::MediaPlaylist;
use parking_lot::Mutex;
use symphonia::core::io::MediaSourceStream;
use crate::{
util::{
self,
hls::{self, SegmentCache, SegmentInfos},
},
Source,
};
pub struct SoundcloudSource {
segment_infos: SegmentInfos,
cache: Arc<Mutex<SegmentCache>>,
pub source: util::symphonia::Source,
seeking: Option<Duration>,
}
impl SoundcloudSource {
pub fn new(media_playlist: &MediaPlaylist) -> Result<Self> {
let duration = media_playlist.duration();
let segment_infos = SegmentInfos::from_hls(&media_playlist);
let cache = Arc::new(Mutex::new(SegmentCache::new(segment_infos.0.len())));
hls::Fetcher::spawn(duration, segment_infos.clone(), cache.clone())?;
let hls_source = hls::MediaSource::new(
duration,
segment_infos.clone(),
cache.clone(),
Duration::ZERO,
);
let mss = MediaSourceStream::new(Box::new(hls_source), Default::default());
let symphonia_source = util::symphonia::Source::from_mss(mss)?;
Ok(Self {
segment_infos,
cache,
source: symphonia_source,
seeking: None,
})
}
}
impl Source for SoundcloudSource {
fn seek(&mut self, pos: Duration) -> Result<(), crate::SourceError> {
match self.source.seek(pos) {
Err(crate::SourceError::Buffering) => {
let (curr_segment_idx, mut curr_offset) =
self.segment_infos.segment_at(pos).unwrap();
curr_offset -= 5000; // safety to avoid not having enough data for symphonia
self.cache.lock().source_position = (curr_segment_idx, curr_offset);
self.seeking = Some(pos);
return Err(crate::SourceError::Buffering);
}
_ => {}
}
Ok(())
}
fn next(&mut self, buf: &mut [[f32; 2]]) -> Result<(), crate::SourceError> {
if let Some(pos) = self.seeking {
if !self.cache.lock().buffering {
self.seeking = None;
let hls_source = hls::MediaSource::new(
self.source.duration,
self.segment_infos.clone(),
self.cache.clone(),
pos,
);
let mss = MediaSourceStream::new(Box::new(hls_source), Default::default());
self.source = util::symphonia::Source::from_mss(mss).unwrap();
} else {
return Err(crate::SourceError::Buffering);
}
}
self.source.next(buf)
}
}
let segment_infos = SegmentInfos::from_hls(&hls);
let cache = fetcher::SoundcloudFetcher::spawn(hls.duration(), segment_infos.clone())?;
let source = source::SoundcloudMediaSource::new(hls.duration(), segment_infos, cache);
let source = source::SoundcloudSource::new(&hls)?;
// let mss = MediaSourceStream::new(Box::new(source), Default::default());
// std::thread::sleep(Duration::SECOND * 2);
// let source = crate::util::symphonia::Source::from_mss(mss).map(|source| SongSource {
// info: SongInfo {
// duration: source.duration,
// },
// sample_rate: source.sample_rate,
// signal: Box::new(source),
// });
}
}
}
// We need a thread that fetches the HLS segments continuously as we stream the audio
// and inserts them into a cache protected by a mutex.
// And the SoundcloudMediaSource needs to pull the segments from the cache.
// The fetcher will try to keep the cache filled for the next 10 seconds
#[derive(Clone)]
pub struct SegmentInfo {
url: String,
duration: Duration,
}
pub struct SegmentCache {
source_position: (usize, usize), // (segment idx, byte idx)
segments: Vec<Option<Vec<u8>>>,
}
#[derive(Clone)]
pub struct SegmentInfos(pub Vec<SegmentInfo>);
impl SegmentInfos {
pub fn from_hls(hls: &MediaPlaylist) -> Self {
Self(
hls.segments
.values()
.map(|segment| SegmentInfo {
url: segment.uri().to_string(),
duration: segment.duration.duration(),
})
.collect(),
)
}
fn segment_at(&self, time: Duration) -> Option<usize> {
let mut t = Duration::ZERO;
for (idx, segment) in self.0.iter().enumerate() {
t += segment.duration;
if t > time {
return Some(idx);
}
}
return None;
}
fn time_at_position(&self, segment_idx: usize, byte_idx: usize) -> Duration {
let mut t = Duration::ZERO;
for (idx, segment) in self.0.iter().enumerate() {
if segment_idx == idx {
t += Duration::from_secs_f64(byte_idx as f64 / 128_000.0);
break;
}
t += segment.duration;
use std::{fs::File, path::Path, time::Duration};
use anyhow::{Context, Result};
use symphonia::core::{
audio::{AudioBufferRef, SampleBuffer},
codecs::Decoder,
formats::{FormatReader, SeekMode, SeekTo},
io::MediaSourceStream,
probe::Hint,
units::Time,
};
use anyhow::Result;
let source = LocalSource::from_file(path).map(|source| SongSource {
info: SongInfo {
duration: source.duration,
},
sample_rate: source.sample_rate,
signal: Box::new(source),
});
let source =
crate::util::symphonia::Source::from_file(path).map(|source| SongSource {
info: SongInfo {
duration: source.duration,
},
sample_rate: source.sample_rate,
signal: Box::new(source),
});
pub struct LocalSource {
format: Box<dyn FormatReader>,
decoder: Box<dyn Decoder>,
track_id: u32,
pub duration: Duration,
pub sample_rate: f64,
sample_buf: symphonia::core::audio::SampleBuffer<f32>,
i: u32,
buffering: bool,
}
// pub struct LocalSource {
// format: Box<dyn FormatReader>,
// decoder: Box<dyn Decoder>,
// track_id: u32,
// pub duration: Duration,
// pub sample_rate: f64,
// sample_buf: symphonia::core::audio::SampleBuffer<f32>,
// i: u32,
// buffering: bool,
// }
impl LocalSource {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let file = Box::new(File::open(path).unwrap());
let mss = MediaSourceStream::new(file, Default::default());
Self::from_mss(mss)
}
// impl LocalSource {
// pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
// let file = Box::new(File::open(path).unwrap());
// let mss = MediaSourceStream::new(file, Default::default());
// Self::from_mss(mss)
// }
let mut decoder =
symphonia::default::get_codecs().make(&track.codec_params, &Default::default())?;
let codec_params = decoder.codec_params();
let duration = {
let time = codec_params
.time_base
.unwrap()
.calc_time(codec_params.n_frames.unwrap());
Duration::from_secs(time.seconds) + Duration::from_secs_f64(time.frac)
};
// let mut decoder =
// symphonia::default::get_codecs().make(&track.codec_params, &Default::default())?;
// let codec_params = decoder.codec_params();
// let duration = {
// let time = codec_params
// .time_base
// .unwrap()
// .calc_time(codec_params.n_frames.unwrap());
// Duration::from_secs(time.seconds) + Duration::from_secs_f64(time.frac)
// };
let audio_buf = get_next_audio_buffer(&mut *format, track.id, &mut *decoder)?;
let spec = *audio_buf.spec();
let sample_buf = {
let duration = audio_buf.capacity() as u64;
let mut sample_buf = SampleBuffer::new(duration, spec);
sample_buf.copy_interleaved_ref(audio_buf);
sample_buf
};
// let audio_buf = get_next_audio_buffer(&mut *format, track.id, &mut *decoder)?;
// let spec = *audio_buf.spec();
// let sample_buf = {
// let duration = audio_buf.capacity() as u64;
// let mut sample_buf = SampleBuffer::new(duration, spec);
// sample_buf.copy_interleaved_ref(audio_buf);
// sample_buf
// };
Ok(Self {
format,
decoder,
track_id: track.id,
duration,
sample_rate: spec.rate as f64,
sample_buf,
buffering: false,
i: 0,
})
}
// Ok(Self {
// format,
// decoder,
// track_id: track.id,
// duration,
// sample_rate: spec.rate as f64,
// sample_buf,
// buffering: false,
// i: 0,
// })
// }
fn decode_next(&mut self) {
let next = get_next_audio_buffer(&mut *self.format, self.track_id, &mut *self.decoder);
match next {
Ok(audio_buf) => {
self.sample_buf.copy_interleaved_ref(audio_buf);
self.buffering = false;
}
Err(e) => {
self.sample_buf.clear();
self.buffering = true;
println!("error while decoding next packed {}", e);
}
}
self.i = 0;
}
}
// fn decode_next(&mut self) {
// let next = get_next_audio_buffer(&mut *self.format, self.track_id, &mut *self.decoder);
// match next {
// Ok(audio_buf) => {
// self.sample_buf.copy_interleaved_ref(audio_buf);
// self.buffering = false;
// }
// Err(e) => {
// self.sample_buf.clear();
// self.buffering = true;
// println!("error while decoding next packed {}", e);
// }
// }
// self.i = 0;
// }
// }
fn get_next_audio_buffer<'a>(
format: &mut dyn FormatReader,
track_id: u32,
decoder: &'a mut dyn Decoder,
) -> Result<AudioBufferRef<'a>> {
let packet = loop {
let packet = format.next_packet().context("Couldn't fetch next packet")?;
if packet.track_id() == track_id {
break packet;
}
};
let audio_buf = decoder
.decode(&packet)
.context("Couldn't decode next packet")?;
Ok(audio_buf)
}
// fn get_next_audio_buffer<'a>(
// format: &mut dyn FormatReader,
// track_id: u32,
// decoder: &'a mut dyn Decoder,
// ) -> Result<AudioBufferRef<'a>> {
// let packet = loop {
// let packet = format.next_packet().context("Couldn't fetch next packet")?;
// if packet.track_id() == track_id {
// break packet;
// }
// };
// let audio_buf = decoder
// .decode(&packet)
// .context("Couldn't decode next packet")?;
// Ok(audio_buf)
// }
impl Source for LocalSource {
fn seek(&mut self, pos: Duration) {
if let Err(_) = self.format.seek(
SeekMode::Coarse,
SeekTo::Time {
time: Time {
seconds: pos.as_secs(),
frac: pos.as_secs_f64().fract(),
},
track_id: None,
},
) {
self.buffering = true;
}
self.decode_next();
self.i = 0;
}
// impl Source for LocalSource {
// fn seek(&mut self, pos: Duration) {
// if let Err(_) = self.format.seek(
// SeekMode::Coarse,
// SeekTo::Time {
// time: Time {
// seconds: pos.as_secs(),
// frac: pos.as_secs_f64().fract(),
// },
// track_id: None,
// },
// ) {
// self.buffering = true;
// }
// self.decode_next();
// self.i = 0;
// }
fn next(&mut self, buf: &mut [[f32; 2]]) -> Result<(), SourceError> {
for b in buf {
if self.i >= self.sample_buf.len() as u32 {
self.decode_next();
if self.buffering {
return Err(SourceError::Buffering);
}
}
b[0] = self.sample_buf.samples()[(self.i + 0) as usize];
b[1] = self.sample_buf.samples()[(self.i + 1) as usize];
self.i += 2;
}
Ok(())
}
}
// fn next(&mut self, buf: &mut [[f32; 2]]) -> Result<(), SourceError> {
// for b in buf {
// if self.i >= self.sample_buf.len() as u32 {
// self.decode_next();
// if self.buffering {
// return Err(SourceError::Buffering);
// }
// }
// b[0] = self.sample_buf.samples()[(self.i + 0) as usize];
// b[1] = self.sample_buf.samples()[(self.i + 1) as usize];
// self.i += 2;
// }
// Ok(())
// }
// }
use anyhow::Result;
use rubato::{InterpolationParameters, Resampler as _, SincFixedOut};
use crate::{SongSource, SourceError};
pub struct Resampler {
pub resampler: SincFixedOut<f32>,
pub source_buf: Vec<[f32; 2]>,
pub in_buf: Vec<Vec<f32>>,
pub out_buf: Vec<Vec<f32>>,
pub i: usize,
}
impl Resampler {
pub fn new(ratio: f64) -> Result<Self> {
let resampler = SincFixedOut::new(
ratio,
2.0,
InterpolationParameters {
sinc_len: 256,
f_cutoff: 0.95,
oversampling_factor: 128,
interpolation: rubato::InterpolationType::Linear,
window: rubato::WindowFunction::Blackman2,
},
512,
2,
)
.unwrap();
Ok(Self {
source_buf: vec![[0.0; 2]; resampler.input_frames_max()],
in_buf: resampler.input_buffer_allocate(),
out_buf: resampler.output_buffer_allocate(),
resampler,
i: 0,
})
}
pub fn process(&mut self, source: &mut SongSource) -> Result<(), SourceError> {
let in_len = self.resampler.input_frames_next();
source.signal.next(&mut self.source_buf[..in_len])?;
self.in_buf[0].clear();
self.in_buf[1].clear();
for i in 0..in_len {
self.in_buf[0].push(self.source_buf[i][0]);
self.in_buf[1].push(self.source_buf[i][1]);
}
self.out_buf[0].clear();
self.out_buf[1].clear();
self.resampler
.process_into_buffer(&self.in_buf, &mut self.out_buf, Some(&[true, true]))
.unwrap();
self.i = 0;
Ok(())
}
}
use std::{
ops::DerefMut,
sync::Arc,
time::{Duration, Instant},
};
use cpal::StreamConfig;
use parking_lot::RwLock;
use crate::{PlayerState, SongSource, SourceError};
mod resampler;
use resampler::Resampler;
pub enum Event {
Play,
Pause,
Stop,
Seek(Duration),
SetSource(SongSource),
}
pub struct Player {
receiver: crossbeam_channel::Receiver<Event>,
config: StreamConfig,
pub state: Arc<RwLock<PlayerState>>,
source: Option<SongSource>,
resampler: Option<Resampler>,
buffering_since: Instant,
}
impl Player {
pub fn new(config: StreamConfig, receiver: crossbeam_channel::Receiver<Event>) -> Self {
Self {
receiver,
config,
state: Arc::new(RwLock::new(PlayerState::Idle)),
source: None,
resampler: None,
buffering_since: Instant::now(),
}
}
pub fn process_events(&mut self) {
while let Ok(event) = self.receiver.try_recv() {
match event {
Event::Play => {
self.state.write().play().unwrap();
}
Event::Pause => {
self.state.write().pause().unwrap();
}
Event::Stop => {
*self.state.write() = PlayerState::Idle;
}
Event::Seek(position) => {
self.state.write().seek(position).unwrap();
match self.source.as_mut().unwrap().signal.seek(position) {
Err(SourceError::Buffering) => {
self.buffering_since = Instant::now();
}
_ => {}
}
}
Event::SetSource(mut source) => {
let source_sample_rate = source.sample_rate;
let mut resampler =
Resampler::new((self.config.sample_rate.0 as f64) / source_sample_rate)
.unwrap();
resampler.process(&mut source).ok();
self.resampler = Some(resampler);
self.state.write().set_song(source.info);
self.source = Some(source);
}
}
}
}
pub fn process(&mut self, data: &mut [f32]) {
self.process_events();
let playing = match *self.state.read() {
PlayerState::Playing { paused, .. } => !paused,
_ => false,
};
if !playing {
for d in data {
*d = 0.0;
}
return;
}
let resampler = self.resampler.as_mut().unwrap();
let source = self.source.as_mut().unwrap();
if self.buffering_since.elapsed() < Duration::SECOND {
for d in data {
*d = 0.0;
}
return;
}
for d in data.array_chunks_mut::<2>() {
if resampler.i >= resampler.out_buf[0].len() {
match resampler.process(source) {
Err(SourceError::Buffering) => {
// we should probably zero out the rest of the buffer
self.buffering_since = Instant::now();
return;
}
Err(SourceError::EndOfStream) => {
*self.state.write() = PlayerState::Idle;
}
Ok(()) => match self.state.write().deref_mut() {
PlayerState::Playing { offset, .. } => {
*offset += Duration::from_secs_f64(
resampler.out_buf[0].len() as f64
/ self.config.sample_rate.0 as f64,
);
}
_ => {}
},
}
}
d[0] = resampler.out_buf[0][resampler.i];
d[1] = resampler.out_buf[1][resampler.i];
resampler.i += 1;
}
}
}
Playing {
song,
offset,
playing_since,
} => {
*self = Paused {
song: *song,
offset: *offset + Instant::now().duration_since(*playing_since),
};
Playing { paused, .. } => {
*paused = true;
match self.state {
Paused { .. } => self.play(),
Playing { .. } => self.pause(),
_ => Err(anyhow!("wrong state")),
let paused = match *self.state.read() {
Playing { paused, .. } => paused,
_ => return Err(anyhow!("wrong state")),
};
if paused {
self.play()
} else {
self.pause()
tracing = "0.1.35"
use std::time::Duration;
use druid::{widget::Controller, Env, Event, EventCtx, TimerToken, Widget};
use crate::command;
#[derive(Default)]
pub struct PlayerTick {
timer: Option<TimerToken>,
}
impl<T, W: Widget<T>> Controller<T, W> for PlayerTick {
fn event(&mut self, child: &mut W, ctx: &mut EventCtx, event: &Event, data: &mut T, env: &Env) {
match event {
Event::Timer(t) if Some(*t) == self.timer => {
ctx.submit_command(command::PLAYER_TICK);
self.timer = Some(ctx.request_timer(Duration::SECOND));
}
Event::Command(c) if c.is(command::SONG_PLAY) => {
self.timer = Some(ctx.request_timer(Duration::SECOND));
}
_ => {}
}
if let Event::WindowConnected = event {
ctx.request_focus();
}
child.event(ctx, event, data, env);
}
}
use tracing_subscriber::prelude::*;
let filter_layer = tracing_subscriber::filter::LevelFilter::DEBUG;
let fmt_layer = tracing_subscriber::fmt::layer()
// Display target (eg "my_crate::some_mod::submod") with logs
.with_target(true)
.without_time()
.with_filter(tracing_subscriber::filter::filter_fn(|metadata| {
metadata.target().starts_with("tf")
}));
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();