From 7c220f0d2bf8d6bdab79a05b748cde8cba3646c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=B4=80=E1=B4=8D=E1=B4=9B=E1=B4=8F=E1=B4=80=E1=B4=87?= =?UTF-8?q?=CA=80?= Date: Fri, 24 Jan 2025 01:11:59 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E7=B2=BE=E7=AE=80=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=EF=BC=8C=E7=BB=9F=E4=B8=80=E9=80=BB=E8=BE=91=20(#229)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 1 - crates/bili_sync/Cargo.toml | 1 - crates/bili_sync/src/adapter/collection.rs | 84 ++------ crates/bili_sync/src/adapter/favorite.rs | 81 ++------ crates/bili_sync/src/adapter/helper/mod.rs | 112 ----------- crates/bili_sync/src/adapter/mod.rs | 116 ++++++----- crates/bili_sync/src/adapter/submission.rs | 85 ++------ crates/bili_sync/src/adapter/watch_later.rs | 82 ++------ crates/bili_sync/src/bilibili/analyzer.rs | 1 - crates/bili_sync/src/bilibili/credential.rs | 1 - crates/bili_sync/src/bilibili/mod.rs | 1 - crates/bili_sync/src/bilibili/video.rs | 3 +- crates/bili_sync/src/config/global.rs | 4 +- crates/bili_sync/src/utils/convert.rs | 202 +++++++++----------- crates/bili_sync/src/utils/format_arg.rs | 30 +++ crates/bili_sync/src/utils/mod.rs | 1 + crates/bili_sync/src/utils/model.rs | 81 +++++++- crates/bili_sync/src/workflow.rs | 105 ++++++---- 18 files changed, 387 insertions(+), 604 deletions(-) delete mode 100644 crates/bili_sync/src/adapter/helper/mod.rs create mode 100644 crates/bili_sync/src/utils/format_arg.rs diff --git a/Cargo.lock b/Cargo.lock index 1ee98ea..0809e73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -385,7 +385,6 @@ dependencies = [ "arc-swap", "assert_matches", "async-stream", - "async-trait", "bili_sync_entity", "bili_sync_migration", "chrono", diff --git a/crates/bili_sync/Cargo.toml b/crates/bili_sync/Cargo.toml index dfe8cea..be8f063 100644 --- a/crates/bili_sync/Cargo.toml +++ b/crates/bili_sync/Cargo.toml @@ -12,7 +12,6 @@ readme = "../../README.md" anyhow = { workspace = true } arc-swap = { workspace = true } async-stream = { workspace = true } -async-trait = { workspace = true } bili_sync_entity = { workspace = true } bili_sync_migration = { workspace = true } chrono = { workspace = true } diff --git a/crates/bili_sync/src/adapter/collection.rs b/crates/bili_sync/src/adapter/collection.rs index 58ab665..cdc4df3 100644 --- a/crates/bili_sync/src/adapter/collection.rs +++ b/crates/bili_sync/src/adapter/collection.rs @@ -2,99 +2,39 @@ use std::path::Path; use std::pin::Pin; use anyhow::{Context, Result}; -use async_trait::async_trait; use bili_sync_entity::*; use futures::Stream; use sea_orm::entity::prelude::*; -use sea_orm::sea_query::{IntoCondition, OnConflict}; +use sea_orm::sea_query::{OnConflict, SimpleExpr}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; +use sea_orm::{DatabaseConnection, Unchanged}; -use crate::adapter::{helper, VideoListModel}; -use crate::bilibili::{self, BiliClient, Collection, CollectionItem, CollectionType, VideoInfo}; -use crate::utils::status::STATUS_COMPLETED; +use crate::adapter::{VideoListModel, _ActiveModel}; +use crate::bilibili::{BiliClient, Collection, CollectionItem, CollectionType, VideoInfo}; -#[async_trait] impl VideoListModel for collection::Model { - async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { - helper::filter_videos( - video::Column::CollectionId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.eq(0)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_null()) - .into_condition(), - connection, - ) - .await + fn filter_expr(&self) -> SimpleExpr { + video::Column::CollectionId.eq(self.id) } - async fn unhandled_video_pages( - &self, - connection: &DatabaseConnection, - ) -> Result)>> { - helper::filter_videos_with_pages( - video::Column::CollectionId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(STATUS_COMPLETED)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_not_null()) - .into_condition(), - connection, - ) - .await - } - - fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { - let mut video_model = video_info.to_model(base_model); + fn set_relation_id(&self, video_model: &mut video::ActiveModel) { video_model.collection_id = Set(Some(self.id)); - helper::video_with_path(video_model, &self.path, video_info) } - async fn fetch_videos_detail( - &self, - video: bilibili::Video<'_>, - video_model: video::Model, - connection: &DatabaseConnection, - ) -> Result<()> { - let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; - match info { - Ok((tags, view_info)) => { - let VideoInfo::Detail { pages, .. } = &view_info else { - unreachable!("view_info must be VideoInfo::View") - }; - let txn = connection.begin().await?; - // 将分页信息写入数据库 - helper::create_video_pages(pages, &video_model, &txn).await?; - // 将页标记和 tag 写入数据库 - let mut video_active_model = self.video_model_by_info(&view_info, Some(video_model)); - video_active_model.single_page = Set(Some(pages.len() == 1)); - video_active_model.tags = Set(Some(serde_json::to_value(tags)?)); - video_active_model.save(&txn).await?; - txn.commit().await?; - } - Err(e) => { - helper::error_fetch_video_detail(e, video_model, connection).await?; - } - }; - Ok(()) + fn path(&self) -> &Path { + Path::new(self.path.as_str()) } fn get_latest_row_at(&self) -> DateTime { self.latest_row_at } - async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> { - collection::ActiveModel { + fn update_latest_row_at(&self, datetime: DateTime) -> _ActiveModel { + _ActiveModel::Collection(collection::ActiveModel { id: Unchanged(self.id), latest_row_at: Set(datetime), ..Default::default() - } - .update(connection) - .await?; - Ok(()) + }) } fn log_fetch_video_start(&self) { diff --git a/crates/bili_sync/src/adapter/favorite.rs b/crates/bili_sync/src/adapter/favorite.rs index 71b19e5..eb0ee3f 100644 --- a/crates/bili_sync/src/adapter/favorite.rs +++ b/crates/bili_sync/src/adapter/favorite.rs @@ -2,96 +2,39 @@ use std::path::Path; use std::pin::Pin; use anyhow::{Context, Result}; -use async_trait::async_trait; use bili_sync_entity::*; use futures::Stream; use sea_orm::entity::prelude::*; -use sea_orm::sea_query::{IntoCondition, OnConflict}; +use sea_orm::sea_query::{OnConflict, SimpleExpr}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; +use sea_orm::{DatabaseConnection, Unchanged}; -use crate::adapter::{helper, VideoListModel}; -use crate::bilibili::{self, BiliClient, FavoriteList, VideoInfo}; -use crate::utils::status::STATUS_COMPLETED; +use crate::adapter::{VideoListModel, _ActiveModel}; +use crate::bilibili::{BiliClient, FavoriteList, VideoInfo}; -#[async_trait] impl VideoListModel for favorite::Model { - async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { - helper::filter_videos( - video::Column::FavoriteId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.eq(0)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_null()) - .into_condition(), - connection, - ) - .await + fn filter_expr(&self) -> SimpleExpr { + video::Column::FavoriteId.eq(self.id) } - async fn unhandled_video_pages( - &self, - connection: &DatabaseConnection, - ) -> Result)>> { - helper::filter_videos_with_pages( - video::Column::FavoriteId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(STATUS_COMPLETED)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_not_null()) - .into_condition(), - connection, - ) - .await - } - - fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { - let mut video_model = video_info.to_model(base_model); + fn set_relation_id(&self, video_model: &mut video::ActiveModel) { video_model.favorite_id = Set(Some(self.id)); - helper::video_with_path(video_model, &self.path, video_info) } - async fn fetch_videos_detail( - &self, - video: bilibili::Video<'_>, - video_model: video::Model, - connection: &DatabaseConnection, - ) -> Result<()> { - let info: Result<_> = async { Ok((video.get_tags().await?, video.get_pages().await?)) }.await; - match info { - Ok((tags, pages_info)) => { - let txn = connection.begin().await?; - // 将分页信息写入数据库 - helper::create_video_pages(&pages_info, &video_model, &txn).await?; - // 将页标记和 tag 写入数据库 - let mut video_active_model: video::ActiveModel = video_model.into(); - video_active_model.single_page = Set(Some(pages_info.len() == 1)); - video_active_model.tags = Set(Some(serde_json::to_value(tags)?)); - video_active_model.save(&txn).await?; - txn.commit().await?; - } - Err(e) => { - helper::error_fetch_video_detail(e, video_model, connection).await?; - } - }; - Ok(()) + fn path(&self) -> &Path { + Path::new(self.path.as_str()) } fn get_latest_row_at(&self) -> DateTime { self.latest_row_at } - async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> { - favorite::ActiveModel { + fn update_latest_row_at(&self, datetime: DateTime) -> _ActiveModel { + _ActiveModel::Favorite(favorite::ActiveModel { id: Unchanged(self.id), latest_row_at: Set(datetime), ..Default::default() - } - .update(connection) - .await?; - Ok(()) + }) } fn log_fetch_video_start(&self) { diff --git a/crates/bili_sync/src/adapter/helper/mod.rs b/crates/bili_sync/src/adapter/helper/mod.rs deleted file mode 100644 index 930db2e..0000000 --- a/crates/bili_sync/src/adapter/helper/mod.rs +++ /dev/null @@ -1,112 +0,0 @@ -use std::path::Path; - -use anyhow::Result; -use bili_sync_entity::*; -use sea_orm::entity::prelude::*; -use sea_orm::sea_query::OnConflict; -use sea_orm::ActiveValue::Set; -use sea_orm::{Condition, DatabaseTransaction}; - -use crate::bilibili::{BiliError, PageInfo, VideoInfo}; -use crate::config::{PathSafeTemplate, TEMPLATE}; - -/// 使用 condition 筛选视频,返回视频列表 -pub(super) async fn filter_videos(condition: Condition, conn: &DatabaseConnection) -> Result> { - Ok(video::Entity::find().filter(condition).all(conn).await?) -} - -/// 使用 condition 筛选视频,返回视频列表和相关的分 P 列表 -pub(super) async fn filter_videos_with_pages( - condition: Condition, - conn: &DatabaseConnection, -) -> Result)>> { - Ok(video::Entity::find() - .filter(condition) - .find_with_related(page::Entity) - .all(conn) - .await?) -} - -/// 返回设置了 path 的视频 -pub(super) fn video_with_path( - mut video_model: video::ActiveModel, - base_path: &str, - video_info: &VideoInfo, -) -> video::ActiveModel { - if let Some(fmt_args) = &video_info.to_fmt_args() { - video_model.path = Set(Path::new(base_path) - .join( - TEMPLATE - .path_safe_render("video", fmt_args) - .expect("template render failed"), - ) - .to_string_lossy() - .to_string()); - } - video_model -} - -/// 处理获取视频详细信息失败的情况 -pub(super) async fn error_fetch_video_detail( - e: anyhow::Error, - video_model: bili_sync_entity::video::Model, - connection: &DatabaseConnection, -) -> Result<()> { - error!( - "获取视频 {} - {} 的详细信息失败,错误为:{}", - &video_model.bvid, &video_model.name, e - ); - if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::() { - let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into(); - video_active_model.valid = Set(false); - video_active_model.save(connection).await?; - } - Ok(()) -} - -/// 创建视频的所有分 P -pub(crate) async fn create_video_pages( - pages_info: &[PageInfo], - video_model: &video::Model, - connection: &DatabaseTransaction, -) -> Result<()> { - let page_models = pages_info - .iter() - .map(move |p| { - let (width, height) = match &p.dimension { - Some(d) => { - if d.rotate == 0 { - (Some(d.width), Some(d.height)) - } else { - (Some(d.height), Some(d.width)) - } - } - None => (None, None), - }; - page::ActiveModel { - video_id: Set(video_model.id), - cid: Set(p.cid), - pid: Set(p.page), - name: Set(p.name.clone()), - width: Set(width), - height: Set(height), - duration: Set(p.duration), - image: Set(p.first_frame.clone()), - download_status: Set(0), - ..Default::default() - } - }) - .collect::>(); - for page_chunk in page_models.chunks(50) { - page::Entity::insert_many(page_chunk.to_vec()) - .on_conflict( - OnConflict::columns([page::Column::VideoId, page::Column::Pid]) - .do_nothing() - .to_owned(), - ) - .do_nothing() - .exec(connection) - .await?; - } - Ok(()) -} diff --git a/crates/bili_sync/src/adapter/mod.rs b/crates/bili_sync/src/adapter/mod.rs index 3d0a003..3e40aaf 100644 --- a/crates/bili_sync/src/adapter/mod.rs +++ b/crates/bili_sync/src/adapter/mod.rs @@ -1,6 +1,5 @@ mod collection; mod favorite; -mod helper; mod submission; mod watch_later; @@ -8,16 +7,53 @@ use std::path::Path; use std::pin::Pin; use anyhow::Result; -use async_trait::async_trait; use futures::Stream; use sea_orm::entity::prelude::*; +use sea_orm::sea_query::SimpleExpr; use sea_orm::DatabaseConnection; use crate::adapter::collection::collection_from; use crate::adapter::favorite::favorite_from; use crate::adapter::submission::submission_from; use crate::adapter::watch_later::watch_later_from; -use crate::bilibili::{self, BiliClient, CollectionItem, VideoInfo}; +use crate::bilibili::{BiliClient, CollectionItem, VideoInfo}; + +pub trait VideoListModel { + /// 获取特定视频列表的筛选条件 + fn filter_expr(&self) -> SimpleExpr; + + // 为 video_model 设置该视频列表的关联 id + fn set_relation_id(&self, video_model: &mut bili_sync_entity::video::ActiveModel); + + // 获取视频列表的保存路径 + fn path(&self) -> &Path; + + /// 获取视频 model 中记录的最新时间 + fn get_latest_row_at(&self) -> DateTime; + + /// 更新视频 model 中记录的最新时间,此处返回需要更新的 ActiveModel,接着调用 save 方法执行保存 + /// 不同 VideoListModel 返回的类型不同,为了 VideoListModel 的 object safety 不能使用 impl Trait + /// Box 又提示 ActiveModelTrait 没有 object safety,因此手写一个 Enum 静态分发 + fn update_latest_row_at(&self, datetime: DateTime) -> _ActiveModel; + + /// 开始获取视频 + fn log_fetch_video_start(&self); + + /// 结束获取视频 + fn log_fetch_video_end(&self); + + /// 开始下载视频 + fn log_download_video_start(&self); + + /// 结束下载视频 + fn log_download_video_end(&self); + + /// 开始刷新视频 + fn log_refresh_video_start(&self); + + /// 结束刷新视频 + fn log_refresh_video_end(&self, count: usize); +} pub enum Args<'a> { Favorite { fid: &'a str }, @@ -40,53 +76,29 @@ pub async fn video_list_from<'a>( } } -#[async_trait] -pub trait VideoListModel { - /// 未填充的视频 - async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result>; - - /// 未处理的视频和分页 - async fn unhandled_video_pages( - &self, - connection: &DatabaseConnection, - ) -> Result)>>; - - /// 视频信息对应的视频 model - fn video_model_by_info( - &self, - video_info: &VideoInfo, - base_model: Option, - ) -> bili_sync_entity::video::ActiveModel; - - /// 视频 model 中缺失的信息 - async fn fetch_videos_detail( - &self, - video: bilibili::Video<'_>, - video_model: bili_sync_entity::video::Model, - connection: &DatabaseConnection, - ) -> Result<()>; - - /// 获取视频 model 中记录的最新时间 - fn get_latest_row_at(&self) -> DateTime; - - /// 更新视频 model 中记录的最新时间 - async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()>; - - /// 开始获取视频 - fn log_fetch_video_start(&self); - - /// 结束获取视频 - fn log_fetch_video_end(&self); - - /// 开始下载视频 - fn log_download_video_start(&self); - - /// 结束下载视频 - fn log_download_video_end(&self); - - /// 开始刷新视频 - fn log_refresh_video_start(&self); - - /// 结束刷新视频 - fn log_refresh_video_end(&self, count: usize); +pub enum _ActiveModel { + Favorite(bili_sync_entity::favorite::ActiveModel), + Collection(bili_sync_entity::collection::ActiveModel), + Submission(bili_sync_entity::submission::ActiveModel), + WatchLater(bili_sync_entity::watch_later::ActiveModel), +} + +impl _ActiveModel { + pub async fn save(self, connection: &DatabaseConnection) -> Result<()> { + match self { + _ActiveModel::Favorite(model) => { + model.save(connection).await?; + } + _ActiveModel::Collection(model) => { + model.save(connection).await?; + } + _ActiveModel::Submission(model) => { + model.save(connection).await?; + } + _ActiveModel::WatchLater(model) => { + model.save(connection).await?; + } + } + Ok(()) + } } diff --git a/crates/bili_sync/src/adapter/submission.rs b/crates/bili_sync/src/adapter/submission.rs index db0e845..dcb98d6 100644 --- a/crates/bili_sync/src/adapter/submission.rs +++ b/crates/bili_sync/src/adapter/submission.rs @@ -2,100 +2,39 @@ use std::path::Path; use std::pin::Pin; use anyhow::{Context, Result}; -use async_trait::async_trait; use bili_sync_entity::*; use futures::Stream; use sea_orm::entity::prelude::*; -use sea_orm::sea_query::{IntoCondition, OnConflict}; +use sea_orm::sea_query::{OnConflict, SimpleExpr}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; +use sea_orm::{DatabaseConnection, Unchanged}; -use crate::adapter::helper::video_with_path; -use crate::adapter::{helper, VideoListModel}; -use crate::bilibili::{self, BiliClient, Submission, VideoInfo}; -use crate::utils::status::STATUS_COMPLETED; +use crate::adapter::{VideoListModel, _ActiveModel}; +use crate::bilibili::{BiliClient, Submission, VideoInfo}; -#[async_trait] impl VideoListModel for submission::Model { - async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { - helper::filter_videos( - video::Column::SubmissionId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.eq(0)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_null()) - .into_condition(), - connection, - ) - .await + fn filter_expr(&self) -> SimpleExpr { + video::Column::SubmissionId.eq(self.id) } - async fn unhandled_video_pages( - &self, - connection: &DatabaseConnection, - ) -> Result)>> { - helper::filter_videos_with_pages( - video::Column::SubmissionId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(STATUS_COMPLETED)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_not_null()) - .into_condition(), - connection, - ) - .await - } - - fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { - let mut video_model = video_info.to_model(base_model); + fn set_relation_id(&self, video_model: &mut video::ActiveModel) { video_model.submission_id = Set(Some(self.id)); - video_with_path(video_model, &self.path, video_info) } - async fn fetch_videos_detail( - &self, - video: bilibili::Video<'_>, - video_model: video::Model, - connection: &DatabaseConnection, - ) -> Result<()> { - let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; - match info { - Ok((tags, view_info)) => { - let VideoInfo::Detail { pages, .. } = &view_info else { - unreachable!("view_info must be VideoInfo::View") - }; - let txn = connection.begin().await?; - // 将分页信息写入数据库 - helper::create_video_pages(pages, &video_model, &txn).await?; - // 将页标记和 tag 写入数据库 - let mut video_active_model = self.video_model_by_info(&view_info, Some(video_model)); - video_active_model.single_page = Set(Some(pages.len() == 1)); - video_active_model.tags = Set(Some(serde_json::to_value(tags)?)); - video_active_model.save(&txn).await?; - txn.commit().await?; - } - Err(e) => { - helper::error_fetch_video_detail(e, video_model, connection).await?; - } - }; - Ok(()) + fn path(&self) -> &Path { + Path::new(self.path.as_str()) } fn get_latest_row_at(&self) -> DateTime { self.latest_row_at } - async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> { - submission::ActiveModel { + fn update_latest_row_at(&self, datetime: DateTime) -> _ActiveModel { + _ActiveModel::Submission(submission::ActiveModel { id: Unchanged(self.id), latest_row_at: Set(datetime), ..Default::default() - } - .update(connection) - .await?; - Ok(()) + }) } fn log_fetch_video_start(&self) { diff --git a/crates/bili_sync/src/adapter/watch_later.rs b/crates/bili_sync/src/adapter/watch_later.rs index 87cbbed..6fd20b5 100644 --- a/crates/bili_sync/src/adapter/watch_later.rs +++ b/crates/bili_sync/src/adapter/watch_later.rs @@ -2,97 +2,39 @@ use std::path::Path; use std::pin::Pin; use anyhow::{Context, Result}; -use async_trait::async_trait; use bili_sync_entity::*; use futures::Stream; use sea_orm::entity::prelude::*; -use sea_orm::sea_query::{IntoCondition, OnConflict}; +use sea_orm::sea_query::{OnConflict, SimpleExpr}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged}; +use sea_orm::{DatabaseConnection, Unchanged}; -use crate::adapter::helper::video_with_path; -use crate::adapter::{helper, VideoListModel}; -use crate::bilibili::{self, BiliClient, VideoInfo, WatchLater}; -use crate::utils::status::STATUS_COMPLETED; +use crate::adapter::{VideoListModel, _ActiveModel}; +use crate::bilibili::{BiliClient, VideoInfo, WatchLater}; -#[async_trait] impl VideoListModel for watch_later::Model { - async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { - helper::filter_videos( - video::Column::WatchLaterId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.eq(0)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_null()) - .into_condition(), - connection, - ) - .await + fn filter_expr(&self) -> SimpleExpr { + video::Column::WatchLaterId.eq(self.id) } - async fn unhandled_video_pages( - &self, - connection: &DatabaseConnection, - ) -> Result)>> { - helper::filter_videos_with_pages( - video::Column::WatchLaterId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(STATUS_COMPLETED)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_not_null()) - .into_condition(), - connection, - ) - .await - } - - fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { - let mut video_model = video_info.to_model(base_model); + fn set_relation_id(&self, video_model: &mut video::ActiveModel) { video_model.watch_later_id = Set(Some(self.id)); - video_with_path(video_model, &self.path, video_info) } - async fn fetch_videos_detail( - &self, - video: bilibili::Video<'_>, - video_model: video::Model, - connection: &DatabaseConnection, - ) -> Result<()> { - let info: Result<_> = async { Ok((video.get_tags().await?, video.get_pages().await?)) }.await; - match info { - Ok((tags, pages_info)) => { - let txn = connection.begin().await?; - // 将分页信息写入数据库 - helper::create_video_pages(&pages_info, &video_model, &txn).await?; - // 将页标记和 tag 写入数据库 - let mut video_active_model: video::ActiveModel = video_model.into(); - video_active_model.single_page = Set(Some(pages_info.len() == 1)); - video_active_model.tags = Set(Some(serde_json::to_value(tags)?)); - video_active_model.save(&txn).await?; - txn.commit().await?; - } - Err(e) => { - helper::error_fetch_video_detail(e, video_model, connection).await?; - } - }; - Ok(()) + fn path(&self) -> &Path { + Path::new(self.path.as_str()) } fn get_latest_row_at(&self) -> DateTime { self.latest_row_at } - async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> { - watch_later::ActiveModel { + fn update_latest_row_at(&self, datetime: DateTime) -> _ActiveModel { + _ActiveModel::WatchLater(watch_later::ActiveModel { id: Unchanged(self.id), latest_row_at: Set(datetime), ..Default::default() - } - .update(connection) - .await?; - Ok(()) + }) } fn log_fetch_video_start(&self) { diff --git a/crates/bili_sync/src/bilibili/analyzer.rs b/crates/bili_sync/src/bilibili/analyzer.rs index 1f314d4..ccdf1c3 100644 --- a/crates/bili_sync/src/bilibili/analyzer.rs +++ b/crates/bili_sync/src/bilibili/analyzer.rs @@ -43,7 +43,6 @@ impl PartialOrd for AudioQuality { } impl AudioQuality { - #[inline] pub fn as_sort_key(&self) -> isize { match self { // 这可以让 Dolby 和 Hi-RES 排在 192k 之后,且 Dolby 和 Hi-RES 之间的顺序不变 diff --git a/crates/bili_sync/src/bilibili/credential.rs b/crates/bili_sync/src/bilibili/credential.rs index 68ae809..00e0c1f 100644 --- a/crates/bili_sync/src/bilibili/credential.rs +++ b/crates/bili_sync/src/bilibili/credential.rs @@ -222,7 +222,6 @@ pub fn encoded_query<'a>( } } -#[inline] fn _encoded_query<'a>( params: Vec<(&'a str, impl Into>)>, mixin_key: &str, diff --git a/crates/bili_sync/src/bilibili/mod.rs b/crates/bili_sync/src/bilibili/mod.rs index 1630c30..7206682 100644 --- a/crates/bili_sync/src/bilibili/mod.rs +++ b/crates/bili_sync/src/bilibili/mod.rs @@ -30,7 +30,6 @@ mod watch_later; static MIXIN_KEY: Lazy> = Lazy::new(Default::default); -#[inline] pub(crate) fn set_global_mixin_key(key: String) { MIXIN_KEY.store(Some(Arc::new(key))); } diff --git a/crates/bili_sync/src/bilibili/video.rs b/crates/bili_sync/src/bilibili/video.rs index e258892..d4ec0e6 100644 --- a/crates/bili_sync/src/bilibili/video.rs +++ b/crates/bili_sync/src/bilibili/video.rs @@ -62,7 +62,7 @@ impl<'a> Video<'a> { Self { client, aid, bvid } } - /// 直接调用视频信息接口获取详细的视频信息 + /// 直接调用视频信息接口获取详细的视频信息,视频信息中包含了视频的分页信息 pub async fn get_view_info(&self) -> Result { let mut res = self .client @@ -78,6 +78,7 @@ impl<'a> Video<'a> { Ok(serde_json::from_value(res["data"].take())?) } + #[allow(unused)] pub async fn get_pages(&self) -> Result> { let mut res = self .client diff --git a/crates/bili_sync/src/config/global.rs b/crates/bili_sync/src/config/global.rs index e131d3d..0ced6d2 100644 --- a/crates/bili_sync/src/config/global.rs +++ b/crates/bili_sync/src/config/global.rs @@ -39,7 +39,6 @@ pub static CONFIG_DIR: Lazy = Lazy::new(|| dirs::config_dir().expect("No config path found").join("bili-sync")); #[cfg(not(test))] -#[inline] fn load_config() -> Config { let config = Config::load().unwrap_or_else(|err| { if err @@ -57,11 +56,12 @@ fn load_config() -> Config { // 检查配置文件内容 info!("校验配置文件内容..."); config.check(); + info!("配置文件内容校验通过"); config } #[cfg(test)] -#[inline] + fn load_config() -> Config { let credential = match ( std::env::var("TEST_SESSDATA"), diff --git a/crates/bili_sync/src/utils/convert.rs b/crates/bili_sync/src/utils/convert.rs index bdfa68e..21d38e2 100644 --- a/crates/bili_sync/src/utils/convert.rs +++ b/crates/bili_sync/src/utils/convert.rs @@ -1,23 +1,17 @@ -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; use sea_orm::ActiveValue::{NotSet, Set}; use sea_orm::IntoActiveModel; -use serde_json::json; -use crate::bilibili::VideoInfo; -use crate::config::CONFIG; +use crate::bilibili::{PageInfo, VideoInfo}; impl VideoInfo { - /// 将 VideoInfo 转换为 ActiveModel - pub fn to_model(&self, base_model: Option) -> bili_sync_entity::video::ActiveModel { - let base_model = match base_model { - Some(base_model) => base_model.into_active_model(), - None => { - let mut tmp_model = bili_sync_entity::video::Model::default().into_active_model(); - // 注意此处要把 id 和 created_at 设置为 NotSet,方便在 sql 中忽略这些字段,交由数据库自动生成 - tmp_model.id = NotSet; - tmp_model.created_at = NotSet; - tmp_model - } + /// 在检测视频更新时,通过该方法将 VideoInfo 转换为简单的 ActiveModel,此处仅填充一些简单信息,后续会使用详情覆盖 + pub fn into_simple_model(self) -> bili_sync_entity::video::ActiveModel { + let default = bili_sync_entity::video::ActiveModel { + id: NotSet, + created_at: NotSet, + // 此处不使用 ActiveModel::default() 是为了让其它字段有默认值 + ..bili_sync_entity::video::Model::default().into_active_model() }; match self { VideoInfo::Collection { @@ -26,13 +20,13 @@ impl VideoInfo { ctime, pubtime, } => bili_sync_entity::video::ActiveModel { - bvid: Set(bvid.clone()), - cover: Set(cover.clone()), + bvid: Set(bvid), + cover: Set(cover), ctime: Set(ctime.naive_utc()), pubtime: Set(pubtime.naive_utc()), category: Set(2), // 视频合集里的内容类型肯定是视频 valid: Set(true), - ..base_model + ..default }, VideoInfo::Favorite { title, @@ -46,50 +40,20 @@ impl VideoInfo { pubtime, attr, } => bili_sync_entity::video::ActiveModel { - bvid: Set(bvid.clone()), - name: Set(title.clone()), - category: Set(*vtype), - intro: Set(intro.clone()), - cover: Set(cover.clone()), + bvid: Set(bvid), + name: Set(title), + category: Set(vtype), + intro: Set(intro), + cover: Set(cover), ctime: Set(ctime.naive_utc()), pubtime: Set(pubtime.naive_utc()), favtime: Set(fav_time.naive_utc()), download_status: Set(0), - valid: Set(*attr == 0), - tags: Set(None), - single_page: Set(None), + valid: Set(attr == 0), upper_id: Set(upper.mid), - upper_name: Set(upper.name.clone()), - upper_face: Set(upper.face.clone()), - ..base_model - }, - VideoInfo::Detail { - title, - bvid, - intro, - cover, - upper, - ctime, - pubtime, - state, - .. - } => bili_sync_entity::video::ActiveModel { - bvid: Set(bvid.clone()), - name: Set(title.clone()), - category: Set(2), // 视频合集里的内容类型肯定是视频 - intro: Set(intro.clone()), - cover: Set(cover.clone()), - ctime: Set(ctime.naive_utc()), - pubtime: Set(pubtime.naive_utc()), - favtime: Set(pubtime.naive_utc()), // 合集不包括 fav_time,使用发布时间代替 - download_status: Set(0), - valid: Set(*state == 0), - tags: Set(None), - single_page: Set(None), - upper_id: Set(upper.mid), - upper_name: Set(upper.name.clone()), - upper_face: Set(upper.face.clone()), - ..base_model + upper_name: Set(upper.name), + upper_face: Set(upper.face), + ..default }, VideoInfo::WatchLater { title, @@ -102,22 +66,20 @@ impl VideoInfo { pubtime, state, } => bili_sync_entity::video::ActiveModel { - bvid: Set(bvid.clone()), - name: Set(title.clone()), + bvid: Set(bvid), + name: Set(title), category: Set(2), // 稍后再看里的内容类型肯定是视频 - intro: Set(intro.clone()), - cover: Set(cover.clone()), + intro: Set(intro), + cover: Set(cover), ctime: Set(ctime.naive_utc()), pubtime: Set(pubtime.naive_utc()), favtime: Set(fav_time.naive_utc()), download_status: Set(0), - valid: Set(*state == 0), - tags: Set(None), - single_page: Set(None), + valid: Set(state == 0), upper_id: Set(upper.mid), - upper_name: Set(upper.name.clone()), - upper_face: Set(upper.face.clone()), - ..base_model + upper_name: Set(upper.name), + upper_face: Set(upper.face), + ..default }, VideoInfo::Submission { title, @@ -126,64 +88,58 @@ impl VideoInfo { cover, ctime, } => bili_sync_entity::video::ActiveModel { - bvid: Set(bvid.clone()), - name: Set(title.clone()), - intro: Set(intro.clone()), - cover: Set(cover.clone()), + bvid: Set(bvid), + name: Set(title), + intro: Set(intro), + cover: Set(cover), ctime: Set(ctime.naive_utc()), category: Set(2), // 投稿视频的内容类型肯定是视频 valid: Set(true), - ..base_model + ..default }, + _ => unreachable!(), } } - pub fn to_fmt_args(&self) -> Option { + /// 填充视频详情时调用,该方法会将视频详情附加到原有的 Model 上 + /// 特殊地,如果在检测视频更新时记录了 favtime,那么 favtime 会维持原样,否则会使用 pubtime 填充 + pub fn into_detail_model(self, base_model: bili_sync_entity::video::Model) -> bili_sync_entity::video::ActiveModel { match self { - VideoInfo::Collection { .. } | VideoInfo::Submission { .. } => None, // 不能从简单视频信息中构造格式化参数 - VideoInfo::Favorite { - title, - bvid, - upper, - pubtime, - fav_time, - .. - } - | VideoInfo::WatchLater { - title, - bvid, - upper, - pubtime, - fav_time, - .. - } => Some(json!({ - "bvid": &bvid, - "title": &title, - "upper_name": &upper.name, - "upper_mid": &upper.mid, - "pubtime": pubtime.format(&CONFIG.time_format).to_string(), - "fav_time": fav_time.format(&CONFIG.time_format).to_string(), - })), VideoInfo::Detail { title, bvid, + intro, + cover, upper, + ctime, pubtime, + state, .. - } => { - let pubtime = pubtime.format(&CONFIG.time_format).to_string(); - Some(json!({ - "bvid": &bvid, - "title": &title, - "upper_name": &upper.name, - "upper_mid": &upper.mid, - "pubtime": &pubtime, - "fav_time": &pubtime, - })) - } + } => bili_sync_entity::video::ActiveModel { + bvid: Set(bvid), + name: Set(title), + category: Set(2), + intro: Set(intro), + cover: Set(cover), + ctime: Set(ctime.naive_utc()), + pubtime: Set(pubtime.naive_utc()), + favtime: if base_model.favtime != NaiveDateTime::default() { + NotSet // 之前设置了 favtime,不覆盖 + } else { + Set(pubtime.naive_utc()) // 未设置过 favtime,使用 pubtime 填充 + }, + download_status: Set(0), + valid: Set(state == 0), + upper_id: Set(upper.mid), + upper_name: Set(upper.name), + upper_face: Set(upper.face), + ..base_model.into_active_model() + }, + _ => unreachable!(), } } + /// 获取视频的发布时间,用于对时间做筛选检查新视频 pub fn release_datetime(&self) -> &DateTime { match self { VideoInfo::Collection { pubtime: time, .. } @@ -194,3 +150,33 @@ impl VideoInfo { } } } + +impl PageInfo { + pub fn into_active_model( + self, + video_model: &bili_sync_entity::video::Model, + ) -> bili_sync_entity::page::ActiveModel { + let (width, height) = match &self.dimension { + Some(d) => { + if d.rotate == 0 { + (Some(d.width), Some(d.height)) + } else { + (Some(d.height), Some(d.width)) + } + } + None => (None, None), + }; + bili_sync_entity::page::ActiveModel { + video_id: Set(video_model.id), + cid: Set(self.cid), + pid: Set(self.page), + name: Set(self.name), + width: Set(width), + height: Set(height), + duration: Set(self.duration), + image: Set(self.first_frame), + download_status: Set(0), + ..Default::default() + } + } +} diff --git a/crates/bili_sync/src/utils/format_arg.rs b/crates/bili_sync/src/utils/format_arg.rs new file mode 100644 index 0000000..69a3aaf --- /dev/null +++ b/crates/bili_sync/src/utils/format_arg.rs @@ -0,0 +1,30 @@ +use serde_json::json; + +use crate::config::CONFIG; + +pub fn video_format_args(video_model: &bili_sync_entity::video::Model) -> serde_json::Value { + json!({ + "bvid": &video_model.bvid, + "title": &video_model.name, + "upper_name": &video_model.upper_name, + "upper_mid": &video_model.upper_id, + "pubtime": &video_model.pubtime.and_utc().format(&CONFIG.time_format).to_string(), + "fav_time": &video_model.favtime.and_utc().format(&CONFIG.time_format).to_string(), + }) +} + +pub fn page_format_args( + video_model: &bili_sync_entity::video::Model, + page_model: &bili_sync_entity::page::Model, +) -> serde_json::Value { + json!({ + "bvid": &video_model.bvid, + "title": &video_model.name, + "upper_name": &video_model.upper_name, + "upper_mid": &video_model.upper_id, + "ptitle": &page_model.name, + "pid": page_model.pid, + "pubtime": video_model.pubtime.and_utc().format(&CONFIG.time_format).to_string(), + "fav_time": video_model.favtime.and_utc().format(&CONFIG.time_format).to_string(), + }) +} diff --git a/crates/bili_sync/src/utils/mod.rs b/crates/bili_sync/src/utils/mod.rs index a765e88..d92f0c3 100644 --- a/crates/bili_sync/src/utils/mod.rs +++ b/crates/bili_sync/src/utils/mod.rs @@ -1,5 +1,6 @@ pub mod convert; pub mod filenamify; +pub mod format_arg; pub mod model; pub mod nfo; pub mod status; diff --git a/crates/bili_sync/src/utils/model.rs b/crates/bili_sync/src/utils/model.rs index b836166..ec0f95d 100644 --- a/crates/bili_sync/src/utils/model.rs +++ b/crates/bili_sync/src/utils/model.rs @@ -1,20 +1,65 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use bili_sync_entity::*; use sea_orm::entity::prelude::*; -use sea_orm::sea_query::OnConflict; +use sea_orm::sea_query::{OnConflict, SimpleExpr}; +use sea_orm::DatabaseTransaction; use crate::adapter::VideoListModel; -use crate::bilibili::VideoInfo; +use crate::bilibili::{PageInfo, VideoInfo}; +use crate::utils::status::STATUS_COMPLETED; + +/// 筛选未填充的视频 +pub async fn filter_unfilled_videos( + additional_expr: SimpleExpr, + conn: &DatabaseConnection, +) -> Result> { + video::Entity::find() + .filter( + video::Column::Valid + .eq(true) + .and(video::Column::DownloadStatus.eq(0)) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_null()) + .and(additional_expr), + ) + .all(conn) + .await + .context("filter unfilled videos failed") +} + +/// 筛选未处理完成的视频和视频页 +pub async fn filter_unhandled_video_pages( + additional_expr: SimpleExpr, + connection: &DatabaseConnection, +) -> Result)>> { + video::Entity::find() + .filter( + video::Column::Valid + .eq(true) + .and(video::Column::DownloadStatus.lt(STATUS_COMPLETED)) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_not_null()) + .and(additional_expr), + ) + .find_with_related(page::Entity) + .all(connection) + .await + .context("filter unhandled video pages failed") +} /// 尝试创建 Video Model,如果发生冲突则忽略 pub async fn create_videos( - videos_info: &[VideoInfo], + videos_info: Vec, video_list_model: &dyn VideoListModel, connection: &DatabaseConnection, ) -> Result<()> { let video_models = videos_info - .iter() - .map(|v| video_list_model.video_model_by_info(v, None)) + .into_iter() + .map(|v| { + let mut model = v.into_simple_model(); + video_list_model.set_relation_id(&mut model); + model + }) .collect::>(); video::Entity::insert_many(video_models) // 这里想表达的是 on 索引名,但 sea-orm 的 api 似乎只支持列名而不支持索引名,好在留空可以达到相同的目的 @@ -25,6 +70,30 @@ pub async fn create_videos( Ok(()) } +/// 尝试创建 Page Model,如果发生冲突则忽略 +pub async fn create_pages( + pages_info: Vec, + video_model: &bili_sync_entity::video::Model, + connection: &DatabaseTransaction, +) -> Result<()> { + let page_models = pages_info + .into_iter() + .map(|p| p.into_active_model(video_model)) + .collect::>(); + for page_chunk in page_models.chunks(50) { + page::Entity::insert_many(page_chunk.to_vec()) + .on_conflict( + OnConflict::columns([page::Column::VideoId, page::Column::Pid]) + .do_nothing() + .to_owned(), + ) + .do_nothing() + .exec(connection) + .await?; + } + Ok(()) +} + /// 更新视频 model 的下载状态 pub async fn update_videos_model(videos: Vec, connection: &DatabaseConnection) -> Result<()> { video::Entity::insert_many(videos) diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 9ca0cab..4335c83 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -8,7 +8,7 @@ use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{Future, Stream, StreamExt}; use sea_orm::entity::prelude::*; use sea_orm::ActiveValue::Set; -use serde_json::json; +use sea_orm::TransactionTrait; use tokio::fs; use tokio::sync::{Mutex, Semaphore}; @@ -17,10 +17,15 @@ use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Vi use crate::config::{PathSafeTemplate, ARGS, CONFIG, TEMPLATE}; use crate::downloader::Downloader; use crate::error::{DownloadAbortError, ProcessPageError}; -use crate::utils::model::{create_videos, update_pages_model, update_videos_model}; +use crate::utils::format_arg::{page_format_args, video_format_args}; +use crate::utils::model::{ + create_pages, create_videos, filter_unfilled_videos, filter_unhandled_video_pages, update_pages_model, + update_videos_model, +}; use crate::utils::nfo::{ModelWrapper, NFOMode, NFOSerializer}; use crate::utils::status::{PageStatus, VideoStatus}; +/// 完整地处理某个视频列表 pub async fn process_video_list( args: Args<'_>, bili_client: &BiliClient, @@ -55,7 +60,7 @@ pub async fn refresh_video_list<'a>( .take_while(|v| { // 虽然 video_streams 是从新到旧的,但由于此处是分页请求,极端情况下可能发生访问完第一页时插入了两整页视频的情况 // 此时获取到的第二页视频比第一页的还要新,因此为了确保正确,理应对每一页的第一个视频进行时间比较 - // 但在 streams 的抽象下,无法判断具体是在哪里分页的,所以暂且对每个视频都进行比较,希望不会有太大性能损失 + // 但在 streams 的抽象下,无法判断具体是在哪里分页的,所以暂且对每个视频都进行比较,应该不会有太大性能损失 let release_datetime = v.release_datetime(); if release_datetime > &max_datetime { max_datetime = *release_datetime; @@ -66,11 +71,12 @@ pub async fn refresh_video_list<'a>( let mut count = 0; while let Some(videos_info) = video_streams.next().await { count += videos_info.len(); - create_videos(&videos_info, video_list_model, connection).await?; + create_videos(videos_info, video_list_model, connection).await?; } if max_datetime != latest_row_at { video_list_model - .update_latest_row_at(max_datetime.naive_utc(), connection) + .update_latest_row_at(max_datetime.naive_utc()) + .save(connection) .await?; } video_list_model.log_refresh_video_end(count); @@ -84,12 +90,39 @@ pub async fn fetch_video_details( connection: &DatabaseConnection, ) -> Result<()> { video_list_model.log_fetch_video_start(); - let videos_model = video_list_model.unfilled_videos(connection).await?; + let videos_model = filter_unfilled_videos(video_list_model.filter_expr(), connection).await?; for video_model in videos_model { let video = Video::new(bili_client, video_model.bvid.clone()); - video_list_model - .fetch_videos_detail(video, video_model, connection) - .await?; + let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; + match info { + Err(e) => { + error!( + "获取视频 {} - {} 的详细信息失败,错误为:{}", + &video_model.bvid, &video_model.name, e + ); + if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::() { + let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into(); + video_active_model.valid = Set(false); + video_active_model.save(connection).await?; + } + } + Ok((tags, mut view_info)) => { + let VideoInfo::Detail { pages, .. } = &mut view_info else { + unreachable!() + }; + let pages = std::mem::take(pages); + let pages_len = pages.len(); + let txn = connection.begin().await?; + // 将分页信息写入数据库 + create_pages(pages, &video_model, &txn).await?; + let mut video_active_model = view_info.into_detail_model(video_model); + video_list_model.set_relation_id(&mut video_active_model); + video_active_model.single_page = Set(Some(pages_len == 1)); + video_active_model.tags = Set(Some(serde_json::to_value(tags)?)); + video_active_model.save(&txn).await?; + txn.commit().await?; + } + }; } video_list_model.log_fetch_video_end(); Ok(()) @@ -105,7 +138,7 @@ pub async fn download_unprocessed_videos( let semaphore = Semaphore::new(CONFIG.concurrent_limit.video); let downloader = Downloader::new(bili_client.client.clone()); let mut uppers_mutex: HashMap, Mutex<()>)> = HashMap::new(); - let unhandled_videos_pages = video_list_model.unhandled_video_pages(connection).await?; + let unhandled_videos_pages = filter_unhandled_video_pages(video_list_model.filter_expr(), connection).await?; for (video_model, _) in &unhandled_videos_pages { uppers_mutex .entry(video_model.upper_id) @@ -117,12 +150,12 @@ pub async fn download_unprocessed_videos( let upper_mutex = uppers_mutex.get(&video_model.upper_id).expect("upper mutex not found"); download_video_pages( bili_client, + video_list_model, video_model, pages_model, connection, &semaphore, &downloader, - &CONFIG.upper_path, upper_mutex, ) }) @@ -156,20 +189,23 @@ pub async fn download_unprocessed_videos( #[allow(clippy::too_many_arguments)] pub async fn download_video_pages( bili_client: &BiliClient, + video_list_model: &dyn VideoListModel, video_model: video::Model, pages: Vec, connection: &DatabaseConnection, semaphore: &Semaphore, downloader: &Downloader, - upper_path: &Path, upper_mutex: &(Mutex<()>, Mutex<()>), ) -> Result { let _permit = semaphore.acquire().await.context("acquire semaphore failed")?; let mut status = VideoStatus::new(video_model.download_status); let seprate_status = status.should_run(); - let base_path = Path::new(&video_model.path); + let base_path = video_list_model + .path() + .join(TEMPLATE.path_safe_render("video", &video_format_args(&video_model))?); let upper_id = video_model.upper_id.to_string(); - let base_upper_path = upper_path + let base_upper_path = &CONFIG + .upper_path .join(upper_id.chars().next().context("upper_id is empty")?.to_string()) .join(upper_id); let is_single_page = video_model.single_page.context("single_page is null")?; @@ -213,6 +249,7 @@ pub async fn download_video_pages( pages, connection, downloader, + &base_path, )), ]; let tasks: FuturesOrdered<_> = tasks.into_iter().collect(); @@ -239,6 +276,7 @@ pub async fn download_video_pages( } let mut video_active_model: video::ActiveModel = video_model.into(); video_active_model.download_status = Set(status.into()); + video_active_model.path = Set(base_path.to_string_lossy().to_string()); Ok(video_active_model) } @@ -250,6 +288,7 @@ pub async fn dispatch_download_page( pages: Vec, connection: &DatabaseConnection, downloader: &Downloader, + base_path: &Path, ) -> Result<()> { if !should_run { return Ok(()); @@ -257,7 +296,16 @@ pub async fn dispatch_download_page( let child_semaphore = Semaphore::new(CONFIG.concurrent_limit.page); let tasks = pages .into_iter() - .map(|page_model| download_page(bili_client, video_model, page_model, &child_semaphore, downloader)) + .map(|page_model| { + download_page( + bili_client, + video_model, + page_model, + &child_semaphore, + downloader, + base_path, + ) + }) .collect::>(); let (mut download_aborted, mut error_occurred) = (false, false); let mut stream = tasks @@ -311,25 +359,13 @@ pub async fn download_page( page_model: page::Model, semaphore: &Semaphore, downloader: &Downloader, + base_path: &Path, ) -> Result { let _permit = semaphore.acquire().await.context("acquire semaphore failed")?; let mut status = PageStatus::new(page_model.download_status); let seprate_status = status.should_run(); let is_single_page = video_model.single_page.context("single_page is null")?; - let base_path = Path::new(&video_model.path); - let base_name = TEMPLATE.path_safe_render( - "page", - &json!({ - "bvid": &video_model.bvid, - "title": &video_model.name, - "upper_name": &video_model.upper_name, - "upper_mid": &video_model.upper_id, - "ptitle": &page_model.name, - "pid": page_model.pid, - "pubtime": video_model.pubtime.format(&CONFIG.time_format).to_string(), - "fav_time": video_model.favtime.format(&CONFIG.time_format).to_string(), - }), - )?; + let base_name = TEMPLATE.path_safe_render("page", &page_format_args(video_model, &page_model))?; let (poster_path, video_path, nfo_path, danmaku_path, fanart_path) = if is_single_page { ( base_path.join(format!("{}-poster.jpg", &base_name)), @@ -385,7 +421,7 @@ pub async fn download_page( video_model, downloader, &page_info, - video_path.clone(), + &video_path, )), Box::pin(generate_page_nfo(seprate_status[2], video_model, &page_model, nfo_path)), Box::pin(fetch_page_danmaku( @@ -459,7 +495,7 @@ pub async fn fetch_page_video( video_model: &video::Model, downloader: &Downloader, page_info: &PageInfo, - page_path: PathBuf, + page_path: &Path, ) -> Result<()> { if !should_run { return Ok(()); @@ -470,11 +506,11 @@ pub async fn fetch_page_video( .await? .best_stream(&CONFIG.filter_option)?; match streams { - BestStream::Mixed(mix_stream) => downloader.fetch(mix_stream.url(), &page_path).await, + BestStream::Mixed(mix_stream) => downloader.fetch(mix_stream.url(), page_path).await, BestStream::VideoAudio { video: video_stream, audio: None, - } => downloader.fetch(video_stream.url(), &page_path).await, + } => downloader.fetch(video_stream.url(), page_path).await, BestStream::VideoAudio { video: video_stream, audio: Some(audio_stream), @@ -486,7 +522,7 @@ pub async fn fetch_page_video( let res = async { downloader.fetch(video_stream.url(), &tmp_video_path).await?; downloader.fetch(audio_stream.url(), &tmp_audio_path).await?; - downloader.merge(&tmp_video_path, &tmp_audio_path, &page_path).await + downloader.merge(&tmp_video_path, &tmp_audio_path, page_path).await } .await; let _ = fs::remove_file(tmp_video_path).await; @@ -606,6 +642,7 @@ async fn generate_nfo(serializer: NFOSerializer<'_>, nfo_path: PathBuf) -> Resul #[cfg(test)] mod tests { use handlebars::handlebars_helper; + use serde_json::json; use super::*;