diff --git a/src/core/command.rs b/src/core/command.rs index efdafb5..5b2ea85 100644 --- a/src/core/command.rs +++ b/src/core/command.rs @@ -2,38 +2,43 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::pin::Pin; -use anyhow::{anyhow, Result}; +use anyhow::{bail, Result}; use dirs::config_dir; use entity::{favorite, page, video}; use filenamify::filenamify; -use futures::stream::FuturesUnordered; +use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{pin_mut, Future, StreamExt}; -use log::{error, info}; +use log::{error, info, warn}; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; -use sea_orm::TryIntoModel; +use sea_orm::TransactionTrait; use serde_json::json; use tokio::fs; use tokio::sync::{Mutex, Semaphore}; use super::status::{PageStatus, VideoStatus}; use super::utils::{unhandled_videos_pages, ModelWrapper, NFOMode, NFOSerializer, TEMPLATE}; -use crate::bilibili::{BestStream, BiliClient, FavoriteList, FilterOption, PageInfo, Video}; -use crate::core::utils::{create_video_pages, create_videos, exist_labels, filter_videos, handle_favorite_info}; +use crate::bilibili::{BestStream, BiliClient, BiliError, FavoriteList, FilterOption, PageInfo, Video}; +use crate::core::utils::{ + create_video_pages, create_videos, exist_labels, filter_unfilled_videos, handle_favorite_info, total_video_count, +}; use crate::downloader::Downloader; +use crate::error::DownloadAbortError; /// 处理某个收藏夹,首先刷新收藏夹信息,然后下载收藏夹中未下载成功的视频 -pub async fn process_favorite( +pub async fn process_favorite_list( bili_client: &BiliClient, fid: &str, path: &str, connection: &DatabaseConnection, ) -> Result<()> { - let favorite_model = refresh_favorite(bili_client, fid, path, connection).await?; - download_favorite(bili_client, favorite_model, connection).await + let favorite_model = refresh_favorite_list(bili_client, fid, path, connection).await?; + let favorite_model = fetch_video_details(bili_client, favorite_model, connection).await?; + download_unprocessed_videos(bili_client, favorite_model, connection).await } -pub async fn refresh_favorite( +/// 获取收藏夹 Model,从收藏夹列表中获取所有新添加的视频,将其写入数据库 +pub async fn refresh_favorite_list( bili_client: &BiliClient, fid: &str, path: &str, @@ -42,12 +47,14 @@ pub async fn refresh_favorite( let bili_favorite_list = FavoriteList::new(bili_client, fid.to_owned()); let favorite_list_info = bili_favorite_list.get_info().await?; let favorite_model = handle_favorite_info(&favorite_list_info, path, connection).await?; - info!("Scan the favorite: {fid}"); + info!("Scan the favorite: {fid}."); // 每十个视频一组,避免太多的数据库操作 let video_stream = bili_favorite_list.into_video_stream().chunks(10); pin_mut!(video_stream); + let mut got_count = 0; + let total_count = total_video_count(&favorite_model, connection).await?; while let Some(videos_info) = video_stream.next().await { - info!("got {} new videos...", videos_info.len()); + got_count += videos_info.len(); let exist_labels = exist_labels(&videos_info, &favorite_model, connection).await?; // 如果发现有视频的收藏时间和 bvid 和数据库中重合,说明到达了上次处理到的地方,可以直接退出 let should_break = videos_info @@ -55,31 +62,70 @@ pub async fn refresh_favorite( .any(|v| exist_labels.contains(&(v.bvid.clone(), v.fav_time.naive_utc()))); // 将视频信息写入数据库 create_videos(&videos_info, &favorite_model, connection).await?; - // 找到这些视频中没有下载过,也没有 page 的视频 - let unrefreshed_video_models = filter_videos(&videos_info, &favorite_model, true, true, connection).await?; - if !unrefreshed_video_models.is_empty() { - for video_model in unrefreshed_video_models { - let bili_video = Video::new(bili_client, video_model.bvid.clone()); - let tags = bili_video.get_tags().await?; - let pages_info = bili_video.get_pages().await?; - // 记录视频的分集信息 - create_video_pages(&pages_info, &video_model, connection).await?; - let mut video_active_model: video::ActiveModel = video_model.into(); - video_active_model.single_page = Set(Some(pages_info.len() == 1)); - video_active_model.tags = Set(Some(serde_json::to_value(tags).unwrap())); - // 记录视频是否是单页,以及标签信息 - video_active_model.save(connection).await?; - } - } if should_break { + info!("Reach the last processed processed position, break.."); break; } } - info!("refresh videos done"); + let total_count = total_video_count(&favorite_model, connection).await? - total_count; + info!("Scan the favorite: {fid} done, got {got_count} videos, {total_count} new videos."); Ok(favorite_model) } -pub async fn download_favorite( +/// 筛选出所有没有获取到分页信息和 tag 的视频,请求分页信息和 tag 并写入数据库 +pub async fn fetch_video_details( + bili_client: &BiliClient, + favorite_model: favorite::Model, + connection: &DatabaseConnection, +) -> Result { + info!("start to fetch video details in favorite: {}", favorite_model.f_id); + let videos_model = filter_unfilled_videos(&favorite_model, connection).await?; + for video_model in videos_model { + let bili_video = Video::new(bili_client, video_model.bvid.clone()); + let tags = match bili_video.get_tags().await { + Ok(tags) => tags, + Err(e) => { + error!("failed to get tags for video: {}, {}", &video_model.bvid, e); + if let Some(BiliError::RequestFailed(code, _)) = e.downcast_ref::() { + if *code == -404 { + let mut video_active_model: video::ActiveModel = video_model.into(); + video_active_model.valid = Set(false); + video_active_model.save(connection).await?; + } + } + continue; + } + }; + let pages_info = match bili_video.get_pages().await { + Ok(pages) => pages, + Err(e) => { + error!("failed to get pages for video: {}, {}", &video_model.bvid, e); + if let Some(BiliError::RequestFailed(code, _)) = e.downcast_ref::() { + if *code == -404 { + let mut video_active_model: video::ActiveModel = video_model.into(); + video_active_model.valid = Set(false); + video_active_model.save(connection).await?; + } + } + continue; + } + }; + let txn = connection.begin().await?; + // 将分页信息写入数据库 + create_video_pages(&pages_info, &video_model, &txn).await?; + // 将页标记和 tag 写入数据库 + let mut video_active_model: video::ActiveModel = video_model.into(); + video_active_model.single_page = Set(Some(pages_info.len() == 1)); + video_active_model.tags = Set(Some(serde_json::to_value(tags).unwrap())); + video_active_model.save(&txn).await?; + txn.commit().await?; + } + info!("fetch video details in favorite: {} done.", favorite_model.f_id); + Ok(favorite_model) +} + +/// 下载所有未处理成功的视频 +pub async fn download_unprocessed_videos( bili_client: &BiliClient, favorite_model: favorite::Model, connection: &DatabaseConnection, @@ -90,38 +136,48 @@ pub async fn download_favorite( let semaphore = Semaphore::new(5); let downloader = Downloader::default(); let mut uppers_mutex: HashMap, Mutex<()>)> = HashMap::new(); - for (video, _) in &unhandled_videos_pages { - uppers_mutex.insert(video.id, (Mutex::new(()), Mutex::new(()))); + for (video_model, _) in &unhandled_videos_pages { + uppers_mutex.insert(video_model.upper_id, (Mutex::new(()), Mutex::new(()))); } - let mut tasks = FuturesUnordered::new(); - for (video_model, pages) in unhandled_videos_pages { - let upper_mutex = uppers_mutex.get(&video_model.id).unwrap(); - tasks.push(download_video_pages( - bili_client, - video_model, - pages, - connection, - &semaphore, - &downloader, - upper_mutex, - )); - } - // 创建好下载任务,等待下载任务运行即可,任务结束会返回 Result + let mut tasks = unhandled_videos_pages + .into_iter() + .map(|(video_model, pages_model)| { + let upper_mutex = uppers_mutex.get(&video_model.upper_id).unwrap(); + download_video_pages( + bili_client, + video_model, + pages_model, + connection, + &semaphore, + &downloader, + upper_mutex, + ) + }) + .collect::>(); + let mut models = Vec::with_capacity(10); while let Some(res) = tasks.next().await { match res { - Ok(video_model) => { - info!( - "Video {} processed:\n\t {}", - &video_model.bvid, - VideoStatus::new(video_model.download_status) - ); + Ok(model) => { + models.push(model); } Err(e) => { - error!("Video processed failed: {e}"); + if e.downcast_ref::().is_some() { + warn!("{e}"); + break; + } } } + // 满十个就写入数据库 + if models.len() == 10 { + video::Entity::insert_many(std::mem::replace(&mut models, Vec::with_capacity(10))) + .exec(connection) + .await?; + } } - info!("Done."); + if !models.is_empty() { + video::Entity::insert_many(models).exec(connection).await?; + } + info!("download videos in favorite: {} done.", favorite_model.f_id); Ok(()) } @@ -133,13 +189,14 @@ pub async fn download_video_pages( semaphore: &Semaphore, downloader: &Downloader, upper_mutex: &(Mutex<()>, Mutex<()>), -) -> Result { +) -> Result { let permit = semaphore.acquire().await; if let Err(e) = permit { - return Err(e.into()); + bail!(e); } let mut status = VideoStatus::new(video_model.download_status); let seprate_status = status.should_run(); + let base_path = Path::new(&video_model.path); let upper_id = video_model.upper_id.to_string(); let base_upper_path = config_dir() @@ -148,6 +205,7 @@ pub async fn download_video_pages( .join("upper") .join(upper_id.chars().next().unwrap().to_string()) .join(upper_id); + let is_single_page = video_model.single_page.unwrap(); // 对于单页视频,page 的下载已经足够 // 对于多页视频,page 下载仅包含了分集内容,需要额外补上视频的 poster 的 tvshow.nfo @@ -190,12 +248,31 @@ pub async fn download_video_pages( downloader, )), ]; - let results = futures::future::join_all(tasks).await; + let tasks: FuturesOrdered<_> = tasks.into_iter().collect(); + let results: Vec> = tasks.collect().await; status.update_status(&results); + results + .iter() + .take(4) + .zip(["poster", "video nfo", "upper face", "upper nfo"]) + .for_each(|(res, task_name)| { + if res.is_err() { + error!( + "Video {} {} failed: {}", + &video_model.bvid, + task_name, + res.as_ref().unwrap_err() + ); + } + }); + if let Err(e) = results.into_iter().nth(4).unwrap() { + if let Ok(e) = e.downcast::() { + bail!(e); + } + } let mut video_active_model: video::ActiveModel = video_model.into(); video_active_model.download_status = Set(status.into()); - video_active_model = video_active_model.save(connection).await?; - Ok(video_active_model.try_into_model()?) + Ok(video_active_model) } pub async fn dispatch_download_page( @@ -211,51 +288,44 @@ pub async fn dispatch_download_page( } // 对于视频的分页,允许同时下载三个同时下载(绝大部分是单页视频) let child_semaphore = Semaphore::new(5); - let mut tasks = FuturesUnordered::new(); - for page_model in pages { - tasks.push(download_page( - bili_client, - video_model, - page_model, - connection, - &child_semaphore, - downloader, - )); - } - let mut final_result = Ok(()); + let mut tasks = pages + .into_iter() + .map(|page_model| download_page(bili_client, video_model, page_model, &child_semaphore, downloader)) + .collect::>(); // 任务结束会返回 Result + + let mut models = Vec::with_capacity(10); while let Some(res) = tasks.next().await { match res { - Ok(page_model) => { - let page_status = PageStatus::new(page_model.download_status); - info!( - "Video {} page {} processed:\n\t {}", - &video_model.bvid, page_model.pid, &page_status - ); - if final_result.is_ok() && page_status.should_run().iter().any(|x| *x) { - final_result = Err(anyhow!("Some page item download failed")); - } + Ok(model) => { + models.push(model); } Err(e) => { - error!("Video {} processed failed: {e}", &video_model.bvid); - if final_result.is_ok() { - // 只需要设置一次,作为一个状态标记 - final_result = Err(e); + if e.downcast_ref::().is_some() { + warn!("{e}"); + break; } } } + if models.len() == 10 { + page::Entity::insert_many(std::mem::replace(&mut models, Vec::with_capacity(10))) + .exec(connection) + .await?; + } } - final_result + if !models.is_empty() { + page::Entity::insert_many(models).exec(connection).await?; + } + Ok(()) } pub async fn download_page( bili_client: &BiliClient, video_model: &video::Model, page_model: page::Model, - connection: &DatabaseConnection, semaphore: &Semaphore, downloader: &Downloader, -) -> Result { +) -> Result { let permit = semaphore.acquire().await; if let Err(e) = permit { return Err(e.into()); @@ -313,13 +383,33 @@ pub async fn download_page( )), Box::pin(generate_page_nfo(seprate_status[2], video_model, &page_model, nfo_path)), ]; - let results = futures::future::join_all(tasks).await; + let tasks: FuturesOrdered<_> = tasks.into_iter().collect(); + let results: Vec> = tasks.collect().await; status.update_status(&results); + results + .iter() + .zip(["poster", "video", "nfo"]) + .for_each(|(res, task_name)| { + if res.is_err() { + error!( + "Video {} page {} {} failed: {}", + &video_model.bvid, + page_model.pid, + task_name, + res.as_ref().unwrap_err() + ); + } + }); + // 查看下载视频的状态,该状态会影响上层是否 break + if let Err(e) = results.into_iter().nth(1).unwrap() { + if let Ok(e) = e.downcast::() { + bail!(e); + } + } let mut page_active_model: page::ActiveModel = page_model.into(); page_active_model.download_status = Set(status.into()); page_active_model.path = Set(Some(video_path.to_str().unwrap().to_string())); - page_active_model = page_active_model.save(connection).await?; - Ok(page_active_model.try_into_model().unwrap()) + Ok(page_active_model) } pub async fn fetch_page_poster( diff --git a/src/core/utils.rs b/src/core/utils.rs index 0696b6b..65e3bd1 100644 --- a/src/core/utils.rs +++ b/src/core/utils.rs @@ -152,32 +152,36 @@ pub async fn create_videos( Ok(()) } -/// 筛选所有符合条件的视频 -pub async fn filter_videos( - videos_info: &[VideoInfo], +pub async fn total_video_count(favorite_model: &favorite::Model, connection: &DatabaseConnection) -> Result { + Ok(video::Entity::find() + .filter(video::Column::FavoriteId.eq(favorite_model.id)) + .count(connection) + .await?) +} + +/// 筛选所有未 +pub async fn filter_unfilled_videos( favorite_model: &favorite::Model, - only_unhandled: bool, - only_no_page: bool, connection: &DatabaseConnection, ) -> Result> { - let bvids = videos_info.iter().map(|v| v.bvid.clone()).collect::>(); - let mut condition = video::Column::FavoriteId - .eq(favorite_model.id) - .and(video::Column::Bvid.is_in(bvids)) - .and(video::Column::Valid.eq(true)); - if only_unhandled { - condition = condition.and(video::Column::DownloadStatus.lt(Status::handled())); - } - if only_no_page { - condition = condition.and(video::Column::SinglePage.is_null()); - } - Ok(video::Entity::find().filter(condition).all(connection).await?) + Ok(video::Entity::find() + .filter( + video::Column::FavoriteId + .eq(favorite_model.id) + .and(video::Column::Valid.eq(true)) + .and(video::Column::DownloadStatus.eq(0)) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_null()), + ) + .all(connection) + .await?) } + /// 创建视频的所有分 P pub async fn create_video_pages( pages_info: &[PageInfo], video_model: &video::Model, - connection: &DatabaseConnection, + connection: &impl ConnectionTrait, ) -> Result<()> { let page_models = pages_info .iter()