From 29bfc2efcefebbf44be2b8ec91d7b39309c8358c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=B4=80=E1=B4=8D=E1=B4=9B=E1=B4=8F=E1=B4=80=E1=B4=87?= =?UTF-8?q?=CA=80?= Date: Thu, 25 Jul 2024 00:05:29 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E9=87=8D=E6=9E=84=E9=83=A8?= =?UTF-8?q?=E5=88=86=E4=BB=A3=E7=A0=81=EF=BC=8C=E8=B0=83=E6=95=B4=E5=87=BD?= =?UTF-8?q?=E6=95=B0=E4=BD=8D=E7=BD=AE=20(#154)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/bili_sync/src/adapter/collection.rs | 182 ++++++++------------ crates/bili_sync/src/adapter/favorite.rs | 161 +++++++---------- crates/bili_sync/src/adapter/helper/mod.rs | 136 +++++++++++++++ crates/bili_sync/src/adapter/mod.rs | 46 ++--- crates/bili_sync/src/adapter/watch_later.rs | 155 +++++++---------- crates/bili_sync/src/utils/convert.rs | 4 +- crates/bili_sync/src/utils/model.rs | 50 +----- crates/bili_sync/src/workflow.rs | 2 +- 8 files changed, 358 insertions(+), 378 deletions(-) create mode 100644 crates/bili_sync/src/adapter/helper/mod.rs diff --git a/crates/bili_sync/src/adapter/collection.rs b/crates/bili_sync/src/adapter/collection.rs index c8431f0..27adf8c 100644 --- a/crates/bili_sync/src/adapter/collection.rs +++ b/crates/bili_sync/src/adapter/collection.rs @@ -3,105 +3,53 @@ use std::path::Path; use std::pin::Pin; use anyhow::Result; +use async_trait::async_trait; use bili_sync_entity::*; -use bili_sync_migration::OnConflict; -use filenamify::filenamify; use futures::Stream; use sea_orm::entity::prelude::*; +use sea_orm::sea_query::{IntoCondition, OnConflict}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, QuerySelect, TransactionTrait}; +use sea_orm::{DatabaseConnection, TransactionTrait}; -use crate::adapter::{error_fetch_video_detail, VideoListModel}; +use crate::adapter::{helper, VideoListModel}; use crate::bilibili::{self, BiliClient, Collection, CollectionItem, CollectionType, VideoInfo}; -use crate::config::TEMPLATE; -use crate::utils::id_time_key; -use crate::utils::model::create_video_pages; use crate::utils::status::Status; -pub async fn collection_from<'a>( - collection_item: &'a CollectionItem, - path: &Path, - bili_client: &'a BiliClient, - connection: &DatabaseConnection, -) -> Result<(Box, Pin + 'a>>)> { - let collection = Collection::new(bili_client, collection_item); - let collection_info = collection.get_info().await?; - collection::Entity::insert(collection::ActiveModel { - s_id: Set(collection_info.sid), - m_id: Set(collection_info.mid), - r#type: Set(collection_info.collection_type.into()), - name: Set(collection_info.name.clone()), - path: Set(path.to_string_lossy().to_string()), - ..Default::default() - }) - .on_conflict( - OnConflict::columns([ - collection::Column::SId, - collection::Column::MId, - collection::Column::Type, - ]) - .update_columns([collection::Column::Name, collection::Column::Path]) - .to_owned(), - ) - .exec(connection) - .await?; - Ok(( - Box::new( - collection::Entity::find() - .filter( - collection::Column::SId - .eq(collection_item.sid.clone()) - .and(collection::Column::MId.eq(collection_item.mid.clone())) - .and(collection::Column::Type.eq(Into::::into(collection_item.collection_type.clone()))), - ) - .one(connection) - .await? - .unwrap(), - ), - Box::pin(collection.into_simple_video_stream()), - )) -} -use async_trait::async_trait; - #[async_trait] impl VideoListModel for collection::Model { async fn video_count(&self, connection: &DatabaseConnection) -> Result { - Ok(video::Entity::find() - .filter(video::Column::CollectionId.eq(self.id)) - .count(connection) - .await?) + helper::count_videos(video::Column::CollectionId.eq(self.id).into_condition(), connection).await } async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { - Ok(video::Entity::find() - .filter( - video::Column::CollectionId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.eq(0)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_null()), - ) - .all(connection) - .await?) + helper::filter_videos( + video::Column::CollectionId + .eq(self.id) + .and(video::Column::Valid.eq(true)) + .and(video::Column::DownloadStatus.eq(0)) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_null()) + .into_condition(), + connection, + ) + .await } async fn unhandled_video_pages( &self, connection: &DatabaseConnection, ) -> Result)>> { - Ok(video::Entity::find() - .filter( - video::Column::CollectionId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(Status::handled())) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_not_null()), - ) - .find_with_related(page::Entity) - .all(connection) - .await?) + helper::filter_videos_with_pages( + video::Column::CollectionId + .eq(self.id) + .and(video::Column::Valid.eq(true)) + .and(video::Column::DownloadStatus.lt(Status::handled())) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_not_null()) + .into_condition(), + connection, + ) + .await } async fn exist_labels( @@ -109,37 +57,13 @@ impl VideoListModel for collection::Model { videos_info: &[VideoInfo], connection: &DatabaseConnection, ) -> Result> { - let bvids = videos_info.iter().map(|v| v.bvid().to_string()).collect::>(); - Ok(video::Entity::find() - .filter( - video::Column::CollectionId - .eq(self.id) - .and(video::Column::Bvid.is_in(bvids)), - ) - .select_only() - .columns([video::Column::Bvid, video::Column::Pubtime]) - .into_tuple() - .all(connection) - .await? - .into_iter() - .map(|(bvid, time)| id_time_key(&bvid, &time)) - .collect::>()) + helper::video_keys(videos_info, [video::Column::Bvid, video::Column::Pubtime], connection).await } fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { let mut video_model = video_info.to_model(base_model); video_model.collection_id = Set(Some(self.id)); - if let Some(fmt_args) = &video_info.to_fmt_args() { - video_model.path = Set(Path::new(&self.path) - .join(filenamify( - TEMPLATE - .render("video", fmt_args) - .unwrap_or_else(|_| video_info.bvid().to_string()), - )) - .to_string_lossy() - .to_string()); - } - video_model + helper::video_with_path(video_model, &self.path, video_info) } async fn fetch_videos_detail( @@ -156,7 +80,7 @@ impl VideoListModel for collection::Model { }; let txn = connection.begin().await?; // 将分页信息写入数据库 - create_video_pages(pages, &video_model, &txn).await?; + helper::create_video_pages(pages, &video_model, &txn).await?; // 将页标记和 tag 写入数据库 let mut video_active_model = self.video_model_by_info(&view_info, Some(video_model)); video_active_model.single_page = Set(Some(pages.len() == 1)); @@ -165,7 +89,7 @@ impl VideoListModel for collection::Model { txn.commit().await?; } Err(e) => { - error_fetch_video_detail(e, video_model, connection).await?; + helper::error_fetch_video_detail(e, video_model, connection).await?; } }; Ok(()) @@ -227,3 +151,47 @@ impl VideoListModel for collection::Model { ); } } + +pub(super) async fn collection_from<'a>( + collection_item: &'a CollectionItem, + path: &Path, + bili_client: &'a BiliClient, + connection: &DatabaseConnection, +) -> Result<(Box, Pin + 'a>>)> { + let collection = Collection::new(bili_client, collection_item); + let collection_info = collection.get_info().await?; + collection::Entity::insert(collection::ActiveModel { + s_id: Set(collection_info.sid), + m_id: Set(collection_info.mid), + r#type: Set(collection_info.collection_type.into()), + name: Set(collection_info.name.clone()), + path: Set(path.to_string_lossy().to_string()), + ..Default::default() + }) + .on_conflict( + OnConflict::columns([ + collection::Column::SId, + collection::Column::MId, + collection::Column::Type, + ]) + .update_columns([collection::Column::Name, collection::Column::Path]) + .to_owned(), + ) + .exec(connection) + .await?; + Ok(( + Box::new( + collection::Entity::find() + .filter( + collection::Column::SId + .eq(collection_item.sid.clone()) + .and(collection::Column::MId.eq(collection_item.mid.clone())) + .and(collection::Column::Type.eq(Into::::into(collection_item.collection_type.clone()))), + ) + .one(connection) + .await? + .unwrap(), + ), + Box::pin(collection.into_simple_video_stream()), + )) +} diff --git a/crates/bili_sync/src/adapter/favorite.rs b/crates/bili_sync/src/adapter/favorite.rs index fe73224..18281a6 100644 --- a/crates/bili_sync/src/adapter/favorite.rs +++ b/crates/bili_sync/src/adapter/favorite.rs @@ -3,95 +3,53 @@ use std::path::Path; use std::pin::Pin; use anyhow::Result; +use async_trait::async_trait; use bili_sync_entity::*; -use bili_sync_migration::OnConflict; -use filenamify::filenamify; use futures::Stream; use sea_orm::entity::prelude::*; +use sea_orm::sea_query::{IntoCondition, OnConflict}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, QuerySelect, TransactionTrait}; +use sea_orm::{DatabaseConnection, TransactionTrait}; -use crate::adapter::{error_fetch_video_detail, VideoListModel}; +use crate::adapter::{helper, VideoListModel}; use crate::bilibili::{self, BiliClient, FavoriteList, VideoInfo}; -use crate::config::TEMPLATE; -use crate::utils::id_time_key; -use crate::utils::model::create_video_pages; use crate::utils::status::Status; -pub async fn favorite_from<'a>( - fid: &str, - path: &Path, - bili_client: &'a BiliClient, - connection: &DatabaseConnection, -) -> Result<(Box, Pin + 'a>>)> { - let favorite = FavoriteList::new(bili_client, fid.to_owned()); - let favorite_info = favorite.get_info().await?; - favorite::Entity::insert(favorite::ActiveModel { - f_id: Set(favorite_info.id), - name: Set(favorite_info.title.clone()), - path: Set(path.to_string_lossy().to_string()), - ..Default::default() - }) - .on_conflict( - OnConflict::column(favorite::Column::FId) - .update_columns([favorite::Column::Name, favorite::Column::Path]) - .to_owned(), - ) - .exec(connection) - .await?; - Ok(( - Box::new( - favorite::Entity::find() - .filter(favorite::Column::FId.eq(favorite_info.id)) - .one(connection) - .await? - .unwrap(), - ), - Box::pin(favorite.into_video_stream()), - )) -} - -use async_trait::async_trait; - #[async_trait] impl VideoListModel for favorite::Model { async fn video_count(&self, connection: &DatabaseConnection) -> Result { - Ok(video::Entity::find() - .filter(video::Column::FavoriteId.eq(self.id)) - .count(connection) - .await?) + helper::count_videos(video::Column::FavoriteId.eq(self.id).into_condition(), connection).await } async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { - Ok(video::Entity::find() - .filter( - video::Column::FavoriteId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.eq(0)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_null()), - ) - .all(connection) - .await?) + helper::filter_videos( + video::Column::FavoriteId + .eq(self.id) + .and(video::Column::Valid.eq(true)) + .and(video::Column::DownloadStatus.eq(0)) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_null()) + .into_condition(), + connection, + ) + .await } async fn unhandled_video_pages( &self, connection: &DatabaseConnection, ) -> Result)>> { - Ok(video::Entity::find() - .filter( - video::Column::FavoriteId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(Status::handled())) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_not_null()), - ) - .find_with_related(page::Entity) - .all(connection) - .await?) + helper::filter_videos_with_pages( + video::Column::FavoriteId + .eq(self.id) + .and(video::Column::Valid.eq(true)) + .and(video::Column::DownloadStatus.lt(Status::handled())) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_not_null()) + .into_condition(), + connection, + ) + .await } async fn exist_labels( @@ -99,37 +57,13 @@ impl VideoListModel for favorite::Model { videos_info: &[VideoInfo], connection: &DatabaseConnection, ) -> Result> { - let bvids = videos_info.iter().map(|v| v.bvid().to_string()).collect::>(); - Ok(video::Entity::find() - .filter( - video::Column::FavoriteId - .eq(self.id) - .and(video::Column::Bvid.is_in(bvids)), - ) - .select_only() - .columns([video::Column::Bvid, video::Column::Favtime]) - .into_tuple() - .all(connection) - .await? - .into_iter() - .map(|(bvid, time)| id_time_key(&bvid, &time)) - .collect::>()) + helper::video_keys(videos_info, [video::Column::Bvid, video::Column::Favtime], connection).await } fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { let mut video_model = video_info.to_model(base_model); video_model.favorite_id = Set(Some(self.id)); - if let Some(fmt_args) = &video_info.to_fmt_args() { - video_model.path = Set(Path::new(&self.path) - .join(filenamify( - TEMPLATE - .render("video", fmt_args) - .unwrap_or_else(|_| video_info.bvid().to_string()), - )) - .to_string_lossy() - .to_string()); - } - video_model + helper::video_with_path(video_model, &self.path, video_info) } async fn fetch_videos_detail( @@ -143,7 +77,7 @@ impl VideoListModel for favorite::Model { Ok((tags, pages_info)) => { let txn = connection.begin().await?; // 将分页信息写入数据库 - create_video_pages(&pages_info, &video_model, &txn).await?; + helper::create_video_pages(&pages_info, &video_model, &txn).await?; // 将页标记和 tag 写入数据库 let mut video_active_model: video::ActiveModel = video_model.into(); video_active_model.single_page = Set(Some(pages_info.len() == 1)); @@ -152,7 +86,7 @@ impl VideoListModel for favorite::Model { txn.commit().await?; } Err(e) => { - error_fetch_video_detail(e, video_model, connection).await?; + helper::error_fetch_video_detail(e, video_model, connection).await?; } }; Ok(()) @@ -185,3 +119,36 @@ impl VideoListModel for favorite::Model { ); } } + +pub(super) async fn favorite_from<'a>( + fid: &str, + path: &Path, + bili_client: &'a BiliClient, + connection: &DatabaseConnection, +) -> Result<(Box, Pin + 'a>>)> { + let favorite = FavoriteList::new(bili_client, fid.to_owned()); + let favorite_info = favorite.get_info().await?; + favorite::Entity::insert(favorite::ActiveModel { + f_id: Set(favorite_info.id), + name: Set(favorite_info.title.clone()), + path: Set(path.to_string_lossy().to_string()), + ..Default::default() + }) + .on_conflict( + OnConflict::column(favorite::Column::FId) + .update_columns([favorite::Column::Name, favorite::Column::Path]) + .to_owned(), + ) + .exec(connection) + .await?; + Ok(( + Box::new( + favorite::Entity::find() + .filter(favorite::Column::FId.eq(favorite_info.id)) + .one(connection) + .await? + .unwrap(), + ), + Box::pin(favorite.into_video_stream()), + )) +} diff --git a/crates/bili_sync/src/adapter/helper/mod.rs b/crates/bili_sync/src/adapter/helper/mod.rs new file mode 100644 index 0000000..2c39e3a --- /dev/null +++ b/crates/bili_sync/src/adapter/helper/mod.rs @@ -0,0 +1,136 @@ +use std::collections::HashSet; +use std::path::Path; + +use anyhow::Result; +use bili_sync_entity::*; +use filenamify::filenamify; +use sea_orm::entity::prelude::*; +use sea_orm::sea_query::OnConflict; +use sea_orm::ActiveValue::Set; +use sea_orm::{Condition, QuerySelect}; + +use crate::bilibili::{BiliError, PageInfo, VideoInfo}; +use crate::config::TEMPLATE; +use crate::utils::id_time_key; + +/// 使用 condition 筛选视频,返回视频数量 +pub(super) async fn count_videos(condition: Condition, conn: &DatabaseConnection) -> Result { + Ok(video::Entity::find().filter(condition).count(conn).await?) +} + +/// 使用 condition 筛选视频,返回视频列表 +pub(super) async fn filter_videos(condition: Condition, conn: &DatabaseConnection) -> Result> { + Ok(video::Entity::find().filter(condition).all(conn).await?) +} + +/// 使用 condition 筛选视频,返回视频列表和相关的分 P 列表 +pub(super) async fn filter_videos_with_pages( + condition: Condition, + conn: &DatabaseConnection, +) -> Result)>> { + Ok(video::Entity::find() + .filter(condition) + .find_with_related(page::Entity) + .all(conn) + .await?) +} + +/// 返回 videos_info 存在于视频表里那部分对应的 key +pub(super) async fn video_keys( + videos_info: &[VideoInfo], + columns: [video::Column; 2], + conn: &DatabaseConnection, +) -> Result> { + Ok(video::Entity::find() + .filter(video::Column::Bvid.is_in(videos_info.iter().map(|v| v.bvid().to_string()))) + .select_only() + .columns(columns) + .into_tuple() + .all(conn) + .await? + .into_iter() + .map(|(bvid, time)| id_time_key(&bvid, &time)) + .collect()) +} + +/// 返回设置了 path 的视频 +pub(super) fn video_with_path( + mut video_model: video::ActiveModel, + base_path: &str, + video_info: &VideoInfo, +) -> video::ActiveModel { + if let Some(fmt_args) = &video_info.to_fmt_args() { + video_model.path = Set(Path::new(base_path) + .join(filenamify( + TEMPLATE + .render("video", fmt_args) + .unwrap_or_else(|_| video_info.bvid().to_string()), + )) + .to_string_lossy() + .to_string()); + } + video_model +} + +/// 处理获取视频详细信息失败的情况 +pub(super) async fn error_fetch_video_detail( + e: anyhow::Error, + video_model: bili_sync_entity::video::Model, + connection: &DatabaseConnection, +) -> Result<()> { + error!( + "获取视频 {} - {} 的详细信息失败,错误为:{}", + &video_model.bvid, &video_model.name, e + ); + if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::() { + let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into(); + video_active_model.valid = Set(false); + video_active_model.save(connection).await?; + } + Ok(()) +} + +/// 创建视频的所有分 P +pub(crate) async fn create_video_pages( + pages_info: &[PageInfo], + video_model: &video::Model, + connection: &impl ConnectionTrait, +) -> Result<()> { + let page_models = pages_info + .iter() + .map(move |p| { + let (width, height) = match &p.dimension { + Some(d) => { + if d.rotate == 0 { + (Some(d.width), Some(d.height)) + } else { + (Some(d.height), Some(d.width)) + } + } + None => (None, None), + }; + page::ActiveModel { + video_id: Set(video_model.id), + cid: Set(p.cid), + pid: Set(p.page), + name: Set(p.name.clone()), + width: Set(width), + height: Set(height), + duration: Set(p.duration), + image: Set(p.first_frame.clone()), + download_status: Set(0), + ..Default::default() + } + }) + .collect::>(); + page::Entity::insert_many(page_models) + .on_conflict( + OnConflict::columns([page::Column::VideoId, page::Column::Pid]) + .do_nothing() + .to_owned(), + ) + .do_nothing() + .exec(connection) + .await?; + Ok(()) +} diff --git a/crates/bili_sync/src/adapter/mod.rs b/crates/bili_sync/src/adapter/mod.rs index dce2959..42ef454 100644 --- a/crates/bili_sync/src/adapter/mod.rs +++ b/crates/bili_sync/src/adapter/mod.rs @@ -1,5 +1,6 @@ mod collection; mod favorite; +mod helper; mod watch_later; use std::collections::HashSet; @@ -8,15 +9,14 @@ use std::pin::Pin; use anyhow::Result; use async_trait::async_trait; -pub use collection::collection_from; -pub use favorite::favorite_from; +use collection::collection_from; +use favorite::favorite_from; use futures::Stream; use sea_orm::entity::prelude::*; -use sea_orm::ActiveValue::Set; use sea_orm::DatabaseConnection; use watch_later::watch_later_from; -use crate::bilibili::{self, BiliClient, BiliError, CollectionItem, VideoInfo}; +use crate::bilibili::{self, BiliClient, CollectionItem, VideoInfo}; pub enum Args<'a> { Favorite { fid: &'a str }, @@ -37,51 +37,32 @@ pub async fn video_list_from<'a>( } } -async fn error_fetch_video_detail( - e: anyhow::Error, - video_model: bili_sync_entity::video::Model, - connection: &DatabaseConnection, -) -> Result<()> { - error!( - "获取视频 {} - {} 的详细信息失败,错误为:{}", - &video_model.bvid, &video_model.name, e - ); - if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::() { - let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into(); - video_active_model.valid = Set(false); - video_active_model.save(connection).await?; - } - Ok(()) -} - #[async_trait] pub trait VideoListModel { - /* 逻辑相关 */ - - /// 获取与视频列表关联的视频总数 + /// 与视频列表关联的视频总数 async fn video_count(&self, connection: &DatabaseConnection) -> Result; - /// 获取未填充的视频 + /// 未填充的视频 async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result>; - /// 获取未处理的视频和分页 + /// 未处理的视频和分页 async fn unhandled_video_pages( &self, connection: &DatabaseConnection, ) -> Result)>>; - /// 获取该批次视频的存在标记 + /// 该批次视频的存在标记 async fn exist_labels(&self, videos_info: &[VideoInfo], connection: &DatabaseConnection) -> Result>; - /// 获取视频信息对应的视频 model + /// 视频信息对应的视频 model fn video_model_by_info( &self, video_info: &VideoInfo, base_model: Option, ) -> bili_sync_entity::video::ActiveModel; - /// 获取视频 model 中缺失的信息 + /// 视频 model 中缺失的信息 async fn fetch_videos_detail( &self, video: bilibili::Video<'_>, @@ -89,16 +70,21 @@ pub trait VideoListModel { connection: &DatabaseConnection, ) -> Result<()>; - /* 日志相关 */ + /// 开始获取视频 fn log_fetch_video_start(&self); + /// 结束获取视频 fn log_fetch_video_end(&self); + /// 开始下载视频 fn log_download_video_start(&self); + /// 结束下载视频 fn log_download_video_end(&self); + /// 开始刷新视频 fn log_refresh_video_start(&self); + /// 结束刷新视频 fn log_refresh_video_end(&self, got_count: usize, new_count: u64); } diff --git a/crates/bili_sync/src/adapter/watch_later.rs b/crates/bili_sync/src/adapter/watch_later.rs index 53a1354..1189c52 100644 --- a/crates/bili_sync/src/adapter/watch_later.rs +++ b/crates/bili_sync/src/adapter/watch_later.rs @@ -3,91 +3,54 @@ use std::path::Path; use std::pin::Pin; use anyhow::Result; +use async_trait::async_trait; use bili_sync_entity::*; -use bili_sync_migration::OnConflict; -use filenamify::filenamify; use futures::Stream; use sea_orm::entity::prelude::*; +use sea_orm::sea_query::{IntoCondition, OnConflict}; use sea_orm::ActiveValue::Set; -use sea_orm::{DatabaseConnection, QuerySelect, TransactionTrait}; +use sea_orm::{DatabaseConnection, TransactionTrait}; -use crate::adapter::{error_fetch_video_detail, VideoListModel}; +use crate::adapter::helper::video_with_path; +use crate::adapter::{helper, VideoListModel}; use crate::bilibili::{self, BiliClient, VideoInfo, WatchLater}; -use crate::config::TEMPLATE; -use crate::utils::id_time_key; -use crate::utils::model::create_video_pages; use crate::utils::status::Status; -pub async fn watch_later_from<'a>( - path: &Path, - bili_client: &'a BiliClient, - connection: &DatabaseConnection, -) -> Result<(Box, Pin + 'a>>)> { - let watch_later = WatchLater::new(bili_client); - watch_later::Entity::insert(watch_later::ActiveModel { - id: Set(1), - path: Set(path.to_string_lossy().to_string()), - ..Default::default() - }) - .on_conflict( - OnConflict::column(watch_later::Column::Id) - .update_column(watch_later::Column::Path) - .to_owned(), - ) - .exec(connection) - .await?; - Ok(( - Box::new( - watch_later::Entity::find() - .filter(watch_later::Column::Id.eq(1)) - .one(connection) - .await? - .unwrap(), - ), - Box::pin(watch_later.into_video_stream()), - )) -} -use async_trait::async_trait; - #[async_trait] impl VideoListModel for watch_later::Model { async fn video_count(&self, connection: &DatabaseConnection) -> Result { - Ok(video::Entity::find() - .filter(video::Column::WatchLaterId.eq(self.id)) - .count(connection) - .await?) + helper::count_videos(video::Column::WatchLaterId.eq(self.id).into_condition(), connection).await } async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result> { - Ok(video::Entity::find() - .filter( - video::Column::WatchLaterId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.eq(0)) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_null()), - ) - .all(connection) - .await?) + helper::filter_videos( + video::Column::WatchLaterId + .eq(self.id) + .and(video::Column::Valid.eq(true)) + .and(video::Column::DownloadStatus.eq(0)) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_null()) + .into_condition(), + connection, + ) + .await } async fn unhandled_video_pages( &self, connection: &DatabaseConnection, ) -> Result)>> { - Ok(video::Entity::find() - .filter( - video::Column::WatchLaterId - .eq(self.id) - .and(video::Column::Valid.eq(true)) - .and(video::Column::DownloadStatus.lt(Status::handled())) - .and(video::Column::Category.eq(2)) - .and(video::Column::SinglePage.is_not_null()), - ) - .find_with_related(page::Entity) - .all(connection) - .await?) + helper::filter_videos_with_pages( + video::Column::WatchLaterId + .eq(self.id) + .and(video::Column::Valid.eq(true)) + .and(video::Column::DownloadStatus.lt(Status::handled())) + .and(video::Column::Category.eq(2)) + .and(video::Column::SinglePage.is_not_null()) + .into_condition(), + connection, + ) + .await } async fn exist_labels( @@ -95,37 +58,13 @@ impl VideoListModel for watch_later::Model { videos_info: &[VideoInfo], connection: &DatabaseConnection, ) -> Result> { - let bvids = videos_info.iter().map(|v| v.bvid().to_string()).collect::>(); - Ok(video::Entity::find() - .filter( - video::Column::WatchLaterId - .eq(self.id) - .and(video::Column::Bvid.is_in(bvids)), - ) - .select_only() - .columns([video::Column::Bvid, video::Column::Favtime]) - .into_tuple() - .all(connection) - .await? - .into_iter() - .map(|(bvid, time)| id_time_key(&bvid, &time)) - .collect::>()) + helper::video_keys(videos_info, [video::Column::Bvid, video::Column::Favtime], connection).await } fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option) -> video::ActiveModel { let mut video_model = video_info.to_model(base_model); video_model.watch_later_id = Set(Some(self.id)); - if let Some(fmt_args) = &video_info.to_fmt_args() { - video_model.path = Set(Path::new(&self.path) - .join(filenamify( - TEMPLATE - .render("video", fmt_args) - .unwrap_or_else(|_| video_info.bvid().to_string()), - )) - .to_string_lossy() - .to_string()); - } - video_model + video_with_path(video_model, &self.path, video_info) } async fn fetch_videos_detail( @@ -139,7 +78,7 @@ impl VideoListModel for watch_later::Model { Ok((tags, pages_info)) => { let txn = connection.begin().await?; // 将分页信息写入数据库 - create_video_pages(&pages_info, &video_model, &txn).await?; + helper::create_video_pages(&pages_info, &video_model, &txn).await?; // 将页标记和 tag 写入数据库 let mut video_active_model: video::ActiveModel = video_model.into(); video_active_model.single_page = Set(Some(pages_info.len() == 1)); @@ -148,7 +87,7 @@ impl VideoListModel for watch_later::Model { txn.commit().await?; } Err(e) => { - error_fetch_video_detail(e, video_model, connection).await?; + helper::error_fetch_video_detail(e, video_model, connection).await?; } }; Ok(()) @@ -181,3 +120,33 @@ impl VideoListModel for watch_later::Model { ); } } + +pub(super) async fn watch_later_from<'a>( + path: &Path, + bili_client: &'a BiliClient, + connection: &DatabaseConnection, +) -> Result<(Box, Pin + 'a>>)> { + let watch_later = WatchLater::new(bili_client); + watch_later::Entity::insert(watch_later::ActiveModel { + id: Set(1), + path: Set(path.to_string_lossy().to_string()), + ..Default::default() + }) + .on_conflict( + OnConflict::column(watch_later::Column::Id) + .update_column(watch_later::Column::Path) + .to_owned(), + ) + .exec(connection) + .await?; + Ok(( + Box::new( + watch_later::Entity::find() + .filter(watch_later::Column::Id.eq(1)) + .one(connection) + .await? + .unwrap(), + ), + Box::pin(watch_later.into_video_stream()), + )) +} diff --git a/crates/bili_sync/src/utils/convert.rs b/crates/bili_sync/src/utils/convert.rs index 5dd5620..b01782d 100644 --- a/crates/bili_sync/src/utils/convert.rs +++ b/crates/bili_sync/src/utils/convert.rs @@ -1,5 +1,5 @@ -use sea_orm::ActiveValue::NotSet; -use sea_orm::{IntoActiveModel, Set}; +use sea_orm::ActiveValue::{NotSet, Set}; +use sea_orm::IntoActiveModel; use serde_json::json; use crate::bilibili::VideoInfo; diff --git a/crates/bili_sync/src/utils/model.rs b/crates/bili_sync/src/utils/model.rs index 039b971..b836166 100644 --- a/crates/bili_sync/src/utils/model.rs +++ b/crates/bili_sync/src/utils/model.rs @@ -1,11 +1,10 @@ use anyhow::Result; use bili_sync_entity::*; -use bili_sync_migration::OnConflict; use sea_orm::entity::prelude::*; -use sea_orm::ActiveValue::Set; +use sea_orm::sea_query::OnConflict; use crate::adapter::VideoListModel; -use crate::bilibili::{PageInfo, VideoInfo}; +use crate::bilibili::VideoInfo; /// 尝试创建 Video Model,如果发生冲突则忽略 pub async fn create_videos( @@ -26,51 +25,6 @@ pub async fn create_videos( Ok(()) } -/// 创建视频的所有分 P -pub async fn create_video_pages( - pages_info: &[PageInfo], - video_model: &video::Model, - connection: &impl ConnectionTrait, -) -> Result<()> { - let page_models = pages_info - .iter() - .map(move |p| { - let (width, height) = match &p.dimension { - Some(d) => { - if d.rotate == 0 { - (Some(d.width), Some(d.height)) - } else { - (Some(d.height), Some(d.width)) - } - } - None => (None, None), - }; - page::ActiveModel { - video_id: Set(video_model.id), - cid: Set(p.cid), - pid: Set(p.page), - name: Set(p.name.clone()), - width: Set(width), - height: Set(height), - duration: Set(p.duration), - image: Set(p.first_frame.clone()), - download_status: Set(0), - ..Default::default() - } - }) - .collect::>(); - page::Entity::insert_many(page_models) - .on_conflict( - OnConflict::columns([page::Column::VideoId, page::Column::Pid]) - .do_nothing() - .to_owned(), - ) - .do_nothing() - .exec(connection) - .await?; - Ok(()) -} - /// 更新视频 model 的下载状态 pub async fn update_videos_model(videos: Vec, connection: &DatabaseConnection) -> Result<()> { video::Entity::insert_many(videos) diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 15552fc..6f876d6 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use anyhow::{bail, Result}; -use bili_sync_entity::{page, video}; +use bili_sync_entity::*; use filenamify::filenamify; use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{Future, Stream, StreamExt};