feat: 优化风控相关的细节处理 (#527)
This commit is contained in:
@@ -217,7 +217,7 @@ impl PageAnalyzer {
|
||||
.info
|
||||
.pointer_mut("/dash/video")
|
||||
.and_then(|v| v.as_array_mut())
|
||||
.ok_or(BiliError::RiskControlOccurred)?
|
||||
.ok_or(BiliError::VideoStreamsEmpty)?
|
||||
.iter_mut()
|
||||
{
|
||||
let (Some(url), Some(quality), Some(codecs_id)) = (
|
||||
|
||||
@@ -94,9 +94,8 @@ JNrRuoEUXpabUzGB8QIDAQAB
|
||||
.expect("fail to decode public key");
|
||||
let ts = chrono::Local::now().timestamp_millis();
|
||||
let data = format!("refresh_{}", ts).into_bytes();
|
||||
let mut rng = rand::rng();
|
||||
let encrypted = key
|
||||
.encrypt(&mut rng, Oaep::new::<Sha256>(), &data)
|
||||
.encrypt(&mut rand::rng(), Oaep::new::<Sha256>(), &data)
|
||||
.expect("fail to encrypt");
|
||||
hex::encode(encrypted)
|
||||
}
|
||||
|
||||
@@ -1,9 +1,19 @@
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
#[derive(Error, Debug, Clone)]
|
||||
pub enum BiliError {
|
||||
#[error("risk control occurred")]
|
||||
RiskControlOccurred,
|
||||
#[error("request failed, status code: {0}, message: {1}")]
|
||||
RequestFailed(i64, String),
|
||||
#[error("response missing 'code' or 'message' field, full response: {0}")]
|
||||
InvalidResponse(String),
|
||||
#[error("API returned error code {0}, message: {1}, full response: {2}")]
|
||||
ErrorResponse(i64, String, String),
|
||||
#[error("risk control triggered by server, full response: {0}")]
|
||||
RiskControlOccurred(String),
|
||||
#[error("no video streams available (may indicate risk control)")]
|
||||
VideoStreamsEmpty,
|
||||
}
|
||||
|
||||
impl BiliError {
|
||||
pub fn is_risk_control_related(&self) -> bool {
|
||||
matches!(self, BiliError::RiskControlOccurred(_) | BiliError::VideoStreamsEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,10 +53,15 @@ impl Validate for serde_json::Value {
|
||||
fn validate(self) -> Result<Self::Output> {
|
||||
let (code, msg) = match (self["code"].as_i64(), self["message"].as_str()) {
|
||||
(Some(code), Some(msg)) => (code, msg),
|
||||
_ => bail!("no code or message found"),
|
||||
_ => bail!(BiliError::InvalidResponse(self.to_string())),
|
||||
};
|
||||
ensure!(code == 0, BiliError::RequestFailed(code, msg.to_owned()));
|
||||
ensure!(self["data"]["v_voucher"].is_null(), BiliError::RiskControlOccurred);
|
||||
if code == -352 || !self["data"]["v_voucher"].is_null() {
|
||||
bail!(BiliError::RiskControlOccurred(self.to_string()));
|
||||
}
|
||||
ensure!(
|
||||
code == 0,
|
||||
BiliError::ErrorResponse(code, msg.to_owned(), self.to_string())
|
||||
);
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,6 @@
|
||||
use std::io;
|
||||
|
||||
use anyhow::Result;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
#[error("Request too frequently")]
|
||||
pub struct DownloadAbortError();
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
#[error("Process page error")]
|
||||
pub struct ProcessPageError();
|
||||
|
||||
pub enum ExecutionStatus {
|
||||
Skipped,
|
||||
@@ -17,7 +8,7 @@ pub enum ExecutionStatus {
|
||||
Ignored(anyhow::Error),
|
||||
Failed(anyhow::Error),
|
||||
// 任务可以返回该状态固定自己的 status
|
||||
FixedFailed(u32, anyhow::Error),
|
||||
Fixed(u32),
|
||||
}
|
||||
|
||||
// 目前 stable rust 似乎不支持自定义类型使用 ? 运算符,只能先在返回值使用 Result,再这样套层娃
|
||||
|
||||
@@ -6,7 +6,7 @@ use sea_orm::DatabaseConnection;
|
||||
use tokio::time;
|
||||
|
||||
use crate::adapter::VideoSource;
|
||||
use crate::bilibili::{self, BiliClient};
|
||||
use crate::bilibili::{self, BiliClient, BiliError};
|
||||
use crate::config::{Config, TEMPLATE, VersionedConfig};
|
||||
use crate::notifier::NotifierAllExt;
|
||||
use crate::utils::model::get_enabled_video_sources;
|
||||
@@ -79,6 +79,12 @@ async fn download_all_video_sources(
|
||||
.notifiers
|
||||
.notify_all(bili_client.inner_client(), &error_msg)
|
||||
.await;
|
||||
if let Ok(e) = e.downcast::<BiliError>()
|
||||
&& e.is_risk_control_related()
|
||||
{
|
||||
warn!("检测到风控,终止此轮视频下载任务..");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use bili_sync_entity::*;
|
||||
use rand::seq::SliceRandom;
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::DatabaseTransaction;
|
||||
use sea_orm::entity::prelude::*;
|
||||
@@ -134,6 +135,8 @@ pub async fn get_enabled_video_sources(connection: &DatabaseConnection) -> Resul
|
||||
sources.extend(watch_later.into_iter().map(VideoSourceEnum::from));
|
||||
sources.extend(submission.into_iter().map(VideoSourceEnum::from));
|
||||
sources.extend(collection.into_iter().map(VideoSourceEnum::from));
|
||||
// 此处将视频源随机打乱顺序,从概率上确保每个视频源都有机会优先执行,避免后面视频源的长期饥饿问题
|
||||
sources.shuffle(&mut rand::rng());
|
||||
Ok(sources)
|
||||
}
|
||||
|
||||
|
||||
@@ -119,8 +119,8 @@ impl<const N: usize> Status<N> {
|
||||
|
||||
/// 根据子任务执行结果更新子任务的状态
|
||||
fn set_result(&mut self, result: &ExecutionStatus, offset: usize) {
|
||||
// 如果任务返回 FixedFailed 状态,那么无论之前的状态如何,都将状态设置为 FixedFailed 的状态
|
||||
if let ExecutionStatus::FixedFailed(status, _) = result {
|
||||
// 如果任务返回 Fixed 状态,那么无论之前的状态如何,都将状态设置为 Fixed 的状态
|
||||
if let ExecutionStatus::Fixed(status) = result {
|
||||
assert!(*status < 0b1000, "status should be less than 0b1000");
|
||||
self.set_status(offset, *status);
|
||||
} else if self.get_status(offset) < STATUS_MAX_RETRY {
|
||||
@@ -201,9 +201,9 @@ mod tests {
|
||||
assert_eq!(status.should_run(), [false, false, false]);
|
||||
assert!(status.get_completed());
|
||||
status.update_status(&[
|
||||
ExecutionStatus::FixedFailed(1, anyhow!("")),
|
||||
ExecutionStatus::FixedFailed(4, anyhow!("")),
|
||||
ExecutionStatus::FixedFailed(7, anyhow!("")),
|
||||
ExecutionStatus::Fixed(1),
|
||||
ExecutionStatus::Fixed(4),
|
||||
ExecutionStatus::Fixed(7),
|
||||
]);
|
||||
assert_eq!(status.should_run(), [true, false, false]);
|
||||
assert!(!status.get_completed());
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::adapter::{VideoSource, VideoSourceEnum};
|
||||
use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Video, VideoInfo};
|
||||
use crate::config::{ARGS, Config, PathSafeTemplate};
|
||||
use crate::downloader::Downloader;
|
||||
use crate::error::{DownloadAbortError, ExecutionStatus, ProcessPageError};
|
||||
use crate::error::ExecutionStatus;
|
||||
use crate::utils::download_context::DownloadContext;
|
||||
use crate::utils::format_arg::{page_format_args, video_format_args};
|
||||
use crate::utils::model::{
|
||||
@@ -126,7 +126,7 @@ pub async fn fetch_video_details(
|
||||
"获取视频 {} - {} 的详细信息失败,错误为:{:#}",
|
||||
&video_model.bvid, &video_model.name, e
|
||||
);
|
||||
if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::<BiliError>() {
|
||||
if let Some(BiliError::ErrorResponse(-404, _, _)) = e.downcast_ref::<BiliError>() {
|
||||
let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into();
|
||||
video_active_model.valid = Set(false);
|
||||
video_active_model.save(connection).await?;
|
||||
@@ -184,17 +184,17 @@ pub async fn download_unprocessed_videos(
|
||||
download_video_pages(video_model, pages_model, &semaphore, should_download_upper, cx)
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
let mut download_aborted = false;
|
||||
let mut risk_control_related_error = None;
|
||||
let mut stream = tasks
|
||||
// 触发风控时设置 download_aborted 标记并终止流
|
||||
.take_while(|res| {
|
||||
if res
|
||||
.as_ref()
|
||||
.is_err_and(|e| e.downcast_ref::<DownloadAbortError>().is_some())
|
||||
if let Err(e) = res
|
||||
&& let Some(e) = e.downcast_ref::<BiliError>()
|
||||
&& e.is_risk_control_related()
|
||||
{
|
||||
download_aborted = true;
|
||||
risk_control_related_error = Some(e.clone());
|
||||
}
|
||||
futures::future::ready(!download_aborted)
|
||||
futures::future::ready(risk_control_related_error.is_none())
|
||||
})
|
||||
// 过滤掉没有触发风控的普通 Err,只保留正确返回的 Model
|
||||
.filter_map(|res| futures::future::ready(res.ok()))
|
||||
@@ -203,8 +203,8 @@ pub async fn download_unprocessed_videos(
|
||||
while let Some(models) = stream.next().await {
|
||||
update_videos_model(models, connection).await?;
|
||||
}
|
||||
if download_aborted {
|
||||
error!("下载触发风控,已终止所有任务,等待下一轮执行");
|
||||
if let Some(e) = risk_control_related_error {
|
||||
bail!(e);
|
||||
}
|
||||
video_source.log_download_video_end();
|
||||
Ok(())
|
||||
@@ -286,14 +286,18 @@ pub async fn download_video_pages(
|
||||
&video_model.name, task_name, e
|
||||
)
|
||||
}
|
||||
ExecutionStatus::Failed(e) | ExecutionStatus::FixedFailed(_, e) => {
|
||||
ExecutionStatus::Failed(e) => {
|
||||
error!("处理视频「{}」{}失败:{:#}", &video_model.name, task_name, e)
|
||||
}
|
||||
ExecutionStatus::Fixed(_) => unreachable!(),
|
||||
});
|
||||
if let ExecutionStatus::Failed(e) = results.into_iter().nth(4).context("page download result not found")?
|
||||
&& e.downcast_ref::<DownloadAbortError>().is_some()
|
||||
{
|
||||
return Err(e);
|
||||
for result in results {
|
||||
if let ExecutionStatus::Failed(e) = result
|
||||
&& let Ok(e) = e.downcast::<BiliError>()
|
||||
&& e.is_risk_control_related()
|
||||
{
|
||||
bail!(e);
|
||||
}
|
||||
}
|
||||
let mut video_active_model: video::ActiveModel = video_model.into();
|
||||
video_active_model.download_status = Set(status.into());
|
||||
@@ -317,7 +321,7 @@ pub async fn dispatch_download_page(
|
||||
.into_iter()
|
||||
.map(|page_model| download_page(video_model, page_model, &child_semaphore, base_path, cx))
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
let (mut download_aborted, mut target_status) = (false, STATUS_OK);
|
||||
let (mut risk_control_related_error, mut target_status) = (None, STATUS_OK);
|
||||
let mut stream = tasks
|
||||
.take_while(|res| {
|
||||
match res {
|
||||
@@ -333,27 +337,26 @@ pub async fn dispatch_download_page(
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
if e.downcast_ref::<DownloadAbortError>().is_some() {
|
||||
download_aborted = true;
|
||||
if let Some(e) = e.downcast_ref::<BiliError>()
|
||||
&& e.is_risk_control_related()
|
||||
{
|
||||
risk_control_related_error = Some(e.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
// 仅在发生风控时终止流,其它情况继续执行
|
||||
futures::future::ready(!download_aborted)
|
||||
futures::future::ready(risk_control_related_error.is_none())
|
||||
})
|
||||
.filter_map(|res| futures::future::ready(res.ok()))
|
||||
.chunks(10);
|
||||
while let Some(models) = stream.next().await {
|
||||
update_pages_model(models, cx.connection).await?;
|
||||
}
|
||||
if download_aborted {
|
||||
error!("下载视频「{}」的分页时触发风控,将异常向上传递..", &video_model.name);
|
||||
bail!(DownloadAbortError());
|
||||
if let Some(e) = risk_control_related_error {
|
||||
bail!(e);
|
||||
}
|
||||
if target_status != STATUS_OK {
|
||||
return Ok(ExecutionStatus::FixedFailed(target_status, ProcessPageError().into()));
|
||||
}
|
||||
Ok(ExecutionStatus::Succeeded)
|
||||
// 视频中“分页下载”任务的状态始终与所有分页的最小状态一致
|
||||
Ok(ExecutionStatus::Fixed(target_status))
|
||||
}
|
||||
|
||||
/// 下载某个分页,未发生风控且正常运行时返回 Ok(Page::ActiveModel),其中 status 字段存储了新的下载状态,发生风控时返回 DownloadAbortError
|
||||
@@ -507,16 +510,19 @@ pub async fn download_page(
|
||||
&video_model.name, page_model.pid, task_name, e
|
||||
)
|
||||
}
|
||||
ExecutionStatus::Failed(e) | ExecutionStatus::FixedFailed(_, e) => error!(
|
||||
ExecutionStatus::Failed(e) => error!(
|
||||
"处理视频「{}」第 {} 页{}失败:{:#}",
|
||||
&video_model.name, page_model.pid, task_name, e
|
||||
),
|
||||
ExecutionStatus::Fixed(_) => unreachable!(),
|
||||
});
|
||||
// 如果下载视频时触发风控,直接返回 DownloadAbortError
|
||||
if let ExecutionStatus::Failed(e) = results.into_iter().nth(1).context("video download result not found")?
|
||||
&& let Ok(BiliError::RiskControlOccurred) = e.downcast::<BiliError>()
|
||||
{
|
||||
bail!(DownloadAbortError());
|
||||
for result in results {
|
||||
if let ExecutionStatus::Failed(e) = result
|
||||
&& let Ok(e) = e.downcast::<BiliError>()
|
||||
&& e.is_risk_control_related()
|
||||
{
|
||||
bail!(e);
|
||||
}
|
||||
}
|
||||
let mut page_active_model: page::ActiveModel = page_model.into();
|
||||
page_active_model.download_status = Set(status.into());
|
||||
|
||||
Reference in New Issue
Block a user