diff --git a/crates/bili_sync/src/bilibili/analyzer.rs b/crates/bili_sync/src/bilibili/analyzer.rs index a54cd45..67b0459 100644 --- a/crates/bili_sync/src/bilibili/analyzer.rs +++ b/crates/bili_sync/src/bilibili/analyzer.rs @@ -217,7 +217,7 @@ impl PageAnalyzer { .info .pointer_mut("/dash/video") .and_then(|v| v.as_array_mut()) - .ok_or(BiliError::RiskControlOccurred)? + .ok_or(BiliError::VideoStreamsEmpty)? .iter_mut() { let (Some(url), Some(quality), Some(codecs_id)) = ( diff --git a/crates/bili_sync/src/bilibili/credential.rs b/crates/bili_sync/src/bilibili/credential.rs index 594394b..3471ab9 100644 --- a/crates/bili_sync/src/bilibili/credential.rs +++ b/crates/bili_sync/src/bilibili/credential.rs @@ -94,9 +94,8 @@ JNrRuoEUXpabUzGB8QIDAQAB .expect("fail to decode public key"); let ts = chrono::Local::now().timestamp_millis(); let data = format!("refresh_{}", ts).into_bytes(); - let mut rng = rand::rng(); let encrypted = key - .encrypt(&mut rng, Oaep::new::(), &data) + .encrypt(&mut rand::rng(), Oaep::new::(), &data) .expect("fail to encrypt"); hex::encode(encrypted) } diff --git a/crates/bili_sync/src/bilibili/error.rs b/crates/bili_sync/src/bilibili/error.rs index 24fed5c..498b159 100644 --- a/crates/bili_sync/src/bilibili/error.rs +++ b/crates/bili_sync/src/bilibili/error.rs @@ -1,9 +1,19 @@ use thiserror::Error; -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone)] pub enum BiliError { - #[error("risk control occurred")] - RiskControlOccurred, - #[error("request failed, status code: {0}, message: {1}")] - RequestFailed(i64, String), + #[error("response missing 'code' or 'message' field, full response: {0}")] + InvalidResponse(String), + #[error("API returned error code {0}, message: {1}, full response: {2}")] + ErrorResponse(i64, String, String), + #[error("risk control triggered by server, full response: {0}")] + RiskControlOccurred(String), + #[error("no video streams available (may indicate risk control)")] + VideoStreamsEmpty, +} + +impl BiliError { + pub fn is_risk_control_related(&self) -> bool { + matches!(self, BiliError::RiskControlOccurred(_) | BiliError::VideoStreamsEmpty) + } } diff --git a/crates/bili_sync/src/bilibili/mod.rs b/crates/bili_sync/src/bilibili/mod.rs index a228dd3..50e0015 100644 --- a/crates/bili_sync/src/bilibili/mod.rs +++ b/crates/bili_sync/src/bilibili/mod.rs @@ -53,10 +53,15 @@ impl Validate for serde_json::Value { fn validate(self) -> Result { let (code, msg) = match (self["code"].as_i64(), self["message"].as_str()) { (Some(code), Some(msg)) => (code, msg), - _ => bail!("no code or message found"), + _ => bail!(BiliError::InvalidResponse(self.to_string())), }; - ensure!(code == 0, BiliError::RequestFailed(code, msg.to_owned())); - ensure!(self["data"]["v_voucher"].is_null(), BiliError::RiskControlOccurred); + if code == -352 || !self["data"]["v_voucher"].is_null() { + bail!(BiliError::RiskControlOccurred(self.to_string())); + } + ensure!( + code == 0, + BiliError::ErrorResponse(code, msg.to_owned(), self.to_string()) + ); Ok(self) } } diff --git a/crates/bili_sync/src/error.rs b/crates/bili_sync/src/error.rs index e7807fc..6eb9fbc 100644 --- a/crates/bili_sync/src/error.rs +++ b/crates/bili_sync/src/error.rs @@ -1,15 +1,6 @@ use std::io; use anyhow::Result; -use thiserror::Error; - -#[derive(Error, Debug)] -#[error("Request too frequently")] -pub struct DownloadAbortError(); - -#[derive(Error, Debug)] -#[error("Process page error")] -pub struct ProcessPageError(); pub enum ExecutionStatus { Skipped, @@ -17,7 +8,7 @@ pub enum ExecutionStatus { Ignored(anyhow::Error), Failed(anyhow::Error), // 任务可以返回该状态固定自己的 status - FixedFailed(u32, anyhow::Error), + Fixed(u32), } // 目前 stable rust 似乎不支持自定义类型使用 ? 运算符,只能先在返回值使用 Result,再这样套层娃 diff --git a/crates/bili_sync/src/task/video_downloader.rs b/crates/bili_sync/src/task/video_downloader.rs index 9fa74d0..29ea076 100644 --- a/crates/bili_sync/src/task/video_downloader.rs +++ b/crates/bili_sync/src/task/video_downloader.rs @@ -6,7 +6,7 @@ use sea_orm::DatabaseConnection; use tokio::time; use crate::adapter::VideoSource; -use crate::bilibili::{self, BiliClient}; +use crate::bilibili::{self, BiliClient, BiliError}; use crate::config::{Config, TEMPLATE, VersionedConfig}; use crate::notifier::NotifierAllExt; use crate::utils::model::get_enabled_video_sources; @@ -79,6 +79,12 @@ async fn download_all_video_sources( .notifiers .notify_all(bili_client.inner_client(), &error_msg) .await; + if let Ok(e) = e.downcast::() + && e.is_risk_control_related() + { + warn!("检测到风控,终止此轮视频下载任务.."); + break; + } } } Ok(()) diff --git a/crates/bili_sync/src/utils/model.rs b/crates/bili_sync/src/utils/model.rs index 744054e..43804cc 100644 --- a/crates/bili_sync/src/utils/model.rs +++ b/crates/bili_sync/src/utils/model.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result, anyhow}; use bili_sync_entity::*; +use rand::seq::SliceRandom; use sea_orm::ActiveValue::Set; use sea_orm::DatabaseTransaction; use sea_orm::entity::prelude::*; @@ -134,6 +135,8 @@ pub async fn get_enabled_video_sources(connection: &DatabaseConnection) -> Resul sources.extend(watch_later.into_iter().map(VideoSourceEnum::from)); sources.extend(submission.into_iter().map(VideoSourceEnum::from)); sources.extend(collection.into_iter().map(VideoSourceEnum::from)); + // 此处将视频源随机打乱顺序,从概率上确保每个视频源都有机会优先执行,避免后面视频源的长期饥饿问题 + sources.shuffle(&mut rand::rng()); Ok(sources) } diff --git a/crates/bili_sync/src/utils/status.rs b/crates/bili_sync/src/utils/status.rs index 779465e..467dcd6 100644 --- a/crates/bili_sync/src/utils/status.rs +++ b/crates/bili_sync/src/utils/status.rs @@ -119,8 +119,8 @@ impl Status { /// 根据子任务执行结果更新子任务的状态 fn set_result(&mut self, result: &ExecutionStatus, offset: usize) { - // 如果任务返回 FixedFailed 状态,那么无论之前的状态如何,都将状态设置为 FixedFailed 的状态 - if let ExecutionStatus::FixedFailed(status, _) = result { + // 如果任务返回 Fixed 状态,那么无论之前的状态如何,都将状态设置为 Fixed 的状态 + if let ExecutionStatus::Fixed(status) = result { assert!(*status < 0b1000, "status should be less than 0b1000"); self.set_status(offset, *status); } else if self.get_status(offset) < STATUS_MAX_RETRY { @@ -201,9 +201,9 @@ mod tests { assert_eq!(status.should_run(), [false, false, false]); assert!(status.get_completed()); status.update_status(&[ - ExecutionStatus::FixedFailed(1, anyhow!("")), - ExecutionStatus::FixedFailed(4, anyhow!("")), - ExecutionStatus::FixedFailed(7, anyhow!("")), + ExecutionStatus::Fixed(1), + ExecutionStatus::Fixed(4), + ExecutionStatus::Fixed(7), ]); assert_eq!(status.should_run(), [true, false, false]); assert!(!status.get_completed()); diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 542176b..f28e121 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -16,7 +16,7 @@ use crate::adapter::{VideoSource, VideoSourceEnum}; use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Video, VideoInfo}; use crate::config::{ARGS, Config, PathSafeTemplate}; use crate::downloader::Downloader; -use crate::error::{DownloadAbortError, ExecutionStatus, ProcessPageError}; +use crate::error::ExecutionStatus; use crate::utils::download_context::DownloadContext; use crate::utils::format_arg::{page_format_args, video_format_args}; use crate::utils::model::{ @@ -126,7 +126,7 @@ pub async fn fetch_video_details( "获取视频 {} - {} 的详细信息失败,错误为:{:#}", &video_model.bvid, &video_model.name, e ); - if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::() { + if let Some(BiliError::ErrorResponse(-404, _, _)) = e.downcast_ref::() { let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into(); video_active_model.valid = Set(false); video_active_model.save(connection).await?; @@ -184,17 +184,17 @@ pub async fn download_unprocessed_videos( download_video_pages(video_model, pages_model, &semaphore, should_download_upper, cx) }) .collect::>(); - let mut download_aborted = false; + let mut risk_control_related_error = None; let mut stream = tasks // 触发风控时设置 download_aborted 标记并终止流 .take_while(|res| { - if res - .as_ref() - .is_err_and(|e| e.downcast_ref::().is_some()) + if let Err(e) = res + && let Some(e) = e.downcast_ref::() + && e.is_risk_control_related() { - download_aborted = true; + risk_control_related_error = Some(e.clone()); } - futures::future::ready(!download_aborted) + futures::future::ready(risk_control_related_error.is_none()) }) // 过滤掉没有触发风控的普通 Err,只保留正确返回的 Model .filter_map(|res| futures::future::ready(res.ok())) @@ -203,8 +203,8 @@ pub async fn download_unprocessed_videos( while let Some(models) = stream.next().await { update_videos_model(models, connection).await?; } - if download_aborted { - error!("下载触发风控,已终止所有任务,等待下一轮执行"); + if let Some(e) = risk_control_related_error { + bail!(e); } video_source.log_download_video_end(); Ok(()) @@ -286,14 +286,18 @@ pub async fn download_video_pages( &video_model.name, task_name, e ) } - ExecutionStatus::Failed(e) | ExecutionStatus::FixedFailed(_, e) => { + ExecutionStatus::Failed(e) => { error!("处理视频「{}」{}失败:{:#}", &video_model.name, task_name, e) } + ExecutionStatus::Fixed(_) => unreachable!(), }); - if let ExecutionStatus::Failed(e) = results.into_iter().nth(4).context("page download result not found")? - && e.downcast_ref::().is_some() - { - return Err(e); + for result in results { + if let ExecutionStatus::Failed(e) = result + && let Ok(e) = e.downcast::() + && e.is_risk_control_related() + { + bail!(e); + } } let mut video_active_model: video::ActiveModel = video_model.into(); video_active_model.download_status = Set(status.into()); @@ -317,7 +321,7 @@ pub async fn dispatch_download_page( .into_iter() .map(|page_model| download_page(video_model, page_model, &child_semaphore, base_path, cx)) .collect::>(); - let (mut download_aborted, mut target_status) = (false, STATUS_OK); + let (mut risk_control_related_error, mut target_status) = (None, STATUS_OK); let mut stream = tasks .take_while(|res| { match res { @@ -333,27 +337,26 @@ pub async fn dispatch_download_page( } } Err(e) => { - if e.downcast_ref::().is_some() { - download_aborted = true; + if let Some(e) = e.downcast_ref::() + && e.is_risk_control_related() + { + risk_control_related_error = Some(e.clone()); } } } // 仅在发生风控时终止流,其它情况继续执行 - futures::future::ready(!download_aborted) + futures::future::ready(risk_control_related_error.is_none()) }) .filter_map(|res| futures::future::ready(res.ok())) .chunks(10); while let Some(models) = stream.next().await { update_pages_model(models, cx.connection).await?; } - if download_aborted { - error!("下载视频「{}」的分页时触发风控,将异常向上传递..", &video_model.name); - bail!(DownloadAbortError()); + if let Some(e) = risk_control_related_error { + bail!(e); } - if target_status != STATUS_OK { - return Ok(ExecutionStatus::FixedFailed(target_status, ProcessPageError().into())); - } - Ok(ExecutionStatus::Succeeded) + // 视频中“分页下载”任务的状态始终与所有分页的最小状态一致 + Ok(ExecutionStatus::Fixed(target_status)) } /// 下载某个分页,未发生风控且正常运行时返回 Ok(Page::ActiveModel),其中 status 字段存储了新的下载状态,发生风控时返回 DownloadAbortError @@ -507,16 +510,19 @@ pub async fn download_page( &video_model.name, page_model.pid, task_name, e ) } - ExecutionStatus::Failed(e) | ExecutionStatus::FixedFailed(_, e) => error!( + ExecutionStatus::Failed(e) => error!( "处理视频「{}」第 {} 页{}失败:{:#}", &video_model.name, page_model.pid, task_name, e ), + ExecutionStatus::Fixed(_) => unreachable!(), }); - // 如果下载视频时触发风控,直接返回 DownloadAbortError - if let ExecutionStatus::Failed(e) = results.into_iter().nth(1).context("video download result not found")? - && let Ok(BiliError::RiskControlOccurred) = e.downcast::() - { - bail!(DownloadAbortError()); + for result in results { + if let ExecutionStatus::Failed(e) = result + && let Ok(e) = e.downcast::() + && e.is_risk_control_related() + { + bail!(e); + } } let mut page_active_model: page::ActiveModel = page_model.into(); page_active_model.download_status = Set(status.into());