diff --git a/crates/bili_sync/src/database.rs b/crates/bili_sync/src/database.rs index 6898e32..f99603a 100644 --- a/crates/bili_sync/src/database.rs +++ b/crates/bili_sync/src/database.rs @@ -1,6 +1,10 @@ +use std::time::Duration; + use anyhow::{Context, Result}; use bili_sync_migration::{Migrator, MigratorTrait}; -use sea_orm::{ConnectOptions, Database, DatabaseConnection}; +use sea_orm::sqlx::sqlite::SqliteConnectOptions; +use sea_orm::sqlx::{ConnectOptions as SqlxConnectOptions, Sqlite}; +use sea_orm::{ConnectOptions, Database, DatabaseConnection, SqlxSqliteConnector}; use crate::config::CONFIG_DIR; @@ -13,8 +17,19 @@ async fn database_connection() -> Result { option .max_connections(100) .min_connections(5) - .acquire_timeout(std::time::Duration::from_secs(90)); - Ok(Database::connect(option).await?) + .acquire_timeout(Duration::from_secs(90)); + let connect_option = option + .get_url() + .parse::() + .context("Failed to parse database URL")? + .disable_statement_logging() + .busy_timeout(Duration::from_secs(90)); + Ok(SqlxSqliteConnector::from_sqlx_sqlite_pool( + option + .sqlx_pool_options::() + .connect_with(connect_option) + .await?, + )) } async fn migrate_database() -> Result<()> { diff --git a/crates/bili_sync/src/utils/convert.rs b/crates/bili_sync/src/utils/convert.rs index 21d38e2..a8fa865 100644 --- a/crates/bili_sync/src/utils/convert.rs +++ b/crates/bili_sync/src/utils/convert.rs @@ -152,10 +152,7 @@ impl VideoInfo { } impl PageInfo { - pub fn into_active_model( - self, - video_model: &bili_sync_entity::video::Model, - ) -> bili_sync_entity::page::ActiveModel { + pub fn into_active_model(self, video_model_id: i32) -> bili_sync_entity::page::ActiveModel { let (width, height) = match &self.dimension { Some(d) => { if d.rotate == 0 { @@ -167,7 +164,7 @@ impl PageInfo { None => (None, None), }; bili_sync_entity::page::ActiveModel { - video_id: Set(video_model.id), + video_id: Set(video_model_id), cid: Set(self.cid), pid: Set(self.page), name: Set(self.name), diff --git a/crates/bili_sync/src/utils/model.rs b/crates/bili_sync/src/utils/model.rs index 573ce1b..78fc57c 100644 --- a/crates/bili_sync/src/utils/model.rs +++ b/crates/bili_sync/src/utils/model.rs @@ -6,7 +6,7 @@ use sea_orm::sea_query::{OnConflict, SimpleExpr}; use sea_orm::{DatabaseTransaction, TransactionTrait}; use crate::adapter::{VideoSource, VideoSourceEnum}; -use crate::bilibili::{PageInfo, VideoInfo}; +use crate::bilibili::VideoInfo; use crate::config::{Config, LegacyConfig}; use crate::utils::status::STATUS_COMPLETED; @@ -73,16 +73,8 @@ pub async fn create_videos( } /// 尝试创建 Page Model,如果发生冲突则忽略 -pub async fn create_pages( - pages_info: Vec, - video_model: &bili_sync_entity::video::Model, - connection: &DatabaseTransaction, -) -> Result<()> { - let page_models = pages_info - .into_iter() - .map(|p| p.into_active_model(video_model)) - .collect::>(); - for page_chunk in page_models.chunks(50) { +pub async fn create_pages(pages_model: Vec, connection: &DatabaseTransaction) -> Result<()> { + for page_chunk in pages_model.chunks(200) { page::Entity::insert_many(page_chunk.to_vec()) .on_conflict( OnConflict::columns([page::Column::VideoId, page::Column::Pid]) diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 4fc34e7..af08ae0 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -106,42 +106,44 @@ pub async fn fetch_video_details( let semaphore_ref = &semaphore; let tasks = videos_model .into_iter() - .map(|video_model| { - async move { - let _permit = semaphore_ref.acquire().await.context("acquire semaphore failed")?; - let video = Video::new(bili_client, video_model.bvid.clone()); - let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; - match info { - Err(e) => { - error!( - "获取视频 {} - {} 的详细信息失败,错误为:{:#}", - &video_model.bvid, &video_model.name, e - ); - if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::() { - let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into(); - video_active_model.valid = Set(false); - video_active_model.save(connection).await?; - } + .map(|video_model| async move { + let _permit = semaphore_ref.acquire().await.context("acquire semaphore failed")?; + let video = Video::new(bili_client, video_model.bvid.clone()); + let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; + match info { + Err(e) => { + error!( + "获取视频 {} - {} 的详细信息失败,错误为:{:#}", + &video_model.bvid, &video_model.name, e + ); + if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::() { + let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into(); + video_active_model.valid = Set(false); + video_active_model.save(connection).await?; } - Ok((tags, mut view_info)) => { - let VideoInfo::Detail { pages, .. } = &mut view_info else { - unreachable!() - }; - let pages = std::mem::take(pages); - let pages_len = pages.len(); - let txn = connection.begin().await?; - // 将分页信息写入数据库 - create_pages(pages, &video_model, &txn).await?; - let mut video_active_model = view_info.into_detail_model(video_model); - video_source.set_relation_id(&mut video_active_model); - video_active_model.single_page = Set(Some(pages_len == 1)); - video_active_model.tags = Set(Some(serde_json::to_value(tags)?)); - video_active_model.save(&txn).await?; - txn.commit().await?; - } - }; - Ok::<_, anyhow::Error>(()) - } + } + Ok((tags, mut view_info)) => { + let VideoInfo::Detail { pages, .. } = &mut view_info else { + unreachable!() + }; + // 构造 page model + let pages = std::mem::take(pages); + let pages = pages + .into_iter() + .map(|p| p.into_active_model(video_model.id)) + .collect::>(); + // 更新 video model 的各项有关属性 + let mut video_active_model = view_info.into_detail_model(video_model); + video_source.set_relation_id(&mut video_active_model); + video_active_model.single_page = Set(Some(pages.len() == 1)); + video_active_model.tags = Set(Some(serde_json::to_value(tags)?)); + let txn = connection.begin().await?; + create_pages(pages, &txn).await?; + video_active_model.save(&txn).await?; + txn.commit().await?; + } + }; + Ok::<_, anyhow::Error>(()) }) .collect::>(); tasks.try_collect::>().await?;