diff --git a/Cargo.lock b/Cargo.lock index 4563b58..e9c4071 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -186,6 +186,16 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "async-tempfile" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8a57b75c36e16f4d015e60e6a177552976a99b6947724403c551bcfa7cb1207" +dependencies = [ + "tokio", + "uuid", +] + [[package]] name = "async-trait" version = "0.1.85" @@ -346,6 +356,7 @@ dependencies = [ "arc-swap", "assert_matches", "async-stream", + "async-tempfile", "axum", "base64", "bili_sync_entity", @@ -1046,12 +1057,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.8" +version = "0.3.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1078,9 +1089,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.2" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "finl_unicode" @@ -1869,9 +1880,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.13" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" [[package]] name = "litemap" @@ -2866,15 +2877,15 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" [[package]] name = "rustix" -version = "0.38.32" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" +checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" dependencies = [ "bitflags 2.5.0", "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3693,14 +3704,15 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tempfile" -version = "3.10.1" +version = "3.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1" +checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" dependencies = [ - "cfg-if", "fastrand", + "getrandom 0.3.3", + "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d214e82..8651424 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ anyhow = { version = "1.0.100", features = ["backtrace"] } arc-swap = { version = "1.7.1", features = ["serde"] } assert_matches = "1.5.0" async-stream = "0.3.6" +async-tempfile = {version = "0.7.0", features = ["uuid"]} async-trait = "0.1.89" axum = { version = "0.8.6", features = ["macros", "ws"] } base64 = "0.22.1" diff --git a/crates/bili_sync/Cargo.toml b/crates/bili_sync/Cargo.toml index a406d61..de5d35c 100644 --- a/crates/bili_sync/Cargo.toml +++ b/crates/bili_sync/Cargo.toml @@ -13,6 +13,7 @@ build = "build.rs" anyhow = { workspace = true } arc-swap = { workspace = true } async-stream = { workspace = true } +async-tempfile = { workspace = true } axum = { workspace = true } base64 = { workspace = true } bili_sync_entity = { workspace = true } diff --git a/crates/bili_sync/src/adapter/mod.rs b/crates/bili_sync/src/adapter/mod.rs index ab37dbd..b591ec2 100644 --- a/crates/bili_sync/src/adapter/mod.rs +++ b/crates/bili_sync/src/adapter/mod.rs @@ -7,7 +7,7 @@ use std::borrow::Cow; use std::path::Path; use std::pin::Pin; -use anyhow::Result; +use anyhow::{Context, Result}; use chrono::Utc; use enum_dispatch::enum_dispatch; use futures::Stream; @@ -109,6 +109,17 @@ pub trait VideoSource { VideoSourceEnum, Pin> + Send + 'a>>, )>; + + async fn create_dir_all(&self) -> Result<()> { + let video_source_path = self.path(); + tokio::fs::create_dir_all(video_source_path).await.with_context(|| { + format!( + "failed to create video source directory {}", + video_source_path.display() + ) + })?; + Ok(()) + } } pub enum _ActiveModel { diff --git a/crates/bili_sync/src/downloader.rs b/crates/bili_sync/src/downloader.rs index 7976cbe..9c92e1f 100644 --- a/crates/bili_sync/src/downloader.rs +++ b/crates/bili_sync/src/downloader.rs @@ -4,10 +4,12 @@ use std::path::Path; use std::sync::Arc; use anyhow::{Context, Result, bail, ensure}; +use async_tempfile::TempFile; use futures::TryStreamExt; use reqwest::{Method, header}; -use tokio::fs::{self, File, OpenOptions}; +use tokio::fs::{self}; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; +use tokio::process::Command; use tokio::task::JoinSet; use tokio_util::io::StreamReader; @@ -26,14 +28,98 @@ impl Downloader { } pub async fn fetch(&self, url: &str, path: &Path) -> Result<()> { + let mut temp_file = TempFile::new().await?; + self.fetch_internal(url, &mut temp_file).await?; + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + fs::copy(temp_file.file_path(), path).await?; + // temp_file 的 drop 需要 std::fs::remove_file + // 如果交由 rust 自动执行虽然逻辑正确但会略微阻塞异步上下文 + // 尽量主动调用,保证正常执行的情况下文件清除操作由 spawn_blocking 在专门线程中完成 + temp_file.drop_async().await; + Ok(()) + } + + pub async fn multi_fetch(&self, urls: &[&str], path: &Path) -> Result<()> { + let temp_file = self.multi_fetch_internal(urls).await?; + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + fs::copy(temp_file.file_path(), path).await?; + temp_file.drop_async().await; + Ok(()) + } + + pub async fn multi_fetch_and_merge(&self, video_urls: &[&str], audio_urls: &[&str], path: &Path) -> Result<()> { + let (video_temp_file, audio_temp_file) = tokio::try_join!( + self.multi_fetch_internal(video_urls), + self.multi_fetch_internal(audio_urls) + )?; + let final_temp_file = TempFile::new().await?; + let output = Command::new("ffmpeg") + .args([ + "-i", + video_temp_file.file_path().to_string_lossy().as_ref(), + "-i", + audio_temp_file.file_path().to_string_lossy().as_ref(), + "-c", + "copy", + "-strict", + "unofficial", + "-f", + "mp4", + "-y", + final_temp_file.file_path().to_string_lossy().as_ref(), + ]) + .output() + .await + .context("failed to run ffmpeg")?; + if !output.status.success() { + bail!("ffmpeg error: {}", str::from_utf8(&output.stderr).unwrap_or("unknown")); + } + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).await?; + } + fs::copy(final_temp_file.file_path(), path).await?; + tokio::join!( + video_temp_file.drop_async(), + audio_temp_file.drop_async(), + final_temp_file.drop_async() + ); + Ok(()) + } + + async fn multi_fetch_internal(&self, urls: &[&str]) -> Result { + if urls.is_empty() { + bail!("no urls provided"); + } + let mut temp_file = TempFile::new().await?; + for (idx, url) in urls.iter().enumerate() { + match self.fetch_internal(url, &mut temp_file).await { + Ok(_) => return Ok(temp_file), + Err(e) => { + if idx == urls.len() - 1 { + temp_file.drop_async().await; + return Err(e).with_context(|| format!("failed to download file from all {} urls", urls.len())); + } + temp_file.set_len(0).await?; + temp_file.rewind().await?; + } + } + } + unreachable!() + } + + async fn fetch_internal(&self, url: &str, file: &mut TempFile) -> Result<()> { if VersionedConfig::get().load().concurrent_limit.download.enable { - self.fetch_parallel(url, path).await + self.fetch_parallel(url, file).await } else { - self.fetch_serial(url, path).await + self.fetch_serial(url, file).await } } - async fn fetch_serial(&self, url: &str, path: &Path) -> Result<()> { + async fn fetch_serial(&self, url: &str, file: &mut TempFile) -> Result<()> { let resp = self .client .request(Method::GET, url, None) @@ -41,12 +127,8 @@ impl Downloader { .await? .error_for_status()?; let expected = resp.header_content_length(); - if let Some(parent) = path.parent() { - fs::create_dir_all(parent).await?; - } - let mut file = File::create(path).await?; let mut stream_reader = StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other)); - let received = tokio::io::copy(&mut stream_reader, &mut file).await?; + let received = tokio::io::copy(&mut stream_reader, file).await?; file.flush().await?; if let Some(expected) = expected { ensure!( @@ -59,7 +141,7 @@ impl Downloader { Ok(()) } - async fn fetch_parallel(&self, url: &str, path: &Path) -> Result<()> { + async fn fetch_parallel(&self, url: &str, file: &mut TempFile) -> Result<()> { let (concurrency, threshold) = { let config = VersionedConfig::get().load(); ( @@ -81,17 +163,11 @@ impl Downloader { .is_none_or(|v| v.to_str().unwrap_or_default() == "none") // https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Accept-Ranges#none || chunk_size < threshold { - return self.fetch_serial(url, path).await; + return self.fetch_serial(url, file).await; } - if let Some(parent) = path.parent() { - fs::create_dir_all(parent).await?; - } - let file = File::create(path).await?; file.set_len(file_size).await?; - drop(file); let mut tasks = JoinSet::new(); let url = Arc::new(url.to_string()); - let path = Arc::new(path.to_path_buf()); for i in 0..concurrency { let start = i as u64 * chunk_size; let end = if i == concurrency - 1 { @@ -99,10 +175,10 @@ impl Downloader { } else { start + chunk_size } - 1; - let (url_clone, path_clone, client_clone) = (url.clone(), path.clone(), self.client.clone()); + let (url_clone, client_clone) = (url.clone(), self.client.clone()); + let mut file_clone = file.open_rw().await?; tasks.spawn(async move { - let mut file = OpenOptions::new().write(true).open(path_clone.as_ref()).await?; - file.seek(SeekFrom::Start(start)).await?; + file_clone.seek(SeekFrom::Start(start)).await?; let range_header = format!("bytes={}-{}", start, end); let resp = client_clone .request(Method::GET, &url_clone, None) @@ -119,8 +195,8 @@ impl Downloader { ); } let mut stream_reader = StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other)); - let received = tokio::io::copy(&mut stream_reader, &mut file).await?; - file.flush().await?; + let received = tokio::io::copy(&mut stream_reader, &mut file_clone).await?; + file_clone.flush().await?; ensure!( received == end - start + 1, "downloaded bytes mismatch: expected {}, got {}", @@ -135,45 +211,6 @@ impl Downloader { } Ok(()) } - - pub async fn fetch_with_fallback(&self, urls: &[&str], path: &Path) -> Result<()> { - if urls.is_empty() { - bail!("no urls provided"); - } - let mut res = Ok(()); - for url in urls { - match self.fetch(url, path).await { - Ok(_) => return Ok(()), - Err(err) => { - res = Err(err); - } - } - } - res.context("failed to download file") - } - - pub async fn merge(&self, video_path: &Path, audio_path: &Path, output_path: &Path) -> Result<()> { - let output = tokio::process::Command::new("ffmpeg") - .args([ - "-i", - video_path.to_string_lossy().as_ref(), - "-i", - audio_path.to_string_lossy().as_ref(), - "-c", - "copy", - "-strict", - "unofficial", - "-y", - output_path.to_string_lossy().as_ref(), - ]) - .output() - .await - .context("failed to run ffmpeg")?; - if !output.status.success() { - bail!("ffmpeg error: {}", str::from_utf8(&output.stderr).unwrap_or("unknown")); - } - Ok(()) - } } /// reqwest.content_length() 居然指的是 body_size 而非 content-length header,没办法自己实现一下 diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 3aac61a..f78e1fd 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -32,6 +32,8 @@ pub async fn process_video_source( bili_client: &BiliClient, connection: &DatabaseConnection, ) -> Result<()> { + // 预创建视频源目录,提前检测目录是否可写 + video_source.create_dir_all().await?; // 从参数中获取视频列表的 Model 与视频流 let (video_source, video_streams) = video_source.refresh(bili_client, connection).await?; // 从视频流中获取新视频的简要信息,写入数据库 @@ -572,32 +574,18 @@ pub async fn fetch_page_video( .await? .best_stream(&VersionedConfig::get().load().filter_option)?; match streams { - BestStream::Mixed(mix_stream) => downloader.fetch_with_fallback(&mix_stream.urls(), page_path).await?, + BestStream::Mixed(mix_stream) => downloader.multi_fetch(&mix_stream.urls(), page_path).await?, BestStream::VideoAudio { video: video_stream, audio: None, - } => downloader.fetch_with_fallback(&video_stream.urls(), page_path).await?, + } => downloader.multi_fetch(&video_stream.urls(), page_path).await?, BestStream::VideoAudio { video: video_stream, audio: Some(audio_stream), } => { - let (tmp_video_path, tmp_audio_path) = ( - page_path.with_extension("tmp_video"), - page_path.with_extension("tmp_audio"), - ); - let res = async { - downloader - .fetch_with_fallback(&video_stream.urls(), &tmp_video_path) - .await?; - downloader - .fetch_with_fallback(&audio_stream.urls(), &tmp_audio_path) - .await?; - downloader.merge(&tmp_video_path, &tmp_audio_path, page_path).await - } - .await; - let _ = fs::remove_file(tmp_video_path).await; - let _ = fs::remove_file(tmp_audio_path).await; - res? + downloader + .multi_fetch_and_merge(&video_stream.urls(), &audio_stream.urls(), page_path) + .await? } } Ok(ExecutionStatus::Succeeded)