From 40ad37befc7ab5f4f6ef4bcfbfed9898b3ee07f9 Mon Sep 17 00:00:00 2001 From: amtoaer Date: Fri, 29 Mar 2024 00:30:09 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=90=AD=E8=B5=B7=E4=B8=8B=E8=BD=BD?= =?UTF-8?q?=E6=B5=81=E7=A8=8B=E7=9A=84=E6=A1=86=E6=9E=B6=EF=BC=8C=E5=BE=85?= =?UTF-8?q?=E5=A1=AB=E5=85=85=E5=85=B7=E4=BD=93=E4=B8=8B=E8=BD=BD=E8=BF=87?= =?UTF-8?q?=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- entity/src/entities/page.rs | 2 +- src/core/command.rs | 158 ++++++++++++++++++++++++++++++++++-- src/core/mod.rs | 1 + src/core/status.rs | 62 ++++++++++++++ src/core/utils.rs | 25 +----- 5 files changed, 218 insertions(+), 30 deletions(-) create mode 100644 src/core/status.rs diff --git a/entity/src/entities/page.rs b/entity/src/entities/page.rs index b181e47..a534b14 100644 --- a/entity/src/entities/page.rs +++ b/entity/src/entities/page.rs @@ -14,7 +14,7 @@ pub struct Model { pub path: String, pub image: String, pub valid: bool, - pub download_status: i32, + pub download_status: u32, pub created_at: String, } diff --git a/src/core/command.rs b/src/core/command.rs index aada26e..e0ef508 100644 --- a/src/core/command.rs +++ b/src/core/command.rs @@ -1,15 +1,23 @@ +use std::pin::Pin; use std::sync::Arc; -use entity::video; +use entity::{favorite, page, video}; +use futures::stream::FuturesUnordered; +use futures::Future; use futures_util::{pin_mut, StreamExt}; -use log::info; +use log::{error, info}; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; +use sea_orm::TryIntoModel; +use tokio::sync::Semaphore; +use super::status::Status; +use super::utils::unhandled_videos_pages; use crate::bilibili::{BiliClient, FavoriteList, Video}; use crate::core::utils::{ create_video_pages, create_videos, exist_labels, filter_videos, handle_favorite_info, }; +use crate::downloader::Downloader; use crate::Result; pub async fn process_favorite( @@ -17,8 +25,8 @@ pub async fn process_favorite( fid: &str, connection: Arc, ) -> Result<()> { - refresh_favorite(bili_client.clone(), fid, connection.clone()).await?; - download_favorite(bili_client.clone(), fid, connection.clone()).await?; + let favorite_model = refresh_favorite(bili_client.clone(), fid, connection.clone()).await?; + download_favorite(bili_client.clone(), favorite_model, connection.clone()).await?; Ok(()) } @@ -26,7 +34,7 @@ pub async fn refresh_favorite( bili_client: Arc, fid: &str, connection: Arc, -) -> Result<()> { +) -> Result { let bili_favorite_list = FavoriteList::new(bili_client.clone(), fid.to_owned()); let favorite_list_info = bili_favorite_list.get_info().await?; let favorite_model = handle_favorite_info(&favorite_list_info, connection.as_ref()).await?; @@ -64,14 +72,148 @@ pub async fn refresh_favorite( break; } } - Ok(()) + Ok(favorite_model) } #[allow(unused_variables)] pub async fn download_favorite( bili_client: Arc, - fid: &str, + favorite_model: favorite::Model, connection: Arc, ) -> Result<()> { - todo!(); + let unhandled_videos_pages = + unhandled_videos_pages(&favorite_model, connection.as_ref()).await?; + let semaphore = Arc::new(Semaphore::new(3)); + let downloader = Arc::new(Downloader::default()); + let mut tasks = FuturesUnordered::new(); + for (video_model, pages) in unhandled_videos_pages { + tasks.push(Box::pin(download_video_pages( + bili_client.clone(), + video_model, + pages, + connection.clone(), + semaphore.clone(), + ))); + } + while let Some(res) = tasks.next().await { + if let Err(e) = res { + error!("Error: {e}"); + } + } + Ok(()) +} + +pub async fn download_video_pages( + bili_client: Arc, + video_model: video::Model, + pages: Vec, + connection: Arc, + semaphore: Arc, +) -> Result<()> { + let permit = semaphore.acquire().await; + if let Err(e) = permit { + return Err(e.into()); + } + let child_semaphore = Arc::new(Semaphore::new(5)); + let mut tasks = FuturesUnordered::new(); + for page_model in pages { + tasks.push(Box::pin(download_page( + bili_client.clone(), + &video_model, + page_model, + connection.clone(), + child_semaphore.clone(), + ))); + } + while let Some(res) = tasks.next().await { + if let Err(e) = res { + error!("Error: {e}"); + } + } + Ok(()) +} + +pub async fn download_page( + bili_client: Arc, + video_model: &video::Model, + page_model: page::Model, + connection: Arc, + semaphore: Arc, +) -> Result { + let permit = semaphore.acquire().await; + if let Err(e) = permit { + return Err(e.into()); + } + let mut status = Status::new(page_model.download_status); + let seprate_status = status.should_run(); + let tasks: Vec>>>> = vec![ + // 暂时不支持下载字幕 + Box::pin(download_poster( + seprate_status[0], + &bili_client, + video_model, + &page_model, + )), + Box::pin(download_upper( + seprate_status[1], + &bili_client, + video_model, + &page_model, + )), + Box::pin(download_video( + seprate_status[2], + &bili_client, + video_model, + &page_model, + )), + Box::pin(generate_nfo( + seprate_status[3], + &bili_client, + video_model, + &page_model, + )), + ]; + let results = futures::future::join_all(tasks).await; + status.update_status(&results); + let mut page_active_model: page::ActiveModel = page_model.into(); + page_active_model.download_status = Set(status.into()); + page_active_model = page_active_model.save(connection.as_ref()).await?; + Ok(page_active_model.try_into_model().unwrap()) +} + +#[allow(unused_variables)] +pub async fn download_poster( + should_run: bool, + bili_client: &Arc, + video_model: &video::Model, + page_model: &page::Model, +) -> Result<()> { + Ok(()) +} +#[allow(unused_variables)] +pub async fn download_upper( + should_run: bool, + bili_client: &Arc, + video_model: &video::Model, + page_model: &page::Model, +) -> Result<()> { + Ok(()) +} +#[allow(unused_variables)] +pub async fn download_video( + should_run: bool, + bili_client: &Arc, + video_model: &video::Model, + page_model: &page::Model, +) -> Result<()> { + Ok(()) +} +#[allow(unused_variables)] +pub async fn generate_nfo( + should_run: bool, + bili_client: &Arc, + video_model: &video::Model, + page_model: &page::Model, +) -> Result<()> { + Ok(()) } diff --git a/src/core/mod.rs b/src/core/mod.rs index 3b143e0..6806150 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,2 +1,3 @@ pub mod command; +pub mod status; pub mod utils; diff --git a/src/core/status.rs b/src/core/status.rs new file mode 100644 index 0000000..8f3fd16 --- /dev/null +++ b/src/core/status.rs @@ -0,0 +1,62 @@ +use crate::Result; + +static STATUS_MAX_RETRY: u32 = 0b100; +static STATUS_OK: u32 = 0b111; + +/// 用来表示下载的状态,不想写太多列了,所以仅使用一个 u32 表示 +/// 从低位开始,固定每三位表示一种数据的状态 +pub struct Status(u32); + +impl Status { + pub fn new(status: u32) -> Self { + Self(status) + } + + pub fn should_run(&self) -> [bool; 4] { + let mut result = [false; 4]; + for (i, res) in result.iter_mut().enumerate() { + *res = self.check_continue(i); + } + result + } + + pub fn update_status(&mut self, result: &[Result<()>]) { + assert!(result.len() >= 4, "result length must be 4"); + for (i, res) in result.iter().enumerate().take(4) { + self.set_result(res, i); + } + if self.should_run().iter().all(|x| !x) { + // 所有任务都成功或者由于尝试次数过多失败,为 status 最高位打上标记,将来不再重试 + self.0 |= 1 << 31; + } + } + + fn check_continue(&self, offset: usize) -> bool { + assert!(offset < 10, "u32 can only store 10 status"); + let helper = !0u32; + let sub_status = self.0 & (helper << (offset * 3)) & (helper >> (32 - 3 * offset - 3)); + sub_status < STATUS_MAX_RETRY + } + + fn set_result(&mut self, result: &Result<()>, offset: usize) { + if result.is_ok() { + self.set_ok(offset); + } else { + self.plus_one(offset); + } + } + + fn plus_one(&mut self, offset: usize) { + self.0 += 1 << (3 * offset); + } + + fn set_ok(&mut self, offset: usize) { + self.0 |= STATUS_OK << (3 * offset); + } +} + +impl From for u32 { + fn from(status: Status) -> Self { + status.0 + } +} diff --git a/src/core/utils.rs b/src/core/utils.rs index 443cf4e..9056cad 100644 --- a/src/core/utils.rs +++ b/src/core/utils.rs @@ -179,33 +179,16 @@ pub async fn create_video_pages( pub async fn unhandled_videos_pages( favorite_model: &favorite::Model, connection: &DatabaseConnection, -) -> Result> { +) -> Result)>> { Ok(video::Entity::find() .filter( video::Column::FavoriteId .eq(favorite_model.id) .and(video::Column::Valid.eq(true)) - .and(video::Column::Handled.eq(false)), + .and(video::Column::Handled.eq(false)) + .and(video::Column::SinglePage.is_not_null()), ) + .find_with_related(page::Entity) .all(connection) .await?) } - -#[cfg(test)] -mod test { - use entity::{page, video}; - use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; - - #[ignore = "just for manual test"] - #[tokio::test] - async fn test_join() { - let video_with_pages: Vec<(video::Model, Vec)> = video::Entity::find() - .filter(video::Column::Handled.eq(false)) - .filter(video::Column::SinglePage.eq(false)) - .find_with_related(page::Entity) - .all(&crate::database::database_connection().await.unwrap()) - .await - .unwrap(); - dbg!(video_with_pages); - } -}