diff --git a/crates/bili_sync/src/adapter/collection.rs b/crates/bili_sync/src/adapter/collection.rs index 3207864..58ab665 100644 --- a/crates/bili_sync/src/adapter/collection.rs +++ b/crates/bili_sync/src/adapter/collection.rs @@ -12,7 +12,7 @@ use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; use crate::adapter::{helper, VideoListModel}; use crate::bilibili::{self, BiliClient, Collection, CollectionItem, CollectionType, VideoInfo}; -use crate::utils::status::Status; +use crate::utils::status::STATUS_COMPLETED; #[async_trait] impl VideoListModel for collection::Model { @@ -38,7 +38,7 @@ impl VideoListModel for collection::Model { video::Column::CollectionId .eq(self.id) .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(Status::handled())) + .and(video::Column::DownloadStatus.lt(STATUS_COMPLETED)) .and(video::Column::Category.eq(2)) .and(video::Column::SinglePage.is_not_null()) .into_condition(), diff --git a/crates/bili_sync/src/adapter/favorite.rs b/crates/bili_sync/src/adapter/favorite.rs index 692fd80..71b19e5 100644 --- a/crates/bili_sync/src/adapter/favorite.rs +++ b/crates/bili_sync/src/adapter/favorite.rs @@ -12,7 +12,7 @@ use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; use crate::adapter::{helper, VideoListModel}; use crate::bilibili::{self, BiliClient, FavoriteList, VideoInfo}; -use crate::utils::status::Status; +use crate::utils::status::STATUS_COMPLETED; #[async_trait] impl VideoListModel for favorite::Model { @@ -38,7 +38,7 @@ impl VideoListModel for favorite::Model { video::Column::FavoriteId .eq(self.id) .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(Status::handled())) + .and(video::Column::DownloadStatus.lt(STATUS_COMPLETED)) .and(video::Column::Category.eq(2)) .and(video::Column::SinglePage.is_not_null()) .into_condition(), diff --git a/crates/bili_sync/src/adapter/submission.rs b/crates/bili_sync/src/adapter/submission.rs index 731a8f7..db0e845 100644 --- a/crates/bili_sync/src/adapter/submission.rs +++ b/crates/bili_sync/src/adapter/submission.rs @@ -13,7 +13,7 @@ use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; use crate::adapter::helper::video_with_path; use crate::adapter::{helper, VideoListModel}; use crate::bilibili::{self, BiliClient, Submission, VideoInfo}; -use crate::utils::status::Status; +use crate::utils::status::STATUS_COMPLETED; #[async_trait] impl VideoListModel for submission::Model { @@ -39,7 +39,7 @@ impl VideoListModel for submission::Model { video::Column::SubmissionId .eq(self.id) .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(Status::handled())) + .and(video::Column::DownloadStatus.lt(STATUS_COMPLETED)) .and(video::Column::Category.eq(2)) .and(video::Column::SinglePage.is_not_null()) .into_condition(), diff --git a/crates/bili_sync/src/adapter/watch_later.rs b/crates/bili_sync/src/adapter/watch_later.rs index eedf256..87cbbed 100644 --- a/crates/bili_sync/src/adapter/watch_later.rs +++ b/crates/bili_sync/src/adapter/watch_later.rs @@ -13,7 +13,7 @@ use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; use crate::adapter::helper::video_with_path; use crate::adapter::{helper, VideoListModel}; use crate::bilibili::{self, BiliClient, VideoInfo, WatchLater}; -use crate::utils::status::Status; +use crate::utils::status::STATUS_COMPLETED; #[async_trait] impl VideoListModel for watch_later::Model { @@ -39,7 +39,7 @@ impl VideoListModel for watch_later::Model { video::Column::WatchLaterId .eq(self.id) .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(Status::handled())) + .and(video::Column::DownloadStatus.lt(STATUS_COMPLETED)) .and(video::Column::Category.eq(2)) .and(video::Column::SinglePage.is_not_null()) .into_condition(), diff --git a/crates/bili_sync/src/utils/status.rs b/crates/bili_sync/src/utils/status.rs index 2708550..01cc81a 100644 --- a/crates/bili_sync/src/utils/status.rs +++ b/crates/bili_sync/src/utils/status.rs @@ -2,78 +2,85 @@ use anyhow::Result; static STATUS_MAX_RETRY: u32 = 0b100; static STATUS_OK: u32 = 0b111; +pub static STATUS_COMPLETED: u32 = 1 << 31; /// 用来表示下载的状态,不想写太多列了,所以仅使用一个 u32 表示。 -/// 从低位开始,固定每三位表示一种数据的状态,从 0b000 开始,每失败一次加一,最多 0b100(即重试 4 次), -/// 如果成功,将对应的三位设置为 0b111。 -/// 当所有任务都成功或者由于尝试次数过多失败,为 status 最高位打上标记 1,将来不再继续尝试。 +/// 从低位开始,固定每三位表示一种子任务的状态。 +/// 子任务状态从 0b000 开始,每执行失败一次将状态加一,最多 0b100(即允许重试 4 次),该值定义为 STATUS_MAX_RETRY。 +/// 如果子任务执行成功,将状态设置为 0b111,该值定义为 STATUS_OK。 +/// 子任务达到最大失败次数或者执行成功时,认为该子任务已经完成。 +/// 当所有子任务都已经完成时,为最高位打上标记 1,表示整个下载任务已经完成。 #[derive(Clone)] pub struct Status(u32); impl Status { - /// 如果 status 整体大于等于 1 << 31,则表示任务已经被处理过,不再需要重试。 - /// 数据库可以使用 status < Status::handled() 来筛选需要处理的内容。 - pub const fn handled() -> u32 { - 1 << 31 - } - fn new(status: u32) -> Self { Self(status) } - /// 一般仅需要被内部调用,用来设置最高位的标记 - fn set_flag(&mut self, handled: bool) { - if handled { + /// 设置最高位的完成标记 + fn set_completed(&mut self, completed: bool) { + if completed { self.0 |= 1 << 31; } else { self.0 &= !(1 << 31); } } - /// 从低到高检查状态,如果该位置的任务应该继续尝试执行,则返回 true,否则返回 false - fn should_run(&self, size: usize) -> Vec { - (0..size).map(|x| self.check_continue(x)).collect() + // 获取最高位的完成标记 + fn get_completed(&self) -> bool { + self.0 >> 31 == 1 } - /// 如果任务的执行次数小于 STATUS_MAX_RETRY,说明可以继续运行 + /// 获取某个子任务的状态 + fn get_status(&self, offset: usize) -> u32 { + let helper = !0u32; + (self.0 & (helper << (offset * 3)) & (helper >> (32 - 3 * offset - 3))) >> (offset * 3) + } + + // 将某个子任务的状态加一(在任务失败时使用) + fn plus_one(&mut self, offset: usize) { + self.0 += 1 << (3 * offset); + } + + // 设置某个子任务的状态为 STATUS_OK(在任务成功时使用) + fn set_ok(&mut self, offset: usize) { + self.0 |= STATUS_OK << (3 * offset); + } + + /// 检查某个子任务是否还应该继续执行,实际是检查该子任务的状态是否小于 STATUS_MAX_RETRY fn check_continue(&self, offset: usize) -> bool { self.get_status(offset) < STATUS_MAX_RETRY } - /// 根据任务结果更新状态,如果任务成功,设置为 STATUS_OK,否则加一 + /// 依次检查所有子任务是否还应该继续执行,返回一个 bool 数组 + fn should_run(&self, size: usize) -> Vec { + (0..size).map(|x| self.check_continue(x)).collect() + } + + /// 根据子任务执行结果更新子任务的状态 + /// 如果 Result 是 Ok,那么认为任务执行成功,将状态设置为 STATUS_OK + /// 如果 Result 是 Err,那么认为任务执行失败,将状态加一 + fn set_result(&mut self, result: &Result<()>, offset: usize) { + if self.get_status(offset) < STATUS_MAX_RETRY { + match result { + Ok(_) => self.set_ok(offset), + Err(_) => self.plus_one(offset), + } + } + } + + /// 根据任务结果更新状态,任务结果是一个 Result 数组,需要与子任务一一对应 + /// 如果所有子任务都已经完成,那么打上最高位的完成标记 fn update_status(&mut self, result: &[Result<()>]) { for (i, res) in result.iter().enumerate() { self.set_result(res, i); } if self.should_run(result.len()).iter().all(|x| !x) { // 所有任务都成功或者由于尝试次数过多失败,为 status 最高位打上标记,将来不再重试 - self.set_flag(true) + self.set_completed(true) } } - - fn set_result(&mut self, result: &Result<()>, offset: usize) { - if result.is_ok() { - // 如果任务已经执行到最大次数,那么此时 Result 也是 Ok,此时不应该更新状态 - if self.get_status(offset) < STATUS_MAX_RETRY { - self.set_ok(offset); - } - } else { - self.plus_one(offset); - } - } - - fn plus_one(&mut self, offset: usize) { - self.0 += 1 << (3 * offset); - } - - fn set_ok(&mut self, offset: usize) { - self.0 |= STATUS_OK << (3 * offset); - } - - fn get_status(&self, offset: usize) -> u32 { - let helper = !0u32; - (self.0 & (helper << (offset * 3)) & (helper >> (32 - 3 * offset - 3))) >> (offset * 3) - } } impl From for u32 { @@ -82,7 +89,7 @@ impl From for u32 { } } -/// 从前到后分别表示:视频封面、视频信息、Up 主头像、Up 主信息、分 P 下载 +/// 包含五个子任务,从前到后依次是:视频封面、视频信息、Up 主头像、Up 主信息、分 P 下载 #[derive(Clone)] pub struct VideoStatus(Status); @@ -107,7 +114,7 @@ impl From for u32 { } } -/// 从前到后分别表示:视频封面、视频内容、视频信息 +/// 包含四个子任务,从前到后分别是:视频封面、视频内容、视频信息、视频弹幕 #[derive(Clone)] pub struct PageStatus(Status); @@ -124,6 +131,10 @@ impl PageStatus { assert!(result.len() == 4, "PageStatus should have 4 status"); self.0.update_status(result) } + + pub fn get_completed(&self) -> bool { + self.0.get_completed() + } } impl From for u32 { @@ -149,6 +160,6 @@ mod test { } status.update_status(&[Err(anyhow!("")), Ok(()), Ok(())]); assert_eq!(status.should_run(3), vec![false, false, false]); - assert_eq!(u32::from(status), 0b111_111_100 | Status::handled()); + assert_eq!(u32::from(status), 0b111_111_100 | STATUS_COMPLETED); } } diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index a8a24f4..9ca0cab 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -102,16 +102,16 @@ pub async fn download_unprocessed_videos( connection: &DatabaseConnection, ) -> Result<()> { video_list_model.log_download_video_start(); - let unhandled_videos_pages = video_list_model.unhandled_video_pages(connection).await?; let semaphore = Semaphore::new(CONFIG.concurrent_limit.video); let downloader = Downloader::new(bili_client.client.clone()); let mut uppers_mutex: HashMap, Mutex<()>)> = HashMap::new(); + let unhandled_videos_pages = video_list_model.unhandled_video_pages(connection).await?; for (video_model, _) in &unhandled_videos_pages { uppers_mutex .entry(video_model.upper_id) .or_insert_with(|| (Mutex::new(()), Mutex::new(()))); } - let mut tasks = unhandled_videos_pages + let tasks = unhandled_videos_pages .into_iter() .map(|(video_model, pages_model)| { let upper_mutex = uppers_mutex.get(&video_model.upper_id).expect("upper mutex not found"); @@ -127,32 +127,32 @@ pub async fn download_unprocessed_videos( ) }) .collect::>(); - let mut models = Vec::with_capacity(10); - while let Some(res) = tasks.next().await { - match res { - Ok(model) => { - models.push(model); + let mut download_aborted = false; + let mut stream = tasks + // 触发风控时设置 download_aborted 标记并终止流 + .take_while(|res| { + if res + .as_ref() + .is_err_and(|e| e.downcast_ref::().is_some()) + { + download_aborted = true; } - Err(e) => { - if e.downcast_ref::().is_some() { - error!("下载视频时触发风控,将终止收藏夹下所有下载任务,等待下一轮执行"); - break; - } - } - } - // 满十个就写入数据库 - if models.len() == 10 { - update_videos_model(std::mem::replace(&mut models, Vec::with_capacity(10)), connection).await?; - } - } - if !models.is_empty() { + futures::future::ready(!download_aborted) + }) + // 过滤掉没有触发风控的普通 Err,只保留正确返回的 Model + .filter_map(|res| futures::future::ready(res.ok())) + // 将成功返回的 Model 按十个一组合并 + .chunks(10); + while let Some(models) = stream.next().await { update_videos_model(models, connection).await?; } + if download_aborted { + error!("下载视频时触发风控,终止收藏夹下所有下载任务,等待下一轮执行"); + } video_list_model.log_download_video_end(); Ok(()) } -/// 暂时这样做,后面提取成上下文 #[allow(clippy::too_many_arguments)] pub async fn download_video_pages( bili_client: &BiliClient, @@ -232,7 +232,7 @@ pub async fn download_video_pages( &video_model.bvid, &video_model.name, task_name, e ), }); - if let Err(e) = results.into_iter().nth(4).expect("not enough results") { + if let Err(e) = results.into_iter().nth(4).context("page download result not found")? { if e.downcast_ref::().is_some() { return Err(e); } @@ -242,6 +242,7 @@ pub async fn download_video_pages( Ok(video_active_model) } +/// 分发并执行分页下载任务,当且仅当所有分页成功下载或达到最大重试次数时返回 Ok,否则根据失败原因返回对应的错误 pub async fn dispatch_download_page( should_run: bool, bili_client: &BiliClient, @@ -254,57 +255,56 @@ pub async fn dispatch_download_page( return Ok(()); } let child_semaphore = Semaphore::new(CONFIG.concurrent_limit.page); - let mut tasks = pages + let tasks = pages .into_iter() .map(|page_model| download_page(bili_client, video_model, page_model, &child_semaphore, downloader)) .collect::>(); - let mut models = Vec::with_capacity(10); - let (mut should_error, mut is_break) = (false, false); - while let Some(res) = tasks.next().await { - match res { - Ok(model) => { - if let Set(status) = model.download_status { - let status = PageStatus::new(status); - if status.should_run().iter().any(|v| *v) { - // 有一个分页没变成终止状态(即下载成功或者重试次数达到限制),就应该向上层传递 Error - should_error = true; + let (mut download_aborted, mut error_occurred) = (false, false); + let mut stream = tasks + .take_while(|res| { + match res { + Ok(model) => { + // 当前函数返回的是所有分页的下载状态,只要有任何一个分页返回新的下载状态标识位是 false,当前函数就应该认为是失败的 + if model + .download_status + .try_as_ref() + .is_none_or(|status| !PageStatus::new(*status).get_completed()) + { + error_occurred = true; } } - models.push(model); - } - Err(e) => { - if e.downcast_ref::().is_some() { - should_error = true; - is_break = true; - break; + Err(e) => { + if e.downcast_ref::().is_some() { + download_aborted = true; + } } } - } - if models.len() == 10 { - update_pages_model(std::mem::replace(&mut models, Vec::with_capacity(10)), connection).await?; - } - } - if !models.is_empty() { + // 仅在发生风控时终止流,其它情况继续执行 + futures::future::ready(!download_aborted) + }) + .filter_map(|res| futures::future::ready(res.ok())) + .chunks(10); + while let Some(models) = stream.next().await { update_pages_model(models, connection).await?; } - if should_error { - if is_break { - error!( - "下载视频 {} - {} 的分页时触发风控,将异常向上传递...", - &video_model.bvid, &video_model.name - ); - bail!(DownloadAbortError()); - } else { - error!( - "下载视频 {} - {} 的分页时出现了错误,将在下一轮尝试重新处理", - &video_model.bvid, &video_model.name - ); - bail!(ProcessPageError()); - } + if download_aborted { + error!( + "下载视频 {} - {} 的分页时触发风控,将异常向上传递...", + &video_model.bvid, &video_model.name + ); + bail!(DownloadAbortError()); + } + if error_occurred { + error!( + "下载视频 {} - {} 的分页时出现了错误,将在下一轮尝试重新处理", + &video_model.bvid, &video_model.name + ); + bail!(ProcessPageError()); } Ok(()) } +/// 下载某个分页,未发生风控且正常运行时返回 Ok(Page::ActiveModel),其中 status 字段存储了新的下载状态,发生风控时返回 DownloadAbortError pub async fn download_page( bili_client: &BiliClient, video_model: &video::Model, @@ -312,10 +312,7 @@ pub async fn download_page( semaphore: &Semaphore, downloader: &Downloader, ) -> Result { - let permit = semaphore.acquire().await; - if let Err(e) = permit { - return Err(e.into()); - } + let _permit = semaphore.acquire().await.context("acquire semaphore failed")?; let mut status = PageStatus::new(page_model.download_status); let seprate_status = status.should_run(); let is_single_page = video_model.single_page.context("single_page is null")?; @@ -415,8 +412,8 @@ pub async fn download_page( &video_model.bvid, &video_model.name, page_model.pid, task_name, e ), }); - // 查看下载视频的状态,该状态会影响上层是否 break - if let Err(e) = results.into_iter().nth(1).expect("not enough results") { + // 如果下载视频时触发风控,直接返回 DownloadAbortError + if let Err(e) = results.into_iter().nth(1).context("video download result not found")? { if let Ok(BiliError::RiskControlOccurred) = e.downcast::() { bail!(DownloadAbortError()); }