diff --git a/crates/bili_sync/src/error.rs b/crates/bili_sync/src/error.rs index 5d415c1..f16a098 100644 --- a/crates/bili_sync/src/error.rs +++ b/crates/bili_sync/src/error.rs @@ -1,3 +1,6 @@ +use std::io; + +use anyhow::Result; use thiserror::Error; #[derive(Error, Debug)] @@ -7,3 +10,34 @@ pub struct DownloadAbortError(); #[derive(Error, Debug)] #[error("Process page error")] pub struct ProcessPageError(); + +pub enum ExecutionStatus { + Skipped, + Succeeded, + Ignored(anyhow::Error), + Failed(anyhow::Error), +} + +// 目前 stable rust 似乎不支持自定义类型使用 ? 运算符,只能先在返回值使用 Result,再这样套层娃 +impl From> for ExecutionStatus { + fn from(res: Result) -> Self { + match res { + Ok(status) => status, + Err(err) => { + // error decoding response body + if let Some(error) = err.downcast_ref::() { + if error.is_decode() { + return ExecutionStatus::Ignored(err); + } + } + // 文件系统的权限错误 + if let Some(error) = err.downcast_ref::() { + if error.kind() == io::ErrorKind::PermissionDenied { + return ExecutionStatus::Ignored(err); + } + } + ExecutionStatus::Failed(err) + } + } + } +} diff --git a/crates/bili_sync/src/utils/status.rs b/crates/bili_sync/src/utils/status.rs index 2efcf93..7e7fdd9 100644 --- a/crates/bili_sync/src/utils/status.rs +++ b/crates/bili_sync/src/utils/status.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use crate::error::ExecutionStatus; pub(super) static STATUS_MAX_RETRY: u32 = 0b100; pub(super) static STATUS_OK: u32 = 0b111; @@ -57,7 +57,7 @@ impl Status { /// 根据任务结果更新状态,任务结果是一个 Result 数组,需要与子任务一一对应 /// 如果所有子任务都已经完成,那么打上最高位的完成标记 - pub fn update_status(&mut self, result: &[Result<()>]) { + pub fn update_status(&mut self, result: &[ExecutionStatus]) { assert!(result.len() == N, "result length should be equal to N"); for (i, res) in result.iter().enumerate() { self.set_result(res, i); @@ -105,11 +105,12 @@ impl Status { /// 根据子任务执行结果更新子任务的状态 /// 如果 Result 是 Ok,那么认为任务执行成功,将状态设置为 STATUS_OK /// 如果 Result 是 Err,那么认为任务执行失败,将状态加一 - fn set_result(&mut self, result: &Result<()>, offset: usize) { + fn set_result(&mut self, result: &ExecutionStatus, offset: usize) { if self.get_status(offset) < STATUS_MAX_RETRY { match result { - Ok(_) => self.set_ok(offset), - Err(_) => self.plus_one(offset), + ExecutionStatus::Succeeded | ExecutionStatus::Skipped => self.set_ok(offset), + ExecutionStatus::Failed(_) => self.plus_one(offset), + ExecutionStatus::Ignored(_) => {} } } } @@ -168,10 +169,18 @@ mod test { let mut status = Status::<3>::default(); assert_eq!(status.should_run(), [true, true, true]); for _ in 0..3 { - status.update_status(&[Err(anyhow!("")), Ok(()), Ok(())]); + status.update_status(&[ + ExecutionStatus::Failed(anyhow!("")), + ExecutionStatus::Succeeded, + ExecutionStatus::Succeeded, + ]); assert_eq!(status.should_run(), [true, false, false]); } - status.update_status(&[Err(anyhow!("")), Ok(()), Ok(())]); + status.update_status(&[ + ExecutionStatus::Failed(anyhow!("")), + ExecutionStatus::Succeeded, + ExecutionStatus::Succeeded, + ]); assert_eq!(status.should_run(), [false, false, false]); } @@ -189,7 +198,11 @@ mod test { let testcases = [([0, 0, 1], [1, 7, 7]), ([3, 4, 3], [4, 4, 7]), ([3, 1, 7], [4, 7, 7])]; for (before, after) in testcases.iter() { let mut status = Status::<3>::from(before.clone()); - status.update_status(&[Err(anyhow!("")), Ok(()), Ok(())]); + status.update_status(&[ + ExecutionStatus::Failed(anyhow!("")), + ExecutionStatus::Succeeded, + ExecutionStatus::Succeeded, + ]); assert_eq!(<[u32; 3]>::from(status), *after); } } diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index a13908e..2a9f29f 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -16,7 +16,7 @@ use crate::adapter::{video_list_from, Args, VideoListModel, VideoListModelEnum}; use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Video, VideoInfo}; use crate::config::{PathSafeTemplate, ARGS, CONFIG, TEMPLATE}; use crate::downloader::Downloader; -use crate::error::{DownloadAbortError, ProcessPageError}; +use crate::error::{DownloadAbortError, ExecutionStatus, ProcessPageError}; use crate::utils::format_arg::{page_format_args, video_format_args}; use crate::utils::model::{ create_pages, create_videos, filter_unfilled_videos, filter_unhandled_video_pages, update_pages_model, @@ -219,7 +219,7 @@ pub async fn download_video_pages( let is_single_page = video_model.single_page.context("single_page is null")?; // 对于单页视频,page 的下载已经足够 // 对于多页视频,page 下载仅包含了分集内容,需要额外补上视频的 poster 的 tvshow.nfo - let tasks: Vec> + Send>>> = vec![ + let tasks: Vec> + Send>>> = vec![ // 下载视频封面 Box::pin(fetch_video_poster( separate_status[0] && !is_single_page, @@ -259,17 +259,24 @@ pub async fn download_video_pages( )), ]; let tasks: FuturesOrdered<_> = tasks.into_iter().collect(); - let results: Vec> = tasks.collect().await; + let results: Vec = tasks.collect::>().await.into_iter().map(Into::into).collect(); status.update_status(&results); results .iter() .take(4) .zip(["封面", "详情", "作者头像", "作者详情"]) .for_each(|(res, task_name)| match res { - Ok(_) => info!("处理视频「{}」{}成功", &video_model.name, task_name), - Err(e) => error!("处理视频「{}」{}失败: {}", &video_model.name, task_name, e), + ExecutionStatus::Skipped => info!("处理视频「{}」{}已成功过,跳过", &video_model.name, task_name), + ExecutionStatus::Succeeded => info!("处理视频「{}」{}成功", &video_model.name, task_name), + ExecutionStatus::Ignored(e) => { + error!( + "处理视频「{}」{}出现常见错误,已忽略: {}", + &video_model.name, task_name, e + ) + } + ExecutionStatus::Failed(e) => error!("处理视频「{}」{}失败: {}", &video_model.name, task_name, e), }); - if let Err(e) = results.into_iter().nth(4).context("page download result not found")? { + if let ExecutionStatus::Failed(e) = results.into_iter().nth(4).context("page download result not found")? { if e.downcast_ref::().is_some() { return Err(e); } @@ -289,9 +296,9 @@ pub async fn dispatch_download_page( connection: &DatabaseConnection, downloader: &Downloader, base_path: &Path, -) -> Result<()> { +) -> Result { if !should_run { - return Ok(()); + return Ok(ExecutionStatus::Skipped); } let child_semaphore = Semaphore::new(CONFIG.concurrent_limit.page); let tasks = pages @@ -346,7 +353,7 @@ pub async fn dispatch_download_page( ); bail!(ProcessPageError()); } - Ok(()) + Ok(ExecutionStatus::Succeeded) } /// 下载某个分页,未发生风控且正常运行时返回 Ok(Page::ActiveModel),其中 status 字段存储了新的下载状态,发生风控时返回 DownloadAbortError @@ -407,7 +414,7 @@ pub async fn download_page( dimension, ..Default::default() }; - let tasks: Vec> + Send>>> = vec![ + let tasks: Vec> + Send>>> = vec![ Box::pin(fetch_page_poster( separate_status[0], video_model, @@ -446,23 +453,33 @@ pub async fn download_page( )), ]; let tasks: FuturesOrdered<_> = tasks.into_iter().collect(); - let results: Vec> = tasks.collect().await; + let results: Vec = tasks.collect::>().await.into_iter().map(Into::into).collect(); status.update_status(&results); results .iter() .zip(["封面", "视频", "详情", "弹幕", "字幕"]) .for_each(|(res, task_name)| match res { - Ok(_) => info!( + ExecutionStatus::Skipped => info!( + "处理视频「{}」第 {} 页{}已成功过,跳过", + &video_model.name, page_model.pid, task_name + ), + ExecutionStatus::Succeeded => info!( "处理视频「{}」第 {} 页{}成功", &video_model.name, page_model.pid, task_name ), - Err(e) => error!( + ExecutionStatus::Ignored(e) => { + error!( + "处理视频「{}」第 {} 页{}出现常见错误,已忽略: {}", + &video_model.name, page_model.pid, task_name, e + ) + } + ExecutionStatus::Failed(e) => error!( "处理视频「{}」第 {} 页{}失败: {}", &video_model.name, page_model.pid, task_name, e ), }); // 如果下载视频时触发风控,直接返回 DownloadAbortError - if let Err(e) = results.into_iter().nth(1).context("video download result not found")? { + if let ExecutionStatus::Failed(e) = results.into_iter().nth(1).context("video download result not found")? { if let Ok(BiliError::RiskControlOccurred) = e.downcast::() { bail!(DownloadAbortError()); } @@ -480,9 +497,9 @@ pub async fn fetch_page_poster( downloader: &Downloader, poster_path: PathBuf, fanart_path: Option, -) -> Result<()> { +) -> Result { if !should_run { - return Ok(()); + return Ok(ExecutionStatus::Skipped); } let single_page = video_model.single_page.context("single_page is null")?; let url = if single_page { @@ -499,7 +516,7 @@ pub async fn fetch_page_poster( if let Some(fanart_path) = fanart_path { fs::copy(&poster_path, &fanart_path).await?; } - Ok(()) + Ok(ExecutionStatus::Succeeded) } pub async fn fetch_page_video( @@ -509,9 +526,9 @@ pub async fn fetch_page_video( downloader: &Downloader, page_info: &PageInfo, page_path: &Path, -) -> Result<()> { +) -> Result { if !should_run { - return Ok(()); + return Ok(ExecutionStatus::Skipped); } let bili_video = Video::new(bili_client, video_model.bvid.clone()); let streams = bili_video @@ -519,11 +536,11 @@ pub async fn fetch_page_video( .await? .best_stream(&CONFIG.filter_option)?; match streams { - BestStream::Mixed(mix_stream) => downloader.fetch(mix_stream.url(), page_path).await, + BestStream::Mixed(mix_stream) => downloader.fetch(mix_stream.url(), page_path).await?, BestStream::VideoAudio { video: video_stream, audio: None, - } => downloader.fetch(video_stream.url(), page_path).await, + } => downloader.fetch(video_stream.url(), page_path).await?, BestStream::VideoAudio { video: video_stream, audio: Some(audio_stream), @@ -540,9 +557,10 @@ pub async fn fetch_page_video( .await; let _ = fs::remove_file(tmp_video_path).await; let _ = fs::remove_file(tmp_audio_path).await; - res + res? } } + Ok(ExecutionStatus::Succeeded) } pub async fn fetch_page_danmaku( @@ -551,16 +569,17 @@ pub async fn fetch_page_danmaku( video_model: &video::Model, page_info: &PageInfo, danmaku_path: PathBuf, -) -> Result<()> { +) -> Result { if !should_run { - return Ok(()); + return Ok(ExecutionStatus::Skipped); } let bili_video = Video::new(bili_client, video_model.bvid.clone()); bili_video .get_danmaku_writer(page_info) .await? .write(danmaku_path) - .await + .await?; + Ok(ExecutionStatus::Succeeded) } pub async fn fetch_page_subtitle( @@ -569,9 +588,9 @@ pub async fn fetch_page_subtitle( video_model: &video::Model, page_info: &PageInfo, subtitle_path: &Path, -) -> Result<()> { +) -> Result { if !should_run { - return Ok(()); + return Ok(ExecutionStatus::Skipped); } let bili_video = Video::new(bili_client, video_model.bvid.clone()); let subtitles = bili_video.get_subtitles(page_info).await?; @@ -583,7 +602,7 @@ pub async fn fetch_page_subtitle( }) .collect::>(); tasks.try_collect::>().await?; - Ok(()) + Ok(ExecutionStatus::Succeeded) } pub async fn generate_page_nfo( @@ -591,9 +610,9 @@ pub async fn generate_page_nfo( video_model: &video::Model, page_model: &page::Model, nfo_path: PathBuf, -) -> Result<()> { +) -> Result { if !should_run { - return Ok(()); + return Ok(ExecutionStatus::Skipped); } let single_page = video_model.single_page.context("single_page is null")?; let nfo_serializer = if single_page { @@ -601,7 +620,8 @@ pub async fn generate_page_nfo( } else { NFOSerializer(ModelWrapper::Page(page_model), NFOMode::EPOSODE) }; - generate_nfo(nfo_serializer, nfo_path).await + generate_nfo(nfo_serializer, nfo_path).await?; + Ok(ExecutionStatus::Succeeded) } pub async fn fetch_video_poster( @@ -610,13 +630,13 @@ pub async fn fetch_video_poster( downloader: &Downloader, poster_path: PathBuf, fanart_path: PathBuf, -) -> Result<()> { +) -> Result { if !should_run { - return Ok(()); + return Ok(ExecutionStatus::Skipped); } downloader.fetch(&video_model.cover, &poster_path).await?; fs::copy(&poster_path, &fanart_path).await?; - Ok(()) + Ok(ExecutionStatus::Succeeded) } pub async fn fetch_upper_face( @@ -624,27 +644,38 @@ pub async fn fetch_upper_face( video_model: &video::Model, downloader: &Downloader, upper_face_path: PathBuf, -) -> Result<()> { +) -> Result { if !should_run { - return Ok(()); + return Ok(ExecutionStatus::Skipped); } - downloader.fetch(&video_model.upper_face, &upper_face_path).await + downloader.fetch(&video_model.upper_face, &upper_face_path).await?; + Ok(ExecutionStatus::Succeeded) } -pub async fn generate_upper_nfo(should_run: bool, video_model: &video::Model, nfo_path: PathBuf) -> Result<()> { +pub async fn generate_upper_nfo( + should_run: bool, + video_model: &video::Model, + nfo_path: PathBuf, +) -> Result { if !should_run { - return Ok(()); + return Ok(ExecutionStatus::Skipped); } let nfo_serializer = NFOSerializer(ModelWrapper::Video(video_model), NFOMode::UPPER); - generate_nfo(nfo_serializer, nfo_path).await + generate_nfo(nfo_serializer, nfo_path).await?; + Ok(ExecutionStatus::Succeeded) } -pub async fn generate_video_nfo(should_run: bool, video_model: &video::Model, nfo_path: PathBuf) -> Result<()> { +pub async fn generate_video_nfo( + should_run: bool, + video_model: &video::Model, + nfo_path: PathBuf, +) -> Result { if !should_run { - return Ok(()); + return Ok(ExecutionStatus::Skipped); } let nfo_serializer = NFOSerializer(ModelWrapper::Video(video_model), NFOMode::TVSHOW); - generate_nfo(nfo_serializer, nfo_path).await + generate_nfo(nfo_serializer, nfo_path).await?; + Ok(ExecutionStatus::Succeeded) } /// 创建 nfo_path 的父目录,然后写入 nfo 文件