From 4f780faf64bbaa452f0ca54a7a4cb3dbe4b43b2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=B4=80=E1=B4=8D=E1=B4=9B=E1=B4=8F=E1=B4=80=E1=B4=87?= =?UTF-8?q?=CA=80?= Date: Wed, 6 Aug 2025 14:08:07 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=A2=9E=E5=8A=A0=20busy=5Ftimeout?= =?UTF-8?q?=E3=80=81=E6=9C=80=E5=B0=8F=E5=8C=96=E4=BA=8B=E5=8A=A1=E5=9D=97?= =?UTF-8?q?=E3=80=81=E5=A2=9E=E5=8A=A0=E6=AF=8F=E6=89=B9=E5=A4=84=E7=90=86?= =?UTF-8?q?=20page=20=E9=87=8F=20(#420)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/bili_sync/src/database.rs | 21 ++++++-- crates/bili_sync/src/utils/convert.rs | 7 +-- crates/bili_sync/src/utils/model.rs | 14 ++---- crates/bili_sync/src/workflow.rs | 72 ++++++++++++++------------- 4 files changed, 60 insertions(+), 54 deletions(-) diff --git a/crates/bili_sync/src/database.rs b/crates/bili_sync/src/database.rs index 6898e32..f99603a 100644 --- a/crates/bili_sync/src/database.rs +++ b/crates/bili_sync/src/database.rs @@ -1,6 +1,10 @@ +use std::time::Duration; + use anyhow::{Context, Result}; use bili_sync_migration::{Migrator, MigratorTrait}; -use sea_orm::{ConnectOptions, Database, DatabaseConnection}; +use sea_orm::sqlx::sqlite::SqliteConnectOptions; +use sea_orm::sqlx::{ConnectOptions as SqlxConnectOptions, Sqlite}; +use sea_orm::{ConnectOptions, Database, DatabaseConnection, SqlxSqliteConnector}; use crate::config::CONFIG_DIR; @@ -13,8 +17,19 @@ async fn database_connection() -> Result { option .max_connections(100) .min_connections(5) - .acquire_timeout(std::time::Duration::from_secs(90)); - Ok(Database::connect(option).await?) + .acquire_timeout(Duration::from_secs(90)); + let connect_option = option + .get_url() + .parse::() + .context("Failed to parse database URL")? + .disable_statement_logging() + .busy_timeout(Duration::from_secs(90)); + Ok(SqlxSqliteConnector::from_sqlx_sqlite_pool( + option + .sqlx_pool_options::() + .connect_with(connect_option) + .await?, + )) } async fn migrate_database() -> Result<()> { diff --git a/crates/bili_sync/src/utils/convert.rs b/crates/bili_sync/src/utils/convert.rs index 21d38e2..a8fa865 100644 --- a/crates/bili_sync/src/utils/convert.rs +++ b/crates/bili_sync/src/utils/convert.rs @@ -152,10 +152,7 @@ impl VideoInfo { } impl PageInfo { - pub fn into_active_model( - self, - video_model: &bili_sync_entity::video::Model, - ) -> bili_sync_entity::page::ActiveModel { + pub fn into_active_model(self, video_model_id: i32) -> bili_sync_entity::page::ActiveModel { let (width, height) = match &self.dimension { Some(d) => { if d.rotate == 0 { @@ -167,7 +164,7 @@ impl PageInfo { None => (None, None), }; bili_sync_entity::page::ActiveModel { - video_id: Set(video_model.id), + video_id: Set(video_model_id), cid: Set(self.cid), pid: Set(self.page), name: Set(self.name), diff --git a/crates/bili_sync/src/utils/model.rs b/crates/bili_sync/src/utils/model.rs index 573ce1b..78fc57c 100644 --- a/crates/bili_sync/src/utils/model.rs +++ b/crates/bili_sync/src/utils/model.rs @@ -6,7 +6,7 @@ use sea_orm::sea_query::{OnConflict, SimpleExpr}; use sea_orm::{DatabaseTransaction, TransactionTrait}; use crate::adapter::{VideoSource, VideoSourceEnum}; -use crate::bilibili::{PageInfo, VideoInfo}; +use crate::bilibili::VideoInfo; use crate::config::{Config, LegacyConfig}; use crate::utils::status::STATUS_COMPLETED; @@ -73,16 +73,8 @@ pub async fn create_videos( } /// 尝试创建 Page Model,如果发生冲突则忽略 -pub async fn create_pages( - pages_info: Vec, - video_model: &bili_sync_entity::video::Model, - connection: &DatabaseTransaction, -) -> Result<()> { - let page_models = pages_info - .into_iter() - .map(|p| p.into_active_model(video_model)) - .collect::>(); - for page_chunk in page_models.chunks(50) { +pub async fn create_pages(pages_model: Vec, connection: &DatabaseTransaction) -> Result<()> { + for page_chunk in pages_model.chunks(200) { page::Entity::insert_many(page_chunk.to_vec()) .on_conflict( OnConflict::columns([page::Column::VideoId, page::Column::Pid]) diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 4fc34e7..af08ae0 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -106,42 +106,44 @@ pub async fn fetch_video_details( let semaphore_ref = &semaphore; let tasks = videos_model .into_iter() - .map(|video_model| { - async move { - let _permit = semaphore_ref.acquire().await.context("acquire semaphore failed")?; - let video = Video::new(bili_client, video_model.bvid.clone()); - let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; - match info { - Err(e) => { - error!( - "获取视频 {} - {} 的详细信息失败,错误为:{:#}", - &video_model.bvid, &video_model.name, e - ); - if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::() { - let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into(); - video_active_model.valid = Set(false); - video_active_model.save(connection).await?; - } + .map(|video_model| async move { + let _permit = semaphore_ref.acquire().await.context("acquire semaphore failed")?; + let video = Video::new(bili_client, video_model.bvid.clone()); + let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; + match info { + Err(e) => { + error!( + "获取视频 {} - {} 的详细信息失败,错误为:{:#}", + &video_model.bvid, &video_model.name, e + ); + if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::() { + let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into(); + video_active_model.valid = Set(false); + video_active_model.save(connection).await?; } - Ok((tags, mut view_info)) => { - let VideoInfo::Detail { pages, .. } = &mut view_info else { - unreachable!() - }; - let pages = std::mem::take(pages); - let pages_len = pages.len(); - let txn = connection.begin().await?; - // 将分页信息写入数据库 - create_pages(pages, &video_model, &txn).await?; - let mut video_active_model = view_info.into_detail_model(video_model); - video_source.set_relation_id(&mut video_active_model); - video_active_model.single_page = Set(Some(pages_len == 1)); - video_active_model.tags = Set(Some(serde_json::to_value(tags)?)); - video_active_model.save(&txn).await?; - txn.commit().await?; - } - }; - Ok::<_, anyhow::Error>(()) - } + } + Ok((tags, mut view_info)) => { + let VideoInfo::Detail { pages, .. } = &mut view_info else { + unreachable!() + }; + // 构造 page model + let pages = std::mem::take(pages); + let pages = pages + .into_iter() + .map(|p| p.into_active_model(video_model.id)) + .collect::>(); + // 更新 video model 的各项有关属性 + let mut video_active_model = view_info.into_detail_model(video_model); + video_source.set_relation_id(&mut video_active_model); + video_active_model.single_page = Set(Some(pages.len() == 1)); + video_active_model.tags = Set(Some(serde_json::to_value(tags)?)); + let txn = connection.begin().await?; + create_pages(pages, &txn).await?; + video_active_model.save(&txn).await?; + txn.commit().await?; + } + }; + Ok::<_, anyhow::Error>(()) }) .collect::>(); tasks.try_collect::>().await?;