fix: 增加 busy_timeout、最小化事务块、增加每批处理 page 量 (#420)
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use bili_sync_migration::{Migrator, MigratorTrait};
|
||||
use sea_orm::{ConnectOptions, Database, DatabaseConnection};
|
||||
use sea_orm::sqlx::sqlite::SqliteConnectOptions;
|
||||
use sea_orm::sqlx::{ConnectOptions as SqlxConnectOptions, Sqlite};
|
||||
use sea_orm::{ConnectOptions, Database, DatabaseConnection, SqlxSqliteConnector};
|
||||
|
||||
use crate::config::CONFIG_DIR;
|
||||
|
||||
@@ -13,8 +17,19 @@ async fn database_connection() -> Result<DatabaseConnection> {
|
||||
option
|
||||
.max_connections(100)
|
||||
.min_connections(5)
|
||||
.acquire_timeout(std::time::Duration::from_secs(90));
|
||||
Ok(Database::connect(option).await?)
|
||||
.acquire_timeout(Duration::from_secs(90));
|
||||
let connect_option = option
|
||||
.get_url()
|
||||
.parse::<SqliteConnectOptions>()
|
||||
.context("Failed to parse database URL")?
|
||||
.disable_statement_logging()
|
||||
.busy_timeout(Duration::from_secs(90));
|
||||
Ok(SqlxSqliteConnector::from_sqlx_sqlite_pool(
|
||||
option
|
||||
.sqlx_pool_options::<Sqlite>()
|
||||
.connect_with(connect_option)
|
||||
.await?,
|
||||
))
|
||||
}
|
||||
|
||||
async fn migrate_database() -> Result<()> {
|
||||
|
||||
@@ -152,10 +152,7 @@ impl VideoInfo {
|
||||
}
|
||||
|
||||
impl PageInfo {
|
||||
pub fn into_active_model(
|
||||
self,
|
||||
video_model: &bili_sync_entity::video::Model,
|
||||
) -> bili_sync_entity::page::ActiveModel {
|
||||
pub fn into_active_model(self, video_model_id: i32) -> bili_sync_entity::page::ActiveModel {
|
||||
let (width, height) = match &self.dimension {
|
||||
Some(d) => {
|
||||
if d.rotate == 0 {
|
||||
@@ -167,7 +164,7 @@ impl PageInfo {
|
||||
None => (None, None),
|
||||
};
|
||||
bili_sync_entity::page::ActiveModel {
|
||||
video_id: Set(video_model.id),
|
||||
video_id: Set(video_model_id),
|
||||
cid: Set(self.cid),
|
||||
pid: Set(self.page),
|
||||
name: Set(self.name),
|
||||
|
||||
@@ -6,7 +6,7 @@ use sea_orm::sea_query::{OnConflict, SimpleExpr};
|
||||
use sea_orm::{DatabaseTransaction, TransactionTrait};
|
||||
|
||||
use crate::adapter::{VideoSource, VideoSourceEnum};
|
||||
use crate::bilibili::{PageInfo, VideoInfo};
|
||||
use crate::bilibili::VideoInfo;
|
||||
use crate::config::{Config, LegacyConfig};
|
||||
use crate::utils::status::STATUS_COMPLETED;
|
||||
|
||||
@@ -73,16 +73,8 @@ pub async fn create_videos(
|
||||
}
|
||||
|
||||
/// 尝试创建 Page Model,如果发生冲突则忽略
|
||||
pub async fn create_pages(
|
||||
pages_info: Vec<PageInfo>,
|
||||
video_model: &bili_sync_entity::video::Model,
|
||||
connection: &DatabaseTransaction,
|
||||
) -> Result<()> {
|
||||
let page_models = pages_info
|
||||
.into_iter()
|
||||
.map(|p| p.into_active_model(video_model))
|
||||
.collect::<Vec<page::ActiveModel>>();
|
||||
for page_chunk in page_models.chunks(50) {
|
||||
pub async fn create_pages(pages_model: Vec<page::ActiveModel>, connection: &DatabaseTransaction) -> Result<()> {
|
||||
for page_chunk in pages_model.chunks(200) {
|
||||
page::Entity::insert_many(page_chunk.to_vec())
|
||||
.on_conflict(
|
||||
OnConflict::columns([page::Column::VideoId, page::Column::Pid])
|
||||
|
||||
@@ -106,42 +106,44 @@ pub async fn fetch_video_details(
|
||||
let semaphore_ref = &semaphore;
|
||||
let tasks = videos_model
|
||||
.into_iter()
|
||||
.map(|video_model| {
|
||||
async move {
|
||||
let _permit = semaphore_ref.acquire().await.context("acquire semaphore failed")?;
|
||||
let video = Video::new(bili_client, video_model.bvid.clone());
|
||||
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await;
|
||||
match info {
|
||||
Err(e) => {
|
||||
error!(
|
||||
"获取视频 {} - {} 的详细信息失败,错误为:{:#}",
|
||||
&video_model.bvid, &video_model.name, e
|
||||
);
|
||||
if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::<BiliError>() {
|
||||
let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into();
|
||||
video_active_model.valid = Set(false);
|
||||
video_active_model.save(connection).await?;
|
||||
}
|
||||
.map(|video_model| async move {
|
||||
let _permit = semaphore_ref.acquire().await.context("acquire semaphore failed")?;
|
||||
let video = Video::new(bili_client, video_model.bvid.clone());
|
||||
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await;
|
||||
match info {
|
||||
Err(e) => {
|
||||
error!(
|
||||
"获取视频 {} - {} 的详细信息失败,错误为:{:#}",
|
||||
&video_model.bvid, &video_model.name, e
|
||||
);
|
||||
if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::<BiliError>() {
|
||||
let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into();
|
||||
video_active_model.valid = Set(false);
|
||||
video_active_model.save(connection).await?;
|
||||
}
|
||||
Ok((tags, mut view_info)) => {
|
||||
let VideoInfo::Detail { pages, .. } = &mut view_info else {
|
||||
unreachable!()
|
||||
};
|
||||
let pages = std::mem::take(pages);
|
||||
let pages_len = pages.len();
|
||||
let txn = connection.begin().await?;
|
||||
// 将分页信息写入数据库
|
||||
create_pages(pages, &video_model, &txn).await?;
|
||||
let mut video_active_model = view_info.into_detail_model(video_model);
|
||||
video_source.set_relation_id(&mut video_active_model);
|
||||
video_active_model.single_page = Set(Some(pages_len == 1));
|
||||
video_active_model.tags = Set(Some(serde_json::to_value(tags)?));
|
||||
video_active_model.save(&txn).await?;
|
||||
txn.commit().await?;
|
||||
}
|
||||
};
|
||||
Ok::<_, anyhow::Error>(())
|
||||
}
|
||||
}
|
||||
Ok((tags, mut view_info)) => {
|
||||
let VideoInfo::Detail { pages, .. } = &mut view_info else {
|
||||
unreachable!()
|
||||
};
|
||||
// 构造 page model
|
||||
let pages = std::mem::take(pages);
|
||||
let pages = pages
|
||||
.into_iter()
|
||||
.map(|p| p.into_active_model(video_model.id))
|
||||
.collect::<Vec<page::ActiveModel>>();
|
||||
// 更新 video model 的各项有关属性
|
||||
let mut video_active_model = view_info.into_detail_model(video_model);
|
||||
video_source.set_relation_id(&mut video_active_model);
|
||||
video_active_model.single_page = Set(Some(pages.len() == 1));
|
||||
video_active_model.tags = Set(Some(serde_json::to_value(tags)?));
|
||||
let txn = connection.begin().await?;
|
||||
create_pages(pages, &txn).await?;
|
||||
video_active_model.save(&txn).await?;
|
||||
txn.commit().await?;
|
||||
}
|
||||
};
|
||||
Ok::<_, anyhow::Error>(())
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
tasks.try_collect::<Vec<_>>().await?;
|
||||
|
||||
Reference in New Issue
Block a user