From 5540c465417800d516422eda922f01f80e48925f Mon Sep 17 00:00:00 2001 From: amtoaer Date: Sun, 31 Mar 2024 00:43:25 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=9E=E7=8E=B0=E5=85=A8=E9=83=A8?= =?UTF-8?q?=E7=9A=84=E4=B8=8B=E8=BD=BD=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/core/command.rs | 173 +++++++++++++++++++++++++++++++++++++------- src/core/status.rs | 46 +++++++++++- src/core/utils.rs | 8 +- 3 files changed, 191 insertions(+), 36 deletions(-) diff --git a/src/core/command.rs b/src/core/command.rs index f6f32c3..8a1b8b5 100644 --- a/src/core/command.rs +++ b/src/core/command.rs @@ -1,6 +1,6 @@ +use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::pin::Pin; -use std::sync::Arc; use entity::{favorite, page, video}; use futures::stream::FuturesUnordered; @@ -13,9 +13,9 @@ use sea_orm::ActiveValue::Set; use sea_orm::TryIntoModel; use serde_json::json; use tokio::fs; -use tokio::sync::Semaphore; +use tokio::sync::{Mutex, Semaphore}; -use super::status::PageStatus; +use super::status::{PageStatus, VideoStatus}; use super::utils::{unhandled_videos_pages, ModelWrapper, NFOMode, NFOSerializer}; use crate::bilibili::{BestStream, BiliClient, FavoriteList, FilterOption, PageInfo, Video}; use crate::core::utils::{create_video_pages, create_videos, exist_labels, filter_videos, handle_favorite_info}; @@ -49,7 +49,7 @@ pub async fn refresh_favorite( let video_stream = bili_favorite_list.into_video_stream().chunks(10); pin_mut!(video_stream); while let Some(videos_info) = video_stream.next().await { - info!("handle videos: {}", videos_info.len()); + info!("got {} videos...", videos_info.len()); let exist_labels = exist_labels(&videos_info, &favorite_model, connection).await?; // 如果发现有视频的收藏时间和 bvid 和数据库中重合,说明到达了上次处理到的地方,可以直接退出 let should_break = videos_info @@ -77,7 +77,7 @@ pub async fn refresh_favorite( break; } } - info!("handle videos done"); + info!("refresh videos done"); Ok(favorite_model) } @@ -86,29 +86,44 @@ pub async fn download_favorite( favorite_model: favorite::Model, connection: &DatabaseConnection, ) -> Result<()> { - info!("start to download!"); + info!("start to download videos in favorite: {}", favorite_model.f_id); let unhandled_videos_pages = unhandled_videos_pages(&favorite_model, connection).await?; // 对于视频,允许五个同时下载(视频内还有分页、不同分页还有多种下载任务) - let semaphore = Arc::new(Semaphore::new(5)); + let semaphore = Semaphore::new(5); let downloader = Downloader::default(); + let mut uppers_mutex: HashMap, Mutex<()>)> = HashMap::new(); + for (video, _) in &unhandled_videos_pages { + uppers_mutex.insert(video.id, (Mutex::new(()), Mutex::new(()))); + } let mut tasks = FuturesUnordered::new(); for (video_model, pages) in unhandled_videos_pages { + let upper_mutex = uppers_mutex.get(&video_model.id).unwrap(); tasks.push(download_video_pages( bili_client, video_model, pages, connection, - semaphore.clone(), + &semaphore, &downloader, + upper_mutex, )); } // 创建好下载任务,等待下载任务运行即可,任务结束会返回 Result while let Some(res) = tasks.next().await { - if let Err(e) = res { - error!("Error: {e}"); + match res { + Ok(video_model) => { + info!( + "Video {} processed:\n\t {}", + &video_model.bvid, + VideoStatus::new(video_model.download_status) + ); + } + Err(e) => { + error!("Video processed failed: {e}"); + } } } - info!("download done."); + info!("Done."); Ok(()) } @@ -117,40 +132,107 @@ pub async fn download_video_pages( video_model: video::Model, pages: Vec, connection: &DatabaseConnection, - semaphore: Arc, + semaphore: &Semaphore, downloader: &Downloader, -) -> Result<()> { + upper_mutex: &(Mutex<()>, Mutex<()>), +) -> Result { let permit = semaphore.acquire().await; if let Err(e) = permit { return Err(e.into()); } + let mut status = VideoStatus::new(video_model.download_status); + let seprate_status = status.should_run(); + let base_path = Path::new(&video_model.path); + let is_single_page = video_model.single_page.unwrap(); + // 对于单页视频,page 的下载已经足够 + // 对于多页视频,page 下载仅包含了分集内容,需要额外补上视频的 poster 的 tvshow.nfo + let tasks: Vec>>>> = vec![ + // 下载视频封面 + Box::pin(fetch_video_poster( + seprate_status[0], + &video_model, + downloader, + base_path.join("poster.jpg"), + )), + // 分发分页下载的任务 + Box::pin(dispatch_download_page( + seprate_status[1], + bili_client, + &video_model, + pages, + connection, + downloader, + )), + // 生成视频信息的 nfo + Box::pin(generate_video_nfo( + seprate_status[2] && !is_single_page, + &video_model, + base_path.join("tvshow.nfo"), + )), + // 下载 Up 主头像 + Box::pin(fetch_upper_face( + seprate_status[3], + &video_model, + downloader, + &upper_mutex.0, + base_path.join("upper-face.jpg"), + )), + // 生成 Up 主信息的 nfo + Box::pin(generate_upper_nfo( + seprate_status[4], + &video_model, + &upper_mutex.1, + base_path.join("upper.nfo"), + )), + ]; + let results = futures::future::join_all(tasks).await; + status.update_status(&results); + let mut video_active_model: video::ActiveModel = video_model.into(); + video_active_model.download_status = Set(status.into()); + video_active_model = video_active_model.save(connection).await?; + Ok(video_active_model.try_into_model()?) +} + +pub async fn dispatch_download_page( + should_run: bool, + bili_client: &BiliClient, + video_model: &video::Model, + pages: Vec, + connection: &DatabaseConnection, + downloader: &Downloader, +) -> Result<()> { + if !should_run { + return Ok(()); + } // 对于视频的分页,允许同时下载三个同时下载(绝大部分是单页视频) - let child_semaphore = Arc::new(Semaphore::new(5)); + let child_semaphore = Semaphore::new(5); let mut tasks = FuturesUnordered::new(); for page_model in pages { tasks.push(download_page( bili_client, - &video_model, + video_model, page_model, connection, - child_semaphore.clone(), + &child_semaphore, downloader, )); } - // 同样创建好下载任务等待运行,任务结束会返回 Result + // 任务结束会返回 Result while let Some(res) = tasks.next().await { - if let Err(e) = res { - error!("Error: {e}"); + match res { + Ok(page_model) => { + info!( + "Video {} page {} processed:\n\t {}", + &video_model.bvid, + page_model.pid, + PageStatus::new(page_model.download_status) + ); + } + Err(e) => { + error!("Video {} processed failed: {e}", &video_model.bvid); + } } } - let is_single_page = video_model.single_page.unwrap(); - // 对于单页视频,page 的下载已经足够 - // 对于多页视频,page 下载仅包含了分集内容,需要额外补上视频的 poster 的 tvshow.nfo - if !is_single_page { - let base_path = Path::new(&video_model.path); - generate_video_nfo(true, &video_model, base_path.join("tvshow.nfo")).await?; - fetch_video_poster(true, &video_model, downloader, base_path.join("poster.jpg")).await?; - } Ok(()) } @@ -159,7 +241,7 @@ pub async fn download_page( video_model: &video::Model, page_model: page::Model, connection: &DatabaseConnection, - semaphore: Arc, + semaphore: &Semaphore, downloader: &Downloader, ) -> Result { let permit = semaphore.acquire().await; @@ -317,6 +399,41 @@ pub async fn fetch_video_poster( downloader.fetch(&video_model.cover, &poster_path).await } +pub async fn fetch_upper_face( + should_run: bool, + video_model: &video::Model, + downloader: &Downloader, + upper_face_mutex: &Mutex<()>, + upper_face_path: PathBuf, +) -> Result<()> { + if !should_run { + return Ok(()); + } + // 这个锁是为了避免多个视频同时下载同一个 up 主的头像 + let _permit = upper_face_mutex.lock().await; + if !upper_face_path.exists() { + return downloader.fetch(&video_model.upper_face, &upper_face_path).await; + } + Ok(()) +} + +pub async fn generate_upper_nfo( + should_run: bool, + video_model: &video::Model, + upper_nfo_mutex: &Mutex<()>, + nfo_path: PathBuf, +) -> Result<()> { + if !should_run { + return Ok(()); + } + let _permit = upper_nfo_mutex.lock().await; + if !nfo_path.exists() { + let nfo_serializer = NFOSerializer(ModelWrapper::Video(video_model), NFOMode::UPPER); + return generate_nfo(nfo_serializer, nfo_path).await; + } + Ok(()) +} + pub async fn generate_video_nfo(should_run: bool, video_model: &video::Model, nfo_path: PathBuf) -> Result<()> { if !should_run { return Ok(()); diff --git a/src/core/status.rs b/src/core/status.rs index af60d5f..4469a3d 100644 --- a/src/core/status.rs +++ b/src/core/status.rs @@ -1,3 +1,5 @@ +use core::fmt; + use crate::Result; static STATUS_MAX_RETRY: u32 = 0b100; @@ -75,7 +77,17 @@ impl Status { fn get_status(&self, offset: usize) -> u32 { let helper = !0u32; - self.0 & (helper << (offset * 3)) & (helper >> (32 - 3 * offset - 3)) + (self.0 & (helper << (offset * 3)) & (helper >> (32 - 3 * offset - 3))) >> (offset * 3) + } + + fn display_status(status: u32) -> String { + if status < STATUS_MAX_RETRY { + format!("retry {} times", status) + } else if status == STATUS_OK { + "ok".to_string() + } else { + "failed".to_string() + } } } @@ -85,7 +97,7 @@ impl From for u32 { } } -/// 从前到后分别表示:视频封面、分页下载、视频信息 +/// 从前到后分别表示:视频封面、分页下载、视频信息、Up 主头像、Up 主信息 #[derive(Clone)] pub struct VideoStatus(Status); @@ -95,15 +107,29 @@ impl VideoStatus { } pub fn should_run(&self) -> Vec { - self.0.should_run(3) + self.0.should_run(5) } pub fn update_status(&mut self, result: &[Result<()>]) { - assert!(result.len() == 3, "VideoStatus should have 3 status"); + assert!(result.len() == 5, "VideoStatus should have 5 status"); self.0.update_status(result) } } +impl fmt::Display for VideoStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Video Cover: {}, Page: {}, Video NFO: {}, Up Avatar: {}, Up NFO: {}", + Status::display_status(self.0.get_status(0)), + Status::display_status(self.0.get_status(1)), + Status::display_status(self.0.get_status(2)), + Status::display_status(self.0.get_status(3)), + Status::display_status(self.0.get_status(4)) + ) + } +} + impl From for u32 { fn from(status: VideoStatus) -> Self { status.0.into() @@ -129,6 +155,18 @@ impl PageStatus { } } +impl fmt::Display for PageStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "Page Cover: {}, Page Content: {}, Page NFO: {}", + Status::display_status(self.0.get_status(0)), + Status::display_status(self.0.get_status(1)), + Status::display_status(self.0.get_status(2)) + ) + } +} + impl From for u32 { fn from(status: PageStatus) -> Self { status.0.into() diff --git a/src/core/utils.rs b/src/core/utils.rs index 898dd3d..07eaef8 100644 --- a/src/core/utils.rs +++ b/src/core/utils.rs @@ -11,6 +11,7 @@ use sea_orm::ActiveValue::Set; use sea_orm::QuerySelect; use tokio::io::AsyncWriteExt; +use super::status::Status; use crate::bilibili::{FavoriteListInfo, PageInfo, VideoInfo}; use crate::Result; @@ -102,7 +103,7 @@ pub async fn create_videos( ctime: Set(v.ctime.naive_utc()), pubtime: Set(v.pubtime.naive_utc()), favtime: Set(v.fav_time.naive_utc()), - handled: Set(false), + download_status: Set(0), valid: Set(v.attr == 0), tags: Set(None), single_page: Set(None), @@ -138,7 +139,7 @@ pub async fn filter_videos( .and(video::Column::Bvid.is_in(bvids)) .and(video::Column::Valid.eq(true)); if only_unhandled { - condition = condition.and(video::Column::Handled.eq(false)); + condition = condition.and(video::Column::DownloadStatus.lt(Status::handled())); } if only_no_page { condition = condition.and(video::Column::SinglePage.is_null()); @@ -164,7 +165,6 @@ pub async fn create_video_pages( .unwrap() .to_string()), image: Set(p.first_frame.clone()), - valid: Set(video_model.valid), download_status: Set(0), ..Default::default() }) @@ -191,7 +191,7 @@ pub async fn unhandled_videos_pages( video::Column::FavoriteId .eq(favorite_model.id) .and(video::Column::Valid.eq(true)) - .and(video::Column::Handled.eq(false)) + .and(video::Column::DownloadStatus.lt(Status::handled())) .and(video::Column::SinglePage.is_not_null()), ) .find_with_related(page::Entity)