From 4539e9379de4881a84066df3cd60c085d5559775 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=B4=80=E1=B4=8D=E1=B4=9B=E1=B4=8F=E1=B4=80=E1=B4=87?= =?UTF-8?q?=CA=80?= Date: Tue, 17 Jun 2025 02:15:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=BF=81=E7=A7=BB=E6=89=80=E6=9C=89?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E5=88=B0=E6=95=B0=E6=8D=AE=E5=BA=93=EF=BC=8C?= =?UTF-8?q?=E5=B9=B6=E6=94=AF=E6=8C=81=E8=BF=90=E8=A1=8C=E6=97=B6=E9=87=8D?= =?UTF-8?q?=E8=BD=BD=20(#364)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 11 + Cargo.toml | 1 + crates/bili_sync/Cargo.toml | 1 + crates/bili_sync/src/adapter/collection.rs | 95 ++++---- crates/bili_sync/src/adapter/favorite.rs | 66 +++--- crates/bili_sync/src/adapter/mod.rs | 38 +--- crates/bili_sync/src/adapter/submission.rs | 66 +++--- crates/bili_sync/src/adapter/watch_later.rs | 46 ++-- crates/bili_sync/src/api/auth.rs | 6 +- crates/bili_sync/src/api/handler.rs | 15 +- crates/bili_sync/src/bilibili/analyzer.rs | 8 +- crates/bili_sync/src/bilibili/client.rs | 56 ++--- crates/bili_sync/src/bilibili/collection.rs | 6 +- .../src/bilibili/danmaku/ass_writer.rs | 10 +- .../src/bilibili/danmaku/canvas/lane.rs | 4 +- .../src/bilibili/danmaku/canvas/mod.rs | 18 +- .../bili_sync/src/bilibili/danmaku/danmu.rs | 2 +- .../bili_sync/src/bilibili/danmaku/writer.rs | 5 +- crates/bili_sync/src/bilibili/me.rs | 9 +- crates/bili_sync/src/bilibili/mod.rs | 15 +- crates/bili_sync/src/bilibili/submission.rs | 1 - crates/bili_sync/src/bilibili/video.rs | 2 +- .../bili_sync/src/config/{clap.rs => args.rs} | 3 + crates/bili_sync/src/config/current.rs | 133 ++++++++++++ crates/bili_sync/src/config/default.rs | 18 ++ crates/bili_sync/src/config/flag.rs | 3 + crates/bili_sync/src/config/global.rs | 62 ------ crates/bili_sync/src/config/handlebar.rs | 90 ++++++++ crates/bili_sync/src/config/item.rs | 78 +------ crates/bili_sync/src/config/legacy.rs | 135 ++++++++++++ crates/bili_sync/src/config/mod.rs | 205 ++---------------- .../bili_sync/src/config/versioned_cache.rs | 49 +++++ .../bili_sync/src/config/versioned_config.rs | 91 ++++++++ crates/bili_sync/src/database.rs | 8 +- crates/bili_sync/src/downloader.rs | 19 +- crates/bili_sync/src/main.rs | 18 +- crates/bili_sync/src/task/http_server.rs | 37 ++-- crates/bili_sync/src/task/video_downloader.rs | 29 ++- crates/bili_sync/src/utils/format_arg.rs | 12 +- crates/bili_sync/src/utils/model.rs | 126 ++++++++++- crates/bili_sync/src/utils/nfo.rs | 6 +- crates/bili_sync/src/workflow.rs | 103 ++------- .../bili_sync_entity/src/entities/config.rs | 17 ++ crates/bili_sync_entity/src/entities/mod.rs | 1 + crates/bili_sync_migration/src/lib.rs | 2 + .../src/m20250613_043257_add_config.rs | 44 ++++ 46 files changed, 1055 insertions(+), 715 deletions(-) rename crates/bili_sync/src/config/{clap.rs => args.rs} (92%) create mode 100644 crates/bili_sync/src/config/current.rs create mode 100644 crates/bili_sync/src/config/default.rs create mode 100644 crates/bili_sync/src/config/flag.rs delete mode 100644 crates/bili_sync/src/config/global.rs create mode 100644 crates/bili_sync/src/config/handlebar.rs create mode 100644 crates/bili_sync/src/config/legacy.rs create mode 100644 crates/bili_sync/src/config/versioned_cache.rs create mode 100644 crates/bili_sync/src/config/versioned_config.rs create mode 100644 crates/bili_sync_entity/src/entities/config.rs create mode 100644 crates/bili_sync_migration/src/m20250613_043257_add_config.rs diff --git a/Cargo.lock b/Cargo.lock index 382adb2..3d8696e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -518,6 +518,7 @@ dependencies = [ "toml", "tower", "tracing", + "tracing-panic", "tracing-subscriber", "utoipa", "utoipa-swagger-ui", @@ -3942,6 +3943,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-panic" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bf1298a179837099f9309243af3b554e840f7f67f65e9f55294913299bd4cc5" +dependencies = [ + "tracing", + "tracing-subscriber", +] + [[package]] name = "tracing-subscriber" version = "0.3.19" diff --git a/Cargo.toml b/Cargo.toml index 239439d..bb141c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ tokio-util = { version = "0.7.15", features = ["io", "rt"] } toml = "0.8.22" tower = "0.5.2" tracing = "0.1.41" +tracing-panic = "0.1.2" tracing-subscriber = { version = "0.3.19", features = ["chrono"] } utoipa = { git = "https://github.com/amtoaer/utoipa.git", tag = "v1.0.1", features = [ "axum_extras", diff --git a/crates/bili_sync/Cargo.toml b/crates/bili_sync/Cargo.toml index 1cdfe27..2c7cc42 100644 --- a/crates/bili_sync/Cargo.toml +++ b/crates/bili_sync/Cargo.toml @@ -48,6 +48,7 @@ tokio-util = { workspace = true } toml = { workspace = true } tower = { workspace = true } tracing = { workspace = true } +tracing-panic = { workspace = true } tracing-subscriber = { workspace = true } utoipa = { workspace = true } utoipa-swagger-ui = { workspace = true } diff --git a/crates/bili_sync/src/adapter/collection.rs b/crates/bili_sync/src/adapter/collection.rs index f63241a..3d22727 100644 --- a/crates/bili_sync/src/adapter/collection.rs +++ b/crates/bili_sync/src/adapter/collection.rs @@ -90,50 +90,55 @@ impl VideoSource for collection::Model { fn log_download_video_end(&self) { info!("下载{}「{}」视频完成", CollectionType::from(self.r#type), self.name); } -} -pub(super) async fn collection_from<'a>( - collection_item: &'a CollectionItem, - path: &Path, - bili_client: &'a BiliClient, - connection: &DatabaseConnection, -) -> Result<( - VideoSourceEnum, - Pin> + 'a + Send>>, -)> { - let collection = Collection::new(bili_client, collection_item); - let collection_info = collection.get_info().await?; - collection::Entity::insert(collection::ActiveModel { - s_id: Set(collection_info.sid), - m_id: Set(collection_info.mid), - r#type: Set(collection_info.collection_type.into()), - name: Set(collection_info.name.clone()), - path: Set(path.to_string_lossy().to_string()), - ..Default::default() - }) - .on_conflict( - OnConflict::columns([ - collection::Column::SId, - collection::Column::MId, - collection::Column::Type, - ]) - .update_columns([collection::Column::Name, collection::Column::Path]) - .to_owned(), - ) - .exec(connection) - .await?; - Ok(( - collection::Entity::find() - .filter( - collection::Column::SId - .eq(collection_item.sid.clone()) - .and(collection::Column::MId.eq(collection_item.mid.clone())) - .and(collection::Column::Type.eq(Into::::into(collection_item.collection_type.clone()))), - ) - .one(connection) - .await? - .context("collection not found")? - .into(), - Box::pin(collection.into_video_stream()), - )) + async fn refresh<'a>( + self, + bili_client: &'a BiliClient, + connection: &'a DatabaseConnection, + ) -> Result<( + VideoSourceEnum, + Pin> + Send + 'a>>, + )> { + let collection = Collection::new( + bili_client, + CollectionItem { + sid: self.s_id.to_string(), + mid: self.m_id.to_string(), + collection_type: CollectionType::from(self.r#type), + }, + ); + let collection_info = collection.get_info().await?; + collection::Entity::insert(collection::ActiveModel { + s_id: Set(collection_info.sid), + m_id: Set(collection_info.mid), + r#type: Set(collection_info.collection_type.into()), + name: Set(collection_info.name.clone()), + ..Default::default() + }) + .on_conflict( + OnConflict::columns([ + collection::Column::SId, + collection::Column::MId, + collection::Column::Type, + ]) + .update_column(collection::Column::Name) + .to_owned(), + ) + .exec(connection) + .await?; + Ok(( + collection::Entity::find() + .filter( + collection::Column::SId + .eq(self.s_id) + .and(collection::Column::MId.eq(self.m_id)) + .and(collection::Column::Type.eq(self.r#type)), + ) + .one(connection) + .await? + .context("collection not found")? + .into(), + Box::pin(collection.into_video_stream()), + )) + } } diff --git a/crates/bili_sync/src/adapter/favorite.rs b/crates/bili_sync/src/adapter/favorite.rs index 740ca5d..3ce8403 100644 --- a/crates/bili_sync/src/adapter/favorite.rs +++ b/crates/bili_sync/src/adapter/favorite.rs @@ -60,39 +60,37 @@ impl VideoSource for favorite::Model { fn log_download_video_end(&self) { info!("下载收藏夹「{}」视频完成", self.name); } -} -pub(super) async fn favorite_from<'a>( - fid: &str, - path: &Path, - bili_client: &'a BiliClient, - connection: &DatabaseConnection, -) -> Result<( - VideoSourceEnum, - Pin> + 'a + Send>>, -)> { - let favorite = FavoriteList::new(bili_client, fid.to_owned()); - let favorite_info = favorite.get_info().await?; - favorite::Entity::insert(favorite::ActiveModel { - f_id: Set(favorite_info.id), - name: Set(favorite_info.title.clone()), - path: Set(path.to_string_lossy().to_string()), - ..Default::default() - }) - .on_conflict( - OnConflict::column(favorite::Column::FId) - .update_columns([favorite::Column::Name, favorite::Column::Path]) - .to_owned(), - ) - .exec(connection) - .await?; - Ok(( - favorite::Entity::find() - .filter(favorite::Column::FId.eq(favorite_info.id)) - .one(connection) - .await? - .context("favorite not found")? - .into(), - Box::pin(favorite.into_video_stream()), - )) + async fn refresh<'a>( + self, + bili_client: &'a BiliClient, + connection: &'a DatabaseConnection, + ) -> Result<( + VideoSourceEnum, + Pin> + Send + 'a>>, + )> { + let favorite = FavoriteList::new(bili_client, self.f_id.to_string()); + let favorite_info = favorite.get_info().await?; + favorite::Entity::insert(favorite::ActiveModel { + f_id: Set(favorite_info.id), + name: Set(favorite_info.title.clone()), + ..Default::default() + }) + .on_conflict( + OnConflict::column(favorite::Column::FId) + .update_column(favorite::Column::Name) + .to_owned(), + ) + .exec(connection) + .await?; + Ok(( + favorite::Entity::find() + .filter(favorite::Column::FId.eq(favorite_info.id)) + .one(connection) + .await? + .context("favorite not found")? + .into(), + Box::pin(favorite.into_video_stream()), + )) + } } diff --git a/crates/bili_sync/src/adapter/mod.rs b/crates/bili_sync/src/adapter/mod.rs index 84edcdd..6d32d55 100644 --- a/crates/bili_sync/src/adapter/mod.rs +++ b/crates/bili_sync/src/adapter/mod.rs @@ -20,11 +20,7 @@ use bili_sync_entity::favorite::Model as Favorite; use bili_sync_entity::submission::Model as Submission; use bili_sync_entity::watch_later::Model as WatchLater; -use crate::adapter::collection::collection_from; -use crate::adapter::favorite::favorite_from; -use crate::adapter::submission::submission_from; -use crate::adapter::watch_later::watch_later_from; -use crate::bilibili::{BiliClient, CollectionItem, VideoInfo}; +use crate::bilibili::{BiliClient, VideoInfo}; #[enum_dispatch] pub enum VideoSourceEnum { @@ -84,31 +80,15 @@ pub trait VideoSource { /// 结束下载视频 fn log_download_video_end(&self); -} -#[derive(Clone, Copy, Debug)] -pub enum Args<'a> { - Favorite { fid: &'a str }, - Collection { collection_item: &'a CollectionItem }, - WatchLater, - Submission { upper_id: &'a str }, -} - -pub async fn video_source_from<'a>( - args: Args<'a>, - path: &Path, - bili_client: &'a BiliClient, - connection: &DatabaseConnection, -) -> Result<( - VideoSourceEnum, - Pin> + 'a + Send>>, -)> { - match args { - Args::Favorite { fid } => favorite_from(fid, path, bili_client, connection).await, - Args::Collection { collection_item } => collection_from(collection_item, path, bili_client, connection).await, - Args::WatchLater => watch_later_from(path, bili_client, connection).await, - Args::Submission { upper_id } => submission_from(upper_id, path, bili_client, connection).await, - } + async fn refresh<'a>( + self, + bili_client: &'a BiliClient, + connection: &'a DatabaseConnection, + ) -> Result<( + VideoSourceEnum, + Pin> + Send + 'a>>, + )>; } pub enum _ActiveModel { diff --git a/crates/bili_sync/src/adapter/submission.rs b/crates/bili_sync/src/adapter/submission.rs index 1e4c006..4f9906f 100644 --- a/crates/bili_sync/src/adapter/submission.rs +++ b/crates/bili_sync/src/adapter/submission.rs @@ -60,39 +60,37 @@ impl VideoSource for submission::Model { fn log_download_video_end(&self) { info!("下载「{}」投稿视频完成", self.upper_name); } -} -pub(super) async fn submission_from<'a>( - upper_id: &str, - path: &Path, - bili_client: &'a BiliClient, - connection: &DatabaseConnection, -) -> Result<( - VideoSourceEnum, - Pin> + 'a + Send>>, -)> { - let submission = Submission::new(bili_client, upper_id.to_owned()); - let upper = submission.get_info().await?; - submission::Entity::insert(submission::ActiveModel { - upper_id: Set(upper.mid.parse()?), - upper_name: Set(upper.name), - path: Set(path.to_string_lossy().to_string()), - ..Default::default() - }) - .on_conflict( - OnConflict::column(submission::Column::UpperId) - .update_columns([submission::Column::UpperName, submission::Column::Path]) - .to_owned(), - ) - .exec(connection) - .await?; - Ok(( - submission::Entity::find() - .filter(submission::Column::UpperId.eq(upper.mid)) - .one(connection) - .await? - .context("submission not found")? - .into(), - Box::pin(submission.into_video_stream()), - )) + async fn refresh<'a>( + self, + bili_client: &'a BiliClient, + connection: &'a DatabaseConnection, + ) -> Result<( + VideoSourceEnum, + Pin> + Send + 'a>>, + )> { + let submission = Submission::new(bili_client, self.upper_id.to_string()); + let upper = submission.get_info().await?; + submission::Entity::insert(submission::ActiveModel { + upper_id: Set(upper.mid.parse()?), + upper_name: Set(upper.name), + ..Default::default() + }) + .on_conflict( + OnConflict::column(submission::Column::UpperId) + .update_column(submission::Column::UpperName) + .to_owned(), + ) + .exec(connection) + .await?; + Ok(( + submission::Entity::find() + .filter(submission::Column::UpperId.eq(upper.mid)) + .one(connection) + .await? + .context("submission not found")? + .into(), + Box::pin(submission.into_video_stream()), + )) + } } diff --git a/crates/bili_sync/src/adapter/watch_later.rs b/crates/bili_sync/src/adapter/watch_later.rs index 9ba4314..fe1d667 100644 --- a/crates/bili_sync/src/adapter/watch_later.rs +++ b/crates/bili_sync/src/adapter/watch_later.rs @@ -1,12 +1,12 @@ use std::path::Path; use std::pin::Pin; -use anyhow::{Context, Result}; +use anyhow::Result; use bili_sync_entity::*; use futures::Stream; use sea_orm::ActiveValue::Set; use sea_orm::entity::prelude::*; -use sea_orm::sea_query::{OnConflict, SimpleExpr}; +use sea_orm::sea_query::SimpleExpr; use sea_orm::{DatabaseConnection, Unchanged}; use crate::adapter::{_ActiveModel, VideoSource, VideoSourceEnum}; @@ -60,36 +60,16 @@ impl VideoSource for watch_later::Model { fn log_download_video_end(&self) { info!("下载稍后再看视频完成"); } -} -pub(super) async fn watch_later_from<'a>( - path: &Path, - bili_client: &'a BiliClient, - connection: &DatabaseConnection, -) -> Result<( - VideoSourceEnum, - Pin> + 'a + Send>>, -)> { - let watch_later = WatchLater::new(bili_client); - watch_later::Entity::insert(watch_later::ActiveModel { - id: Set(1), - path: Set(path.to_string_lossy().to_string()), - ..Default::default() - }) - .on_conflict( - OnConflict::column(watch_later::Column::Id) - .update_column(watch_later::Column::Path) - .to_owned(), - ) - .exec(connection) - .await?; - Ok(( - watch_later::Entity::find() - .filter(watch_later::Column::Id.eq(1)) - .one(connection) - .await? - .context("watch_later not found")? - .into(), - Box::pin(watch_later.into_video_stream()), - )) + async fn refresh<'a>( + self, + bili_client: &'a BiliClient, + _connection: &'a DatabaseConnection, + ) -> Result<( + VideoSourceEnum, + Pin> + Send + 'a>>, + )> { + let watch_later = WatchLater::new(bili_client); + Ok((self.into(), Box::pin(watch_later.into_video_stream()))) + } } diff --git a/crates/bili_sync/src/api/auth.rs b/crates/bili_sync/src/api/auth.rs index 31490cd..b7bb827 100644 --- a/crates/bili_sync/src/api/auth.rs +++ b/crates/bili_sync/src/api/auth.rs @@ -7,10 +7,12 @@ use utoipa::Modify; use utoipa::openapi::security::{ApiKey, ApiKeyValue, SecurityScheme}; use crate::api::wrapper::ApiResponse; -use crate::config::CONFIG; +use crate::config::VersionedConfig; pub async fn auth(headers: HeaderMap, request: Request, next: Next) -> Result { - if request.uri().path().starts_with("/api/") && get_token(&headers) != CONFIG.auth_token { + if request.uri().path().starts_with("/api/") + && get_token(&headers).is_none_or(|token| token != VersionedConfig::get().load().auth_token) + { return Ok(ApiResponse::unauthorized(()).into_response()); } Ok(next.run(request).await) diff --git a/crates/bili_sync/src/api/handler.rs b/crates/bili_sync/src/api/handler.rs index bf19c7a..8ffb606 100644 --- a/crates/bili_sync/src/api/handler.rs +++ b/crates/bili_sync/src/api/handler.rs @@ -573,13 +573,14 @@ pub async fn upsert_collection( Extension(bili_client): Extension>, ValidatedJson(request): ValidatedJson, ) -> Result, ApiError> { - let collection_item = CollectionItem { - sid: request.id.to_string(), - mid: request.mid.to_string(), - collection_type: request.collection_type, - }; - - let collection = Collection::new(bili_client.as_ref(), &collection_item); + let collection = Collection::new( + bili_client.as_ref(), + CollectionItem { + sid: request.id.to_string(), + mid: request.mid.to_string(), + collection_type: request.collection_type, + }, + ); let collection_info = collection.get_info().await?; collection::Entity::insert(collection::ActiveModel { diff --git a/crates/bili_sync/src/bilibili/analyzer.rs b/crates/bili_sync/src/bilibili/analyzer.rs index 8e12600..9976fbe 100644 --- a/crates/bili_sync/src/bilibili/analyzer.rs +++ b/crates/bili_sync/src/bilibili/analyzer.rs @@ -2,7 +2,7 @@ use anyhow::{Context, Result, bail}; use serde::{Deserialize, Serialize}; use crate::bilibili::error::BiliError; -use crate::config::CONFIG; +use crate::config::VersionedConfig; pub struct PageAnalyzer { info: serde_json::Value, @@ -136,7 +136,7 @@ impl Stream { let mut urls = std::iter::once(url.as_str()) .chain(backup_url.iter().map(|s| s.as_str())) .collect::>(); - if CONFIG.cdn_sorting { + if VersionedConfig::get().load().cdn_sorting { urls.sort_by_key(|u| { if u.contains("upos-") { 0 // 服务商 cdn @@ -351,7 +351,7 @@ impl PageAnalyzer { mod tests { use super::*; use crate::bilibili::{BiliClient, Video}; - use crate::config::CONFIG; + use crate::config::VersionedConfig; #[test] fn test_quality_order() { @@ -433,7 +433,7 @@ mod tests { .get_page_analyzer(&first_page) .await .expect("failed to get page analyzer") - .best_stream(&CONFIG.filter_option) + .best_stream(&VersionedConfig::get().load().filter_option) .expect("failed to get best stream"); dbg!(bvid, &best_stream); match best_stream { diff --git a/crates/bili_sync/src/bilibili/client.rs b/crates/bili_sync/src/bilibili/client.rs index 1e9d7f5..4eae4e3 100644 --- a/crates/bili_sync/src/bilibili/client.rs +++ b/crates/bili_sync/src/bilibili/client.rs @@ -1,13 +1,14 @@ use std::sync::Arc; use std::time::Duration; -use anyhow::{Context, Result}; +use anyhow::Result; use leaky_bucket::RateLimiter; use reqwest::{Method, header}; +use sea_orm::DatabaseConnection; use crate::bilibili::Credential; use crate::bilibili::credential::WbiImg; -use crate::config::{CONFIG, RateLimit}; +use crate::config::{RateLimit, VersionedCache, VersionedConfig}; // 一个对 reqwest::Client 的简单封装,用于 Bilibili 请求 #[derive(Clone)] @@ -63,53 +64,54 @@ impl Default for Client { pub struct BiliClient { pub client: Client, - limiter: Option, + limiter: VersionedCache>, } impl BiliClient { pub fn new() -> Self { let client = Client::new(); - let limiter = CONFIG - .concurrent_limit - .rate_limit - .as_ref() - .map(|RateLimit { limit, duration }| { - RateLimiter::builder() - .initial(*limit) - .refill(*limit) - .max(*limit) - .interval(Duration::from_millis(*duration)) - .build() - }); + let limiter = VersionedCache::new(|config| { + Ok(config + .concurrent_limit + .rate_limit + .as_ref() + .map(|RateLimit { limit, duration }| { + RateLimiter::builder() + .initial(*limit) + .refill(*limit) + .max(*limit) + .interval(Duration::from_millis(*duration)) + .build() + })) + }) + .expect("failed to create rate limiter"); Self { client, limiter } } /// 获取一个预构建的请求,通过该方法获取请求时会检查并等待速率限制 pub async fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder { - if let Some(limiter) = &self.limiter { + if let Some(limiter) = self.limiter.load().as_ref() { limiter.acquire_one().await; } - let credential = CONFIG.credential.load(); - self.client.request(method, url, credential.as_deref()) + let credential = VersionedConfig::get().load().credential.load(); + self.client.request(method, url, Some(credential.as_ref())) } - pub async fn check_refresh(&self) -> Result<()> { - let credential = CONFIG.credential.load(); - let Some(credential) = credential.as_deref() else { - return Ok(()); - }; + pub async fn check_refresh(&self, connection: &DatabaseConnection) -> Result<()> { + let credential = VersionedConfig::get().load().credential.load(); if !credential.need_refresh(&self.client).await? { return Ok(()); } let new_credential = credential.refresh(&self.client).await?; - CONFIG.credential.store(Some(Arc::new(new_credential))); - CONFIG.save() + let config = VersionedConfig::get().load(); + config.credential.store(Arc::new(new_credential)); + config.save_to_database(connection).await?; + Ok(()) } /// 获取 wbi img,用于生成请求签名 pub async fn wbi_img(&self) -> Result { - let credential = CONFIG.credential.load(); - let credential = credential.as_deref().context("no credential found")?; + let credential = VersionedConfig::get().load().credential.load(); credential.wbi_img(&self.client).await } } diff --git a/crates/bili_sync/src/bilibili/collection.rs b/crates/bili_sync/src/bilibili/collection.rs index 0823eaa..10cb06e 100644 --- a/crates/bili_sync/src/bilibili/collection.rs +++ b/crates/bili_sync/src/bilibili/collection.rs @@ -10,7 +10,7 @@ use serde_json::Value; use crate::bilibili::credential::encoded_query; use crate::bilibili::{BiliClient, MIXIN_KEY, Validate, VideoInfo}; -#[derive(PartialEq, Eq, Hash, Clone, Debug, Deserialize, Default)] +#[derive(PartialEq, Eq, Hash, Clone, Debug, Deserialize, Default, Copy)] pub enum CollectionType { Series, #[default] @@ -55,7 +55,7 @@ pub struct CollectionItem { pub struct Collection<'a> { client: &'a BiliClient, - collection: &'a CollectionItem, + collection: CollectionItem, } #[derive(Debug, PartialEq)] @@ -94,7 +94,7 @@ impl<'de> Deserialize<'de> for CollectionInfo { } impl<'a> Collection<'a> { - pub fn new(client: &'a BiliClient, collection: &'a CollectionItem) -> Self { + pub fn new(client: &'a BiliClient, collection: CollectionItem) -> Self { Self { client, collection } } diff --git a/crates/bili_sync/src/bilibili/danmaku/ass_writer.rs b/crates/bili_sync/src/bilibili/danmaku/ass_writer.rs index a79a8a4..3adf1ae 100644 --- a/crates/bili_sync/src/bilibili/danmaku/ass_writer.rs +++ b/crates/bili_sync/src/bilibili/danmaku/ass_writer.rs @@ -88,14 +88,14 @@ impl fmt::Display for CanvasStyles { } } -pub struct AssWriter { +pub struct AssWriter<'a, W: AsyncWrite> { f: Pin>>, title: String, - canvas_config: CanvasConfig, + canvas_config: CanvasConfig<'a>, } -impl AssWriter { - pub fn new(f: W, title: String, canvas_config: CanvasConfig) -> Self { +impl<'a, W: AsyncWrite> AssWriter<'a, W> { + pub fn new(f: W, title: String, canvas_config: CanvasConfig<'a>) -> Self { AssWriter { // 对于 HDD、docker 之类的场景,磁盘 IO 是非常大的瓶颈。使用大缓存 f: Box::pin(BufWriter::with_capacity(10 << 20, f)), @@ -104,7 +104,7 @@ impl AssWriter { } } - pub async fn construct(f: W, title: String, canvas_config: CanvasConfig) -> Result { + pub async fn construct(f: W, title: String, canvas_config: CanvasConfig<'a>) -> Result { let mut res = Self::new(f, title, canvas_config); res.init().await?; Ok(res) diff --git a/crates/bili_sync/src/bilibili/danmaku/canvas/lane.rs b/crates/bili_sync/src/bilibili/danmaku/canvas/lane.rs index 803df08..12b2b5e 100644 --- a/crates/bili_sync/src/bilibili/danmaku/canvas/lane.rs +++ b/crates/bili_sync/src/bilibili/danmaku/canvas/lane.rs @@ -18,7 +18,7 @@ pub struct Lane { } impl Lane { - pub fn draw(danmu: &Danmu, config: &CanvasConfig) -> Self { + pub fn draw(danmu: &Danmu, config: &CanvasConfig<'_>) -> Self { Lane { last_shoot_time: danmu.timeline_s, last_length: danmu.length(config), @@ -26,7 +26,7 @@ impl Lane { } /// 这个槽位是否可以发射另外一条弹幕,返回可能的情形 - pub fn available_for(&self, other: &Danmu, config: &CanvasConfig) -> Collision { + pub fn available_for(&self, other: &Danmu, config: &CanvasConfig<'_>) -> Collision { #[allow(non_snake_case)] let T = config.danmaku_option.duration; #[allow(non_snake_case)] diff --git a/crates/bili_sync/src/bilibili/danmaku/canvas/mod.rs b/crates/bili_sync/src/bilibili/danmaku/canvas/mod.rs index bc16ee3..9a73425 100644 --- a/crates/bili_sync/src/bilibili/danmaku/canvas/mod.rs +++ b/crates/bili_sync/src/bilibili/danmaku/canvas/mod.rs @@ -10,7 +10,7 @@ use crate::bilibili::danmaku::canvas::lane::Collision; use crate::bilibili::danmaku::danmu::DanmuType; use crate::bilibili::danmaku::{Danmu, DrawEffect, Drawable}; -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct DanmakuOption { pub duration: f64, pub font: String, @@ -54,13 +54,13 @@ impl Default for DanmakuOption { } #[derive(Clone)] -pub struct CanvasConfig { +pub struct CanvasConfig<'a> { pub width: u64, pub height: u64, - pub danmaku_option: &'static DanmakuOption, + pub danmaku_option: &'a DanmakuOption, } -impl CanvasConfig { - pub fn new(danmaku_option: &'static DanmakuOption, page: &PageInfo) -> Self { +impl<'a> CanvasConfig<'a> { + pub fn new(danmaku_option: &'a DanmakuOption, page: &PageInfo) -> Self { let (width, height) = Self::dimension(page); Self { width, @@ -86,7 +86,7 @@ impl CanvasConfig { ((720.0 / height as f64 * width as f64) as u64, 720) } - pub fn canvas(self) -> Canvas { + pub fn canvas(self) -> Canvas<'a> { let float_lanes_cnt = (self.danmaku_option.float_percentage * self.height as f64 / self.danmaku_option.lane_size as f64) as usize; @@ -97,12 +97,12 @@ impl CanvasConfig { } } -pub struct Canvas { - pub config: CanvasConfig, +pub struct Canvas<'a> { + pub config: CanvasConfig<'a>, pub float_lanes: Vec>, } -impl Canvas { +impl<'a> Canvas<'a> { pub fn draw(&mut self, mut danmu: Danmu) -> Result> { danmu.timeline_s += self.config.danmaku_option.time_offset; if danmu.timeline_s < 0.0 { diff --git a/crates/bili_sync/src/bilibili/danmaku/danmu.rs b/crates/bili_sync/src/bilibili/danmaku/danmu.rs index 4c426a1..8179a2d 100644 --- a/crates/bili_sync/src/bilibili/danmaku/danmu.rs +++ b/crates/bili_sync/src/bilibili/danmaku/danmu.rs @@ -40,7 +40,7 @@ impl Danmu { /// 计算弹幕的“像素长度”,会乘上一个缩放因子 /// /// 汉字算一个全宽,英文算2/3宽 - pub fn length(&self, config: &CanvasConfig) -> f64 { + pub fn length(&self, config: &CanvasConfig<'_>) -> f64 { let pts = config.danmaku_option.font_size * self .content diff --git a/crates/bili_sync/src/bilibili/danmaku/writer.rs b/crates/bili_sync/src/bilibili/danmaku/writer.rs index 2207989..e7bf70d 100644 --- a/crates/bili_sync/src/bilibili/danmaku/writer.rs +++ b/crates/bili_sync/src/bilibili/danmaku/writer.rs @@ -6,7 +6,7 @@ use tokio::fs::{self, File}; use crate::bilibili::PageInfo; use crate::bilibili::danmaku::canvas::CanvasConfig; use crate::bilibili::danmaku::{AssWriter, Danmu}; -use crate::config::CONFIG; +use crate::config::VersionedConfig; pub struct DanmakuWriter<'a> { page: &'a PageInfo, @@ -22,7 +22,8 @@ impl<'a> DanmakuWriter<'a> { if let Some(parent) = path.parent() { fs::create_dir_all(parent).await?; } - let canvas_config = CanvasConfig::new(&CONFIG.danmaku_option, self.page); + let config = VersionedConfig::get().load_full(); + let canvas_config = CanvasConfig::new(&config.danmaku_option, self.page); let mut writer = AssWriter::construct(File::create(path).await?, self.page.name.clone(), canvas_config.clone()).await?; let mut canvas = canvas_config.canvas(); diff --git a/crates/bili_sync/src/bilibili/me.rs b/crates/bili_sync/src/bilibili/me.rs index 5b63b9f..fd3431c 100644 --- a/crates/bili_sync/src/bilibili/me.rs +++ b/crates/bili_sync/src/bilibili/me.rs @@ -4,7 +4,7 @@ use anyhow::Result; use reqwest::Method; use crate::bilibili::{BiliClient, Validate}; -use crate::config::CONFIG; +use crate::config::VersionedConfig; pub struct Me<'a> { client: &'a BiliClient, mid: String, @@ -73,12 +73,7 @@ impl<'a> Me<'a> { } fn my_id() -> String { - CONFIG - .credential - .load() - .as_deref() - .map(|c| c.dedeuserid.clone()) - .unwrap() + VersionedConfig::get().load().credential.load().dedeuserid.clone() } } diff --git a/crates/bili_sync/src/bilibili/mod.rs b/crates/bili_sync/src/bilibili/mod.rs index fcf4512..de0c4d8 100644 --- a/crates/bili_sync/src/bilibili/mod.rs +++ b/crates/bili_sync/src/bilibili/mod.rs @@ -154,13 +154,14 @@ mod tests { panic!("获取 mixin key 失败"); }; set_global_mixin_key(mixin_key); - // 测试视频合集 - let collection_item = CollectionItem { - mid: "521722088".to_string(), - sid: "4523".to_string(), - collection_type: CollectionType::Season, - }; - let collection = Collection::new(&bili_client, &collection_item); + let collection = Collection::new( + &bili_client, + CollectionItem { + mid: "521722088".to_string(), + sid: "4523".to_string(), + collection_type: CollectionType::Season, + }, + ); let videos = collection .into_video_stream() .take(20) diff --git a/crates/bili_sync/src/bilibili/submission.rs b/crates/bili_sync/src/bilibili/submission.rs index cf90dc2..e01fd3a 100644 --- a/crates/bili_sync/src/bilibili/submission.rs +++ b/crates/bili_sync/src/bilibili/submission.rs @@ -1,5 +1,4 @@ use anyhow::{Context, Result, anyhow}; -use arc_swap::access::Access; use async_stream::try_stream; use futures::Stream; use reqwest::Method; diff --git a/crates/bili_sync/src/bilibili/video.rs b/crates/bili_sync/src/bilibili/video.rs index 6a05763..ed4981f 100644 --- a/crates/bili_sync/src/bilibili/video.rs +++ b/crates/bili_sync/src/bilibili/video.rs @@ -68,7 +68,7 @@ impl<'a> Video<'a> { Ok(serde_json::from_value(res["data"].take())?) } - #[allow(unused)] + #[allow(dead_code)] pub async fn get_pages(&self) -> Result> { let mut res = self .client diff --git a/crates/bili_sync/src/config/clap.rs b/crates/bili_sync/src/config/args.rs similarity index 92% rename from crates/bili_sync/src/config/clap.rs rename to crates/bili_sync/src/config/args.rs index d9e27c1..d98e244 100644 --- a/crates/bili_sync/src/config/clap.rs +++ b/crates/bili_sync/src/config/args.rs @@ -1,7 +1,10 @@ use std::borrow::Cow; +use std::sync::LazyLock; use clap::Parser; +pub static ARGS: LazyLock = LazyLock::new(Args::parse); + #[derive(Parser)] #[command(name = "Bili-Sync", version = detail_version(), about, long_about = None)] pub struct Args { diff --git a/crates/bili_sync/src/config/current.rs b/crates/bili_sync/src/config/current.rs new file mode 100644 index 0000000..206fd63 --- /dev/null +++ b/crates/bili_sync/src/config/current.rs @@ -0,0 +1,133 @@ +use std::path::PathBuf; +use std::sync::LazyLock; + +use anyhow::{Result, bail}; +use arc_swap::ArcSwap; +use sea_orm::DatabaseConnection; +use serde::{Deserialize, Serialize}; + +use crate::bilibili::{Credential, DanmakuOption, FilterOption}; +use crate::config::LegacyConfig; +use crate::config::default::{default_auth_token, default_bind_address, default_time_format}; +use crate::config::item::{ConcurrentLimit, NFOTimeType}; +use crate::utils::model::{load_db_config, save_db_config}; + +pub static CONFIG_DIR: LazyLock = + LazyLock::new(|| dirs::config_dir().expect("No config path found").join("bili-sync")); + +#[derive(Serialize, Deserialize)] +pub struct Config { + #[serde(default = "default_auth_token")] + pub auth_token: String, + #[serde(default = "default_bind_address")] + pub bind_address: String, + pub credential: ArcSwap, + pub filter_option: FilterOption, + #[serde(default)] + pub danmaku_option: DanmakuOption, + pub video_name: String, + pub page_name: String, + pub interval: u64, + pub upper_path: PathBuf, + #[serde(default)] + pub nfo_time_type: NFOTimeType, + #[serde(default)] + pub concurrent_limit: ConcurrentLimit, + #[serde(default = "default_time_format")] + pub time_format: String, + #[serde(default)] + pub cdn_sorting: bool, +} + +impl Config { + pub async fn load_from_database(connection: &DatabaseConnection) -> Result>> { + load_db_config(connection).await + } + + pub async fn save_to_database(&self, connection: &DatabaseConnection) -> Result<()> { + save_db_config(self, connection).await + } + + pub fn check(&self) -> Result<()> { + let mut errors = Vec::new(); + if !self.upper_path.is_absolute() { + errors.push("up 主头像保存的路径应为绝对路径"); + } + if self.video_name.is_empty() { + errors.push("未设置 video_name 模板"); + } + if self.page_name.is_empty() { + errors.push("未设置 page_name 模板"); + } + let credential = self.credential.load(); + if credential.sessdata.is_empty() + || credential.bili_jct.is_empty() + || credential.buvid3.is_empty() + || credential.dedeuserid.is_empty() + || credential.ac_time_value.is_empty() + { + errors.push("Credential 信息不完整,请确保填写完整"); + } + if !(self.concurrent_limit.video > 0 && self.concurrent_limit.page > 0) { + errors.push("video 和 page 允许的并发数必须大于 0"); + } + if !errors.is_empty() { + bail!( + errors + .into_iter() + .map(|e| format!("- {}", e)) + .collect::>() + .join("\n") + ); + } + Ok(()) + } + + #[cfg(test)] + pub(super) fn test_default() -> Self { + Self { + cdn_sorting: true, + ..Default::default() + } + } +} + +impl Default for Config { + fn default() -> Self { + Self { + auth_token: default_auth_token(), + bind_address: default_bind_address(), + credential: ArcSwap::from_pointee(Credential::default()), + filter_option: FilterOption::default(), + danmaku_option: DanmakuOption::default(), + video_name: "{{title}}".to_owned(), + page_name: "{{bvid}}".to_owned(), + interval: 1200, + upper_path: CONFIG_DIR.join("upper_face"), + nfo_time_type: NFOTimeType::FavTime, + concurrent_limit: ConcurrentLimit::default(), + time_format: default_time_format(), + cdn_sorting: false, + } + } +} + +impl From for Config { + fn from(legacy: LegacyConfig) -> Self { + Self { + auth_token: legacy.auth_token, + bind_address: legacy.bind_address, + credential: legacy.credential, + filter_option: legacy.filter_option, + danmaku_option: legacy.danmaku_option, + video_name: legacy.video_name, + page_name: legacy.page_name, + interval: legacy.interval, + upper_path: legacy.upper_path, + nfo_time_type: legacy.nfo_time_type, + concurrent_limit: legacy.concurrent_limit, + time_format: legacy.time_format, + cdn_sorting: legacy.cdn_sorting, + } + } +} diff --git a/crates/bili_sync/src/config/default.rs b/crates/bili_sync/src/config/default.rs new file mode 100644 index 0000000..667d2e1 --- /dev/null +++ b/crates/bili_sync/src/config/default.rs @@ -0,0 +1,18 @@ +use rand::seq::SliceRandom; + +pub(super) fn default_time_format() -> String { + "%Y-%m-%d".to_string() +} + +/// 默认的 auth_token 实现,生成随机 16 位字符串 +pub(super) fn default_auth_token() -> String { + let byte_choices = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()_+-="; + let mut rng = rand::thread_rng(); + (0..16) + .map(|_| *(byte_choices.choose(&mut rng).expect("choose byte failed")) as char) + .collect() +} + +pub(super) fn default_bind_address() -> String { + "0.0.0.0:12345".to_string() +} diff --git a/crates/bili_sync/src/config/flag.rs b/crates/bili_sync/src/config/flag.rs new file mode 100644 index 0000000..7b987e5 --- /dev/null +++ b/crates/bili_sync/src/config/flag.rs @@ -0,0 +1,3 @@ +use std::sync::atomic::AtomicBool; + +pub static DOWNLOADER_RUNNING: AtomicBool = AtomicBool::new(false); diff --git a/crates/bili_sync/src/config/global.rs b/crates/bili_sync/src/config/global.rs deleted file mode 100644 index cbb287b..0000000 --- a/crates/bili_sync/src/config/global.rs +++ /dev/null @@ -1,62 +0,0 @@ -use std::path::PathBuf; - -use clap::Parser; -use handlebars::handlebars_helper; -use once_cell::sync::Lazy; - -use crate::config::Config; -use crate::config::clap::Args; -use crate::config::item::PathSafeTemplate; - -/// 全局的 CONFIG,可以从中读取配置信息 -pub static CONFIG: Lazy = Lazy::new(load_config); - -/// 全局的 TEMPLATE,用来渲染 video_name 和 page_name 模板 -pub static TEMPLATE: Lazy = Lazy::new(|| { - let mut handlebars = handlebars::Handlebars::new(); - handlebars_helper!(truncate: |s: String, len: usize| { - if s.chars().count() > len { - s.chars().take(len).collect::() - } else { - s.to_string() - } - }); - handlebars.register_helper("truncate", Box::new(truncate)); - handlebars - .path_safe_register("video", &CONFIG.video_name) - .expect("failed to register video template"); - handlebars - .path_safe_register("page", &CONFIG.page_name) - .expect("failed to register page template"); - handlebars -}); - -/// 全局的 ARGS,用来解析命令行参数 -pub static ARGS: Lazy = Lazy::new(Args::parse); - -/// 全局的 CONFIG_DIR,表示配置文件夹的路径 -pub static CONFIG_DIR: Lazy = - Lazy::new(|| dirs::config_dir().expect("No config path found").join("bili-sync")); - -fn load_config() -> Config { - if cfg!(test) { - return Config::load(&CONFIG_DIR.join("test_config.toml")).unwrap_or(Config::test_default()); - } - info!("开始加载配置文件.."); - let config = Config::load(&CONFIG_DIR.join("config.toml")).unwrap_or_else(|err| { - if err - .downcast_ref::() - .is_none_or(|e| e.kind() != std::io::ErrorKind::NotFound) - { - panic!("加载配置文件失败,错误为: {err}"); - } - warn!("配置文件不存在,使用默认配置.."); - Config::default() - }); - info!("配置文件加载完毕,覆盖刷新原有配置"); - config.save().expect("保存默认配置时遇到错误"); - info!("检查配置文件.."); - config.check(); - info!("配置文件检查通过"); - config -} diff --git a/crates/bili_sync/src/config/handlebar.rs b/crates/bili_sync/src/config/handlebar.rs new file mode 100644 index 0000000..f9caafd --- /dev/null +++ b/crates/bili_sync/src/config/handlebar.rs @@ -0,0 +1,90 @@ +use std::sync::LazyLock; + +use anyhow::Result; +use handlebars::handlebars_helper; + +use crate::config::versioned_cache::VersionedCache; +use crate::config::{Config, PathSafeTemplate}; + +pub static TEMPLATE: LazyLock>> = + LazyLock::new(|| VersionedCache::new(create_template).expect("Failed to create handlebars template")); + +fn create_template(config: &Config) -> Result> { + let mut handlebars = handlebars::Handlebars::new(); + handlebars.register_helper("truncate", Box::new(truncate)); + handlebars.path_safe_register("video", config.video_name.to_owned())?; + Ok(handlebars) +} + +handlebars_helper!(truncate: |s: String, len: usize| { + if s.chars().count() > len { + s.chars().take(len).collect::() + } else { + s.to_string() + } +}); + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn test_template_usage() { + let mut template = handlebars::Handlebars::new(); + template.register_helper("truncate", Box::new(truncate)); + let _ = template.path_safe_register("video", "test{{bvid}}test"); + let _ = template.path_safe_register("test_truncate", "哈哈,{{ truncate title 30 }}"); + let _ = template.path_safe_register("test_path_unix", "{{ truncate title 7 }}/test/a"); + let _ = template.path_safe_register("test_path_windows", r"{{ truncate title 7 }}\\test\\a"); + #[cfg(not(windows))] + { + assert_eq!( + template + .path_safe_render("test_path_unix", &json!({"title": "关注/永雏塔菲喵"})) + .unwrap(), + "关注_永雏塔菲/test/a" + ); + assert_eq!( + template + .path_safe_render("test_path_windows", &json!({"title": "关注/永雏塔菲喵"})) + .unwrap(), + "关注_永雏塔菲_test_a" + ); + } + #[cfg(windows)] + { + assert_eq!( + template + .path_safe_render("test_path_unix", &json!({"title": "关注/永雏塔菲喵"})) + .unwrap(), + "关注_永雏塔菲_test_a" + ); + assert_eq!( + template + .path_safe_render("test_path_windows", &json!({"title": "关注/永雏塔菲喵"})) + .unwrap(), + r"关注_永雏塔菲\\test\\a" + ); + } + assert_eq!( + template + .path_safe_render("video", &json!({"bvid": "BV1b5411h7g7"})) + .unwrap(), + "testBV1b5411h7g7test" + ); + assert_eq!( + template + .path_safe_render( + "test_truncate", + &json!({"title": "你说得对,但是 Rust 是由 Mozilla 自主研发的一款全新的编译期格斗游戏。\ + 编译将发生在一个被称作「Cargo」的构建系统中。在这里,被引用的指针将被授予「生命周期」之力,导引对象安全。\ + 你将扮演一位名为「Rustacean」的神秘角色, 在与「Rustc」的搏斗中邂逅各种骨骼惊奇的傲娇报错。\ + 征服她们、通过编译同时,逐步发掘「C++」程序崩溃的真相。"}) + ) + .unwrap(), + "哈哈,你说得对,但是 Rust 是由 Mozilla 自主研发的一" + ); + } +} diff --git a/crates/bili_sync/src/config/item.rs b/crates/bili_sync/src/config/item.rs index e2b877c..b0fa6fa 100644 --- a/crates/bili_sync/src/config/item.rs +++ b/crates/bili_sync/src/config/item.rs @@ -1,12 +1,8 @@ -use std::collections::HashMap; use std::path::PathBuf; use anyhow::Result; -use serde::de::{Deserializer, MapAccess, Visitor}; -use serde::ser::SerializeMap; use serde::{Deserialize, Serialize}; -use crate::bilibili::{CollectionItem, CollectionType}; use crate::utils::filenamify::filenamify; /// 稍后再看的配置 @@ -74,13 +70,14 @@ impl Default for ConcurrentLimit { } pub trait PathSafeTemplate { - fn path_safe_register(&mut self, name: &'static str, template: &'static str) -> Result<()>; + fn path_safe_register(&mut self, name: &'static str, template: impl Into) -> Result<()>; fn path_safe_render(&self, name: &'static str, data: &serde_json::Value) -> Result; } /// 通过将模板字符串中的分隔符替换为自定义的字符串,使得模板字符串中的分隔符得以保留 impl PathSafeTemplate for handlebars::Handlebars<'_> { - fn path_safe_register(&mut self, name: &'static str, template: &'static str) -> Result<()> { + fn path_safe_register(&mut self, name: &'static str, template: impl Into) -> Result<()> { + let template = template.into(); Ok(self.register_template_string(name, template.replace(std::path::MAIN_SEPARATOR_STR, "__SEP__"))?) } @@ -88,72 +85,3 @@ impl PathSafeTemplate for handlebars::Handlebars<'_> { Ok(filenamify(&self.render(name, data)?).replace("__SEP__", std::path::MAIN_SEPARATOR_STR)) } } -/* 后面是用于自定义 Collection 的序列化、反序列化的样板代码 */ -pub(super) fn serialize_collection_list( - collection_list: &HashMap, - serializer: S, -) -> Result -where - S: serde::Serializer, -{ - let mut map = serializer.serialize_map(Some(collection_list.len()))?; - for (k, v) in collection_list { - let prefix = match k.collection_type { - CollectionType::Series => "series", - CollectionType::Season => "season", - }; - map.serialize_entry(&[prefix, &k.mid, &k.sid].join(":"), v)?; - } - map.end() -} - -pub(super) fn deserialize_collection_list<'de, D>(deserializer: D) -> Result, D::Error> -where - D: Deserializer<'de>, -{ - struct CollectionListVisitor; - - impl<'de> Visitor<'de> for CollectionListVisitor { - type Value = HashMap; - - fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { - formatter.write_str("a map of collection list") - } - - fn visit_map(self, mut map: A) -> Result - where - A: MapAccess<'de>, - { - let mut collection_list = HashMap::new(); - while let Some((key, value)) = map.next_entry::()? { - let collection_item = match key.split(':').collect::>().as_slice() { - [prefix, mid, sid] => { - let collection_type = match *prefix { - "series" => CollectionType::Series, - "season" => CollectionType::Season, - _ => { - return Err(serde::de::Error::custom( - "invalid collection type, should be series or season", - )); - } - }; - CollectionItem { - mid: mid.to_string(), - sid: sid.to_string(), - collection_type, - } - } - _ => { - return Err(serde::de::Error::custom( - "invalid collection key, should be series:mid:sid or season:mid:sid", - )); - } - }; - collection_list.insert(collection_item, value); - } - Ok(collection_list) - } - } - - deserializer.deserialize_map(CollectionListVisitor) -} diff --git a/crates/bili_sync/src/config/legacy.rs b/crates/bili_sync/src/config/legacy.rs new file mode 100644 index 0000000..329e193 --- /dev/null +++ b/crates/bili_sync/src/config/legacy.rs @@ -0,0 +1,135 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +use anyhow::Result; +use arc_swap::ArcSwap; +use sea_orm::DatabaseConnection; +use serde::de::{Deserializer, MapAccess, Visitor}; +use serde::ser::SerializeMap; +use serde::{Deserialize, Serialize}; + +use crate::bilibili::{CollectionItem, CollectionType, Credential, DanmakuOption, FilterOption}; +use crate::config::Config; +use crate::config::default::{default_auth_token, default_bind_address, default_time_format}; +use crate::config::item::{ConcurrentLimit, NFOTimeType, WatchLaterConfig}; +use crate::utils::model::migrate_legacy_config; + +#[derive(Serialize, Deserialize)] +pub struct LegacyConfig { + #[serde(default = "default_auth_token")] + pub auth_token: String, + #[serde(default = "default_bind_address")] + pub bind_address: String, + pub credential: ArcSwap, + pub filter_option: FilterOption, + #[serde(default)] + pub danmaku_option: DanmakuOption, + pub favorite_list: HashMap, + #[serde( + default, + serialize_with = "serialize_collection_list", + deserialize_with = "deserialize_collection_list" + )] + pub collection_list: HashMap, + #[serde(default)] + pub submission_list: HashMap, + #[serde(default)] + pub watch_later: WatchLaterConfig, + pub video_name: String, + pub page_name: String, + pub interval: u64, + pub upper_path: PathBuf, + #[serde(default)] + pub nfo_time_type: NFOTimeType, + #[serde(default)] + pub concurrent_limit: ConcurrentLimit, + #[serde(default = "default_time_format")] + pub time_format: String, + #[serde(default)] + pub cdn_sorting: bool, +} + +impl LegacyConfig { + async fn load_from_file(path: &Path) -> Result { + let legacy_config_str = tokio::fs::read_to_string(path).await?; + Ok(toml::from_str(&legacy_config_str)?) + } + + pub async fn migrate_from_file(path: &Path, connection: &DatabaseConnection) -> Result { + let legacy_config = Self::load_from_file(path).await?; + migrate_legacy_config(&legacy_config, connection).await?; + Ok(legacy_config.into()) + } +} + +/* +后面是用于自定义 Collection 的序列化、反序列化的样板代码 +*/ +pub(super) fn serialize_collection_list( + collection_list: &HashMap, + serializer: S, +) -> Result +where + S: serde::Serializer, +{ + let mut map = serializer.serialize_map(Some(collection_list.len()))?; + for (k, v) in collection_list { + let prefix = match k.collection_type { + CollectionType::Series => "series", + CollectionType::Season => "season", + }; + map.serialize_entry(&[prefix, &k.mid, &k.sid].join(":"), v)?; + } + map.end() +} + +pub(super) fn deserialize_collection_list<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + struct CollectionListVisitor; + + impl<'de> Visitor<'de> for CollectionListVisitor { + type Value = HashMap; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a map of collection list") + } + + fn visit_map(self, mut map: A) -> Result + where + A: MapAccess<'de>, + { + let mut collection_list = HashMap::new(); + while let Some((key, value)) = map.next_entry::()? { + let collection_item = match key.split(':').collect::>().as_slice() { + [prefix, mid, sid] => { + let collection_type = match *prefix { + "series" => CollectionType::Series, + "season" => CollectionType::Season, + _ => { + return Err(serde::de::Error::custom( + "invalid collection type, should be series or season", + )); + } + }; + CollectionItem { + mid: mid.to_string(), + sid: sid.to_string(), + collection_type, + } + } + _ => { + return Err(serde::de::Error::custom( + "invalid collection key, should be series:mid:sid or season:mid:sid", + )); + } + }; + collection_list.insert(collection_item, value); + } + Ok(collection_list) + } + } + + deserializer.deserialize_map(CollectionListVisitor) +} diff --git a/crates/bili_sync/src/config/mod.rs b/crates/bili_sync/src/config/mod.rs index 6cb0ea0..e0cf5c9 100644 --- a/crates/bili_sync/src/config/mod.rs +++ b/crates/bili_sync/src/config/mod.rs @@ -1,191 +1,18 @@ -use std::borrow::Cow; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use anyhow::Result; -use arc_swap::ArcSwapOption; -use rand::seq::SliceRandom; -use serde::{Deserialize, Serialize}; - -mod clap; -mod global; +mod args; +mod current; +mod default; +mod flag; +mod handlebar; mod item; +mod legacy; +mod versioned_cache; +mod versioned_config; -use crate::adapter::Args; -use crate::bilibili::{CollectionItem, Credential, DanmakuOption, FilterOption}; -pub use crate::config::clap::version; -pub use crate::config::global::{ARGS, CONFIG, CONFIG_DIR, TEMPLATE}; -use crate::config::item::{ConcurrentLimit, deserialize_collection_list, serialize_collection_list}; -pub use crate::config::item::{NFOTimeType, PathSafeTemplate, RateLimit, WatchLaterConfig}; - -fn default_time_format() -> String { - "%Y-%m-%d".to_string() -} - -/// 默认的 auth_token 实现,生成随机 16 位字符串 -fn default_auth_token() -> Option { - let byte_choices = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()_+-="; - let mut rng = rand::thread_rng(); - Some( - (0..16) - .map(|_| *(byte_choices.choose(&mut rng).expect("choose byte failed")) as char) - .collect(), - ) -} - -fn default_bind_address() -> String { - "0.0.0.0:12345".to_string() -} - -#[derive(Serialize, Deserialize)] -pub struct Config { - #[serde(default = "default_auth_token")] - pub auth_token: Option, - #[serde(default = "default_bind_address")] - pub bind_address: String, - pub credential: ArcSwapOption, - pub filter_option: FilterOption, - #[serde(default)] - pub danmaku_option: DanmakuOption, - pub favorite_list: HashMap, - #[serde( - default, - serialize_with = "serialize_collection_list", - deserialize_with = "deserialize_collection_list" - )] - pub collection_list: HashMap, - #[serde(default)] - pub submission_list: HashMap, - #[serde(default)] - pub watch_later: WatchLaterConfig, - pub video_name: Cow<'static, str>, - pub page_name: Cow<'static, str>, - pub interval: u64, - pub upper_path: PathBuf, - #[serde(default)] - pub nfo_time_type: NFOTimeType, - #[serde(default)] - pub concurrent_limit: ConcurrentLimit, - #[serde(default = "default_time_format")] - pub time_format: String, - #[serde(default)] - pub cdn_sorting: bool, -} - -impl Default for Config { - fn default() -> Self { - Self { - auth_token: default_auth_token(), - bind_address: default_bind_address(), - credential: ArcSwapOption::from(Some(Arc::new(Credential::default()))), - filter_option: FilterOption::default(), - danmaku_option: DanmakuOption::default(), - favorite_list: HashMap::new(), - collection_list: HashMap::new(), - submission_list: HashMap::new(), - watch_later: Default::default(), - video_name: Cow::Borrowed("{{title}}"), - page_name: Cow::Borrowed("{{bvid}}"), - interval: 1200, - upper_path: CONFIG_DIR.join("upper_face"), - nfo_time_type: NFOTimeType::FavTime, - concurrent_limit: ConcurrentLimit::default(), - time_format: default_time_format(), - cdn_sorting: false, - } - } -} - -impl Config { - pub fn save(&self) -> Result<()> { - let config_path = CONFIG_DIR.join("config.toml"); - std::fs::create_dir_all(&*CONFIG_DIR)?; - std::fs::write(config_path, toml::to_string_pretty(self)?)?; - Ok(()) - } - - fn load(path: &Path) -> Result { - let config_content = std::fs::read_to_string(path)?; - Ok(toml::from_str(&config_content)?) - } - - pub fn as_video_sources(&self) -> Vec<(Args<'_>, &PathBuf)> { - let mut params = Vec::new(); - self.favorite_list - .iter() - .for_each(|(fid, path)| params.push((Args::Favorite { fid }, path))); - self.collection_list - .iter() - .for_each(|(collection_item, path)| params.push((Args::Collection { collection_item }, path))); - if self.watch_later.enabled { - params.push((Args::WatchLater, &self.watch_later.path)); - } - self.submission_list - .iter() - .for_each(|(upper_id, path)| params.push((Args::Submission { upper_id }, path))); - params - } - - pub fn check(&self) { - let mut ok = true; - let video_sources = self.as_video_sources(); - if video_sources.is_empty() { - ok = false; - error!("没有配置任何需要扫描的内容,程序空转没有意义"); - } - for (args, path) in video_sources { - if !path.is_absolute() { - ok = false; - error!("{:?} 保存的路径应为绝对路径,检测到: {}", args, path.display()); - } - } - if !self.upper_path.is_absolute() { - ok = false; - error!("up 主头像保存的路径应为绝对路径"); - } - if self.video_name.is_empty() { - ok = false; - error!("未设置 video_name 模板"); - } - if self.page_name.is_empty() { - ok = false; - error!("未设置 page_name 模板"); - } - let credential = self.credential.load(); - match credential.as_deref() { - Some(credential) => { - if credential.sessdata.is_empty() - || credential.bili_jct.is_empty() - || credential.buvid3.is_empty() - || credential.dedeuserid.is_empty() - || credential.ac_time_value.is_empty() - { - ok = false; - error!("Credential 信息不完整,请确保填写完整"); - } - } - None => { - ok = false; - error!("未设置 Credential 信息"); - } - } - if !(self.concurrent_limit.video > 0 && self.concurrent_limit.page > 0) { - ok = false; - error!("video 和 page 允许的并发数必须大于 0"); - } - if !ok { - panic!( - "位于 {} 的配置文件不合法,请参考提示信息修复后继续运行", - CONFIG_DIR.join("config.toml").display() - ); - } - } - - pub(super) fn test_default() -> Self { - Self { - cdn_sorting: true, - ..Default::default() - } - } -} +pub use crate::config::args::{ARGS, version}; +pub use crate::config::current::{CONFIG_DIR, Config}; +pub use crate::config::flag::DOWNLOADER_RUNNING; +pub use crate::config::handlebar::TEMPLATE; +pub use crate::config::item::{NFOTimeType, PathSafeTemplate, RateLimit}; +pub use crate::config::legacy::LegacyConfig; +pub use crate::config::versioned_cache::VersionedCache; +pub use crate::config::versioned_config::VersionedConfig; diff --git a/crates/bili_sync/src/config/versioned_cache.rs b/crates/bili_sync/src/config/versioned_cache.rs new file mode 100644 index 0000000..ca9ca99 --- /dev/null +++ b/crates/bili_sync/src/config/versioned_cache.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use anyhow::Result; +use arc_swap::{ArcSwap, Guard}; + +use crate::config::{Config, VersionedConfig}; + +pub struct VersionedCache { + inner: ArcSwap, + version: AtomicU64, + builder: fn(&Config) -> Result, +} + +impl VersionedCache { + pub fn new(builder: fn(&Config) -> Result) -> Result { + let current_config = VersionedConfig::get().load(); + let initial_value = builder(¤t_config)?; + Ok(Self { + inner: ArcSwap::from_pointee(initial_value), + version: AtomicU64::new(0), + builder, + }) + } + + pub fn load(&self) -> Guard> { + self.reload_if_needed(); + self.inner.load() + } + + #[allow(dead_code)] + pub fn load_full(&self) -> Arc { + self.reload_if_needed(); + self.inner.load_full() + } + + fn reload_if_needed(&self) { + let current_version = VersionedConfig::get().version(); + let cached_version = self.version.load(Ordering::Acquire); + + if current_version != cached_version { + let current_config = VersionedConfig::get().load(); + if let Ok(new_value) = (self.builder)(¤t_config) { + self.inner.store(Arc::new(new_value)); + self.version.store(current_version, Ordering::Release); + } + } + } +} diff --git a/crates/bili_sync/src/config/versioned_config.rs b/crates/bili_sync/src/config/versioned_config.rs new file mode 100644 index 0000000..143784d --- /dev/null +++ b/crates/bili_sync/src/config/versioned_config.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use anyhow::{Result, anyhow, bail}; +use arc_swap::{ArcSwap, Guard}; +use sea_orm::DatabaseConnection; +use tokio::sync::OnceCell; + +use crate::config::{CONFIG_DIR, Config, LegacyConfig}; + +pub static VERSIONED_CONFIG: OnceCell = OnceCell::const_new(); + +pub struct VersionedConfig { + inner: ArcSwap, + version: AtomicU64, +} + +impl VersionedConfig { + /// 初始化全局的 `VersionedConfig`,初始化失败或者已初始化过则返回错误 + pub async fn init(connection: &DatabaseConnection) -> Result<()> { + let config = match Config::load_from_database(connection).await? { + Some(Ok(config)) => config, + Some(Err(e)) => bail!("解析数据库配置失败: {}", e), + None => { + let config = match LegacyConfig::migrate_from_file(&CONFIG_DIR.join("config.toml"), connection).await { + Ok(config) => config, + Err(e) => { + if e.downcast_ref::() + .is_none_or(|e| e.kind() != std::io::ErrorKind::NotFound) + { + bail!("未成功读取并迁移旧版本配置:{:#}", e); + } else { + let config = Config::default(); + warn!( + "生成 auth_token:{},可使用该 token 登录 web UI,该信息仅在首次运行时打印", + config.auth_token + ); + config + } + } + }; + config.save_to_database(connection).await?; + config + } + }; + let versioned_config = VersionedConfig::new(config); + VERSIONED_CONFIG + .set(versioned_config) + .map_err(|e| anyhow!("VERSIONED_CONFIG has already been initialized: {}", e))?; + Ok(()) + } + + #[cfg(test)] + /// 单元测试直接使用测试专用的配置即可 + pub fn get() -> &'static VersionedConfig { + use std::sync::LazyLock; + static TEST_CONFIG: LazyLock = LazyLock::new(|| VersionedConfig::new(Config::test_default())); + return &TEST_CONFIG; + } + + #[cfg(not(test))] + /// 获取全局的 `VersionedConfig`,如果未初始化则会 panic + pub fn get() -> &'static VersionedConfig { + VERSIONED_CONFIG.get().expect("VERSIONED_CONFIG is not initialized") + } + + pub fn new(config: Config) -> Self { + Self { + inner: ArcSwap::from_pointee(config), + version: AtomicU64::new(1), + } + } + + pub fn load(&self) -> Guard> { + self.inner.load() + } + + pub fn load_full(&self) -> Arc { + self.inner.load_full() + } + + pub fn version(&self) -> u64 { + self.version.load(Ordering::Acquire) + } + + #[allow(dead_code)] + pub fn update(&self, new_config: Config) { + self.inner.store(Arc::new(new_config)); + self.version.fetch_add(1, Ordering::AcqRel); + } +} diff --git a/crates/bili_sync/src/database.rs b/crates/bili_sync/src/database.rs index a552f75..b525037 100644 --- a/crates/bili_sync/src/database.rs +++ b/crates/bili_sync/src/database.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use bili_sync_migration::{Migrator, MigratorTrait}; use sea_orm::{ConnectOptions, Database, DatabaseConnection}; @@ -25,7 +25,7 @@ async fn migrate_database() -> Result<()> { } /// 进行数据库迁移并获取数据库连接,供外部使用 -pub async fn setup_database() -> DatabaseConnection { - migrate_database().await.expect("数据库迁移失败"); - database_connection().await.expect("获取数据库连接失败") +pub async fn setup_database() -> Result { + migrate_database().await.context("Failed to migrate database")?; + database_connection().await.context("Failed to connect to database") } diff --git a/crates/bili_sync/src/downloader.rs b/crates/bili_sync/src/downloader.rs index 3c77647..e17b7ee 100644 --- a/crates/bili_sync/src/downloader.rs +++ b/crates/bili_sync/src/downloader.rs @@ -12,7 +12,7 @@ use tokio::task::JoinSet; use tokio_util::io::StreamReader; use crate::bilibili::Client; -use crate::config::CONFIG; +use crate::config::VersionedConfig; pub struct Downloader { client: Client, } @@ -26,7 +26,7 @@ impl Downloader { } pub async fn fetch(&self, url: &str, path: &Path) -> Result<()> { - if CONFIG.concurrent_limit.download.enable { + if VersionedConfig::get().load().concurrent_limit.download.enable { self.fetch_parallel(url, path).await } else { self.fetch_serial(url, path).await @@ -60,6 +60,13 @@ impl Downloader { } async fn fetch_parallel(&self, url: &str, path: &Path) -> Result<()> { + let (concurrency, threshold) = { + let config = VersionedConfig::get().load(); + ( + config.concurrent_limit.download.concurrency, + config.concurrent_limit.download.threshold, + ) + }; let resp = self .client .request(Method::HEAD, url, None) @@ -67,12 +74,12 @@ impl Downloader { .await? .error_for_status()?; let file_size = resp.header_content_length().unwrap_or_default(); - let chunk_size = file_size / CONFIG.concurrent_limit.download.concurrency as u64; + let chunk_size = file_size / concurrency as u64; if resp .headers() .get(header::ACCEPT_RANGES) .is_none_or(|v| v.to_str().unwrap_or_default() == "none") // https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Accept-Ranges#none - || chunk_size < CONFIG.concurrent_limit.download.threshold + || chunk_size < threshold { return self.fetch_serial(url, path).await; } @@ -85,9 +92,9 @@ impl Downloader { let mut tasks = JoinSet::new(); let url = Arc::new(url.to_string()); let path = Arc::new(path.to_path_buf()); - for i in 0..CONFIG.concurrent_limit.download.concurrency { + for i in 0..concurrency { let start = i as u64 * chunk_size; - let end = if i == CONFIG.concurrent_limit.download.concurrency - 1 { + let end = if i == concurrency - 1 { file_size } else { start + chunk_size diff --git a/crates/bili_sync/src/main.rs b/crates/bili_sync/src/main.rs index 668b526..8bccb1f 100644 --- a/crates/bili_sync/src/main.rs +++ b/crates/bili_sync/src/main.rs @@ -17,21 +17,20 @@ use std::future::Future; use std::sync::Arc; use bilibili::BiliClient; -use once_cell::sync::Lazy; +use sea_orm::DatabaseConnection; use task::{http_server, video_downloader}; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; -use crate::config::{ARGS, CONFIG}; +use crate::config::{ARGS, VersionedConfig}; use crate::database::setup_database; use crate::utils::init_logger; use crate::utils::signal::terminate; #[tokio::main] async fn main() { - init(); + let connection = init().await; let bili_client = Arc::new(BiliClient::new()); - let connection = Arc::new(setup_database().await); let token = CancellationToken::new(); let tracker = TaskTracker::new(); @@ -74,12 +73,17 @@ fn spawn_task( }); } -/// 初始化日志系统,打印欢迎信息,加载配置文件 -fn init() { +/// 初始化日志系统、打印欢迎信息,初始化数据库连接和全局配置,最终返回数据库连接 +async fn init() -> Arc { init_logger(&ARGS.log_level); info!("欢迎使用 Bili-Sync,当前程序版本:{}", config::version()); info!("项目地址:https://github.com/amtoaer/bili-sync"); - Lazy::force(&CONFIG); + let connection = Arc::new(setup_database().await.expect("数据库初始化失败")); + info!("数据库初始化完成"); + VersionedConfig::init(&connection).await.expect("配置初始化失败"); + info!("配置初始化完成"); + + connection } async fn handle_shutdown(tracker: TaskTracker, token: CancellationToken) { diff --git a/crates/bili_sync/src/task/http_server.rs b/crates/bili_sync/src/task/http_server.rs index e1308f4..18c96e3 100644 --- a/crates/bili_sync/src/task/http_server.rs +++ b/crates/bili_sync/src/task/http_server.rs @@ -1,9 +1,8 @@ use std::sync::Arc; use anyhow::{Context, Result}; -use axum::body::Body; use axum::extract::Request; -use axum::http::{Response, Uri, header}; +use axum::http::{Uri, header}; use axum::response::IntoResponse; use axum::routing::get; use axum::{Extension, Router, ServiceExt, middleware}; @@ -16,7 +15,7 @@ use utoipa_swagger_ui::{Config, SwaggerUi}; use crate::api::auth; use crate::api::handler::{ApiDoc, api_router}; use crate::bilibili::BiliClient; -use crate::config::CONFIG; +use crate::config::VersionedConfig; #[derive(RustEmbed)] #[preserve_source = false] @@ -41,10 +40,11 @@ pub async fn http_server(database_connection: Arc, bili_clie .layer(Extension(database_connection)) .layer(Extension(bili_client)) .layer(middleware::from_fn(auth::auth)); - let listener = tokio::net::TcpListener::bind(&CONFIG.bind_address) + let config = VersionedConfig::get().load_full(); + let listener = tokio::net::TcpListener::bind(&config.bind_address) .await .context("bind address failed")?; - info!("开始运行管理页: http://{}", CONFIG.bind_address); + info!("开始运行管理页: http://{}", config.bind_address); Ok(axum::serve(listener, ServiceExt::::into_make_service(app)).await?) } @@ -56,16 +56,21 @@ async fn frontend_files(uri: Uri) -> impl IntoResponse { let Some(content) = Asset::get(path) else { return (StatusCode::NOT_FOUND, "404 Not Found").into_response(); }; - Response::builder() - .status(StatusCode::OK) - .header( - header::CONTENT_TYPE, - content.mime_type().as_deref().unwrap_or("application/octet-stream"), + let mime_type = content.mime_type(); + let content_type = mime_type.as_deref().unwrap_or("application/octet-stream"); + if cfg!(debug_assertions) { + ( + [(header::CONTENT_TYPE, content_type)], + // safety: `RustEmbed` returns uncompressed files directly from the filesystem in debug mode + content.data().unwrap(), ) - .header(header::CONTENT_ENCODING, "br") - // safety: `RustEmbed` will always generate br-compressed files if the feature is enabled - .body(Body::from(content.data_br().unwrap())) - .unwrap_or_else(|_| { - return (StatusCode::INTERNAL_SERVER_ERROR, "500 Internal Server Error").into_response(); - }) + .into_response() + } else { + ( + [(header::CONTENT_TYPE, content_type), (header::CONTENT_ENCODING, "br")], + // safety: `RustEmbed` will always generate br-compressed files if the feature is enabled + content.data_br().unwrap(), + ) + .into_response() + } } diff --git a/crates/bili_sync/src/task/video_downloader.rs b/crates/bili_sync/src/task/video_downloader.rs index dd0bb00..0e848de 100644 --- a/crates/bili_sync/src/task/video_downloader.rs +++ b/crates/bili_sync/src/task/video_downloader.rs @@ -4,16 +4,22 @@ use sea_orm::DatabaseConnection; use tokio::time; use crate::bilibili::{self, BiliClient}; -use crate::config::CONFIG; +use crate::config::{DOWNLOADER_RUNNING, VersionedConfig}; +use crate::utils::model::get_enabled_video_sources; use crate::workflow::process_video_source; /// 启动周期下载视频的任务 pub async fn video_downloader(connection: Arc, bili_client: Arc) { let mut anchor = chrono::Local::now().date_naive(); - let video_sources = CONFIG.as_video_sources(); loop { info!("开始执行本轮视频下载任务.."); + DOWNLOADER_RUNNING.store(true, std::sync::atomic::Ordering::Relaxed); + let config = VersionedConfig::get().load_full(); 'inner: { + if let Err(e) = config.check() { + error!("配置检查失败,跳过本轮执行:\n{:#}", e); + break 'inner; + } match bili_client.wbi_img().await.map(|wbi_img| wbi_img.into()) { Ok(Some(mixin_key)) => bilibili::set_global_mixin_key(mixin_key), Ok(_) => { @@ -26,19 +32,28 @@ pub async fn video_downloader(connection: Arc, bili_client: } }; if anchor != chrono::Local::now().date_naive() { - if let Err(e) = bili_client.check_refresh().await { + if let Err(e) = bili_client.check_refresh(&connection).await { error!("检查刷新 Credential 遇到错误:{:#},等待下一轮执行", e); break 'inner; } anchor = chrono::Local::now().date_naive(); } - for (args, path) in &video_sources { - if let Err(e) = process_video_source(*args, &bili_client, path, &connection).await { - error!("处理过程遇到错误:{:#}", e); + let Ok(video_sources) = get_enabled_video_sources(&connection).await else { + error!("获取视频源列表失败,等待下一轮执行"); + break 'inner; + }; + if video_sources.is_empty() { + info!("没有可用的视频源,等待下一轮执行"); + break 'inner; + } + for video_source in video_sources { + if let Err(e) = process_video_source(video_source, &bili_client, &connection).await { + error!("处理 {} 时遇到错误:{:#},等待下一轮执行", "test", e); } } info!("本轮任务执行完毕,等待下一轮执行"); } - time::sleep(time::Duration::from_secs(CONFIG.interval)).await; + DOWNLOADER_RUNNING.store(false, std::sync::atomic::Ordering::Relaxed); + time::sleep(time::Duration::from_secs(config.interval)).await; } } diff --git a/crates/bili_sync/src/utils/format_arg.rs b/crates/bili_sync/src/utils/format_arg.rs index 69a3aaf..f1bce26 100644 --- a/crates/bili_sync/src/utils/format_arg.rs +++ b/crates/bili_sync/src/utils/format_arg.rs @@ -1,15 +1,16 @@ use serde_json::json; -use crate::config::CONFIG; +use crate::config::VersionedConfig; pub fn video_format_args(video_model: &bili_sync_entity::video::Model) -> serde_json::Value { + let config = VersionedConfig::get().load(); json!({ "bvid": &video_model.bvid, "title": &video_model.name, "upper_name": &video_model.upper_name, "upper_mid": &video_model.upper_id, - "pubtime": &video_model.pubtime.and_utc().format(&CONFIG.time_format).to_string(), - "fav_time": &video_model.favtime.and_utc().format(&CONFIG.time_format).to_string(), + "pubtime": &video_model.pubtime.and_utc().format(&config.time_format).to_string(), + "fav_time": &video_model.favtime.and_utc().format(&config.time_format).to_string(), }) } @@ -17,6 +18,7 @@ pub fn page_format_args( video_model: &bili_sync_entity::video::Model, page_model: &bili_sync_entity::page::Model, ) -> serde_json::Value { + let config = VersionedConfig::get().load(); json!({ "bvid": &video_model.bvid, "title": &video_model.name, @@ -24,7 +26,7 @@ pub fn page_format_args( "upper_mid": &video_model.upper_id, "ptitle": &page_model.name, "pid": page_model.pid, - "pubtime": video_model.pubtime.and_utc().format(&CONFIG.time_format).to_string(), - "fav_time": video_model.favtime.and_utc().format(&CONFIG.time_format).to_string(), + "pubtime": video_model.pubtime.and_utc().format(&config.time_format).to_string(), + "fav_time": video_model.favtime.and_utc().format(&config.time_format).to_string(), }) } diff --git a/crates/bili_sync/src/utils/model.rs b/crates/bili_sync/src/utils/model.rs index aaa5895..573ce1b 100644 --- a/crates/bili_sync/src/utils/model.rs +++ b/crates/bili_sync/src/utils/model.rs @@ -1,11 +1,13 @@ -use anyhow::{Context, Result}; +use anyhow::{Context, Result, anyhow}; use bili_sync_entity::*; -use sea_orm::DatabaseTransaction; +use sea_orm::ActiveValue::Set; use sea_orm::entity::prelude::*; use sea_orm::sea_query::{OnConflict, SimpleExpr}; +use sea_orm::{DatabaseTransaction, TransactionTrait}; use crate::adapter::{VideoSource, VideoSourceEnum}; use crate::bilibili::{PageInfo, VideoInfo}; +use crate::config::{Config, LegacyConfig}; use crate::utils::status::STATUS_COMPLETED; /// 筛选未填充的视频 @@ -117,3 +119,123 @@ pub async fn update_pages_model(pages: Vec, connection: &Data query.exec(connection).await?; Ok(()) } + +/// 获取所有已经启用的视频源 +pub async fn get_enabled_video_sources(connection: &DatabaseConnection) -> Result> { + let (favorite, watch_later, submission, collection) = tokio::try_join!( + favorite::Entity::find() + .filter(favorite::Column::Enabled.eq(true)) + .all(connection), + watch_later::Entity::find() + .filter(watch_later::Column::Enabled.eq(true)) + .all(connection), + submission::Entity::find() + .filter(submission::Column::Enabled.eq(true)) + .all(connection), + collection::Entity::find() + .filter(collection::Column::Enabled.eq(true)) + .all(connection), + )?; + let mut sources = Vec::with_capacity(favorite.len() + watch_later.len() + submission.len() + collection.len()); + sources.extend(favorite.into_iter().map(VideoSourceEnum::from)); + sources.extend(watch_later.into_iter().map(VideoSourceEnum::from)); + sources.extend(submission.into_iter().map(VideoSourceEnum::from)); + sources.extend(collection.into_iter().map(VideoSourceEnum::from)); + Ok(sources) +} + +/// 从数据库中加载配置 +pub async fn load_db_config(connection: &DatabaseConnection) -> Result>> { + Ok(bili_sync_entity::config::Entity::find_by_id(1) + .one(connection) + .await? + .map(|model| { + serde_json::from_str(&model.data).map_err(|e| anyhow!("Failed to deserialize config data: {}", e)) + })) +} + +/// 保存配置到数据库 +pub async fn save_db_config(config: &Config, connection: &DatabaseConnection) -> Result<()> { + let data = serde_json::to_string(config).context("Failed to serialize config data")?; + let model = bili_sync_entity::config::ActiveModel { + id: Set(1), + data: Set(data), + ..Default::default() + }; + bili_sync_entity::config::Entity::insert(model) + .on_conflict( + OnConflict::column(bili_sync_entity::config::Column::Id) + .update_column(bili_sync_entity::config::Column::Data) + .to_owned(), + ) + .exec(connection) + .await + .context("Failed to save config to database")?; + Ok(()) +} + +/// 迁移旧版本配置(即将所有相关联的内容设置为 enabled) +pub async fn migrate_legacy_config(config: &LegacyConfig, connection: &DatabaseConnection) -> Result<()> { + let transaction = connection.begin().await.context("Failed to begin transaction")?; + tokio::try_join!( + migrate_favorite(config, &transaction), + migrate_watch_later(config, &transaction), + migrate_submission(config, &transaction), + migrate_collection(config, &transaction) + )?; + transaction.commit().await.context("Failed to commit transaction")?; + Ok(()) +} + +async fn migrate_favorite(config: &LegacyConfig, connection: &DatabaseTransaction) -> Result<()> { + favorite::Entity::update_many() + .filter(favorite::Column::FId.is_in(config.favorite_list.keys().collect::>())) + .col_expr(favorite::Column::Enabled, Expr::value(true)) + .exec(connection) + .await + .context("Failed to migrate favorite config")?; + Ok(()) +} + +async fn migrate_watch_later(config: &LegacyConfig, connection: &DatabaseTransaction) -> Result<()> { + if config.watch_later.enabled { + watch_later::Entity::update_many() + .col_expr(watch_later::Column::Enabled, Expr::value(true)) + .exec(connection) + .await + .context("Failed to migrate watch later config")?; + } + Ok(()) +} + +async fn migrate_submission(config: &LegacyConfig, connection: &DatabaseTransaction) -> Result<()> { + submission::Entity::update_many() + .filter(submission::Column::UpperId.is_in(config.submission_list.keys().collect::>())) + .col_expr(submission::Column::Enabled, Expr::value(true)) + .exec(connection) + .await + .context("Failed to migrate submission config")?; + Ok(()) +} + +async fn migrate_collection(config: &LegacyConfig, connection: &DatabaseTransaction) -> Result<()> { + let tuples: Vec<(i64, i64, i32)> = config + .collection_list + .keys() + .filter_map(|key| Some((key.sid.parse().ok()?, key.mid.parse().ok()?, key.collection_type.into()))) + .collect(); + collection::Entity::update_many() + .filter( + Expr::tuple([ + Expr::column(collection::Column::SId), + Expr::column(collection::Column::MId), + Expr::column(collection::Column::Type), + ]) + .in_tuples(tuples), + ) + .col_expr(collection::Column::Enabled, Expr::value(true)) + .exec(connection) + .await + .context("Failed to migrate collection config")?; + Ok(()) +} diff --git a/crates/bili_sync/src/utils/nfo.rs b/crates/bili_sync/src/utils/nfo.rs index 58b38e1..3726584 100644 --- a/crates/bili_sync/src/utils/nfo.rs +++ b/crates/bili_sync/src/utils/nfo.rs @@ -6,7 +6,7 @@ use quick_xml::events::{BytesCData, BytesText}; use quick_xml::writer::Writer; use tokio::io::{AsyncWriteExt, BufWriter}; -use crate::config::{CONFIG, NFOTimeType}; +use crate::config::{NFOTimeType, VersionedConfig}; #[allow(clippy::upper_case_acronyms)] pub enum NFO<'a> { @@ -339,7 +339,7 @@ impl<'a> From<&'a video::Model> for Movie<'a> { bvid: &video.bvid, upper_id: video.upper_id, upper_name: &video.upper_name, - aired: match CONFIG.nfo_time_type { + aired: match VersionedConfig::get().load().nfo_time_type { NFOTimeType::FavTime => video.favtime, NFOTimeType::PubTime => video.pubtime, }, @@ -359,7 +359,7 @@ impl<'a> From<&'a video::Model> for TVShow<'a> { bvid: &video.bvid, upper_id: video.upper_id, upper_name: &video.upper_name, - aired: match CONFIG.nfo_time_type { + aired: match VersionedConfig::get().load().nfo_time_type { NFOTimeType::FavTime => video.favtime, NFOTimeType::PubTime => video.pubtime, }, diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 34df96e..9bac9db 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -12,9 +12,9 @@ use sea_orm::entity::prelude::*; use tokio::fs; use tokio::sync::Semaphore; -use crate::adapter::{Args, VideoSource, VideoSourceEnum, video_source_from}; +use crate::adapter::{VideoSource, VideoSourceEnum}; use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Video, VideoInfo}; -use crate::config::{ARGS, CONFIG, PathSafeTemplate, TEMPLATE}; +use crate::config::{ARGS, PathSafeTemplate, TEMPLATE, VersionedConfig}; use crate::downloader::Downloader; use crate::error::{DownloadAbortError, ExecutionStatus, ProcessPageError}; use crate::utils::format_arg::{page_format_args, video_format_args}; @@ -27,13 +27,12 @@ use crate::utils::status::{PageStatus, STATUS_OK, VideoStatus}; /// 完整地处理某个视频来源 pub async fn process_video_source( - args: Args<'_>, + video_source: VideoSourceEnum, bili_client: &BiliClient, - path: &Path, connection: &DatabaseConnection, ) -> Result<()> { // 从参数中获取视频列表的 Model 与视频流 - let (video_source, video_streams) = video_source_from(args, path, bili_client, connection).await?; + let (video_source, video_streams) = video_source.refresh(bili_client, connection).await?; // 从视频流中获取新视频的简要信息,写入数据库 refresh_video_source(&video_source, video_streams, connection).await?; // 单独请求视频详情接口,获取视频的详情信息与所有的分页,写入数据库 @@ -154,7 +153,7 @@ pub async fn download_unprocessed_videos( connection: &DatabaseConnection, ) -> Result<()> { video_source.log_download_video_start(); - let semaphore = Semaphore::new(CONFIG.concurrent_limit.video); + let semaphore = Semaphore::new(VersionedConfig::get().load().concurrent_limit.video); let downloader = Downloader::new(bili_client.client.clone()); let unhandled_videos_pages = filter_unhandled_video_pages(video_source.filter_expr(), connection).await?; let mut assigned_upper = HashSet::new(); @@ -215,11 +214,14 @@ pub async fn download_video_pages( let _permit = semaphore.acquire().await.context("acquire semaphore failed")?; let mut status = VideoStatus::from(video_model.download_status); let separate_status = status.should_run(); - let base_path = video_source - .path() - .join(TEMPLATE.path_safe_render("video", &video_format_args(&video_model))?); + let base_path = video_source.path().join( + TEMPLATE + .load() + .path_safe_render("video", &video_format_args(&video_model))?, + ); let upper_id = video_model.upper_id.to_string(); - let base_upper_path = &CONFIG + let base_upper_path = VersionedConfig::get() + .load() .upper_path .join(upper_id.chars().next().context("upper_id is empty")?.to_string()) .join(upper_id); @@ -311,7 +313,7 @@ pub async fn dispatch_download_page( if !should_run { return Ok(ExecutionStatus::Skipped); } - let child_semaphore = Semaphore::new(CONFIG.concurrent_limit.page); + let child_semaphore = Semaphore::new(VersionedConfig::get().load().concurrent_limit.page); let tasks = pages .into_iter() .map(|page_model| { @@ -377,7 +379,9 @@ pub async fn download_page( let mut status = PageStatus::from(page_model.download_status); let separate_status = status.should_run(); let is_single_page = video_model.single_page.context("single_page is null")?; - let base_name = TEMPLATE.path_safe_render("page", &page_format_args(video_model, &page_model))?; + let base_name = TEMPLATE + .load() + .path_safe_render("page", &page_format_args(video_model, &page_model))?; let (poster_path, video_path, nfo_path, danmaku_path, fanart_path, subtitle_path) = if is_single_page { ( base_path.join(format!("{}-poster.jpg", &base_name)), @@ -532,7 +536,7 @@ pub async fn fetch_page_video( let streams = bili_video .get_page_analyzer(page_info) .await? - .best_stream(&CONFIG.filter_option)?; + .best_stream(&VersionedConfig::get().load().filter_option)?; match streams { BestStream::Mixed(mix_stream) => downloader.fetch_with_fallback(&mix_stream.urls(), page_path).await?, BestStream::VideoAudio { @@ -686,76 +690,3 @@ async fn generate_nfo(nfo: NFO<'_>, nfo_path: PathBuf) -> Result<()> { fs::write(nfo_path, nfo.generate_nfo().await?.as_bytes()).await?; Ok(()) } - -#[cfg(test)] -mod tests { - use handlebars::handlebars_helper; - use serde_json::json; - - use super::*; - - #[test] - fn test_template_usage() { - let mut template = handlebars::Handlebars::new(); - handlebars_helper!(truncate: |s: String, len: usize| { - if s.chars().count() > len { - s.chars().take(len).collect::() - } else { - s.to_string() - } - }); - template.register_helper("truncate", Box::new(truncate)); - let _ = template.path_safe_register("video", "test{{bvid}}test"); - let _ = template.path_safe_register("test_truncate", "哈哈,{{ truncate title 30 }}"); - let _ = template.path_safe_register("test_path_unix", "{{ truncate title 7 }}/test/a"); - let _ = template.path_safe_register("test_path_windows", r"{{ truncate title 7 }}\\test\\a"); - #[cfg(not(windows))] - { - assert_eq!( - template - .path_safe_render("test_path_unix", &json!({"title": "关注/永雏塔菲喵"})) - .unwrap(), - "关注_永雏塔菲/test/a" - ); - assert_eq!( - template - .path_safe_render("test_path_windows", &json!({"title": "关注/永雏塔菲喵"})) - .unwrap(), - "关注_永雏塔菲_test_a" - ); - } - #[cfg(windows)] - { - assert_eq!( - template - .path_safe_render("test_path_unix", &json!({"title": "关注/永雏塔菲喵"})) - .unwrap(), - "关注_永雏塔菲_test_a" - ); - assert_eq!( - template - .path_safe_render("test_path_windows", &json!({"title": "关注/永雏塔菲喵"})) - .unwrap(), - r"关注_永雏塔菲\\test\\a" - ); - } - assert_eq!( - template - .path_safe_render("video", &json!({"bvid": "BV1b5411h7g7"})) - .unwrap(), - "testBV1b5411h7g7test" - ); - assert_eq!( - template - .path_safe_render( - "test_truncate", - &json!({"title": "你说得对,但是 Rust 是由 Mozilla 自主研发的一款全新的编译期格斗游戏。\ - 编译将发生在一个被称作「Cargo」的构建系统中。在这里,被引用的指针将被授予「生命周期」之力,导引对象安全。\ - 你将扮演一位名为「Rustacean」的神秘角色, 在与「Rustc」的搏斗中邂逅各种骨骼惊奇的傲娇报错。\ - 征服她们、通过编译同时,逐步发掘「C++」程序崩溃的真相。"}) - ) - .unwrap(), - "哈哈,你说得对,但是 Rust 是由 Mozilla 自主研发的一" - ); - } -} diff --git a/crates/bili_sync_entity/src/entities/config.rs b/crates/bili_sync_entity/src/entities/config.rs new file mode 100644 index 0000000..71255cc --- /dev/null +++ b/crates/bili_sync_entity/src/entities/config.rs @@ -0,0 +1,17 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "config")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub data: String, + pub created_at: String, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/crates/bili_sync_entity/src/entities/mod.rs b/crates/bili_sync_entity/src/entities/mod.rs index 4721356..5e65b77 100644 --- a/crates/bili_sync_entity/src/entities/mod.rs +++ b/crates/bili_sync_entity/src/entities/mod.rs @@ -3,6 +3,7 @@ pub mod prelude; pub mod collection; +pub mod config; pub mod favorite; pub mod page; pub mod submission; diff --git a/crates/bili_sync_migration/src/lib.rs b/crates/bili_sync_migration/src/lib.rs index d415f13..e29a77c 100644 --- a/crates/bili_sync_migration/src/lib.rs +++ b/crates/bili_sync_migration/src/lib.rs @@ -6,6 +6,7 @@ mod m20240709_130914_watch_later; mod m20240724_161008_submission; mod m20250122_062926_add_latest_row_at; mod m20250612_090826_add_enabled; +mod m20250613_043257_add_config; pub struct Migrator; @@ -19,6 +20,7 @@ impl MigratorTrait for Migrator { Box::new(m20240724_161008_submission::Migration), Box::new(m20250122_062926_add_latest_row_at::Migration), Box::new(m20250612_090826_add_enabled::Migration), + Box::new(m20250613_043257_add_config::Migration), ] } } diff --git a/crates/bili_sync_migration/src/m20250613_043257_add_config.rs b/crates/bili_sync_migration/src/m20250613_043257_add_config.rs new file mode 100644 index 0000000..7753883 --- /dev/null +++ b/crates/bili_sync_migration/src/m20250613_043257_add_config.rs @@ -0,0 +1,44 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Config::Table) + .if_not_exists() + .col( + ColumnDef::new(Config::Id) + .integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col(ColumnDef::new(Config::Data).text().not_null()) + .col( + ColumnDef::new(Config::CreatedAt) + .timestamp() + .default(Expr::current_timestamp()) + .not_null(), + ) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager.drop_table(Table::drop().table(Config::Table).to_owned()).await + } +} + +#[derive(DeriveIden)] +enum Config { + Table, + Id, + Data, + CreatedAt, +}