feat: 在状态更新时忽略掉一些常见的错误 (#259)

This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2025-02-18 22:22:29 +08:00
committed by GitHub
parent e12a9cda95
commit 315ad13703
3 changed files with 129 additions and 51 deletions

View File

@@ -1,3 +1,6 @@
use std::io;
use anyhow::Result;
use thiserror::Error;
#[derive(Error, Debug)]
@@ -7,3 +10,34 @@ pub struct DownloadAbortError();
#[derive(Error, Debug)]
#[error("Process page error")]
pub struct ProcessPageError();
pub enum ExecutionStatus {
Skipped,
Succeeded,
Ignored(anyhow::Error),
Failed(anyhow::Error),
}
// 目前 stable rust 似乎不支持自定义类型使用 ? 运算符,只能先在返回值使用 Result再这样套层娃
impl From<Result<ExecutionStatus>> for ExecutionStatus {
fn from(res: Result<ExecutionStatus>) -> Self {
match res {
Ok(status) => status,
Err(err) => {
// error decoding response body
if let Some(error) = err.downcast_ref::<reqwest::Error>() {
if error.is_decode() {
return ExecutionStatus::Ignored(err);
}
}
// 文件系统的权限错误
if let Some(error) = err.downcast_ref::<io::Error>() {
if error.kind() == io::ErrorKind::PermissionDenied {
return ExecutionStatus::Ignored(err);
}
}
ExecutionStatus::Failed(err)
}
}
}
}

View File

@@ -1,4 +1,4 @@
use anyhow::Result;
use crate::error::ExecutionStatus;
pub(super) static STATUS_MAX_RETRY: u32 = 0b100;
pub(super) static STATUS_OK: u32 = 0b111;
@@ -57,7 +57,7 @@ impl<const N: usize> Status<N> {
/// 根据任务结果更新状态,任务结果是一个 Result 数组,需要与子任务一一对应
/// 如果所有子任务都已经完成,那么打上最高位的完成标记
pub fn update_status(&mut self, result: &[Result<()>]) {
pub fn update_status(&mut self, result: &[ExecutionStatus]) {
assert!(result.len() == N, "result length should be equal to N");
for (i, res) in result.iter().enumerate() {
self.set_result(res, i);
@@ -105,11 +105,12 @@ impl<const N: usize> Status<N> {
/// 根据子任务执行结果更新子任务的状态
/// 如果 Result 是 Ok那么认为任务执行成功将状态设置为 STATUS_OK
/// 如果 Result 是 Err那么认为任务执行失败将状态加一
fn set_result(&mut self, result: &Result<()>, offset: usize) {
fn set_result(&mut self, result: &ExecutionStatus, offset: usize) {
if self.get_status(offset) < STATUS_MAX_RETRY {
match result {
Ok(_) => self.set_ok(offset),
Err(_) => self.plus_one(offset),
ExecutionStatus::Succeeded | ExecutionStatus::Skipped => self.set_ok(offset),
ExecutionStatus::Failed(_) => self.plus_one(offset),
ExecutionStatus::Ignored(_) => {}
}
}
}
@@ -168,10 +169,18 @@ mod test {
let mut status = Status::<3>::default();
assert_eq!(status.should_run(), [true, true, true]);
for _ in 0..3 {
status.update_status(&[Err(anyhow!("")), Ok(()), Ok(())]);
status.update_status(&[
ExecutionStatus::Failed(anyhow!("")),
ExecutionStatus::Succeeded,
ExecutionStatus::Succeeded,
]);
assert_eq!(status.should_run(), [true, false, false]);
}
status.update_status(&[Err(anyhow!("")), Ok(()), Ok(())]);
status.update_status(&[
ExecutionStatus::Failed(anyhow!("")),
ExecutionStatus::Succeeded,
ExecutionStatus::Succeeded,
]);
assert_eq!(status.should_run(), [false, false, false]);
}
@@ -189,7 +198,11 @@ mod test {
let testcases = [([0, 0, 1], [1, 7, 7]), ([3, 4, 3], [4, 4, 7]), ([3, 1, 7], [4, 7, 7])];
for (before, after) in testcases.iter() {
let mut status = Status::<3>::from(before.clone());
status.update_status(&[Err(anyhow!("")), Ok(()), Ok(())]);
status.update_status(&[
ExecutionStatus::Failed(anyhow!("")),
ExecutionStatus::Succeeded,
ExecutionStatus::Succeeded,
]);
assert_eq!(<[u32; 3]>::from(status), *after);
}
}

View File

@@ -16,7 +16,7 @@ use crate::adapter::{video_list_from, Args, VideoListModel, VideoListModelEnum};
use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Video, VideoInfo};
use crate::config::{PathSafeTemplate, ARGS, CONFIG, TEMPLATE};
use crate::downloader::Downloader;
use crate::error::{DownloadAbortError, ProcessPageError};
use crate::error::{DownloadAbortError, ExecutionStatus, ProcessPageError};
use crate::utils::format_arg::{page_format_args, video_format_args};
use crate::utils::model::{
create_pages, create_videos, filter_unfilled_videos, filter_unhandled_video_pages, update_pages_model,
@@ -219,7 +219,7 @@ pub async fn download_video_pages(
let is_single_page = video_model.single_page.context("single_page is null")?;
// 对于单页视频page 的下载已经足够
// 对于多页视频page 下载仅包含了分集内容,需要额外补上视频的 poster 的 tvshow.nfo
let tasks: Vec<Pin<Box<dyn Future<Output = Result<()>> + Send>>> = vec![
let tasks: Vec<Pin<Box<dyn Future<Output = Result<ExecutionStatus>> + Send>>> = vec![
// 下载视频封面
Box::pin(fetch_video_poster(
separate_status[0] && !is_single_page,
@@ -259,17 +259,24 @@ pub async fn download_video_pages(
)),
];
let tasks: FuturesOrdered<_> = tasks.into_iter().collect();
let results: Vec<Result<()>> = tasks.collect().await;
let results: Vec<ExecutionStatus> = tasks.collect::<Vec<_>>().await.into_iter().map(Into::into).collect();
status.update_status(&results);
results
.iter()
.take(4)
.zip(["封面", "详情", "作者头像", "作者详情"])
.for_each(|(res, task_name)| match res {
Ok(_) => info!("处理视频「{}」{}成功", &video_model.name, task_name),
Err(e) => error!("处理视频「{}」{}失败: {}", &video_model.name, task_name, e),
ExecutionStatus::Skipped => info!("处理视频「{}」{}成功过,跳过", &video_model.name, task_name),
ExecutionStatus::Succeeded => info!("处理视频「{}」{}成功", &video_model.name, task_name),
ExecutionStatus::Ignored(e) => {
error!(
"处理视频「{}」{}出现常见错误,已忽略: {}",
&video_model.name, task_name, e
)
}
ExecutionStatus::Failed(e) => error!("处理视频「{}」{}失败: {}", &video_model.name, task_name, e),
});
if let Err(e) = results.into_iter().nth(4).context("page download result not found")? {
if let ExecutionStatus::Failed(e) = results.into_iter().nth(4).context("page download result not found")? {
if e.downcast_ref::<DownloadAbortError>().is_some() {
return Err(e);
}
@@ -289,9 +296,9 @@ pub async fn dispatch_download_page(
connection: &DatabaseConnection,
downloader: &Downloader,
base_path: &Path,
) -> Result<()> {
) -> Result<ExecutionStatus> {
if !should_run {
return Ok(());
return Ok(ExecutionStatus::Skipped);
}
let child_semaphore = Semaphore::new(CONFIG.concurrent_limit.page);
let tasks = pages
@@ -346,7 +353,7 @@ pub async fn dispatch_download_page(
);
bail!(ProcessPageError());
}
Ok(())
Ok(ExecutionStatus::Succeeded)
}
/// 下载某个分页,未发生风控且正常运行时返回 Ok(Page::ActiveModel),其中 status 字段存储了新的下载状态,发生风控时返回 DownloadAbortError
@@ -407,7 +414,7 @@ pub async fn download_page(
dimension,
..Default::default()
};
let tasks: Vec<Pin<Box<dyn Future<Output = Result<()>> + Send>>> = vec![
let tasks: Vec<Pin<Box<dyn Future<Output = Result<ExecutionStatus>> + Send>>> = vec![
Box::pin(fetch_page_poster(
separate_status[0],
video_model,
@@ -446,23 +453,33 @@ pub async fn download_page(
)),
];
let tasks: FuturesOrdered<_> = tasks.into_iter().collect();
let results: Vec<Result<()>> = tasks.collect().await;
let results: Vec<ExecutionStatus> = tasks.collect::<Vec<_>>().await.into_iter().map(Into::into).collect();
status.update_status(&results);
results
.iter()
.zip(["封面", "视频", "详情", "弹幕", "字幕"])
.for_each(|(res, task_name)| match res {
Ok(_) => info!(
ExecutionStatus::Skipped => info!(
"处理视频「{}」第 {} 页{}已成功过,跳过",
&video_model.name, page_model.pid, task_name
),
ExecutionStatus::Succeeded => info!(
"处理视频「{}」第 {} 页{}成功",
&video_model.name, page_model.pid, task_name
),
Err(e) => error!(
ExecutionStatus::Ignored(e) => {
error!(
"处理视频「{}」第 {} 页{}出现常见错误,已忽略: {}",
&video_model.name, page_model.pid, task_name, e
)
}
ExecutionStatus::Failed(e) => error!(
"处理视频「{}」第 {} 页{}失败: {}",
&video_model.name, page_model.pid, task_name, e
),
});
// 如果下载视频时触发风控,直接返回 DownloadAbortError
if let Err(e) = results.into_iter().nth(1).context("video download result not found")? {
if let ExecutionStatus::Failed(e) = results.into_iter().nth(1).context("video download result not found")? {
if let Ok(BiliError::RiskControlOccurred) = e.downcast::<BiliError>() {
bail!(DownloadAbortError());
}
@@ -480,9 +497,9 @@ pub async fn fetch_page_poster(
downloader: &Downloader,
poster_path: PathBuf,
fanart_path: Option<PathBuf>,
) -> Result<()> {
) -> Result<ExecutionStatus> {
if !should_run {
return Ok(());
return Ok(ExecutionStatus::Skipped);
}
let single_page = video_model.single_page.context("single_page is null")?;
let url = if single_page {
@@ -499,7 +516,7 @@ pub async fn fetch_page_poster(
if let Some(fanart_path) = fanart_path {
fs::copy(&poster_path, &fanart_path).await?;
}
Ok(())
Ok(ExecutionStatus::Succeeded)
}
pub async fn fetch_page_video(
@@ -509,9 +526,9 @@ pub async fn fetch_page_video(
downloader: &Downloader,
page_info: &PageInfo,
page_path: &Path,
) -> Result<()> {
) -> Result<ExecutionStatus> {
if !should_run {
return Ok(());
return Ok(ExecutionStatus::Skipped);
}
let bili_video = Video::new(bili_client, video_model.bvid.clone());
let streams = bili_video
@@ -519,11 +536,11 @@ pub async fn fetch_page_video(
.await?
.best_stream(&CONFIG.filter_option)?;
match streams {
BestStream::Mixed(mix_stream) => downloader.fetch(mix_stream.url(), page_path).await,
BestStream::Mixed(mix_stream) => downloader.fetch(mix_stream.url(), page_path).await?,
BestStream::VideoAudio {
video: video_stream,
audio: None,
} => downloader.fetch(video_stream.url(), page_path).await,
} => downloader.fetch(video_stream.url(), page_path).await?,
BestStream::VideoAudio {
video: video_stream,
audio: Some(audio_stream),
@@ -540,9 +557,10 @@ pub async fn fetch_page_video(
.await;
let _ = fs::remove_file(tmp_video_path).await;
let _ = fs::remove_file(tmp_audio_path).await;
res
res?
}
}
Ok(ExecutionStatus::Succeeded)
}
pub async fn fetch_page_danmaku(
@@ -551,16 +569,17 @@ pub async fn fetch_page_danmaku(
video_model: &video::Model,
page_info: &PageInfo,
danmaku_path: PathBuf,
) -> Result<()> {
) -> Result<ExecutionStatus> {
if !should_run {
return Ok(());
return Ok(ExecutionStatus::Skipped);
}
let bili_video = Video::new(bili_client, video_model.bvid.clone());
bili_video
.get_danmaku_writer(page_info)
.await?
.write(danmaku_path)
.await
.await?;
Ok(ExecutionStatus::Succeeded)
}
pub async fn fetch_page_subtitle(
@@ -569,9 +588,9 @@ pub async fn fetch_page_subtitle(
video_model: &video::Model,
page_info: &PageInfo,
subtitle_path: &Path,
) -> Result<()> {
) -> Result<ExecutionStatus> {
if !should_run {
return Ok(());
return Ok(ExecutionStatus::Skipped);
}
let bili_video = Video::new(bili_client, video_model.bvid.clone());
let subtitles = bili_video.get_subtitles(page_info).await?;
@@ -583,7 +602,7 @@ pub async fn fetch_page_subtitle(
})
.collect::<FuturesUnordered<_>>();
tasks.try_collect::<Vec<()>>().await?;
Ok(())
Ok(ExecutionStatus::Succeeded)
}
pub async fn generate_page_nfo(
@@ -591,9 +610,9 @@ pub async fn generate_page_nfo(
video_model: &video::Model,
page_model: &page::Model,
nfo_path: PathBuf,
) -> Result<()> {
) -> Result<ExecutionStatus> {
if !should_run {
return Ok(());
return Ok(ExecutionStatus::Skipped);
}
let single_page = video_model.single_page.context("single_page is null")?;
let nfo_serializer = if single_page {
@@ -601,7 +620,8 @@ pub async fn generate_page_nfo(
} else {
NFOSerializer(ModelWrapper::Page(page_model), NFOMode::EPOSODE)
};
generate_nfo(nfo_serializer, nfo_path).await
generate_nfo(nfo_serializer, nfo_path).await?;
Ok(ExecutionStatus::Succeeded)
}
pub async fn fetch_video_poster(
@@ -610,13 +630,13 @@ pub async fn fetch_video_poster(
downloader: &Downloader,
poster_path: PathBuf,
fanart_path: PathBuf,
) -> Result<()> {
) -> Result<ExecutionStatus> {
if !should_run {
return Ok(());
return Ok(ExecutionStatus::Skipped);
}
downloader.fetch(&video_model.cover, &poster_path).await?;
fs::copy(&poster_path, &fanart_path).await?;
Ok(())
Ok(ExecutionStatus::Succeeded)
}
pub async fn fetch_upper_face(
@@ -624,27 +644,38 @@ pub async fn fetch_upper_face(
video_model: &video::Model,
downloader: &Downloader,
upper_face_path: PathBuf,
) -> Result<()> {
) -> Result<ExecutionStatus> {
if !should_run {
return Ok(());
return Ok(ExecutionStatus::Skipped);
}
downloader.fetch(&video_model.upper_face, &upper_face_path).await
downloader.fetch(&video_model.upper_face, &upper_face_path).await?;
Ok(ExecutionStatus::Succeeded)
}
pub async fn generate_upper_nfo(should_run: bool, video_model: &video::Model, nfo_path: PathBuf) -> Result<()> {
pub async fn generate_upper_nfo(
should_run: bool,
video_model: &video::Model,
nfo_path: PathBuf,
) -> Result<ExecutionStatus> {
if !should_run {
return Ok(());
return Ok(ExecutionStatus::Skipped);
}
let nfo_serializer = NFOSerializer(ModelWrapper::Video(video_model), NFOMode::UPPER);
generate_nfo(nfo_serializer, nfo_path).await
generate_nfo(nfo_serializer, nfo_path).await?;
Ok(ExecutionStatus::Succeeded)
}
pub async fn generate_video_nfo(should_run: bool, video_model: &video::Model, nfo_path: PathBuf) -> Result<()> {
pub async fn generate_video_nfo(
should_run: bool,
video_model: &video::Model,
nfo_path: PathBuf,
) -> Result<ExecutionStatus> {
if !should_run {
return Ok(());
return Ok(ExecutionStatus::Skipped);
}
let nfo_serializer = NFOSerializer(ModelWrapper::Video(video_model), NFOMode::TVSHOW);
generate_nfo(nfo_serializer, nfo_path).await
generate_nfo(nfo_serializer, nfo_path).await?;
Ok(ExecutionStatus::Succeeded)
}
/// 创建 nfo_path 的父目录,然后写入 nfo 文件