feat: 重构下载模块,将文件下载到临时目录再最终移动至目标路径 (#495)

This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2025-10-13 01:59:50 +08:00
committed by GitHub
parent eb2606f120
commit de702435af
6 changed files with 145 additions and 95 deletions

40
Cargo.lock generated
View File

@@ -186,6 +186,16 @@ dependencies = [
"syn 2.0.96",
]
[[package]]
name = "async-tempfile"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8a57b75c36e16f4d015e60e6a177552976a99b6947724403c551bcfa7cb1207"
dependencies = [
"tokio",
"uuid",
]
[[package]]
name = "async-trait"
version = "0.1.85"
@@ -346,6 +356,7 @@ dependencies = [
"arc-swap",
"assert_matches",
"async-stream",
"async-tempfile",
"axum",
"base64",
"bili_sync_entity",
@@ -1046,12 +1057,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
[[package]]
name = "errno"
version = "0.3.8"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -1078,9 +1089,9 @@ dependencies = [
[[package]]
name = "fastrand"
version = "2.0.2"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "finl_unicode"
@@ -1869,9 +1880,9 @@ dependencies = [
[[package]]
name = "linux-raw-sys"
version = "0.4.13"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c"
checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039"
[[package]]
name = "litemap"
@@ -2866,15 +2877,15 @@ checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustix"
version = "0.38.32"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89"
checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e"
dependencies = [
"bitflags 2.5.0",
"errno",
"libc",
"linux-raw-sys",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -3693,14 +3704,15 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tempfile"
version = "3.10.1"
version = "3.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85b77fafb263dd9d05cbeac119526425676db3784113aa9295c88498cbf8bff1"
checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16"
dependencies = [
"cfg-if",
"fastrand",
"getrandom 0.3.3",
"once_cell",
"rustix",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]

View File

@@ -19,6 +19,7 @@ anyhow = { version = "1.0.100", features = ["backtrace"] }
arc-swap = { version = "1.7.1", features = ["serde"] }
assert_matches = "1.5.0"
async-stream = "0.3.6"
async-tempfile = {version = "0.7.0", features = ["uuid"]}
async-trait = "0.1.89"
axum = { version = "0.8.6", features = ["macros", "ws"] }
base64 = "0.22.1"

View File

@@ -13,6 +13,7 @@ build = "build.rs"
anyhow = { workspace = true }
arc-swap = { workspace = true }
async-stream = { workspace = true }
async-tempfile = { workspace = true }
axum = { workspace = true }
base64 = { workspace = true }
bili_sync_entity = { workspace = true }

View File

@@ -7,7 +7,7 @@ use std::borrow::Cow;
use std::path::Path;
use std::pin::Pin;
use anyhow::Result;
use anyhow::{Context, Result};
use chrono::Utc;
use enum_dispatch::enum_dispatch;
use futures::Stream;
@@ -109,6 +109,17 @@ pub trait VideoSource {
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + Send + 'a>>,
)>;
async fn create_dir_all(&self) -> Result<()> {
let video_source_path = self.path();
tokio::fs::create_dir_all(video_source_path).await.with_context(|| {
format!(
"failed to create video source directory {}",
video_source_path.display()
)
})?;
Ok(())
}
}
pub enum _ActiveModel {

View File

@@ -4,10 +4,12 @@ use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result, bail, ensure};
use async_tempfile::TempFile;
use futures::TryStreamExt;
use reqwest::{Method, header};
use tokio::fs::{self, File, OpenOptions};
use tokio::fs::{self};
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::process::Command;
use tokio::task::JoinSet;
use tokio_util::io::StreamReader;
@@ -26,14 +28,98 @@ impl Downloader {
}
pub async fn fetch(&self, url: &str, path: &Path) -> Result<()> {
let mut temp_file = TempFile::new().await?;
self.fetch_internal(url, &mut temp_file).await?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
fs::copy(temp_file.file_path(), path).await?;
// temp_file 的 drop 需要 std::fs::remove_file
// 如果交由 rust 自动执行虽然逻辑正确但会略微阻塞异步上下文
// 尽量主动调用,保证正常执行的情况下文件清除操作由 spawn_blocking 在专门线程中完成
temp_file.drop_async().await;
Ok(())
}
pub async fn multi_fetch(&self, urls: &[&str], path: &Path) -> Result<()> {
let temp_file = self.multi_fetch_internal(urls).await?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
fs::copy(temp_file.file_path(), path).await?;
temp_file.drop_async().await;
Ok(())
}
pub async fn multi_fetch_and_merge(&self, video_urls: &[&str], audio_urls: &[&str], path: &Path) -> Result<()> {
let (video_temp_file, audio_temp_file) = tokio::try_join!(
self.multi_fetch_internal(video_urls),
self.multi_fetch_internal(audio_urls)
)?;
let final_temp_file = TempFile::new().await?;
let output = Command::new("ffmpeg")
.args([
"-i",
video_temp_file.file_path().to_string_lossy().as_ref(),
"-i",
audio_temp_file.file_path().to_string_lossy().as_ref(),
"-c",
"copy",
"-strict",
"unofficial",
"-f",
"mp4",
"-y",
final_temp_file.file_path().to_string_lossy().as_ref(),
])
.output()
.await
.context("failed to run ffmpeg")?;
if !output.status.success() {
bail!("ffmpeg error: {}", str::from_utf8(&output.stderr).unwrap_or("unknown"));
}
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
fs::copy(final_temp_file.file_path(), path).await?;
tokio::join!(
video_temp_file.drop_async(),
audio_temp_file.drop_async(),
final_temp_file.drop_async()
);
Ok(())
}
async fn multi_fetch_internal(&self, urls: &[&str]) -> Result<TempFile> {
if urls.is_empty() {
bail!("no urls provided");
}
let mut temp_file = TempFile::new().await?;
for (idx, url) in urls.iter().enumerate() {
match self.fetch_internal(url, &mut temp_file).await {
Ok(_) => return Ok(temp_file),
Err(e) => {
if idx == urls.len() - 1 {
temp_file.drop_async().await;
return Err(e).with_context(|| format!("failed to download file from all {} urls", urls.len()));
}
temp_file.set_len(0).await?;
temp_file.rewind().await?;
}
}
}
unreachable!()
}
async fn fetch_internal(&self, url: &str, file: &mut TempFile) -> Result<()> {
if VersionedConfig::get().load().concurrent_limit.download.enable {
self.fetch_parallel(url, path).await
self.fetch_parallel(url, file).await
} else {
self.fetch_serial(url, path).await
self.fetch_serial(url, file).await
}
}
async fn fetch_serial(&self, url: &str, path: &Path) -> Result<()> {
async fn fetch_serial(&self, url: &str, file: &mut TempFile) -> Result<()> {
let resp = self
.client
.request(Method::GET, url, None)
@@ -41,12 +127,8 @@ impl Downloader {
.await?
.error_for_status()?;
let expected = resp.header_content_length();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let mut file = File::create(path).await?;
let mut stream_reader = StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other));
let received = tokio::io::copy(&mut stream_reader, &mut file).await?;
let received = tokio::io::copy(&mut stream_reader, file).await?;
file.flush().await?;
if let Some(expected) = expected {
ensure!(
@@ -59,7 +141,7 @@ impl Downloader {
Ok(())
}
async fn fetch_parallel(&self, url: &str, path: &Path) -> Result<()> {
async fn fetch_parallel(&self, url: &str, file: &mut TempFile) -> Result<()> {
let (concurrency, threshold) = {
let config = VersionedConfig::get().load();
(
@@ -81,17 +163,11 @@ impl Downloader {
.is_none_or(|v| v.to_str().unwrap_or_default() == "none") // https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Accept-Ranges#none
|| chunk_size < threshold
{
return self.fetch_serial(url, path).await;
return self.fetch_serial(url, file).await;
}
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let file = File::create(path).await?;
file.set_len(file_size).await?;
drop(file);
let mut tasks = JoinSet::new();
let url = Arc::new(url.to_string());
let path = Arc::new(path.to_path_buf());
for i in 0..concurrency {
let start = i as u64 * chunk_size;
let end = if i == concurrency - 1 {
@@ -99,10 +175,10 @@ impl Downloader {
} else {
start + chunk_size
} - 1;
let (url_clone, path_clone, client_clone) = (url.clone(), path.clone(), self.client.clone());
let (url_clone, client_clone) = (url.clone(), self.client.clone());
let mut file_clone = file.open_rw().await?;
tasks.spawn(async move {
let mut file = OpenOptions::new().write(true).open(path_clone.as_ref()).await?;
file.seek(SeekFrom::Start(start)).await?;
file_clone.seek(SeekFrom::Start(start)).await?;
let range_header = format!("bytes={}-{}", start, end);
let resp = client_clone
.request(Method::GET, &url_clone, None)
@@ -119,8 +195,8 @@ impl Downloader {
);
}
let mut stream_reader = StreamReader::new(resp.bytes_stream().map_err(std::io::Error::other));
let received = tokio::io::copy(&mut stream_reader, &mut file).await?;
file.flush().await?;
let received = tokio::io::copy(&mut stream_reader, &mut file_clone).await?;
file_clone.flush().await?;
ensure!(
received == end - start + 1,
"downloaded bytes mismatch: expected {}, got {}",
@@ -135,45 +211,6 @@ impl Downloader {
}
Ok(())
}
pub async fn fetch_with_fallback(&self, urls: &[&str], path: &Path) -> Result<()> {
if urls.is_empty() {
bail!("no urls provided");
}
let mut res = Ok(());
for url in urls {
match self.fetch(url, path).await {
Ok(_) => return Ok(()),
Err(err) => {
res = Err(err);
}
}
}
res.context("failed to download file")
}
pub async fn merge(&self, video_path: &Path, audio_path: &Path, output_path: &Path) -> Result<()> {
let output = tokio::process::Command::new("ffmpeg")
.args([
"-i",
video_path.to_string_lossy().as_ref(),
"-i",
audio_path.to_string_lossy().as_ref(),
"-c",
"copy",
"-strict",
"unofficial",
"-y",
output_path.to_string_lossy().as_ref(),
])
.output()
.await
.context("failed to run ffmpeg")?;
if !output.status.success() {
bail!("ffmpeg error: {}", str::from_utf8(&output.stderr).unwrap_or("unknown"));
}
Ok(())
}
}
/// reqwest.content_length() 居然指的是 body_size 而非 content-length header没办法自己实现一下

View File

@@ -32,6 +32,8 @@ pub async fn process_video_source(
bili_client: &BiliClient,
connection: &DatabaseConnection,
) -> Result<()> {
// 预创建视频源目录,提前检测目录是否可写
video_source.create_dir_all().await?;
// 从参数中获取视频列表的 Model 与视频流
let (video_source, video_streams) = video_source.refresh(bili_client, connection).await?;
// 从视频流中获取新视频的简要信息,写入数据库
@@ -572,32 +574,18 @@ pub async fn fetch_page_video(
.await?
.best_stream(&VersionedConfig::get().load().filter_option)?;
match streams {
BestStream::Mixed(mix_stream) => downloader.fetch_with_fallback(&mix_stream.urls(), page_path).await?,
BestStream::Mixed(mix_stream) => downloader.multi_fetch(&mix_stream.urls(), page_path).await?,
BestStream::VideoAudio {
video: video_stream,
audio: None,
} => downloader.fetch_with_fallback(&video_stream.urls(), page_path).await?,
} => downloader.multi_fetch(&video_stream.urls(), page_path).await?,
BestStream::VideoAudio {
video: video_stream,
audio: Some(audio_stream),
} => {
let (tmp_video_path, tmp_audio_path) = (
page_path.with_extension("tmp_video"),
page_path.with_extension("tmp_audio"),
);
let res = async {
downloader
.fetch_with_fallback(&video_stream.urls(), &tmp_video_path)
.await?;
downloader
.fetch_with_fallback(&audio_stream.urls(), &tmp_audio_path)
.await?;
downloader.merge(&tmp_video_path, &tmp_audio_path, page_path).await
}
.await;
let _ = fs::remove_file(tmp_video_path).await;
let _ = fs::remove_file(tmp_audio_path).await;
res?
downloader
.multi_fetch_and_merge(&video_stream.urls(), &audio_stream.urls(), page_path)
.await?
}
}
Ok(ExecutionStatus::Succeeded)