From 854d39cf88d62f0c051635830144c094d301c880 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=B4=80=E1=B4=8D=E1=B4=9B=E1=B4=8F=E1=B4=80=E1=B4=87?= =?UTF-8?q?=CA=80?= Date: Thu, 6 Nov 2025 17:25:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E5=AF=B9=E5=85=A8?= =?UTF-8?q?=E5=B1=80=E9=85=8D=E7=BD=AE=E7=9A=84=E5=A4=84=E7=90=86=EF=BC=8C?= =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=B8=8B=E8=BD=BD=E8=B7=AF=E5=BE=84=E5=A1=AB?= =?UTF-8?q?=E5=85=85=E9=80=BB=E8=BE=91=20(#523)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/bili_sync/src/adapter/collection.rs | 4 +- crates/bili_sync/src/adapter/favorite.rs | 5 +- crates/bili_sync/src/adapter/mod.rs | 3 +- crates/bili_sync/src/adapter/submission.rs | 5 +- crates/bili_sync/src/adapter/watch_later.rs | 5 +- crates/bili_sync/src/api/routes/config/mod.rs | 9 +- crates/bili_sync/src/api/routes/me/mod.rs | 10 +- crates/bili_sync/src/api/routes/mod.rs | 2 +- .../src/api/routes/video_sources/mod.rs | 16 +- crates/bili_sync/src/bilibili/analyzer.rs | 12 +- crates/bili_sync/src/bilibili/client.rs | 88 ++++-- crates/bili_sync/src/bilibili/collection.rs | 20 +- .../bili_sync/src/bilibili/danmaku/writer.rs | 8 +- crates/bili_sync/src/bilibili/dynamic.rs | 12 +- .../bili_sync/src/bilibili/favorite_list.rs | 23 +- crates/bili_sync/src/bilibili/me.rs | 56 ++-- crates/bili_sync/src/bilibili/mod.rs | 46 ++- crates/bili_sync/src/bilibili/submission.rs | 25 +- crates/bili_sync/src/bilibili/video.rs | 43 ++- crates/bili_sync/src/bilibili/watch_later.rs | 13 +- crates/bili_sync/src/config/current.rs | 8 - crates/bili_sync/src/config/item.rs | 2 +- crates/bili_sync/src/config/mod.rs | 2 +- .../bili_sync/src/config/versioned_cache.rs | 74 ++--- .../bili_sync/src/config/versioned_config.rs | 65 ++-- crates/bili_sync/src/downloader.rs | 62 ++-- crates/bili_sync/src/task/http_server.rs | 6 +- crates/bili_sync/src/task/video_downloader.rs | 98 +++--- .../bili_sync/src/utils/download_context.rs | 36 +++ crates/bili_sync/src/utils/format_arg.rs | 15 +- crates/bili_sync/src/utils/mod.rs | 1 + crates/bili_sync/src/utils/nfo.rs | 94 +++--- crates/bili_sync/src/utils/task_notifier.rs | 21 +- crates/bili_sync/src/workflow.rs | 283 ++++++++++-------- 34 files changed, 706 insertions(+), 466 deletions(-) create mode 100644 crates/bili_sync/src/utils/download_context.rs diff --git a/crates/bili_sync/src/adapter/collection.rs b/crates/bili_sync/src/adapter/collection.rs index 96526f0..0f1433d 100644 --- a/crates/bili_sync/src/adapter/collection.rs +++ b/crates/bili_sync/src/adapter/collection.rs @@ -13,7 +13,7 @@ use sea_orm::sea_query::SimpleExpr; use sea_orm::{DatabaseConnection, Unchanged}; use crate::adapter::{_ActiveModel, VideoSource, VideoSourceEnum}; -use crate::bilibili::{BiliClient, Collection, CollectionItem, CollectionType, VideoInfo}; +use crate::bilibili::{BiliClient, Collection, CollectionItem, CollectionType, Credential, VideoInfo}; impl VideoSource for collection::Model { fn display_name(&self) -> Cow<'static, str> { @@ -77,6 +77,7 @@ impl VideoSource for collection::Model { async fn refresh<'a>( self, bili_client: &'a BiliClient, + credential: &'a Credential, connection: &'a DatabaseConnection, ) -> Result<( VideoSourceEnum, @@ -89,6 +90,7 @@ impl VideoSource for collection::Model { mid: self.m_id.to_string(), collection_type: CollectionType::from_expected(self.r#type), }, + credential, ); let collection_info = collection.get_info().await?; ensure!( diff --git a/crates/bili_sync/src/adapter/favorite.rs b/crates/bili_sync/src/adapter/favorite.rs index edb70ae..a50bee8 100644 --- a/crates/bili_sync/src/adapter/favorite.rs +++ b/crates/bili_sync/src/adapter/favorite.rs @@ -12,7 +12,7 @@ use sea_orm::sea_query::SimpleExpr; use sea_orm::{DatabaseConnection, Unchanged}; use crate::adapter::{_ActiveModel, VideoSource, VideoSourceEnum}; -use crate::bilibili::{BiliClient, FavoriteList, VideoInfo}; +use crate::bilibili::{BiliClient, Credential, FavoriteList, VideoInfo}; impl VideoSource for favorite::Model { fn display_name(&self) -> Cow<'static, str> { @@ -50,12 +50,13 @@ impl VideoSource for favorite::Model { async fn refresh<'a>( self, bili_client: &'a BiliClient, + credential: &'a Credential, connection: &'a DatabaseConnection, ) -> Result<( VideoSourceEnum, Pin> + Send + 'a>>, )> { - let favorite = FavoriteList::new(bili_client, self.f_id.to_string()); + let favorite = FavoriteList::new(bili_client, self.f_id.to_string(), credential); let favorite_info = favorite.get_info().await?; ensure!( favorite_info.id == self.f_id, diff --git a/crates/bili_sync/src/adapter/mod.rs b/crates/bili_sync/src/adapter/mod.rs index b591ec2..db95911 100644 --- a/crates/bili_sync/src/adapter/mod.rs +++ b/crates/bili_sync/src/adapter/mod.rs @@ -23,7 +23,7 @@ use bili_sync_entity::rule::Rule; use bili_sync_entity::submission::Model as Submission; use bili_sync_entity::watch_later::Model as WatchLater; -use crate::bilibili::{BiliClient, VideoInfo}; +use crate::bilibili::{BiliClient, Credential, VideoInfo}; #[enum_dispatch] pub enum VideoSourceEnum { @@ -104,6 +104,7 @@ pub trait VideoSource { async fn refresh<'a>( self, bili_client: &'a BiliClient, + credential: &'a Credential, connection: &'a DatabaseConnection, ) -> Result<( VideoSourceEnum, diff --git a/crates/bili_sync/src/adapter/submission.rs b/crates/bili_sync/src/adapter/submission.rs index 7f82a43..155fbbc 100644 --- a/crates/bili_sync/src/adapter/submission.rs +++ b/crates/bili_sync/src/adapter/submission.rs @@ -11,7 +11,7 @@ use sea_orm::sea_query::SimpleExpr; use sea_orm::{DatabaseConnection, Unchanged}; use crate::adapter::{_ActiveModel, VideoSource, VideoSourceEnum}; -use crate::bilibili::{BiliClient, Dynamic, Submission, VideoInfo}; +use crate::bilibili::{BiliClient, Credential, Dynamic, Submission, VideoInfo}; impl VideoSource for submission::Model { fn display_name(&self) -> std::borrow::Cow<'static, str> { @@ -85,12 +85,13 @@ impl VideoSource for submission::Model { async fn refresh<'a>( self, bili_client: &'a BiliClient, + credential: &'a Credential, connection: &'a DatabaseConnection, ) -> Result<( VideoSourceEnum, Pin> + Send + 'a>>, )> { - let submission = Submission::new(bili_client, self.upper_id.to_string()); + let submission = Submission::new(bili_client, self.upper_id.to_string(), credential); let upper = submission.get_info().await?; ensure!( upper.mid == submission.upper_id, diff --git a/crates/bili_sync/src/adapter/watch_later.rs b/crates/bili_sync/src/adapter/watch_later.rs index 32f63f2..e572cfc 100644 --- a/crates/bili_sync/src/adapter/watch_later.rs +++ b/crates/bili_sync/src/adapter/watch_later.rs @@ -11,7 +11,7 @@ use sea_orm::sea_query::SimpleExpr; use sea_orm::{DatabaseConnection, Unchanged}; use crate::adapter::{_ActiveModel, VideoSource, VideoSourceEnum}; -use crate::bilibili::{BiliClient, VideoInfo, WatchLater}; +use crate::bilibili::{BiliClient, Credential, VideoInfo, WatchLater}; impl VideoSource for watch_later::Model { fn display_name(&self) -> std::borrow::Cow<'static, str> { @@ -49,12 +49,13 @@ impl VideoSource for watch_later::Model { async fn refresh<'a>( self, bili_client: &'a BiliClient, + credential: &'a Credential, _connection: &'a DatabaseConnection, ) -> Result<( VideoSourceEnum, Pin> + Send + 'a>>, )> { - let watch_later = WatchLater::new(bili_client); + let watch_later = WatchLater::new(bili_client, credential); Ok((self.into(), Box::pin(watch_later.into_video_stream()))) } } diff --git a/crates/bili_sync/src/api/routes/config/mod.rs b/crates/bili_sync/src/api/routes/config/mod.rs index abb1d35..d301a52 100644 --- a/crates/bili_sync/src/api/routes/config/mod.rs +++ b/crates/bili_sync/src/api/routes/config/mod.rs @@ -6,10 +6,8 @@ use axum::extract::Extension; use axum::routing::get; use sea_orm::DatabaseConnection; -use crate::api::error::InnerApiError; use crate::api::wrapper::{ApiError, ApiResponse, ValidatedJson}; use crate::config::{Config, VersionedConfig}; -use crate::utils::task_notifier::TASK_STATUS_NOTIFIER; pub(super) fn router() -> Router { Router::new().route("/config", get(get_config).put(update_config)) @@ -17,7 +15,7 @@ pub(super) fn router() -> Router { /// 获取全局配置 pub async fn get_config() -> Result>, ApiError> { - Ok(ApiResponse::ok(VersionedConfig::get().load_full())) + Ok(ApiResponse::ok(VersionedConfig::get().snapshot())) } /// 更新全局配置 @@ -25,12 +23,7 @@ pub async fn update_config( Extension(db): Extension, ValidatedJson(config): ValidatedJson, ) -> Result>, ApiError> { - let Some(_lock) = TASK_STATUS_NOTIFIER.detect_running() else { - // 简单避免一下可能的不一致现象 - return Err(InnerApiError::BadRequest("下载任务正在运行,无法修改配置".to_string()).into()); - }; config.check()?; let new_config = VersionedConfig::get().update(config, &db).await?; - drop(_lock); Ok(ApiResponse::ok(new_config)) } diff --git a/crates/bili_sync/src/api/routes/me/mod.rs b/crates/bili_sync/src/api/routes/me/mod.rs index b29e7cf..fc18df5 100644 --- a/crates/bili_sync/src/api/routes/me/mod.rs +++ b/crates/bili_sync/src/api/routes/me/mod.rs @@ -15,6 +15,7 @@ use crate::api::response::{ }; use crate::api::wrapper::{ApiError, ApiResponse}; use crate::bilibili::{BiliClient, Me}; +use crate::config::VersionedConfig; pub(super) fn router() -> Router { Router::new() @@ -28,7 +29,8 @@ pub async fn get_created_favorites( Extension(db): Extension, Extension(bili_client): Extension>, ) -> Result, ApiError> { - let me = Me::new(bili_client.as_ref()); + let credential = &VersionedConfig::get().read().credential; + let me = Me::new(bili_client.as_ref(), credential); let bili_favorites = me.get_created_favorites().await?; let favorites = if let Some(bili_favorites) = bili_favorites { @@ -68,7 +70,8 @@ pub async fn get_followed_collections( Extension(bili_client): Extension>, Query(params): Query, ) -> Result, ApiError> { - let me = Me::new(bili_client.as_ref()); + let credential = &VersionedConfig::get().read().credential; + let me = Me::new(bili_client.as_ref(), credential); let (page_num, page_size) = (params.page_num.unwrap_or(1), params.page_size.unwrap_or(50)); let bili_collections = me.get_followed_collections(page_num, page_size).await?; @@ -110,7 +113,8 @@ pub async fn get_followed_uppers( Extension(bili_client): Extension>, Query(params): Query, ) -> Result, ApiError> { - let me = Me::new(bili_client.as_ref()); + let credential = &VersionedConfig::get().read().credential; + let me = Me::new(bili_client.as_ref(), credential); let (page_num, page_size) = (params.page_num.unwrap_or(1), params.page_size.unwrap_or(20)); let bili_uppers = me.get_followed_uppers(page_num, page_size).await?; diff --git a/crates/bili_sync/src/api/routes/mod.rs b/crates/bili_sync/src/api/routes/mod.rs index d79afdc..0739f4d 100644 --- a/crates/bili_sync/src/api/routes/mod.rs +++ b/crates/bili_sync/src/api/routes/mod.rs @@ -34,7 +34,7 @@ pub fn router() -> Router { /// 中间件:使用 auth token 对请求进行身份验证 pub async fn auth(mut headers: HeaderMap, request: Request, next: Next) -> Result { - let config = VersionedConfig::get().load(); + let config = VersionedConfig::get().read(); let token = config.auth_token.as_str(); if headers .get("Authorization") diff --git a/crates/bili_sync/src/api/routes/video_sources/mod.rs b/crates/bili_sync/src/api/routes/video_sources/mod.rs index b2e7c0e..059343a 100644 --- a/crates/bili_sync/src/api/routes/video_sources/mod.rs +++ b/crates/bili_sync/src/api/routes/video_sources/mod.rs @@ -22,7 +22,7 @@ use crate::api::response::{ }; use crate::api::wrapper::{ApiError, ApiResponse, ValidatedJson}; use crate::bilibili::{BiliClient, Collection, CollectionItem, FavoriteList, Submission}; -use crate::config::{PathSafeTemplate, TEMPLATE}; +use crate::config::{PathSafeTemplate, TEMPLATE, VersionedConfig}; use crate::utils::rule::FieldEvaluatable; pub(super) fn router() -> Router { @@ -170,8 +170,10 @@ pub async fn get_video_sources_default_path( "submissions" => "submission_default_path", _ => return Err(InnerApiError::BadRequest("Invalid video source type".to_string()).into()), }; - let (template, params) = (TEMPLATE.load(), serde_json::to_value(params)?); - Ok(ApiResponse::ok(template.path_safe_render(template_name, ¶ms)?)) + let template = TEMPLATE.read(); + Ok(ApiResponse::ok( + template.path_safe_render(template_name, &serde_json::to_value(params)?)?, + )) } /// 更新视频来源 @@ -323,7 +325,8 @@ pub async fn insert_favorite( Extension(bili_client): Extension>, ValidatedJson(request): ValidatedJson, ) -> Result, ApiError> { - let favorite = FavoriteList::new(bili_client.as_ref(), request.fid.to_string()); + let credential = &VersionedConfig::get().read().credential; + let favorite = FavoriteList::new(bili_client.as_ref(), request.fid.to_string(), credential); let favorite_info = favorite.get_info().await?; favorite::Entity::insert(favorite::ActiveModel { f_id: Set(favorite_info.id), @@ -343,6 +346,7 @@ pub async fn insert_collection( Extension(bili_client): Extension>, ValidatedJson(request): ValidatedJson, ) -> Result, ApiError> { + let credential = &VersionedConfig::get().read().credential; let collection = Collection::new( bili_client.as_ref(), CollectionItem { @@ -350,6 +354,7 @@ pub async fn insert_collection( mid: request.mid.to_string(), collection_type: request.collection_type, }, + credential, ); let collection_info = collection.get_info().await?; collection::Entity::insert(collection::ActiveModel { @@ -373,7 +378,8 @@ pub async fn insert_submission( Extension(bili_client): Extension>, ValidatedJson(request): ValidatedJson, ) -> Result, ApiError> { - let submission = Submission::new(bili_client.as_ref(), request.upper_id.to_string()); + let credential = &VersionedConfig::get().read().credential; + let submission = Submission::new(bili_client.as_ref(), request.upper_id.to_string(), credential); let upper = submission.get_info().await?; submission::Entity::insert(submission::ActiveModel { upper_id: Set(upper.mid.parse()?), diff --git a/crates/bili_sync/src/bilibili/analyzer.rs b/crates/bili_sync/src/bilibili/analyzer.rs index fd3b664..a54cd45 100644 --- a/crates/bili_sync/src/bilibili/analyzer.rs +++ b/crates/bili_sync/src/bilibili/analyzer.rs @@ -2,7 +2,6 @@ use anyhow::{Context, Result, bail}; use serde::{Deserialize, Serialize}; use crate::bilibili::error::BiliError; -use crate::config::VersionedConfig; pub struct PageAnalyzer { info: serde_json::Value, @@ -131,14 +130,14 @@ pub enum Stream { // 通用的获取流链接的方法,交由 Downloader 使用 impl Stream { - pub fn urls(&self) -> Vec<&str> { + pub fn urls(&self, enable_cdn_sorting: bool) -> Vec<&str> { match self { Self::Flv(url) | Self::Html5Mp4(url) | Self::EpisodeTryMp4(url) => vec![url], Self::DashVideo { url, backup_url, .. } | Self::DashAudio { url, backup_url, .. } => { let mut urls = std::iter::once(url.as_str()) .chain(backup_url.iter().map(|s| s.as_str())) .collect::>(); - if VersionedConfig::get().load().cdn_sorting { + if enable_cdn_sorting { urls.sort_by_key(|u| { if u.contains("upos-") { 0 // 服务商 cdn @@ -424,16 +423,17 @@ mod tests { Some(AudioQuality::Quality192k), ), ]; + let config = VersionedConfig::get().read(); for (bvid, video_quality, video_codec, audio_quality) in testcases.into_iter() { let client = BiliClient::new(); - let video = Video::new(&client, bvid.to_owned()); + let video = Video::new(&client, bvid.to_owned(), &config.credential); let pages = video.get_pages().await.expect("failed to get pages"); let first_page = pages.into_iter().next().expect("no page found"); let best_stream = video .get_page_analyzer(&first_page) .await .expect("failed to get page analyzer") - .best_stream(&VersionedConfig::get().load().filter_option) + .best_stream(&config.filter_option) .expect("failed to get best stream"); dbg!(bvid, &best_stream); match best_stream { @@ -469,7 +469,7 @@ mod tests { codecs: VideoCodecs::AVC, }; assert_eq!( - stream.urls(), + stream.urls(true), vec![ "https://upos-sz-mirrorcos.bilivideo.com", "https://cn-tj-cu-01-11.bilivideo.com", diff --git a/crates/bili_sync/src/bilibili/client.rs b/crates/bili_sync/src/bilibili/client.rs index 762af6f..ffeb2a9 100644 --- a/crates/bili_sync/src/bilibili/client.rs +++ b/crates/bili_sync/src/bilibili/client.rs @@ -1,14 +1,14 @@ +use std::sync::Arc; use std::time::Duration; -use anyhow::Result; +use anyhow::{Result, bail}; use leaky_bucket::RateLimiter; use reqwest::{Method, header}; -use sea_orm::DatabaseConnection; use ua_generator::ua; use crate::bilibili::Credential; use crate::bilibili::credential::WbiImg; -use crate::config::{RateLimit, VersionedCache, VersionedConfig}; +use crate::config::{RateLimit, VersionedCache}; // 一个对 reqwest::Client 的简单封装,用于 Bilibili 请求 #[derive(Clone)] @@ -60,56 +60,78 @@ impl Default for Client { } } +enum Limiter { + Latest(VersionedCache>), + Snapshot(Arc>), +} + pub struct BiliClient { pub client: Client, - limiter: VersionedCache>, + limiter: Limiter, } impl BiliClient { pub fn new() -> Self { let client = Client::new(); - let limiter = VersionedCache::new(|config| { - Ok(config - .concurrent_limit - .rate_limit - .as_ref() - .map(|RateLimit { limit, duration }| { - RateLimiter::builder() - .initial(*limit) - .refill(*limit) - .max(*limit) - .interval(Duration::from_millis(*duration)) - .build() - })) - }) - .expect("failed to create rate limiter"); + let limiter = Limiter::Latest( + VersionedCache::new(|config| { + Ok(config + .concurrent_limit + .rate_limit + .as_ref() + .map(|RateLimit { limit, duration }| { + RateLimiter::builder() + .initial(*limit) + .refill(*limit) + .max(*limit) + .interval(Duration::from_millis(*duration)) + .build() + })) + }) + .expect("failed to create rate limiter"), + ); Self { client, limiter } } + /// 获取当前 BiliClient 的快照,快照中的限流器固定不变 + pub fn snapshot(&self) -> Result { + let Limiter::Latest(inner) = &self.limiter else { + // 语法上没问题,但语义上不允许对快照进行快照 + bail!("cannot snapshot a snapshot BiliClient"); + }; + Ok(Self { + client: self.client.clone(), + limiter: Limiter::Snapshot(inner.snapshot()), + }) + } + /// 获取一个预构建的请求,通过该方法获取请求时会检查并等待速率限制 - pub async fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder { - if let Some(limiter) = self.limiter.load().as_ref() { - limiter.acquire_one().await; + pub async fn request(&self, method: Method, url: &str, credential: &Credential) -> reqwest::RequestBuilder { + match &self.limiter { + Limiter::Latest(inner) => { + if let Some(limiter) = inner.read().as_ref() { + limiter.acquire_one().await; + } + } + Limiter::Snapshot(inner) => { + if let Some(limiter) = inner.as_ref() { + limiter.acquire_one().await; + } + } } - let credential = &VersionedConfig::get().load().credential; self.client.request(method, url, Some(credential)) } - pub async fn check_refresh(&self, connection: &DatabaseConnection) -> Result<()> { - let credential = &VersionedConfig::get().load().credential; + /// 检查并刷新 Credential,不需要刷新返回 Ok(None),需要刷新返回 Ok(Some(new_credential)) + pub async fn check_refresh(&self, credential: &Credential) -> Result> { if !credential.need_refresh(&self.client).await? { - return Ok(()); + return Ok(None); } - let new_credential = credential.refresh(&self.client).await?; - VersionedConfig::get() - .update_credential(new_credential, connection) - .await?; - Ok(()) + Ok(Some(credential.refresh(&self.client).await?)) } /// 获取 wbi img,用于生成请求签名 - pub async fn wbi_img(&self) -> Result { - let credential = &VersionedConfig::get().load().credential; + pub async fn wbi_img(&self, credential: &Credential) -> Result { credential.wbi_img(&self.client).await } } diff --git a/crates/bili_sync/src/bilibili/collection.rs b/crates/bili_sync/src/bilibili/collection.rs index 9f59b36..65093ad 100644 --- a/crates/bili_sync/src/bilibili/collection.rs +++ b/crates/bili_sync/src/bilibili/collection.rs @@ -7,7 +7,7 @@ use reqwest::Method; use serde::Deserialize; use serde_json::Value; -use crate::bilibili::{BiliClient, Validate, VideoInfo}; +use crate::bilibili::{BiliClient, Credential, Validate, VideoInfo}; #[derive(PartialEq, Eq, Hash, Clone, Debug, Default, Copy)] pub enum CollectionType { @@ -73,6 +73,7 @@ pub struct CollectionItem { pub struct Collection<'a> { client: &'a BiliClient, pub collection: CollectionItem, + credential: &'a Credential, } #[derive(Debug, PartialEq)] @@ -111,8 +112,12 @@ impl<'de> Deserialize<'de> for CollectionInfo { } impl<'a> Collection<'a> { - pub fn new(client: &'a BiliClient, collection: CollectionItem) -> Self { - Self { client, collection } + pub fn new(client: &'a BiliClient, collection: CollectionItem, credential: &'a Credential) -> Self { + Self { + client, + collection, + credential, + } } pub async fn get_info(&self) -> Result { @@ -126,7 +131,7 @@ impl<'a> Collection<'a> { async fn get_series_info(&self) -> Result { self.client - .request(Method::GET, "https://api.bilibili.com/x/series/series") + .request(Method::GET, "https://api.bilibili.com/x/series/series", self.credential) .await .query(&[("series_id", self.collection.sid.as_str())]) .send() @@ -141,7 +146,11 @@ impl<'a> Collection<'a> { let req = match self.collection.collection_type { CollectionType::Series => self .client - .request(Method::GET, "https://api.bilibili.com/x/series/archives") + .request( + Method::GET, + "https://api.bilibili.com/x/series/archives", + self.credential, + ) .await .query(&[("pn", page)]) .query(&[ @@ -156,6 +165,7 @@ impl<'a> Collection<'a> { .request( Method::GET, "https://api.bilibili.com/x/polymer/web-space/seasons_archives_list", + self.credential, ) .await .query(&[("page_num", page)]) diff --git a/crates/bili_sync/src/bilibili/danmaku/writer.rs b/crates/bili_sync/src/bilibili/danmaku/writer.rs index e7bf70d..d6ff7a0 100644 --- a/crates/bili_sync/src/bilibili/danmaku/writer.rs +++ b/crates/bili_sync/src/bilibili/danmaku/writer.rs @@ -3,10 +3,9 @@ use std::path::PathBuf; use anyhow::Result; use tokio::fs::{self, File}; -use crate::bilibili::PageInfo; use crate::bilibili::danmaku::canvas::CanvasConfig; use crate::bilibili::danmaku::{AssWriter, Danmu}; -use crate::config::VersionedConfig; +use crate::bilibili::{DanmakuOption, PageInfo}; pub struct DanmakuWriter<'a> { page: &'a PageInfo, @@ -18,12 +17,11 @@ impl<'a> DanmakuWriter<'a> { DanmakuWriter { page, danmaku } } - pub async fn write(self, path: PathBuf) -> Result<()> { + pub async fn write(self, path: PathBuf, danmaku_option: &DanmakuOption) -> Result<()> { if let Some(parent) = path.parent() { fs::create_dir_all(parent).await?; } - let config = VersionedConfig::get().load_full(); - let canvas_config = CanvasConfig::new(&config.danmaku_option, self.page); + let canvas_config = CanvasConfig::new(danmaku_option, self.page); let mut writer = AssWriter::construct(File::create(path).await?, self.page.name.clone(), canvas_config.clone()).await?; let mut canvas = canvas_config.canvas(); diff --git a/crates/bili_sync/src/bilibili/dynamic.rs b/crates/bili_sync/src/bilibili/dynamic.rs index 53c0f0e..4176b5d 100644 --- a/crates/bili_sync/src/bilibili/dynamic.rs +++ b/crates/bili_sync/src/bilibili/dynamic.rs @@ -6,11 +6,12 @@ use futures::Stream; use reqwest::Method; use serde_json::Value; -use crate::bilibili::{BiliClient, MIXIN_KEY, Validate, VideoInfo, WbiSign}; +use crate::bilibili::{BiliClient, Credential, MIXIN_KEY, Validate, VideoInfo, WbiSign}; pub struct Dynamic<'a> { client: &'a BiliClient, pub upper_id: String, + credential: &'a Credential, } #[derive(Debug, serde::Deserialize)] @@ -20,8 +21,12 @@ pub struct DynamicItemPublished { } impl<'a> Dynamic<'a> { - pub fn new(client: &'a BiliClient, upper_id: String) -> Self { - Self { client, upper_id } + pub fn new(client: &'a BiliClient, upper_id: String, credential: &'a Credential) -> Self { + Self { + client, + upper_id, + credential, + } } pub async fn get_dynamics(&self, offset: Option) -> Result { @@ -29,6 +34,7 @@ impl<'a> Dynamic<'a> { .request( Method::GET, "https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/space", + self.credential, ) .await .query(&[ diff --git a/crates/bili_sync/src/bilibili/favorite_list.rs b/crates/bili_sync/src/bilibili/favorite_list.rs index 40a6bb3..cd307c4 100644 --- a/crates/bili_sync/src/bilibili/favorite_list.rs +++ b/crates/bili_sync/src/bilibili/favorite_list.rs @@ -3,10 +3,11 @@ use async_stream::try_stream; use futures::Stream; use serde_json::Value; -use crate::bilibili::{BiliClient, Validate, VideoInfo}; +use crate::bilibili::{BiliClient, Credential, Validate, VideoInfo}; pub struct FavoriteList<'a> { client: &'a BiliClient, fid: String, + credential: &'a Credential, } #[derive(Debug, serde::Deserialize)] @@ -22,14 +23,22 @@ pub struct Upper { pub face: String, } impl<'a> FavoriteList<'a> { - pub fn new(client: &'a BiliClient, fid: String) -> Self { - Self { client, fid } + pub fn new(client: &'a BiliClient, fid: String, credential: &'a Credential) -> Self { + Self { + client, + fid, + credential, + } } pub async fn get_info(&self) -> Result { let mut res = self .client - .request(reqwest::Method::GET, "https://api.bilibili.com/x/v3/fav/folder/info") + .request( + reqwest::Method::GET, + "https://api.bilibili.com/x/v3/fav/folder/info", + self.credential, + ) .await .query(&[("media_id", &self.fid)]) .send() @@ -43,7 +52,11 @@ impl<'a> FavoriteList<'a> { async fn get_videos(&self, page: u32) -> Result { self.client - .request(reqwest::Method::GET, "https://api.bilibili.com/x/v3/fav/resource/list") + .request( + reqwest::Method::GET, + "https://api.bilibili.com/x/v3/fav/resource/list", + self.credential, + ) .await .query(&[ ("media_id", self.fid.as_str()), diff --git a/crates/bili_sync/src/bilibili/me.rs b/crates/bili_sync/src/bilibili/me.rs index 1f2ddde..18e579a 100644 --- a/crates/bili_sync/src/bilibili/me.rs +++ b/crates/bili_sync/src/bilibili/me.rs @@ -1,28 +1,32 @@ use anyhow::{Result, ensure}; use reqwest::Method; -use crate::bilibili::{BiliClient, Validate}; -use crate::config::VersionedConfig; +use crate::bilibili::{BiliClient, Credential, Validate}; + pub struct Me<'a> { client: &'a BiliClient, - mid: String, + credential: &'a Credential, } impl<'a> Me<'a> { - pub fn new(client: &'a BiliClient) -> Self { - Self { - client, - mid: Self::my_id(), - } + pub fn new(client: &'a BiliClient, credential: &'a Credential) -> Self { + Self { client, credential } } pub async fn get_created_favorites(&self) -> Result>> { - ensure!(!self.mid.is_empty(), "未获取到用户 ID,请确保填写设置中的 B 站认证信息"); + ensure!( + !self.mid().is_empty(), + "未获取到用户 ID,请确保填写设置中的 B 站认证信息" + ); let mut resp = self .client - .request(Method::GET, "https://api.bilibili.com/x/v3/fav/folder/created/list-all") + .request( + Method::GET, + "https://api.bilibili.com/x/v3/fav/folder/created/list-all", + self.credential, + ) .await - .query(&[("up_mid", &self.mid)]) + .query(&[("up_mid", &self.mid())]) .send() .await? .error_for_status()? @@ -33,12 +37,19 @@ impl<'a> Me<'a> { } pub async fn get_followed_collections(&self, page_num: i32, page_size: i32) -> Result { - ensure!(!self.mid.is_empty(), "未获取到用户 ID,请确保填写设置中的 B 站认证信息"); + ensure!( + !self.mid().is_empty(), + "未获取到用户 ID,请确保填写设置中的 B 站认证信息" + ); let mut resp = self .client - .request(Method::GET, "https://api.bilibili.com/x/v3/fav/folder/collected/list") + .request( + Method::GET, + "https://api.bilibili.com/x/v3/fav/folder/collected/list", + self.credential, + ) .await - .query(&[("up_mid", self.mid.as_str()), ("platform", "web")]) + .query(&[("up_mid", self.mid()), ("platform", "web")]) .query(&[("pn", page_num), ("ps", page_size)]) .send() .await? @@ -50,12 +61,19 @@ impl<'a> Me<'a> { } pub async fn get_followed_uppers(&self, page_num: i32, page_size: i32) -> Result { - ensure!(!self.mid.is_empty(), "未获取到用户 ID,请确保填写设置中的 B 站认证信息"); + ensure!( + !self.mid().is_empty(), + "未获取到用户 ID,请确保填写设置中的 B 站认证信息" + ); let mut resp = self .client - .request(Method::GET, "https://api.bilibili.com/x/relation/followings") + .request( + Method::GET, + "https://api.bilibili.com/x/relation/followings", + self.credential, + ) .await - .query(&[("vmid", self.mid.as_str())]) + .query(&[("vmid", self.mid())]) .query(&[("pn", page_num), ("ps", page_size)]) .send() .await? @@ -66,8 +84,8 @@ impl<'a> Me<'a> { Ok(serde_json::from_value(resp["data"].take())?) } - fn my_id() -> String { - VersionedConfig::get().load().credential.dedeuserid.clone() + fn mid(&self) -> &str { + &self.credential.dedeuserid } } diff --git a/crates/bili_sync/src/bilibili/mod.rs b/crates/bili_sync/src/bilibili/mod.rs index 1f18026..a228dd3 100644 --- a/crates/bili_sync/src/bilibili/mod.rs +++ b/crates/bili_sync/src/bilibili/mod.rs @@ -8,7 +8,7 @@ use chrono::serde::ts_seconds; use chrono::{DateTime, Utc}; pub use client::{BiliClient, Client}; pub use collection::{Collection, CollectionItem, CollectionType}; -pub use credential::{Credential, WbiImg}; +pub use credential::Credential; pub use danmaku::DanmakuOption; pub use dynamic::Dynamic; pub use error::BiliError; @@ -208,10 +208,15 @@ mod tests { #[tokio::test] async fn test_video_info_type() -> Result<()> { VersionedConfig::init_for_test(&setup_database(Path::new("./test.sqlite")).await?).await?; + let credential = &VersionedConfig::get().read().credential; init_logger("None,bili_sync=debug", None); let bili_client = BiliClient::new(); // 请求 UP 主视频必须要获取 mixin key,使用 key 计算请求参数的签名,否则直接提示权限不足返回空 - let mixin_key = bili_client.wbi_img().await?.into_mixin_key().context("no mixin key")?; + let mixin_key = bili_client + .wbi_img(credential) + .await? + .into_mixin_key() + .context("no mixin key")?; set_global_mixin_key(mixin_key); let collection = Collection::new( &bili_client, @@ -220,6 +225,7 @@ mod tests { sid: "4523".to_string(), collection_type: CollectionType::Season, }, + &credential, ); let videos = collection .into_video_stream() @@ -230,7 +236,7 @@ mod tests { assert!(videos.iter().all(|v| matches!(v, VideoInfo::Collection { .. }))); assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); // 测试收藏夹 - let favorite = FavoriteList::new(&bili_client, "3144336058".to_string()); + let favorite = FavoriteList::new(&bili_client, "3144336058".to_string(), &credential); let videos = favorite .into_video_stream() .take(20) @@ -240,7 +246,7 @@ mod tests { assert!(videos.iter().all(|v| matches!(v, VideoInfo::Favorite { .. }))); assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); // 测试稍后再看 - let watch_later = WatchLater::new(&bili_client); + let watch_later = WatchLater::new(&bili_client, &credential); let videos = watch_later .into_video_stream() .take(20) @@ -250,7 +256,7 @@ mod tests { assert!(videos.iter().all(|v| matches!(v, VideoInfo::WatchLater { .. }))); assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); // 测试投稿 - let submission = Submission::new(&bili_client, "956761".to_string()); + let submission = Submission::new(&bili_client, "956761".to_string(), &credential); let videos = submission .into_video_stream() .take(20) @@ -260,7 +266,7 @@ mod tests { assert!(videos.iter().all(|v| matches!(v, VideoInfo::Submission { .. }))); assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); // 测试动态 - let dynamic = Dynamic::new(&bili_client, "659898".to_string()); + let dynamic = Dynamic::new(&bili_client, "659898".to_string(), &credential); let videos = dynamic .into_video_stream() .take(20) @@ -275,10 +281,16 @@ mod tests { #[ignore = "only for manual test"] #[tokio::test] async fn test_subtitle_parse() -> Result<()> { + VersionedConfig::init_for_test(&setup_database(Path::new("./test.sqlite")).await?).await?; + let credential = &VersionedConfig::get().read().credential; let bili_client = BiliClient::new(); - let mixin_key = bili_client.wbi_img().await?.into_mixin_key().context("no mixin key")?; + let mixin_key = bili_client + .wbi_img(credential) + .await? + .into_mixin_key() + .context("no mixin key")?; set_global_mixin_key(mixin_key); - let video = Video::new(&bili_client, "BV1gLfnY8E6D".to_string()); + let video = Video::new(&bili_client, "BV1gLfnY8E6D".to_string(), &credential); let pages = video.get_pages().await?; println!("pages: {:?}", pages); let subtitles = video.get_subtitles(&pages[0]).await?; @@ -296,15 +308,20 @@ mod tests { #[tokio::test] async fn test_upower_parse() -> Result<()> { VersionedConfig::init_for_test(&setup_database(Path::new("./test.sqlite")).await?).await?; + let credential = &VersionedConfig::get().read().credential; let bili_client = BiliClient::new(); - let mixin_key = bili_client.wbi_img().await?.into_mixin_key().context("no mixin key")?; + let mixin_key = bili_client + .wbi_img(credential) + .await? + .into_mixin_key() + .context("no mixin key")?; set_global_mixin_key(mixin_key); for (bvid, (upower_exclusive, upower_play)) in [ ("BV1HxXwYEEqt", (true, false)), // 充电专享且无权观看 ("BV16w41187fx", (true, true)), // 充电专享但有权观看 ("BV1n34jzPEYq", (false, false)), // 普通视频 ] { - let video = Video::new(&bili_client, bvid.to_string()); + let video = Video::new(&bili_client, bvid.to_string(), credential); let info = video.get_view_info().await?; let VideoInfo::Detail { is_upower_exclusive, @@ -324,15 +341,20 @@ mod tests { #[tokio::test] async fn test_ep_parse() -> Result<()> { VersionedConfig::init_for_test(&setup_database(Path::new("./test.sqlite")).await?).await?; + let credential = &VersionedConfig::get().read().credential; let bili_client = BiliClient::new(); - let mixin_key = bili_client.wbi_img().await?.into_mixin_key().context("no mixin key")?; + let mixin_key = bili_client + .wbi_img(credential) + .await? + .into_mixin_key() + .context("no mixin key")?; set_global_mixin_key(mixin_key); for (bvid, redirect_is_none) in [ ("BV1SF411g796", false), // EP ("BV13xtnzPEye", false), // 番剧 ("BV1kT4NzTEZj", true), // 普通视频 ] { - let video = Video::new(&bili_client, bvid.to_string()); + let video = Video::new(&bili_client, bvid.to_string(), credential); let info = video.get_view_info().await?; let VideoInfo::Detail { redirect_url, .. } = info else { unreachable!(); diff --git a/crates/bili_sync/src/bilibili/submission.rs b/crates/bili_sync/src/bilibili/submission.rs index 260154c..6febd03 100644 --- a/crates/bili_sync/src/bilibili/submission.rs +++ b/crates/bili_sync/src/bilibili/submission.rs @@ -5,27 +5,36 @@ use reqwest::Method; use serde_json::Value; use crate::bilibili::favorite_list::Upper; -use crate::bilibili::{BiliClient, Dynamic, MIXIN_KEY, Validate, VideoInfo, WbiSign}; +use crate::bilibili::{BiliClient, Credential, Dynamic, MIXIN_KEY, Validate, VideoInfo, WbiSign}; pub struct Submission<'a> { client: &'a BiliClient, pub upper_id: String, + credential: &'a Credential, } impl<'a> From> for Dynamic<'a> { fn from(submission: Submission<'a>) -> Self { - Dynamic::new(submission.client, submission.upper_id) + Dynamic::new(submission.client, submission.upper_id, submission.credential) } } impl<'a> Submission<'a> { - pub fn new(client: &'a BiliClient, upper_id: String) -> Self { - Self { client, upper_id } + pub fn new(client: &'a BiliClient, upper_id: String, credential: &'a Credential) -> Self { + Self { + client, + upper_id, + credential, + } } pub async fn get_info(&self) -> Result> { let mut res = self .client - .request(Method::GET, "https://api.bilibili.com/x/web-interface/card") + .request( + Method::GET, + "https://api.bilibili.com/x/web-interface/card", + self.credential, + ) .await .query(&[("mid", self.upper_id.as_str())]) .send() @@ -39,7 +48,11 @@ impl<'a> Submission<'a> { async fn get_videos(&self, page: i32) -> Result { self.client - .request(Method::GET, "https://api.bilibili.com/x/space/wbi/arc/search") + .request( + Method::GET, + "https://api.bilibili.com/x/space/wbi/arc/search", + self.credential, + ) .await .query(&[ ("mid", self.upper_id.as_str()), diff --git a/crates/bili_sync/src/bilibili/video.rs b/crates/bili_sync/src/bilibili/video.rs index 3ee3ed7..c9ebc6c 100644 --- a/crates/bili_sync/src/bilibili/video.rs +++ b/crates/bili_sync/src/bilibili/video.rs @@ -8,11 +8,12 @@ use crate::bilibili::analyzer::PageAnalyzer; use crate::bilibili::client::BiliClient; use crate::bilibili::danmaku::{DanmakuElem, DanmakuWriter, DmSegMobileReply}; use crate::bilibili::subtitle::{SubTitle, SubTitleBody, SubTitleInfo, SubTitlesInfo}; -use crate::bilibili::{MIXIN_KEY, Validate, VideoInfo, WbiSign}; +use crate::bilibili::{Credential, MIXIN_KEY, Validate, VideoInfo, WbiSign}; pub struct Video<'a> { client: &'a BiliClient, pub bvid: String, + credential: &'a Credential, } #[derive(Debug, serde::Deserialize, Default)] @@ -34,15 +35,23 @@ pub struct Dimension { } impl<'a> Video<'a> { - pub fn new(client: &'a BiliClient, bvid: String) -> Self { - Self { client, bvid } + pub fn new(client: &'a BiliClient, bvid: String, credential: &'a Credential) -> Self { + Self { + client, + bvid, + credential, + } } /// 直接调用视频信息接口获取详细的视频信息,视频信息中包含了视频的分页信息 pub async fn get_view_info(&self) -> Result { let mut res = self .client - .request(Method::GET, "https://api.bilibili.com/x/web-interface/wbi/view") + .request( + Method::GET, + "https://api.bilibili.com/x/web-interface/wbi/view", + self.credential, + ) .await .query(&[("bvid", &self.bvid)]) .wbi_sign(MIXIN_KEY.load().as_deref())? @@ -59,7 +68,11 @@ impl<'a> Video<'a> { pub async fn get_pages(&self) -> Result> { let mut res = self .client - .request(Method::GET, "https://api.bilibili.com/x/player/pagelist") + .request( + Method::GET, + "https://api.bilibili.com/x/player/pagelist", + self.credential, + ) .await .query(&[("bvid", &self.bvid)]) .send() @@ -74,7 +87,11 @@ impl<'a> Video<'a> { pub async fn get_tags(&self) -> Result> { let res = self .client - .request(Method::GET, "https://api.bilibili.com/x/web-interface/view/detail/tag") + .request( + Method::GET, + "https://api.bilibili.com/x/web-interface/view/detail/tag", + self.credential, + ) .await .query(&[("bvid", &self.bvid)]) .send() @@ -105,7 +122,11 @@ impl<'a> Video<'a> { async fn get_danmaku_segment(&self, page: &PageInfo, segment_idx: i64) -> Result> { let mut res = self .client - .request(Method::GET, "https://api.bilibili.com/x/v2/dm/wbi/web/seg.so") + .request( + Method::GET, + "https://api.bilibili.com/x/v2/dm/wbi/web/seg.so", + self.credential, + ) .await .query(&[("type", 1), ("oid", page.cid), ("segment_index", segment_idx)]) .wbi_sign(MIXIN_KEY.load().as_deref())? @@ -126,7 +147,11 @@ impl<'a> Video<'a> { pub async fn get_page_analyzer(&self, page: &PageInfo) -> Result { let mut res = self .client - .request(Method::GET, "https://api.bilibili.com/x/player/wbi/playurl") + .request( + Method::GET, + "https://api.bilibili.com/x/player/wbi/playurl", + self.credential, + ) .await .query(&[ ("bvid", self.bvid.as_str()), @@ -149,7 +174,7 @@ impl<'a> Video<'a> { pub async fn get_subtitles(&self, page: &PageInfo) -> Result> { let mut res = self .client - .request(Method::GET, "https://api.bilibili.com/x/player/wbi/v2") + .request(Method::GET, "https://api.bilibili.com/x/player/wbi/v2", self.credential) .await .query(&[("bvid", self.bvid.as_str())]) .query(&[("cid", page.cid)]) diff --git a/crates/bili_sync/src/bilibili/watch_later.rs b/crates/bili_sync/src/bilibili/watch_later.rs index 1c45ddd..f983539 100644 --- a/crates/bili_sync/src/bilibili/watch_later.rs +++ b/crates/bili_sync/src/bilibili/watch_later.rs @@ -3,19 +3,24 @@ use async_stream::try_stream; use futures::Stream; use serde_json::Value; -use crate::bilibili::{BiliClient, Validate, VideoInfo}; +use crate::bilibili::{BiliClient, Credential, Validate, VideoInfo}; pub struct WatchLater<'a> { client: &'a BiliClient, + credential: &'a Credential, } impl<'a> WatchLater<'a> { - pub fn new(client: &'a BiliClient) -> Self { - Self { client } + pub fn new(client: &'a BiliClient, credential: &'a Credential) -> Self { + Self { client, credential } } async fn get_videos(&self) -> Result { self.client - .request(reqwest::Method::GET, "https://api.bilibili.com/x/v2/history/toview") + .request( + reqwest::Method::GET, + "https://api.bilibili.com/x/v2/history/toview", + self.credential, + ) .await .send() .await? diff --git a/crates/bili_sync/src/config/current.rs b/crates/bili_sync/src/config/current.rs index 03b0763..9ff6ed3 100644 --- a/crates/bili_sync/src/config/current.rs +++ b/crates/bili_sync/src/config/current.rs @@ -85,14 +85,6 @@ impl Config { } Ok(()) } - - #[cfg(test)] - pub(super) fn test_default() -> Self { - Self { - cdn_sorting: true, - ..Default::default() - } - } } impl Default for Config { diff --git a/crates/bili_sync/src/config/item.rs b/crates/bili_sync/src/config/item.rs index 43bcf1f..7185792 100644 --- a/crates/bili_sync/src/config/item.rs +++ b/crates/bili_sync/src/config/item.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::utils::filenamify::filenamify; /// NFO 文件使用的时间类型 -#[derive(Serialize, Deserialize, Default, Clone)] +#[derive(Serialize, Deserialize, Default, Clone, Copy)] #[serde(rename_all = "lowercase")] pub enum NFOTimeType { #[default] diff --git a/crates/bili_sync/src/config/mod.rs b/crates/bili_sync/src/config/mod.rs index 2f56fc7..77fc77d 100644 --- a/crates/bili_sync/src/config/mod.rs +++ b/crates/bili_sync/src/config/mod.rs @@ -9,6 +9,6 @@ mod versioned_config; pub use crate::config::args::{ARGS, version}; pub use crate::config::current::{CONFIG_DIR, Config}; pub use crate::config::handlebar::TEMPLATE; -pub use crate::config::item::{NFOTimeType, PathSafeTemplate, RateLimit, SkipOption}; +pub use crate::config::item::{ConcurrentDownloadLimit, NFOTimeType, PathSafeTemplate, RateLimit}; pub use crate::config::versioned_cache::VersionedCache; pub use crate::config::versioned_config::VersionedConfig; diff --git a/crates/bili_sync/src/config/versioned_cache.rs b/crates/bili_sync/src/config/versioned_cache.rs index 98ff22e..685d5e7 100644 --- a/crates/bili_sync/src/config/versioned_cache.rs +++ b/crates/bili_sync/src/config/versioned_cache.rs @@ -1,54 +1,56 @@ use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; use anyhow::Result; use arc_swap::{ArcSwap, Guard}; +use tokio_util::future::FutureExt; +use tokio_util::sync::CancellationToken; use crate::config::{Config, VersionedConfig}; pub struct VersionedCache { - inner: ArcSwap, - version: AtomicU64, - builder: fn(&Config) -> Result, - mutex: parking_lot::Mutex<()>, + inner: Arc>, + cancel_token: CancellationToken, } -impl VersionedCache { +/// 一个跟随全局配置变化自动更新的缓存 +impl VersionedCache { pub fn new(builder: fn(&Config) -> Result) -> Result { - let current_config = VersionedConfig::get().load(); - let current_version = current_config.version; - let initial_value = builder(¤t_config)?; - Ok(Self { - inner: ArcSwap::from_pointee(initial_value), - version: AtomicU64::new(current_version), - builder, - mutex: parking_lot::Mutex::new(()), - }) + let mut rx = VersionedConfig::get().subscribe(); + let initial_value = builder(&rx.borrow_and_update())?; + let cancel_token = CancellationToken::new(); + let inner = Arc::new(ArcSwap::from_pointee(initial_value)); + let inner_clone = inner.clone(); + tokio::spawn( + async move { + while rx.changed().await.is_ok() { + match builder(&rx.borrow()) { + Ok(new_value) => { + inner_clone.store(Arc::new(new_value)); + } + Err(e) => { + error!("Failed to update versioned cache: {:?}", e); + } + } + } + } + .with_cancellation_token_owned(cancel_token.clone()), + ); + Ok(Self { inner, cancel_token }) } - pub fn load(&self) -> Guard> { - self.reload_if_needed(); + /// 获取一个临时的只读引用 + pub fn read(&self) -> Guard> { self.inner.load() } - fn reload_if_needed(&self) { - let current_config = VersionedConfig::get().load(); - let current_version = current_config.version; - let version = self.version.load(Ordering::Relaxed); - if version < current_version { - let _lock = self.mutex.lock(); - if self.version.load(Ordering::Relaxed) >= current_version { - return; - } - match (self.builder)(¤t_config) { - Err(e) => { - error!("Failed to rebuild versioned cache: {:?}", e); - } - Ok(new_value) => { - self.inner.store(Arc::new(new_value)); - self.version.store(current_version, Ordering::Relaxed); - } - } - } + /// 获取当前缓存的完整快照 + pub fn snapshot(&self) -> Arc { + self.inner.load_full() + } +} + +impl Drop for VersionedCache { + fn drop(&mut self) { + self.cancel_token.cancel(); } } diff --git a/crates/bili_sync/src/config/versioned_config.rs b/crates/bili_sync/src/config/versioned_config.rs index b8daafa..f6fc688 100644 --- a/crates/bili_sync/src/config/versioned_config.rs +++ b/crates/bili_sync/src/config/versioned_config.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use anyhow::{Result, anyhow, bail}; use arc_swap::{ArcSwap, Guard}; use sea_orm::DatabaseConnection; -use tokio::sync::OnceCell; +use tokio::sync::{OnceCell, watch}; use crate::bilibili::Credential; use crate::config::{CONFIG_DIR, Config}; @@ -13,6 +13,8 @@ pub static VERSIONED_CONFIG: OnceCell = OnceCell::const_new(); pub struct VersionedConfig { inner: ArcSwap, update_lock: tokio::sync::Mutex<()>, + tx: watch::Sender>, + rx: watch::Receiver>, } impl VersionedConfig { @@ -68,46 +70,51 @@ impl VersionedConfig { } #[cfg(test)] - /// 尝试获取全局的 `VersionedConfig`,如果未初始化则退回测试环境的默认配置 + /// 尝试获取全局的 `VersionedConfig`,如果未初始化则退回默认配置 pub fn get() -> &'static VersionedConfig { use std::sync::LazyLock; - static FALLBACK_CONFIG: LazyLock = - LazyLock::new(|| VersionedConfig::new(Config::test_default())); - // 优先从全局变量获取,未初始化则返回测试环境的默认配置 + static FALLBACK_CONFIG: LazyLock = LazyLock::new(|| VersionedConfig::new(Config::default())); + // 优先从全局变量获取,未初始化则退回默认配置 return VERSIONED_CONFIG.get().unwrap_or_else(|| &FALLBACK_CONFIG); } - pub fn new(config: Config) -> Self { + fn new(config: Config) -> Self { + let inner = ArcSwap::from_pointee(config); + let (tx, rx) = watch::channel(inner.load_full()); Self { - inner: ArcSwap::from_pointee(config), + inner, update_lock: tokio::sync::Mutex::new(()), + tx, + rx, } } - pub fn load(&self) -> Guard> { + pub fn read(&self) -> Guard> { self.inner.load() } - pub fn load_full(&self) -> Arc { + pub fn snapshot(&self) -> Arc { self.inner.load_full() } - pub async fn update_credential(&self, new_credential: Credential, connection: &DatabaseConnection) -> Result<()> { - // 确保更新内容与写入数据库的操作是原子性的 + pub fn subscribe(&self) -> watch::Receiver> { + self.rx.clone() + } + + pub async fn update_credential( + &self, + new_credential: Credential, + connection: &DatabaseConnection, + ) -> Result> { let _lock = self.update_lock.lock().await; - loop { - let old_config = self.inner.load(); - let mut new_config = old_config.as_ref().clone(); - new_config.credential = new_credential.clone(); - new_config.version += 1; - if Arc::ptr_eq( - &old_config, - &self.inner.compare_and_swap(&old_config, Arc::new(new_config)), - ) { - break; - } - } - self.inner.load().save_to_database(connection).await + let mut new_config = self.inner.load().as_ref().clone(); + new_config.credential = new_credential; + new_config.version += 1; + new_config.save_to_database(connection).await?; + let new_config = Arc::new(new_config); + self.inner.store(new_config.clone()); + self.tx.send(new_config.clone())?; + Ok(new_config) } /// 外部 API 会调用这个方法,如果更新失败直接返回错误 @@ -118,14 +125,10 @@ impl VersionedConfig { bail!("配置版本不匹配,请刷新页面修改后重新提交"); } new_config.version += 1; - let new_config = Arc::new(new_config); - if !Arc::ptr_eq( - &old_config, - &self.inner.compare_and_swap(&old_config, new_config.clone()), - ) { - bail!("配置版本不匹配,请刷新页面修改后重新提交"); - } new_config.save_to_database(connection).await?; + let new_config = Arc::new(new_config); + self.inner.store(new_config.clone()); + self.tx.send(new_config.clone())?; Ok(new_config) } } diff --git a/crates/bili_sync/src/downloader.rs b/crates/bili_sync/src/downloader.rs index 9c92e1f..762277c 100644 --- a/crates/bili_sync/src/downloader.rs +++ b/crates/bili_sync/src/downloader.rs @@ -14,7 +14,8 @@ use tokio::task::JoinSet; use tokio_util::io::StreamReader; use crate::bilibili::Client; -use crate::config::VersionedConfig; +use crate::config::ConcurrentDownloadLimit; + pub struct Downloader { client: Client, } @@ -27,9 +28,9 @@ impl Downloader { Self { client } } - pub async fn fetch(&self, url: &str, path: &Path) -> Result<()> { + pub async fn fetch(&self, url: &str, path: &Path, concurrent_download: &ConcurrentDownloadLimit) -> Result<()> { let mut temp_file = TempFile::new().await?; - self.fetch_internal(url, &mut temp_file).await?; + self.fetch_internal(url, &mut temp_file, concurrent_download).await?; if let Some(parent) = path.parent() { fs::create_dir_all(parent).await?; } @@ -41,8 +42,13 @@ impl Downloader { Ok(()) } - pub async fn multi_fetch(&self, urls: &[&str], path: &Path) -> Result<()> { - let temp_file = self.multi_fetch_internal(urls).await?; + pub async fn multi_fetch( + &self, + urls: &[&str], + path: &Path, + concurrent_download: &ConcurrentDownloadLimit, + ) -> Result<()> { + let temp_file = self.multi_fetch_internal(urls, concurrent_download).await?; if let Some(parent) = path.parent() { fs::create_dir_all(parent).await?; } @@ -51,10 +57,16 @@ impl Downloader { Ok(()) } - pub async fn multi_fetch_and_merge(&self, video_urls: &[&str], audio_urls: &[&str], path: &Path) -> Result<()> { + pub async fn multi_fetch_and_merge( + &self, + video_urls: &[&str], + audio_urls: &[&str], + path: &Path, + concurrent_download: &ConcurrentDownloadLimit, + ) -> Result<()> { let (video_temp_file, audio_temp_file) = tokio::try_join!( - self.multi_fetch_internal(video_urls), - self.multi_fetch_internal(audio_urls) + self.multi_fetch_internal(video_urls, concurrent_download), + self.multi_fetch_internal(audio_urls, concurrent_download) )?; let final_temp_file = TempFile::new().await?; let output = Command::new("ffmpeg") @@ -90,13 +102,17 @@ impl Downloader { Ok(()) } - async fn multi_fetch_internal(&self, urls: &[&str]) -> Result { + async fn multi_fetch_internal( + &self, + urls: &[&str], + concurrent_download: &ConcurrentDownloadLimit, + ) -> Result { if urls.is_empty() { bail!("no urls provided"); } let mut temp_file = TempFile::new().await?; for (idx, url) in urls.iter().enumerate() { - match self.fetch_internal(url, &mut temp_file).await { + match self.fetch_internal(url, &mut temp_file, concurrent_download).await { Ok(_) => return Ok(temp_file), Err(e) => { if idx == urls.len() - 1 { @@ -111,9 +127,14 @@ impl Downloader { unreachable!() } - async fn fetch_internal(&self, url: &str, file: &mut TempFile) -> Result<()> { - if VersionedConfig::get().load().concurrent_limit.download.enable { - self.fetch_parallel(url, file).await + async fn fetch_internal( + &self, + url: &str, + file: &mut TempFile, + concurrent_download: &ConcurrentDownloadLimit, + ) -> Result<()> { + if concurrent_download.enable { + self.fetch_parallel(url, file, concurrent_download).await } else { self.fetch_serial(url, file).await } @@ -141,14 +162,13 @@ impl Downloader { Ok(()) } - async fn fetch_parallel(&self, url: &str, file: &mut TempFile) -> Result<()> { - let (concurrency, threshold) = { - let config = VersionedConfig::get().load(); - ( - config.concurrent_limit.download.concurrency, - config.concurrent_limit.download.threshold, - ) - }; + async fn fetch_parallel( + &self, + url: &str, + file: &mut TempFile, + concurrent_download: &ConcurrentDownloadLimit, + ) -> Result<()> { + let (concurrency, threshold) = (concurrent_download.concurrency, concurrent_download.threshold); let resp = self .client .request(Method::HEAD, url, None) diff --git a/crates/bili_sync/src/task/http_server.rs b/crates/bili_sync/src/task/http_server.rs index d82415a..48091be 100644 --- a/crates/bili_sync/src/task/http_server.rs +++ b/crates/bili_sync/src/task/http_server.rs @@ -30,11 +30,11 @@ pub async fn http_server( .layer(Extension(database_connection)) .layer(Extension(bili_client)) .layer(Extension(log_writer)); - let config = VersionedConfig::get().load_full(); - let listener = tokio::net::TcpListener::bind(&config.bind_address) + let bind_address = VersionedConfig::get().read().bind_address.to_owned(); + let listener = tokio::net::TcpListener::bind(&bind_address) .await .context("bind address failed")?; - info!("开始运行管理页:http://{}", config.bind_address); + info!("开始运行管理页:http://{}", bind_address); Ok(axum::serve(listener, ServiceExt::::into_make_service(app)).await?) } diff --git a/crates/bili_sync/src/task/video_downloader.rs b/crates/bili_sync/src/task/video_downloader.rs index fa129cf..8a818cb 100644 --- a/crates/bili_sync/src/task/video_downloader.rs +++ b/crates/bili_sync/src/task/video_downloader.rs @@ -1,11 +1,13 @@ use std::sync::Arc; +use anyhow::{Context, Result, bail}; +use chrono::NaiveDate; use sea_orm::DatabaseConnection; use tokio::time; use crate::adapter::VideoSource; -use crate::bilibili::{self, BiliClient, WbiImg}; -use crate::config::VersionedConfig; +use crate::bilibili::{self, BiliClient}; +use crate::config::{Config, TEMPLATE, VersionedConfig}; use crate::utils::model::get_enabled_video_sources; use crate::utils::task_notifier::TASK_STATUS_NOTIFIER; use crate::workflow::process_video_source; @@ -14,49 +16,59 @@ use crate::workflow::process_video_source; pub async fn video_downloader(connection: DatabaseConnection, bili_client: Arc) { let mut anchor = chrono::Local::now().date_naive(); loop { - info!("开始执行本轮视频下载任务.."); let _lock = TASK_STATUS_NOTIFIER.start_running().await; - let config = VersionedConfig::get().load_full(); - 'inner: { - if let Err(e) = config.check() { - error!("配置检查失败,跳过本轮执行:\n{:#}", e); - break 'inner; - } - match bili_client.wbi_img().await.map(WbiImg::into_mixin_key) { - Ok(Some(mixin_key)) => bilibili::set_global_mixin_key(mixin_key), - Ok(_) => { - error!("解析 mixin key 失败,等待下一轮执行"); - break 'inner; - } - Err(e) => { - error!("获取 mixin key 遇到错误:{:#},等待下一轮执行", e); - break 'inner; - } - }; - if anchor != chrono::Local::now().date_naive() { - if let Err(e) = bili_client.check_refresh(&connection).await { - error!("检查刷新 Credential 遇到错误:{:#},等待下一轮执行", e); - break 'inner; - } - anchor = chrono::Local::now().date_naive(); - } - let Ok(video_sources) = get_enabled_video_sources(&connection).await else { - error!("获取视频源列表失败,等待下一轮执行"); - break 'inner; - }; - if video_sources.is_empty() { - info!("没有可用的视频源,等待下一轮执行"); - break 'inner; - } - for video_source in video_sources { - let display_name = video_source.display_name(); - if let Err(e) = process_video_source(video_source, &bili_client, &connection).await { - error!("处理 {} 时遇到错误:{:#},等待下一轮执行", display_name, e); - } - } - info!("本轮任务执行完毕,等待下一轮执行"); + let mut config = VersionedConfig::get().snapshot(); + info!("开始执行本轮视频下载任务.."); + if let Err(e) = download_all_video_sources(&connection, &bili_client, &mut config, &mut anchor).await { + error!("本轮视频下载任务执行遇到错误:{:#},跳过本轮执行", e); + } else { + info!("本轮视频下载任务执行完毕"); } - TASK_STATUS_NOTIFIER.finish_running(_lock); + TASK_STATUS_NOTIFIER.finish_running(_lock, config.interval as i64); time::sleep(time::Duration::from_secs(config.interval)).await; } } + +async fn download_all_video_sources( + connection: &DatabaseConnection, + bili_client: &BiliClient, + config: &mut Arc, + anchor: &mut NaiveDate, +) -> Result<()> { + config.check().context("配置检查失败")?; + let mixin_key = bili_client + .wbi_img(&config.credential) + .await + .context("获取 wbi_img 失败")? + .into_mixin_key() + .context("解析 mixin key 失败")?; + bilibili::set_global_mixin_key(mixin_key); + if *anchor != chrono::Local::now().date_naive() { + if let Some(new_credential) = bili_client + .check_refresh(&config.credential) + .await + .context("检查刷新 Credential 失败")? + { + *config = VersionedConfig::get() + .update_credential(new_credential, connection) + .await + .context("更新 Credential 失败")?; + } + *anchor = chrono::Local::now().date_naive(); + } + let template = TEMPLATE.snapshot(); + let bili_client = bili_client.snapshot()?; + let video_sources = get_enabled_video_sources(connection) + .await + .context("获取视频源列表失败")?; + if video_sources.is_empty() { + bail!("没有可用的视频源"); + } + for video_source in video_sources { + let display_name = video_source.display_name(); + if let Err(e) = process_video_source(video_source, &bili_client, connection, &template, config).await { + error!("处理 {} 时遇到错误:{:#},跳过该视频源", display_name, e); + } + } + Ok(()) +} diff --git a/crates/bili_sync/src/utils/download_context.rs b/crates/bili_sync/src/utils/download_context.rs new file mode 100644 index 0000000..9d9d5e8 --- /dev/null +++ b/crates/bili_sync/src/utils/download_context.rs @@ -0,0 +1,36 @@ +use sea_orm::DatabaseConnection; + +use crate::adapter::VideoSourceEnum; +use crate::bilibili::BiliClient; +use crate::config::Config; +use crate::downloader::Downloader; + +#[derive(Clone, Copy)] +pub struct DownloadContext<'a> { + pub bili_client: &'a BiliClient, + pub video_source: &'a VideoSourceEnum, + pub template: &'a handlebars::Handlebars<'a>, + pub connection: &'a DatabaseConnection, + pub downloader: &'a Downloader, + pub config: &'a Config, +} + +impl<'a> DownloadContext<'a> { + pub fn new( + bili_client: &'a BiliClient, + video_source: &'a VideoSourceEnum, + template: &'a handlebars::Handlebars<'a>, + connection: &'a DatabaseConnection, + downloader: &'a Downloader, + config: &'a Config, + ) -> Self { + Self { + bili_client, + video_source, + template, + connection, + downloader, + config, + } + } +} diff --git a/crates/bili_sync/src/utils/format_arg.rs b/crates/bili_sync/src/utils/format_arg.rs index f1bce26..1a82b88 100644 --- a/crates/bili_sync/src/utils/format_arg.rs +++ b/crates/bili_sync/src/utils/format_arg.rs @@ -1,24 +1,21 @@ use serde_json::json; -use crate::config::VersionedConfig; - -pub fn video_format_args(video_model: &bili_sync_entity::video::Model) -> serde_json::Value { - let config = VersionedConfig::get().load(); +pub fn video_format_args(video_model: &bili_sync_entity::video::Model, time_format: &str) -> serde_json::Value { json!({ "bvid": &video_model.bvid, "title": &video_model.name, "upper_name": &video_model.upper_name, "upper_mid": &video_model.upper_id, - "pubtime": &video_model.pubtime.and_utc().format(&config.time_format).to_string(), - "fav_time": &video_model.favtime.and_utc().format(&config.time_format).to_string(), + "pubtime": &video_model.pubtime.and_utc().format(time_format).to_string(), + "fav_time": &video_model.favtime.and_utc().format(time_format).to_string(), }) } pub fn page_format_args( video_model: &bili_sync_entity::video::Model, page_model: &bili_sync_entity::page::Model, + time_format: &str, ) -> serde_json::Value { - let config = VersionedConfig::get().load(); json!({ "bvid": &video_model.bvid, "title": &video_model.name, @@ -26,7 +23,7 @@ pub fn page_format_args( "upper_mid": &video_model.upper_id, "ptitle": &page_model.name, "pid": page_model.pid, - "pubtime": video_model.pubtime.and_utc().format(&config.time_format).to_string(), - "fav_time": video_model.favtime.and_utc().format(&config.time_format).to_string(), + "pubtime": video_model.pubtime.and_utc().format(time_format).to_string(), + "fav_time": video_model.favtime.and_utc().format(time_format).to_string(), }) } diff --git a/crates/bili_sync/src/utils/mod.rs b/crates/bili_sync/src/utils/mod.rs index 08dc5ac..415559f 100644 --- a/crates/bili_sync/src/utils/mod.rs +++ b/crates/bili_sync/src/utils/mod.rs @@ -1,4 +1,5 @@ pub mod convert; +pub mod download_context; pub mod filenamify; pub mod format_arg; pub mod model; diff --git a/crates/bili_sync/src/utils/nfo.rs b/crates/bili_sync/src/utils/nfo.rs index 8e719ac..2a86649 100644 --- a/crates/bili_sync/src/utils/nfo.rs +++ b/crates/bili_sync/src/utils/nfo.rs @@ -6,7 +6,7 @@ use quick_xml::events::{BytesCData, BytesText}; use quick_xml::writer::Writer; use tokio::io::{AsyncWriteExt, BufWriter}; -use crate::config::{NFOTimeType, VersionedConfig}; +use crate::config::NFOTimeType; #[allow(clippy::upper_case_acronyms)] pub enum NFO<'a> { @@ -265,7 +265,10 @@ mod tests { ..Default::default() }; assert_eq!( - NFO::Movie((&video).into()).generate_nfo().await.unwrap(), + NFO::Movie((&video).to_nfo(NFOTimeType::FavTime)) + .generate_nfo() + .await + .unwrap(), r#" BV1nWcSeeEkV

intro]]>
@@ -283,7 +286,10 @@ mod tests {
"#, ); assert_eq!( - NFO::TVShow((&video).into()).generate_nfo().await.unwrap(), + NFO::TVShow((&video).to_nfo(NFOTimeType::FavTime)) + .generate_nfo() + .await + .unwrap(), r#" BV1nWcSeeEkV

intro]]>
@@ -301,7 +307,10 @@ mod tests {
"#, ); assert_eq!( - NFO::Upper((&video).into()).generate_nfo().await.unwrap(), + NFO::Upper((&video).to_nfo(NFOTimeType::FavTime)) + .generate_nfo() + .await + .unwrap(), r#" @@ -318,7 +327,10 @@ mod tests { ..Default::default() }; assert_eq!( - NFO::Episode((&page).into()).generate_nfo().await.unwrap(), + NFO::Episode((&page).to_nfo(NFOTimeType::FavTime)) + .generate_nfo() + .await + .unwrap(), r#" @@ -331,54 +343,58 @@ mod tests { } } -impl<'a> From<&'a video::Model> for Movie<'a> { - fn from(video: &'a video::Model) -> Self { - Self { - name: &video.name, - intro: &video.intro, - bvid: &video.bvid, - upper_id: video.upper_id, - upper_name: &video.upper_name, - aired: match VersionedConfig::get().load().nfo_time_type { - NFOTimeType::FavTime => video.favtime, - NFOTimeType::PubTime => video.pubtime, +pub trait ToNFO<'a, T> { + fn to_nfo(&'a self, nfo_time_type: NFOTimeType) -> T; +} + +impl<'a> ToNFO<'a, Movie<'a>> for &'a video::Model { + fn to_nfo(&'a self, nfo_time_type: NFOTimeType) -> Movie<'a> { + Movie { + name: &self.name, + intro: &self.intro, + bvid: &self.bvid, + upper_id: self.upper_id, + upper_name: &self.upper_name, + aired: match nfo_time_type { + NFOTimeType::FavTime => self.favtime, + NFOTimeType::PubTime => self.pubtime, }, - tags: video.tags.as_ref().map(|tags| tags.clone().into()), + tags: self.tags.as_ref().map(|tags| tags.clone().into()), } } } -impl<'a> From<&'a video::Model> for TVShow<'a> { - fn from(video: &'a video::Model) -> Self { - Self { - name: &video.name, - intro: &video.intro, - bvid: &video.bvid, - upper_id: video.upper_id, - upper_name: &video.upper_name, - aired: match VersionedConfig::get().load().nfo_time_type { - NFOTimeType::FavTime => video.favtime, - NFOTimeType::PubTime => video.pubtime, +impl<'a> ToNFO<'a, TVShow<'a>> for &'a video::Model { + fn to_nfo(&'a self, nfo_time_type: NFOTimeType) -> TVShow<'a> { + TVShow { + name: &self.name, + intro: &self.intro, + bvid: &self.bvid, + upper_id: self.upper_id, + upper_name: &self.upper_name, + aired: match nfo_time_type { + NFOTimeType::FavTime => self.favtime, + NFOTimeType::PubTime => self.pubtime, }, - tags: video.tags.as_ref().map(|tags| tags.clone().into()), + tags: self.tags.as_ref().map(|tags| tags.clone().into()), } } } -impl<'a> From<&'a video::Model> for Upper { - fn from(video: &'a video::Model) -> Self { - Self { - upper_id: video.upper_id.to_string(), - pubtime: video.pubtime, +impl<'a> ToNFO<'a, Upper> for &'a video::Model { + fn to_nfo(&'a self, _nfo_time_type: NFOTimeType) -> Upper { + Upper { + upper_id: self.upper_id.to_string(), + pubtime: self.pubtime, } } } -impl<'a> From<&'a page::Model> for Episode<'a> { - fn from(page: &'a page::Model) -> Self { - Self { - name: &page.name, - pid: page.pid.to_string(), +impl<'a> ToNFO<'a, Episode<'a>> for &'a page::Model { + fn to_nfo(&'a self, _nfo_time_type: NFOTimeType) -> Episode<'a> { + Episode { + name: &self.name, + pid: self.pid.to_string(), } } } diff --git a/crates/bili_sync/src/utils/task_notifier.rs b/crates/bili_sync/src/utils/task_notifier.rs index 5396bc2..2248ad9 100644 --- a/crates/bili_sync/src/utils/task_notifier.rs +++ b/crates/bili_sync/src/utils/task_notifier.rs @@ -1,9 +1,7 @@ use std::sync::LazyLock; use serde::Serialize; -use tokio::sync::MutexGuard; - -use crate::config::VersionedConfig; +use tokio::sync::{MutexGuard, watch}; pub static TASK_STATUS_NOTIFIER: LazyLock = LazyLock::new(TaskStatusNotifier::new); @@ -17,13 +15,13 @@ pub struct TaskStatus { pub struct TaskStatusNotifier { mutex: tokio::sync::Mutex<()>, - tx: tokio::sync::watch::Sender, - rx: tokio::sync::watch::Receiver, + tx: watch::Sender, + rx: watch::Receiver, } impl TaskStatusNotifier { pub fn new() -> Self { - let (tx, rx) = tokio::sync::watch::channel(TaskStatus::default()); + let (tx, rx) = watch::channel(TaskStatus::default()); Self { mutex: tokio::sync::Mutex::const_new(()), tx, @@ -42,26 +40,19 @@ impl TaskStatusNotifier { lock } - pub fn finish_running(&self, _lock: MutexGuard<()>) { + pub fn finish_running(&self, _lock: MutexGuard<()>, interval: i64) { let last_status = self.tx.borrow(); let last_run = last_status.last_run; drop(last_status); - let config = VersionedConfig::get().load(); let now = chrono::Local::now(); - let _ = self.tx.send(TaskStatus { is_running: false, last_run, last_finish: Some(now), - next_run: now.checked_add_signed(chrono::Duration::seconds(config.interval as i64)), + next_run: now.checked_add_signed(chrono::Duration::seconds(interval)), }); } - /// 精确探测任务执行状态,保证如果读取到“未运行”,那么在锁释放之前任务不会被执行 - pub fn detect_running(&self) -> Option> { - self.mutex.try_lock().ok() - } - pub fn subscribe(&self) -> tokio::sync::watch::Receiver { self.rx.clone() } diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 6369aea..542176b 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -14,15 +14,16 @@ use tokio::sync::Semaphore; use crate::adapter::{VideoSource, VideoSourceEnum}; use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Video, VideoInfo}; -use crate::config::{ARGS, PathSafeTemplate, SkipOption, TEMPLATE, VersionedConfig}; +use crate::config::{ARGS, Config, PathSafeTemplate}; use crate::downloader::Downloader; use crate::error::{DownloadAbortError, ExecutionStatus, ProcessPageError}; +use crate::utils::download_context::DownloadContext; use crate::utils::format_arg::{page_format_args, video_format_args}; use crate::utils::model::{ create_pages, create_videos, filter_unfilled_videos, filter_unhandled_video_pages, update_pages_model, update_videos_model, }; -use crate::utils::nfo::NFO; +use crate::utils::nfo::{NFO, ToNFO}; use crate::utils::rule::FieldEvaluatable; use crate::utils::status::{PageStatus, STATUS_OK, VideoStatus}; @@ -31,20 +32,24 @@ pub async fn process_video_source( video_source: VideoSourceEnum, bili_client: &BiliClient, connection: &DatabaseConnection, + template: &handlebars::Handlebars<'_>, + config: &Config, ) -> Result<()> { // 预创建视频源目录,提前检测目录是否可写 video_source.create_dir_all().await?; // 从参数中获取视频列表的 Model 与视频流 - let (video_source, video_streams) = video_source.refresh(bili_client, connection).await?; + let (video_source, video_streams) = video_source + .refresh(bili_client, &config.credential, connection) + .await?; // 从视频流中获取新视频的简要信息,写入数据库 refresh_video_source(&video_source, video_streams, connection).await?; // 单独请求视频详情接口,获取视频的详情信息与所有的分页,写入数据库 - fetch_video_details(bili_client, &video_source, connection).await?; + fetch_video_details(bili_client, &video_source, connection, config).await?; if ARGS.scan_only { warn!("已开启仅扫描模式,跳过视频下载.."); } else { // 从数据库中查找所有未下载的视频与分页,下载并处理 - download_unprocessed_videos(bili_client, &video_source, connection).await?; + download_unprocessed_videos(bili_client, &video_source, connection, template, config).await?; } Ok(()) } @@ -103,16 +108,17 @@ pub async fn fetch_video_details( bili_client: &BiliClient, video_source: &VideoSourceEnum, connection: &DatabaseConnection, + config: &Config, ) -> Result<()> { video_source.log_fetch_video_start(); let videos_model = filter_unfilled_videos(video_source.filter_expr(), connection).await?; - let semaphore = Semaphore::new(VersionedConfig::get().load().concurrent_limit.video); + let semaphore = Semaphore::new(config.concurrent_limit.video); let semaphore_ref = &semaphore; let tasks = videos_model .into_iter() .map(|video_model| async move { let _permit = semaphore_ref.acquire().await.context("acquire semaphore failed")?; - let video = Video::new(bili_client, video_model.bvid.clone()); + let video = Video::new(bili_client, video_model.bvid.clone(), &config.credential); let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; match info { Err(e) => { @@ -161,12 +167,13 @@ pub async fn download_unprocessed_videos( bili_client: &BiliClient, video_source: &VideoSourceEnum, connection: &DatabaseConnection, + template: &handlebars::Handlebars<'_>, + config: &Config, ) -> Result<()> { video_source.log_download_video_start(); - let semaphore = Semaphore::new(VersionedConfig::get().load().concurrent_limit.video); + let semaphore = Semaphore::new(config.concurrent_limit.video); let downloader = Downloader::new(bili_client.client.clone()); - // 提前获取 skip_option,保证单个视频源在整个下载过程中配置不变 - let skip_option = &VersionedConfig::get().load().skip_option; + let cx = DownloadContext::new(bili_client, video_source, template, connection, &downloader, config); let unhandled_videos_pages = filter_unhandled_video_pages(video_source.filter_expr(), connection).await?; let mut assigned_upper = HashSet::new(); let tasks = unhandled_videos_pages @@ -174,17 +181,7 @@ pub async fn download_unprocessed_videos( .map(|(video_model, pages_model)| { let should_download_upper = !assigned_upper.contains(&video_model.upper_id); assigned_upper.insert(video_model.upper_id); - download_video_pages( - bili_client, - video_source, - video_model, - pages_model, - connection, - &semaphore, - &downloader, - should_download_upper, - skip_option, - ) + download_video_pages(video_model, pages_model, &semaphore, should_download_upper, cx) }) .collect::>(); let mut download_aborted = false; @@ -213,29 +210,28 @@ pub async fn download_unprocessed_videos( Ok(()) } -#[allow(clippy::too_many_arguments)] pub async fn download_video_pages( - bili_client: &BiliClient, - video_source: &VideoSourceEnum, video_model: video::Model, - pages: Vec, - connection: &DatabaseConnection, + page_models: Vec, semaphore: &Semaphore, - downloader: &Downloader, should_download_upper: bool, - skip_option: &SkipOption, + cx: DownloadContext<'_>, ) -> Result { let _permit = semaphore.acquire().await.context("acquire semaphore failed")?; let mut status = VideoStatus::from(video_model.download_status); let separate_status = status.should_run(); - let base_path = video_source.path().join( - TEMPLATE - .load() - .path_safe_render("video", &video_format_args(&video_model))?, - ); + // 未记录路径时填充,已经填充过路径时使用现有的 + let base_path = if !video_model.path.is_empty() { + PathBuf::from(&video_model.path) + } else { + cx.video_source.path().join( + cx.template + .path_safe_render("video", &video_format_args(&video_model, &cx.config.time_format))?, + ) + }; let upper_id = video_model.upper_id.to_string(); - let base_upper_path = VersionedConfig::get() - .load() + let base_upper_path = cx + .config .upper_path .join(upper_id.chars().next().context("upper_id is empty")?.to_string()) .join(upper_id); @@ -245,47 +241,37 @@ pub async fn download_video_pages( let (res_1, res_2, res_3, res_4, res_5) = tokio::join!( // 下载视频封面 fetch_video_poster( - separate_status[0] && !is_single_page && !skip_option.no_poster, + separate_status[0] && !is_single_page && !cx.config.skip_option.no_poster, &video_model, - downloader, base_path.join("poster.jpg"), base_path.join("fanart.jpg"), + cx ), // 生成视频信息的 nfo generate_video_nfo( - separate_status[1] && !is_single_page && !skip_option.no_video_nfo, + separate_status[1] && !is_single_page && !cx.config.skip_option.no_video_nfo, &video_model, base_path.join("tvshow.nfo"), + cx ), // 下载 Up 主头像 fetch_upper_face( - separate_status[2] && should_download_upper && !skip_option.no_upper, + separate_status[2] && should_download_upper && !cx.config.skip_option.no_upper, &video_model, - downloader, base_upper_path.join("folder.jpg"), + cx ), // 生成 Up 主信息的 nfo generate_upper_nfo( - separate_status[3] && should_download_upper && !skip_option.no_upper, + separate_status[3] && should_download_upper && !cx.config.skip_option.no_upper, &video_model, base_upper_path.join("person.nfo"), + cx, ), // 分发并执行分页下载的任务 - dispatch_download_page( - separate_status[4], - bili_client, - &video_model, - pages, - connection, - downloader, - &base_path, - skip_option, - ) + dispatch_download_page(separate_status[4], &video_model, page_models, &base_path, cx) ); - let results = [res_1, res_2, res_3, res_4, res_5] - .into_iter() - .map(Into::into) - .collect::>(); + let results = [res_1.into(), res_2.into(), res_3.into(), res_4.into(), res_5.into()]; status.update_status(&results); results .iter() @@ -316,34 +302,20 @@ pub async fn download_video_pages( } /// 分发并执行分页下载任务,当且仅当所有分页成功下载或达到最大重试次数时返回 Ok,否则根据失败原因返回对应的错误 -#[allow(clippy::too_many_arguments)] pub async fn dispatch_download_page( should_run: bool, - bili_client: &BiliClient, video_model: &video::Model, - pages: Vec, - connection: &DatabaseConnection, - downloader: &Downloader, + page_models: Vec, base_path: &Path, - skip_option: &SkipOption, + cx: DownloadContext<'_>, ) -> Result { if !should_run { return Ok(ExecutionStatus::Skipped); } - let child_semaphore = Semaphore::new(VersionedConfig::get().load().concurrent_limit.page); - let tasks = pages + let child_semaphore = Semaphore::new(cx.config.concurrent_limit.page); + let tasks = page_models .into_iter() - .map(|page_model| { - download_page( - bili_client, - video_model, - page_model, - &child_semaphore, - downloader, - base_path, - skip_option, - ) - }) + .map(|page_model| download_page(video_model, page_model, &child_semaphore, base_path, cx)) .collect::>(); let (mut download_aborted, mut target_status) = (false, STATUS_OK); let mut stream = tasks @@ -372,7 +344,7 @@ pub async fn dispatch_download_page( .filter_map(|res| futures::future::ready(res.ok())) .chunks(10); while let Some(models) = stream.next().await { - update_pages_model(models, connection).await?; + update_pages_model(models, cx.connection).await?; } if download_aborted { error!("下载视频「{}」的分页时触发风控,将异常向上传递..", &video_model.name); @@ -386,21 +358,54 @@ pub async fn dispatch_download_page( /// 下载某个分页,未发生风控且正常运行时返回 Ok(Page::ActiveModel),其中 status 字段存储了新的下载状态,发生风控时返回 DownloadAbortError pub async fn download_page( - bili_client: &BiliClient, video_model: &video::Model, page_model: page::Model, semaphore: &Semaphore, - downloader: &Downloader, base_path: &Path, - skip_option: &SkipOption, + cx: DownloadContext<'_>, ) -> Result { let _permit = semaphore.acquire().await.context("acquire semaphore failed")?; let mut status = PageStatus::from(page_model.download_status); let separate_status = status.should_run(); let is_single_page = video_model.single_page.context("single_page is null")?; - let base_name = TEMPLATE - .load() - .path_safe_render("page", &page_format_args(video_model, &page_model))?; + // 未记录路径时填充,已经填充过路径时使用现有的 + let (base_path, base_name) = if let Some(old_video_path) = &page_model.path + && !old_video_path.is_empty() + { + let old_video_path = Path::new(old_video_path); + let old_video_filename = old_video_path + .file_name() + .context("invalid page path format")? + .to_string_lossy(); + if is_single_page { + // 单页下的路径是 {base_path}/{base_name}.mp4 + ( + old_video_path.parent().context("invalid page path format")?, + old_video_filename.trim_end_matches(".mp4").to_string(), + ) + } else { + // 多页下的路径是 {base_path}/Season 1/{base_name} - S01Exx.mp4 + ( + old_video_path + .parent() + .and_then(|p| p.parent()) + .context("invalid page path format")?, + old_video_filename + .rsplit_once(" - ") + .context("invalid page path format")? + .0 + .to_string(), + ) + } + } else { + ( + base_path, + cx.template.path_safe_render( + "page", + &page_format_args(video_model, &page_model, &cx.config.time_format), + )?, + ) + }; let (poster_path, video_path, nfo_path, danmaku_path, fanart_path, subtitle_path) = if is_single_page { ( base_path.join(format!("{}-poster.jpg", &base_name)), @@ -448,50 +453,41 @@ pub async fn download_page( let (res_1, res_2, res_3, res_4, res_5) = tokio::join!( // 下载分页封面 fetch_page_poster( - separate_status[0] && !skip_option.no_poster, + separate_status[0] && !cx.config.skip_option.no_poster, video_model, &page_model, - downloader, poster_path, - fanart_path + fanart_path, + cx ), // 下载分页视频 - fetch_page_video( - separate_status[1], - bili_client, - video_model, - downloader, - &page_info, - &video_path - ), + fetch_page_video(separate_status[1], video_model, &page_info, &video_path, cx), // 生成分页视频信息的 nfo generate_page_nfo( - separate_status[2] && !skip_option.no_video_nfo, + separate_status[2] && !cx.config.skip_option.no_video_nfo, video_model, &page_model, - nfo_path + nfo_path, + cx, ), // 下载分页弹幕 fetch_page_danmaku( - separate_status[3] && !skip_option.no_danmaku, - bili_client, + separate_status[3] && !cx.config.skip_option.no_danmaku, video_model, &page_info, - danmaku_path + danmaku_path, + cx, ), // 下载分页字幕 fetch_page_subtitle( - separate_status[4] && !skip_option.no_subtitle, - bili_client, + separate_status[4] && !cx.config.skip_option.no_subtitle, video_model, &page_info, - &subtitle_path + &subtitle_path, + cx ) ); - let results = [res_1, res_2, res_3, res_4, res_5] - .into_iter() - .map(Into::into) - .collect::>(); + let results = [res_1.into(), res_2.into(), res_3.into(), res_4.into(), res_5.into()]; status.update_status(&results); results .iter() @@ -532,9 +528,9 @@ pub async fn fetch_page_poster( should_run: bool, video_model: &video::Model, page_model: &page::Model, - downloader: &Downloader, poster_path: PathBuf, fanart_path: Option, + cx: DownloadContext<'_>, ) -> Result { if !should_run { return Ok(ExecutionStatus::Skipped); @@ -550,7 +546,9 @@ pub async fn fetch_page_poster( None => video_model.cover.as_str(), } }; - downloader.fetch(url, &poster_path).await?; + cx.downloader + .fetch(url, &poster_path, &cx.config.concurrent_limit.download) + .await?; if let Some(fanart_path) = fanart_path { fs::copy(&poster_path, &fanart_path).await?; } @@ -559,32 +557,52 @@ pub async fn fetch_page_poster( pub async fn fetch_page_video( should_run: bool, - bili_client: &BiliClient, video_model: &video::Model, - downloader: &Downloader, page_info: &PageInfo, page_path: &Path, + cx: DownloadContext<'_>, ) -> Result { if !should_run { return Ok(ExecutionStatus::Skipped); } - let bili_video = Video::new(bili_client, video_model.bvid.clone()); + let bili_video = Video::new(cx.bili_client, video_model.bvid.clone(), &cx.config.credential); let streams = bili_video .get_page_analyzer(page_info) .await? - .best_stream(&VersionedConfig::get().load().filter_option)?; + .best_stream(&cx.config.filter_option)?; match streams { - BestStream::Mixed(mix_stream) => downloader.multi_fetch(&mix_stream.urls(), page_path).await?, + BestStream::Mixed(mix_stream) => { + cx.downloader + .multi_fetch( + &mix_stream.urls(cx.config.cdn_sorting), + page_path, + &cx.config.concurrent_limit.download, + ) + .await? + } BestStream::VideoAudio { video: video_stream, audio: None, - } => downloader.multi_fetch(&video_stream.urls(), page_path).await?, + } => { + cx.downloader + .multi_fetch( + &video_stream.urls(cx.config.cdn_sorting), + page_path, + &cx.config.concurrent_limit.download, + ) + .await? + } BestStream::VideoAudio { video: video_stream, audio: Some(audio_stream), } => { - downloader - .multi_fetch_and_merge(&video_stream.urls(), &audio_stream.urls(), page_path) + cx.downloader + .multi_fetch_and_merge( + &video_stream.urls(cx.config.cdn_sorting), + &audio_stream.urls(cx.config.cdn_sorting), + page_path, + &cx.config.concurrent_limit.download, + ) .await? } } @@ -593,34 +611,34 @@ pub async fn fetch_page_video( pub async fn fetch_page_danmaku( should_run: bool, - bili_client: &BiliClient, video_model: &video::Model, page_info: &PageInfo, danmaku_path: PathBuf, + cx: DownloadContext<'_>, ) -> Result { if !should_run { return Ok(ExecutionStatus::Skipped); } - let bili_video = Video::new(bili_client, video_model.bvid.clone()); + let bili_video = Video::new(cx.bili_client, video_model.bvid.clone(), &cx.config.credential); bili_video .get_danmaku_writer(page_info) .await? - .write(danmaku_path) + .write(danmaku_path, &cx.config.danmaku_option) .await?; Ok(ExecutionStatus::Succeeded) } pub async fn fetch_page_subtitle( should_run: bool, - bili_client: &BiliClient, video_model: &video::Model, page_info: &PageInfo, subtitle_path: &Path, + cx: DownloadContext<'_>, ) -> Result { if !should_run { return Ok(ExecutionStatus::Skipped); } - let bili_video = Video::new(bili_client, video_model.bvid.clone()); + let bili_video = Video::new(cx.bili_client, video_model.bvid.clone(), &cx.config.credential); let subtitles = bili_video.get_subtitles(page_info).await?; let tasks = subtitles .into_iter() @@ -638,15 +656,16 @@ pub async fn generate_page_nfo( video_model: &video::Model, page_model: &page::Model, nfo_path: PathBuf, + cx: DownloadContext<'_>, ) -> Result { if !should_run { return Ok(ExecutionStatus::Skipped); } let single_page = video_model.single_page.context("single_page is null")?; let nfo = if single_page { - NFO::Movie(video_model.into()) + NFO::Movie(video_model.to_nfo(cx.config.nfo_time_type)) } else { - NFO::Episode(page_model.into()) + NFO::Episode(page_model.to_nfo(cx.config.nfo_time_type)) }; generate_nfo(nfo, nfo_path).await?; Ok(ExecutionStatus::Succeeded) @@ -655,14 +674,16 @@ pub async fn generate_page_nfo( pub async fn fetch_video_poster( should_run: bool, video_model: &video::Model, - downloader: &Downloader, poster_path: PathBuf, fanart_path: PathBuf, + cx: DownloadContext<'_>, ) -> Result { if !should_run { return Ok(ExecutionStatus::Skipped); } - downloader.fetch(&video_model.cover, &poster_path).await?; + cx.downloader + .fetch(&video_model.cover, &poster_path, &cx.config.concurrent_limit.download) + .await?; fs::copy(&poster_path, &fanart_path).await?; Ok(ExecutionStatus::Succeeded) } @@ -670,13 +691,19 @@ pub async fn fetch_video_poster( pub async fn fetch_upper_face( should_run: bool, video_model: &video::Model, - downloader: &Downloader, upper_face_path: PathBuf, + cx: DownloadContext<'_>, ) -> Result { if !should_run { return Ok(ExecutionStatus::Skipped); } - downloader.fetch(&video_model.upper_face, &upper_face_path).await?; + cx.downloader + .fetch( + &video_model.upper_face, + &upper_face_path, + &cx.config.concurrent_limit.download, + ) + .await?; Ok(ExecutionStatus::Succeeded) } @@ -684,11 +711,12 @@ pub async fn generate_upper_nfo( should_run: bool, video_model: &video::Model, nfo_path: PathBuf, + cx: DownloadContext<'_>, ) -> Result { if !should_run { return Ok(ExecutionStatus::Skipped); } - generate_nfo(NFO::Upper(video_model.into()), nfo_path).await?; + generate_nfo(NFO::Upper(video_model.to_nfo(cx.config.nfo_time_type)), nfo_path).await?; Ok(ExecutionStatus::Succeeded) } @@ -696,11 +724,12 @@ pub async fn generate_video_nfo( should_run: bool, video_model: &video::Model, nfo_path: PathBuf, + cx: DownloadContext<'_>, ) -> Result { if !should_run { return Ok(ExecutionStatus::Skipped); } - generate_nfo(NFO::TVShow(video_model.into()), nfo_path).await?; + generate_nfo(NFO::TVShow(video_model.to_nfo(cx.config.nfo_time_type)), nfo_path).await?; Ok(ExecutionStatus::Succeeded) }