refactor: 尝试将任务处理部分重构为 stream 写法,增补注释
This commit is contained in:
@@ -102,16 +102,16 @@ pub async fn download_unprocessed_videos(
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<()> {
|
||||
video_list_model.log_download_video_start();
|
||||
let unhandled_videos_pages = video_list_model.unhandled_video_pages(connection).await?;
|
||||
let semaphore = Semaphore::new(CONFIG.concurrent_limit.video);
|
||||
let downloader = Downloader::new(bili_client.client.clone());
|
||||
let mut uppers_mutex: HashMap<i64, (Mutex<()>, Mutex<()>)> = HashMap::new();
|
||||
let unhandled_videos_pages = video_list_model.unhandled_video_pages(connection).await?;
|
||||
for (video_model, _) in &unhandled_videos_pages {
|
||||
uppers_mutex
|
||||
.entry(video_model.upper_id)
|
||||
.or_insert_with(|| (Mutex::new(()), Mutex::new(())));
|
||||
}
|
||||
let mut tasks = unhandled_videos_pages
|
||||
let tasks = unhandled_videos_pages
|
||||
.into_iter()
|
||||
.map(|(video_model, pages_model)| {
|
||||
let upper_mutex = uppers_mutex.get(&video_model.upper_id).expect("upper mutex not found");
|
||||
@@ -127,32 +127,32 @@ pub async fn download_unprocessed_videos(
|
||||
)
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
let mut models = Vec::with_capacity(10);
|
||||
while let Some(res) = tasks.next().await {
|
||||
match res {
|
||||
Ok(model) => {
|
||||
models.push(model);
|
||||
let mut download_aborted = false;
|
||||
let mut stream = tasks
|
||||
// 触发风控时设置 download_aborted 标记并终止流
|
||||
.take_while(|res| {
|
||||
if res
|
||||
.as_ref()
|
||||
.is_err_and(|e| e.downcast_ref::<DownloadAbortError>().is_some())
|
||||
{
|
||||
download_aborted = true;
|
||||
}
|
||||
Err(e) => {
|
||||
if e.downcast_ref::<DownloadAbortError>().is_some() {
|
||||
error!("下载视频时触发风控,将终止收藏夹下所有下载任务,等待下一轮执行");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// 满十个就写入数据库
|
||||
if models.len() == 10 {
|
||||
update_videos_model(std::mem::replace(&mut models, Vec::with_capacity(10)), connection).await?;
|
||||
}
|
||||
}
|
||||
if !models.is_empty() {
|
||||
futures::future::ready(!download_aborted)
|
||||
})
|
||||
// 过滤掉没有触发风控的普通 Err,只保留正确返回的 Model
|
||||
.filter_map(|res| futures::future::ready(res.ok()))
|
||||
// 将成功返回的 Model 按十个一组合并
|
||||
.chunks(10);
|
||||
while let Some(models) = stream.next().await {
|
||||
update_videos_model(models, connection).await?;
|
||||
}
|
||||
if download_aborted {
|
||||
error!("下载视频时触发风控,终止收藏夹下所有下载任务,等待下一轮执行");
|
||||
}
|
||||
video_list_model.log_download_video_end();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 暂时这样做,后面提取成上下文
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn download_video_pages(
|
||||
bili_client: &BiliClient,
|
||||
@@ -232,7 +232,7 @@ pub async fn download_video_pages(
|
||||
&video_model.bvid, &video_model.name, task_name, e
|
||||
),
|
||||
});
|
||||
if let Err(e) = results.into_iter().nth(4).expect("not enough results") {
|
||||
if let Err(e) = results.into_iter().nth(4).context("page download result not found")? {
|
||||
if e.downcast_ref::<DownloadAbortError>().is_some() {
|
||||
return Err(e);
|
||||
}
|
||||
@@ -242,6 +242,7 @@ pub async fn download_video_pages(
|
||||
Ok(video_active_model)
|
||||
}
|
||||
|
||||
/// 分发并执行分页下载任务,当且仅当所有分页成功下载或达到最大重试次数时返回 Ok,否则根据失败原因返回对应的错误
|
||||
pub async fn dispatch_download_page(
|
||||
should_run: bool,
|
||||
bili_client: &BiliClient,
|
||||
@@ -254,57 +255,56 @@ pub async fn dispatch_download_page(
|
||||
return Ok(());
|
||||
}
|
||||
let child_semaphore = Semaphore::new(CONFIG.concurrent_limit.page);
|
||||
let mut tasks = pages
|
||||
let tasks = pages
|
||||
.into_iter()
|
||||
.map(|page_model| download_page(bili_client, video_model, page_model, &child_semaphore, downloader))
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
let mut models = Vec::with_capacity(10);
|
||||
let (mut should_error, mut is_break) = (false, false);
|
||||
while let Some(res) = tasks.next().await {
|
||||
match res {
|
||||
Ok(model) => {
|
||||
if let Set(status) = model.download_status {
|
||||
let status = PageStatus::new(status);
|
||||
if status.should_run().iter().any(|v| *v) {
|
||||
// 有一个分页没变成终止状态(即下载成功或者重试次数达到限制),就应该向上层传递 Error
|
||||
should_error = true;
|
||||
let (mut download_aborted, mut error_occurred) = (false, false);
|
||||
let mut stream = tasks
|
||||
.take_while(|res| {
|
||||
match res {
|
||||
Ok(model) => {
|
||||
// 当前函数返回的是所有分页的下载状态,只要有任何一个分页返回新的下载状态标识位是 false,当前函数就应该认为是失败的
|
||||
if model
|
||||
.download_status
|
||||
.try_as_ref()
|
||||
.is_none_or(|status| !PageStatus::new(*status).get_completed())
|
||||
{
|
||||
error_occurred = true;
|
||||
}
|
||||
}
|
||||
models.push(model);
|
||||
}
|
||||
Err(e) => {
|
||||
if e.downcast_ref::<DownloadAbortError>().is_some() {
|
||||
should_error = true;
|
||||
is_break = true;
|
||||
break;
|
||||
Err(e) => {
|
||||
if e.downcast_ref::<DownloadAbortError>().is_some() {
|
||||
download_aborted = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if models.len() == 10 {
|
||||
update_pages_model(std::mem::replace(&mut models, Vec::with_capacity(10)), connection).await?;
|
||||
}
|
||||
}
|
||||
if !models.is_empty() {
|
||||
// 仅在发生风控时终止流,其它情况继续执行
|
||||
futures::future::ready(!download_aborted)
|
||||
})
|
||||
.filter_map(|res| futures::future::ready(res.ok()))
|
||||
.chunks(10);
|
||||
while let Some(models) = stream.next().await {
|
||||
update_pages_model(models, connection).await?;
|
||||
}
|
||||
if should_error {
|
||||
if is_break {
|
||||
error!(
|
||||
"下载视频 {} - {} 的分页时触发风控,将异常向上传递...",
|
||||
&video_model.bvid, &video_model.name
|
||||
);
|
||||
bail!(DownloadAbortError());
|
||||
} else {
|
||||
error!(
|
||||
"下载视频 {} - {} 的分页时出现了错误,将在下一轮尝试重新处理",
|
||||
&video_model.bvid, &video_model.name
|
||||
);
|
||||
bail!(ProcessPageError());
|
||||
}
|
||||
if download_aborted {
|
||||
error!(
|
||||
"下载视频 {} - {} 的分页时触发风控,将异常向上传递...",
|
||||
&video_model.bvid, &video_model.name
|
||||
);
|
||||
bail!(DownloadAbortError());
|
||||
}
|
||||
if error_occurred {
|
||||
error!(
|
||||
"下载视频 {} - {} 的分页时出现了错误,将在下一轮尝试重新处理",
|
||||
&video_model.bvid, &video_model.name
|
||||
);
|
||||
bail!(ProcessPageError());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 下载某个分页,未发生风控且正常运行时返回 Ok(Page::ActiveModel),其中 status 字段存储了新的下载状态,发生风控时返回 DownloadAbortError
|
||||
pub async fn download_page(
|
||||
bili_client: &BiliClient,
|
||||
video_model: &video::Model,
|
||||
@@ -312,10 +312,7 @@ pub async fn download_page(
|
||||
semaphore: &Semaphore,
|
||||
downloader: &Downloader,
|
||||
) -> Result<page::ActiveModel> {
|
||||
let permit = semaphore.acquire().await;
|
||||
if let Err(e) = permit {
|
||||
return Err(e.into());
|
||||
}
|
||||
let _permit = semaphore.acquire().await.context("acquire semaphore failed")?;
|
||||
let mut status = PageStatus::new(page_model.download_status);
|
||||
let seprate_status = status.should_run();
|
||||
let is_single_page = video_model.single_page.context("single_page is null")?;
|
||||
@@ -415,8 +412,8 @@ pub async fn download_page(
|
||||
&video_model.bvid, &video_model.name, page_model.pid, task_name, e
|
||||
),
|
||||
});
|
||||
// 查看下载视频的状态,该状态会影响上层是否 break
|
||||
if let Err(e) = results.into_iter().nth(1).expect("not enough results") {
|
||||
// 如果下载视频时触发风控,直接返回 DownloadAbortError
|
||||
if let Err(e) = results.into_iter().nth(1).context("video download result not found")? {
|
||||
if let Ok(BiliError::RiskControlOccurred) = e.downcast::<BiliError>() {
|
||||
bail!(DownloadAbortError());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user