feat: 支持请求过快出现风控时终止全部下载

This commit is contained in:
amtoaer
2024-04-02 23:14:36 +08:00
parent 377f3bb22a
commit 3859ad9bd0
2 changed files with 202 additions and 108 deletions

View File

@@ -2,38 +2,43 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use anyhow::{anyhow, Result};
use anyhow::{bail, Result};
use dirs::config_dir;
use entity::{favorite, page, video};
use filenamify::filenamify;
use futures::stream::FuturesUnordered;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{pin_mut, Future, StreamExt};
use log::{error, info};
use log::{error, info, warn};
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use sea_orm::TryIntoModel;
use sea_orm::TransactionTrait;
use serde_json::json;
use tokio::fs;
use tokio::sync::{Mutex, Semaphore};
use super::status::{PageStatus, VideoStatus};
use super::utils::{unhandled_videos_pages, ModelWrapper, NFOMode, NFOSerializer, TEMPLATE};
use crate::bilibili::{BestStream, BiliClient, FavoriteList, FilterOption, PageInfo, Video};
use crate::core::utils::{create_video_pages, create_videos, exist_labels, filter_videos, handle_favorite_info};
use crate::bilibili::{BestStream, BiliClient, BiliError, FavoriteList, FilterOption, PageInfo, Video};
use crate::core::utils::{
create_video_pages, create_videos, exist_labels, filter_unfilled_videos, handle_favorite_info, total_video_count,
};
use crate::downloader::Downloader;
use crate::error::DownloadAbortError;
/// 处理某个收藏夹,首先刷新收藏夹信息,然后下载收藏夹中未下载成功的视频
pub async fn process_favorite(
pub async fn process_favorite_list(
bili_client: &BiliClient,
fid: &str,
path: &str,
connection: &DatabaseConnection,
) -> Result<()> {
let favorite_model = refresh_favorite(bili_client, fid, path, connection).await?;
download_favorite(bili_client, favorite_model, connection).await
let favorite_model = refresh_favorite_list(bili_client, fid, path, connection).await?;
let favorite_model = fetch_video_details(bili_client, favorite_model, connection).await?;
download_unprocessed_videos(bili_client, favorite_model, connection).await
}
pub async fn refresh_favorite(
/// 获取收藏夹 Model从收藏夹列表中获取所有新添加的视频将其写入数据库
pub async fn refresh_favorite_list(
bili_client: &BiliClient,
fid: &str,
path: &str,
@@ -42,12 +47,14 @@ pub async fn refresh_favorite(
let bili_favorite_list = FavoriteList::new(bili_client, fid.to_owned());
let favorite_list_info = bili_favorite_list.get_info().await?;
let favorite_model = handle_favorite_info(&favorite_list_info, path, connection).await?;
info!("Scan the favorite: {fid}");
info!("Scan the favorite: {fid}.");
// 每十个视频一组,避免太多的数据库操作
let video_stream = bili_favorite_list.into_video_stream().chunks(10);
pin_mut!(video_stream);
let mut got_count = 0;
let total_count = total_video_count(&favorite_model, connection).await?;
while let Some(videos_info) = video_stream.next().await {
info!("got {} new videos...", videos_info.len());
got_count += videos_info.len();
let exist_labels = exist_labels(&videos_info, &favorite_model, connection).await?;
// 如果发现有视频的收藏时间和 bvid 和数据库中重合,说明到达了上次处理到的地方,可以直接退出
let should_break = videos_info
@@ -55,31 +62,70 @@ pub async fn refresh_favorite(
.any(|v| exist_labels.contains(&(v.bvid.clone(), v.fav_time.naive_utc())));
// 将视频信息写入数据库
create_videos(&videos_info, &favorite_model, connection).await?;
// 找到这些视频中没有下载过,也没有 page 的视频
let unrefreshed_video_models = filter_videos(&videos_info, &favorite_model, true, true, connection).await?;
if !unrefreshed_video_models.is_empty() {
for video_model in unrefreshed_video_models {
let bili_video = Video::new(bili_client, video_model.bvid.clone());
let tags = bili_video.get_tags().await?;
let pages_info = bili_video.get_pages().await?;
// 记录视频的分集信息
create_video_pages(&pages_info, &video_model, connection).await?;
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.single_page = Set(Some(pages_info.len() == 1));
video_active_model.tags = Set(Some(serde_json::to_value(tags).unwrap()));
// 记录视频是否是单页,以及标签信息
video_active_model.save(connection).await?;
}
}
if should_break {
info!("Reach the last processed processed position, break..");
break;
}
}
info!("refresh videos done");
let total_count = total_video_count(&favorite_model, connection).await? - total_count;
info!("Scan the favorite: {fid} done, got {got_count} videos, {total_count} new videos.");
Ok(favorite_model)
}
pub async fn download_favorite(
/// 筛选出所有没有获取到分页信息和 tag 的视频,请求分页信息和 tag 并写入数据库
pub async fn fetch_video_details(
bili_client: &BiliClient,
favorite_model: favorite::Model,
connection: &DatabaseConnection,
) -> Result<favorite::Model> {
info!("start to fetch video details in favorite: {}", favorite_model.f_id);
let videos_model = filter_unfilled_videos(&favorite_model, connection).await?;
for video_model in videos_model {
let bili_video = Video::new(bili_client, video_model.bvid.clone());
let tags = match bili_video.get_tags().await {
Ok(tags) => tags,
Err(e) => {
error!("failed to get tags for video: {}, {}", &video_model.bvid, e);
if let Some(BiliError::RequestFailed(code, _)) = e.downcast_ref::<BiliError>() {
if *code == -404 {
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.valid = Set(false);
video_active_model.save(connection).await?;
}
}
continue;
}
};
let pages_info = match bili_video.get_pages().await {
Ok(pages) => pages,
Err(e) => {
error!("failed to get pages for video: {}, {}", &video_model.bvid, e);
if let Some(BiliError::RequestFailed(code, _)) = e.downcast_ref::<BiliError>() {
if *code == -404 {
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.valid = Set(false);
video_active_model.save(connection).await?;
}
}
continue;
}
};
let txn = connection.begin().await?;
// 将分页信息写入数据库
create_video_pages(&pages_info, &video_model, &txn).await?;
// 将页标记和 tag 写入数据库
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.single_page = Set(Some(pages_info.len() == 1));
video_active_model.tags = Set(Some(serde_json::to_value(tags).unwrap()));
video_active_model.save(&txn).await?;
txn.commit().await?;
}
info!("fetch video details in favorite: {} done.", favorite_model.f_id);
Ok(favorite_model)
}
/// 下载所有未处理成功的视频
pub async fn download_unprocessed_videos(
bili_client: &BiliClient,
favorite_model: favorite::Model,
connection: &DatabaseConnection,
@@ -90,38 +136,48 @@ pub async fn download_favorite(
let semaphore = Semaphore::new(5);
let downloader = Downloader::default();
let mut uppers_mutex: HashMap<i32, (Mutex<()>, Mutex<()>)> = HashMap::new();
for (video, _) in &unhandled_videos_pages {
uppers_mutex.insert(video.id, (Mutex::new(()), Mutex::new(())));
for (video_model, _) in &unhandled_videos_pages {
uppers_mutex.insert(video_model.upper_id, (Mutex::new(()), Mutex::new(())));
}
let mut tasks = FuturesUnordered::new();
for (video_model, pages) in unhandled_videos_pages {
let upper_mutex = uppers_mutex.get(&video_model.id).unwrap();
tasks.push(download_video_pages(
bili_client,
video_model,
pages,
connection,
&semaphore,
&downloader,
upper_mutex,
));
}
// 创建好下载任务,等待下载任务运行即可,任务结束会返回 Result<VideoModel>
let mut tasks = unhandled_videos_pages
.into_iter()
.map(|(video_model, pages_model)| {
let upper_mutex = uppers_mutex.get(&video_model.upper_id).unwrap();
download_video_pages(
bili_client,
video_model,
pages_model,
connection,
&semaphore,
&downloader,
upper_mutex,
)
})
.collect::<FuturesUnordered<_>>();
let mut models = Vec::with_capacity(10);
while let Some(res) = tasks.next().await {
match res {
Ok(video_model) => {
info!(
"Video {} processed:\n\t {}",
&video_model.bvid,
VideoStatus::new(video_model.download_status)
);
Ok(model) => {
models.push(model);
}
Err(e) => {
error!("Video processed failed: {e}");
if e.downcast_ref::<DownloadAbortError>().is_some() {
warn!("{e}");
break;
}
}
}
// 满十个就写入数据库
if models.len() == 10 {
video::Entity::insert_many(std::mem::replace(&mut models, Vec::with_capacity(10)))
.exec(connection)
.await?;
}
}
info!("Done.");
if !models.is_empty() {
video::Entity::insert_many(models).exec(connection).await?;
}
info!("download videos in favorite: {} done.", favorite_model.f_id);
Ok(())
}
@@ -133,13 +189,14 @@ pub async fn download_video_pages(
semaphore: &Semaphore,
downloader: &Downloader,
upper_mutex: &(Mutex<()>, Mutex<()>),
) -> Result<video::Model> {
) -> Result<video::ActiveModel> {
let permit = semaphore.acquire().await;
if let Err(e) = permit {
return Err(e.into());
bail!(e);
}
let mut status = VideoStatus::new(video_model.download_status);
let seprate_status = status.should_run();
let base_path = Path::new(&video_model.path);
let upper_id = video_model.upper_id.to_string();
let base_upper_path = config_dir()
@@ -148,6 +205,7 @@ pub async fn download_video_pages(
.join("upper")
.join(upper_id.chars().next().unwrap().to_string())
.join(upper_id);
let is_single_page = video_model.single_page.unwrap();
// 对于单页视频page 的下载已经足够
// 对于多页视频page 下载仅包含了分集内容,需要额外补上视频的 poster 的 tvshow.nfo
@@ -190,12 +248,31 @@ pub async fn download_video_pages(
downloader,
)),
];
let results = futures::future::join_all(tasks).await;
let tasks: FuturesOrdered<_> = tasks.into_iter().collect();
let results: Vec<Result<()>> = tasks.collect().await;
status.update_status(&results);
results
.iter()
.take(4)
.zip(["poster", "video nfo", "upper face", "upper nfo"])
.for_each(|(res, task_name)| {
if res.is_err() {
error!(
"Video {} {} failed: {}",
&video_model.bvid,
task_name,
res.as_ref().unwrap_err()
);
}
});
if let Err(e) = results.into_iter().nth(4).unwrap() {
if let Ok(e) = e.downcast::<DownloadAbortError>() {
bail!(e);
}
}
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.download_status = Set(status.into());
video_active_model = video_active_model.save(connection).await?;
Ok(video_active_model.try_into_model()?)
Ok(video_active_model)
}
pub async fn dispatch_download_page(
@@ -211,51 +288,44 @@ pub async fn dispatch_download_page(
}
// 对于视频的分页,允许同时下载三个同时下载(绝大部分是单页视频)
let child_semaphore = Semaphore::new(5);
let mut tasks = FuturesUnordered::new();
for page_model in pages {
tasks.push(download_page(
bili_client,
video_model,
page_model,
connection,
&child_semaphore,
downloader,
));
}
let mut final_result = Ok(());
let mut tasks = pages
.into_iter()
.map(|page_model| download_page(bili_client, video_model, page_model, &child_semaphore, downloader))
.collect::<FuturesUnordered<_>>();
// 任务结束会返回 Result<PageModel>
let mut models = Vec::with_capacity(10);
while let Some(res) = tasks.next().await {
match res {
Ok(page_model) => {
let page_status = PageStatus::new(page_model.download_status);
info!(
"Video {} page {} processed:\n\t {}",
&video_model.bvid, page_model.pid, &page_status
);
if final_result.is_ok() && page_status.should_run().iter().any(|x| *x) {
final_result = Err(anyhow!("Some page item download failed"));
}
Ok(model) => {
models.push(model);
}
Err(e) => {
error!("Video {} processed failed: {e}", &video_model.bvid);
if final_result.is_ok() {
// 只需要设置一次,作为一个状态标记
final_result = Err(e);
if e.downcast_ref::<DownloadAbortError>().is_some() {
warn!("{e}");
break;
}
}
}
if models.len() == 10 {
page::Entity::insert_many(std::mem::replace(&mut models, Vec::with_capacity(10)))
.exec(connection)
.await?;
}
}
final_result
if !models.is_empty() {
page::Entity::insert_many(models).exec(connection).await?;
}
Ok(())
}
pub async fn download_page(
bili_client: &BiliClient,
video_model: &video::Model,
page_model: page::Model,
connection: &DatabaseConnection,
semaphore: &Semaphore,
downloader: &Downloader,
) -> Result<page::Model> {
) -> Result<page::ActiveModel> {
let permit = semaphore.acquire().await;
if let Err(e) = permit {
return Err(e.into());
@@ -313,13 +383,33 @@ pub async fn download_page(
)),
Box::pin(generate_page_nfo(seprate_status[2], video_model, &page_model, nfo_path)),
];
let results = futures::future::join_all(tasks).await;
let tasks: FuturesOrdered<_> = tasks.into_iter().collect();
let results: Vec<Result<()>> = tasks.collect().await;
status.update_status(&results);
results
.iter()
.zip(["poster", "video", "nfo"])
.for_each(|(res, task_name)| {
if res.is_err() {
error!(
"Video {} page {} {} failed: {}",
&video_model.bvid,
page_model.pid,
task_name,
res.as_ref().unwrap_err()
);
}
});
// 查看下载视频的状态,该状态会影响上层是否 break
if let Err(e) = results.into_iter().nth(1).unwrap() {
if let Ok(e) = e.downcast::<DownloadAbortError>() {
bail!(e);
}
}
let mut page_active_model: page::ActiveModel = page_model.into();
page_active_model.download_status = Set(status.into());
page_active_model.path = Set(Some(video_path.to_str().unwrap().to_string()));
page_active_model = page_active_model.save(connection).await?;
Ok(page_active_model.try_into_model().unwrap())
Ok(page_active_model)
}
pub async fn fetch_page_poster(

View File

@@ -152,32 +152,36 @@ pub async fn create_videos(
Ok(())
}
/// 筛选所有符合条件的视频
pub async fn filter_videos(
videos_info: &[VideoInfo],
pub async fn total_video_count(favorite_model: &favorite::Model, connection: &DatabaseConnection) -> Result<u64> {
Ok(video::Entity::find()
.filter(video::Column::FavoriteId.eq(favorite_model.id))
.count(connection)
.await?)
}
/// 筛选所有未
pub async fn filter_unfilled_videos(
favorite_model: &favorite::Model,
only_unhandled: bool,
only_no_page: bool,
connection: &DatabaseConnection,
) -> Result<Vec<video::Model>> {
let bvids = videos_info.iter().map(|v| v.bvid.clone()).collect::<Vec<String>>();
let mut condition = video::Column::FavoriteId
.eq(favorite_model.id)
.and(video::Column::Bvid.is_in(bvids))
.and(video::Column::Valid.eq(true));
if only_unhandled {
condition = condition.and(video::Column::DownloadStatus.lt(Status::handled()));
}
if only_no_page {
condition = condition.and(video::Column::SinglePage.is_null());
}
Ok(video::Entity::find().filter(condition).all(connection).await?)
Ok(video::Entity::find()
.filter(
video::Column::FavoriteId
.eq(favorite_model.id)
.and(video::Column::Valid.eq(true))
.and(video::Column::DownloadStatus.eq(0))
.and(video::Column::Category.eq(2))
.and(video::Column::SinglePage.is_null()),
)
.all(connection)
.await?)
}
/// 创建视频的所有分 P
pub async fn create_video_pages(
pages_info: &[PageInfo],
video_model: &video::Model,
connection: &DatabaseConnection,
connection: &impl ConnectionTrait,
) -> Result<()> {
let page_models = pages_info
.iter()