feat: 迁移所有配置到数据库,并支持运行时重载 (#364)

This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2025-06-17 02:15:11 +08:00
committed by GitHub
parent a46c2572b1
commit 4539e9379d
46 changed files with 1055 additions and 715 deletions

11
Cargo.lock generated
View File

@@ -518,6 +518,7 @@ dependencies = [
"toml",
"tower",
"tracing",
"tracing-panic",
"tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
@@ -3942,6 +3943,16 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-panic"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bf1298a179837099f9309243af3b554e840f7f67f65e9f55294913299bd4cc5"
dependencies = [
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.19"

View File

@@ -69,6 +69,7 @@ tokio-util = { version = "0.7.15", features = ["io", "rt"] }
toml = "0.8.22"
tower = "0.5.2"
tracing = "0.1.41"
tracing-panic = "0.1.2"
tracing-subscriber = { version = "0.3.19", features = ["chrono"] }
utoipa = { git = "https://github.com/amtoaer/utoipa.git", tag = "v1.0.1", features = [
"axum_extras",

View File

@@ -48,6 +48,7 @@ tokio-util = { workspace = true }
toml = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
tracing-panic = { workspace = true }
tracing-subscriber = { workspace = true }
utoipa = { workspace = true }
utoipa-swagger-ui = { workspace = true }

View File

@@ -90,50 +90,55 @@ impl VideoSource for collection::Model {
fn log_download_video_end(&self) {
info!("下载{}「{}」视频完成", CollectionType::from(self.r#type), self.name);
}
}
pub(super) async fn collection_from<'a>(
collection_item: &'a CollectionItem,
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
)> {
let collection = Collection::new(bili_client, collection_item);
let collection_info = collection.get_info().await?;
collection::Entity::insert(collection::ActiveModel {
s_id: Set(collection_info.sid),
m_id: Set(collection_info.mid),
r#type: Set(collection_info.collection_type.into()),
name: Set(collection_info.name.clone()),
path: Set(path.to_string_lossy().to_string()),
..Default::default()
})
.on_conflict(
OnConflict::columns([
collection::Column::SId,
collection::Column::MId,
collection::Column::Type,
])
.update_columns([collection::Column::Name, collection::Column::Path])
.to_owned(),
)
.exec(connection)
.await?;
Ok((
collection::Entity::find()
.filter(
collection::Column::SId
.eq(collection_item.sid.clone())
.and(collection::Column::MId.eq(collection_item.mid.clone()))
.and(collection::Column::Type.eq(Into::<i32>::into(collection_item.collection_type.clone()))),
)
.one(connection)
.await?
.context("collection not found")?
.into(),
Box::pin(collection.into_video_stream()),
))
async fn refresh<'a>(
self,
bili_client: &'a BiliClient,
connection: &'a DatabaseConnection,
) -> Result<(
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + Send + 'a>>,
)> {
let collection = Collection::new(
bili_client,
CollectionItem {
sid: self.s_id.to_string(),
mid: self.m_id.to_string(),
collection_type: CollectionType::from(self.r#type),
},
);
let collection_info = collection.get_info().await?;
collection::Entity::insert(collection::ActiveModel {
s_id: Set(collection_info.sid),
m_id: Set(collection_info.mid),
r#type: Set(collection_info.collection_type.into()),
name: Set(collection_info.name.clone()),
..Default::default()
})
.on_conflict(
OnConflict::columns([
collection::Column::SId,
collection::Column::MId,
collection::Column::Type,
])
.update_column(collection::Column::Name)
.to_owned(),
)
.exec(connection)
.await?;
Ok((
collection::Entity::find()
.filter(
collection::Column::SId
.eq(self.s_id)
.and(collection::Column::MId.eq(self.m_id))
.and(collection::Column::Type.eq(self.r#type)),
)
.one(connection)
.await?
.context("collection not found")?
.into(),
Box::pin(collection.into_video_stream()),
))
}
}

View File

@@ -60,39 +60,37 @@ impl VideoSource for favorite::Model {
fn log_download_video_end(&self) {
info!("下载收藏夹「{}」视频完成", self.name);
}
}
pub(super) async fn favorite_from<'a>(
fid: &str,
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
)> {
let favorite = FavoriteList::new(bili_client, fid.to_owned());
let favorite_info = favorite.get_info().await?;
favorite::Entity::insert(favorite::ActiveModel {
f_id: Set(favorite_info.id),
name: Set(favorite_info.title.clone()),
path: Set(path.to_string_lossy().to_string()),
..Default::default()
})
.on_conflict(
OnConflict::column(favorite::Column::FId)
.update_columns([favorite::Column::Name, favorite::Column::Path])
.to_owned(),
)
.exec(connection)
.await?;
Ok((
favorite::Entity::find()
.filter(favorite::Column::FId.eq(favorite_info.id))
.one(connection)
.await?
.context("favorite not found")?
.into(),
Box::pin(favorite.into_video_stream()),
))
async fn refresh<'a>(
self,
bili_client: &'a BiliClient,
connection: &'a DatabaseConnection,
) -> Result<(
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + Send + 'a>>,
)> {
let favorite = FavoriteList::new(bili_client, self.f_id.to_string());
let favorite_info = favorite.get_info().await?;
favorite::Entity::insert(favorite::ActiveModel {
f_id: Set(favorite_info.id),
name: Set(favorite_info.title.clone()),
..Default::default()
})
.on_conflict(
OnConflict::column(favorite::Column::FId)
.update_column(favorite::Column::Name)
.to_owned(),
)
.exec(connection)
.await?;
Ok((
favorite::Entity::find()
.filter(favorite::Column::FId.eq(favorite_info.id))
.one(connection)
.await?
.context("favorite not found")?
.into(),
Box::pin(favorite.into_video_stream()),
))
}
}

View File

@@ -20,11 +20,7 @@ use bili_sync_entity::favorite::Model as Favorite;
use bili_sync_entity::submission::Model as Submission;
use bili_sync_entity::watch_later::Model as WatchLater;
use crate::adapter::collection::collection_from;
use crate::adapter::favorite::favorite_from;
use crate::adapter::submission::submission_from;
use crate::adapter::watch_later::watch_later_from;
use crate::bilibili::{BiliClient, CollectionItem, VideoInfo};
use crate::bilibili::{BiliClient, VideoInfo};
#[enum_dispatch]
pub enum VideoSourceEnum {
@@ -84,31 +80,15 @@ pub trait VideoSource {
/// 结束下载视频
fn log_download_video_end(&self);
}
#[derive(Clone, Copy, Debug)]
pub enum Args<'a> {
Favorite { fid: &'a str },
Collection { collection_item: &'a CollectionItem },
WatchLater,
Submission { upper_id: &'a str },
}
pub async fn video_source_from<'a>(
args: Args<'a>,
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
)> {
match args {
Args::Favorite { fid } => favorite_from(fid, path, bili_client, connection).await,
Args::Collection { collection_item } => collection_from(collection_item, path, bili_client, connection).await,
Args::WatchLater => watch_later_from(path, bili_client, connection).await,
Args::Submission { upper_id } => submission_from(upper_id, path, bili_client, connection).await,
}
async fn refresh<'a>(
self,
bili_client: &'a BiliClient,
connection: &'a DatabaseConnection,
) -> Result<(
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + Send + 'a>>,
)>;
}
pub enum _ActiveModel {

View File

@@ -60,39 +60,37 @@ impl VideoSource for submission::Model {
fn log_download_video_end(&self) {
info!("下载「{}」投稿视频完成", self.upper_name);
}
}
pub(super) async fn submission_from<'a>(
upper_id: &str,
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
)> {
let submission = Submission::new(bili_client, upper_id.to_owned());
let upper = submission.get_info().await?;
submission::Entity::insert(submission::ActiveModel {
upper_id: Set(upper.mid.parse()?),
upper_name: Set(upper.name),
path: Set(path.to_string_lossy().to_string()),
..Default::default()
})
.on_conflict(
OnConflict::column(submission::Column::UpperId)
.update_columns([submission::Column::UpperName, submission::Column::Path])
.to_owned(),
)
.exec(connection)
.await?;
Ok((
submission::Entity::find()
.filter(submission::Column::UpperId.eq(upper.mid))
.one(connection)
.await?
.context("submission not found")?
.into(),
Box::pin(submission.into_video_stream()),
))
async fn refresh<'a>(
self,
bili_client: &'a BiliClient,
connection: &'a DatabaseConnection,
) -> Result<(
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + Send + 'a>>,
)> {
let submission = Submission::new(bili_client, self.upper_id.to_string());
let upper = submission.get_info().await?;
submission::Entity::insert(submission::ActiveModel {
upper_id: Set(upper.mid.parse()?),
upper_name: Set(upper.name),
..Default::default()
})
.on_conflict(
OnConflict::column(submission::Column::UpperId)
.update_column(submission::Column::UpperName)
.to_owned(),
)
.exec(connection)
.await?;
Ok((
submission::Entity::find()
.filter(submission::Column::UpperId.eq(upper.mid))
.one(connection)
.await?
.context("submission not found")?
.into(),
Box::pin(submission.into_video_stream()),
))
}
}

View File

@@ -1,12 +1,12 @@
use std::path::Path;
use std::pin::Pin;
use anyhow::{Context, Result};
use anyhow::Result;
use bili_sync_entity::*;
use futures::Stream;
use sea_orm::ActiveValue::Set;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::{OnConflict, SimpleExpr};
use sea_orm::sea_query::SimpleExpr;
use sea_orm::{DatabaseConnection, Unchanged};
use crate::adapter::{_ActiveModel, VideoSource, VideoSourceEnum};
@@ -60,36 +60,16 @@ impl VideoSource for watch_later::Model {
fn log_download_video_end(&self) {
info!("下载稍后再看视频完成");
}
}
pub(super) async fn watch_later_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a + Send>>,
)> {
let watch_later = WatchLater::new(bili_client);
watch_later::Entity::insert(watch_later::ActiveModel {
id: Set(1),
path: Set(path.to_string_lossy().to_string()),
..Default::default()
})
.on_conflict(
OnConflict::column(watch_later::Column::Id)
.update_column(watch_later::Column::Path)
.to_owned(),
)
.exec(connection)
.await?;
Ok((
watch_later::Entity::find()
.filter(watch_later::Column::Id.eq(1))
.one(connection)
.await?
.context("watch_later not found")?
.into(),
Box::pin(watch_later.into_video_stream()),
))
async fn refresh<'a>(
self,
bili_client: &'a BiliClient,
_connection: &'a DatabaseConnection,
) -> Result<(
VideoSourceEnum,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + Send + 'a>>,
)> {
let watch_later = WatchLater::new(bili_client);
Ok((self.into(), Box::pin(watch_later.into_video_stream())))
}
}

View File

@@ -7,10 +7,12 @@ use utoipa::Modify;
use utoipa::openapi::security::{ApiKey, ApiKeyValue, SecurityScheme};
use crate::api::wrapper::ApiResponse;
use crate::config::CONFIG;
use crate::config::VersionedConfig;
pub async fn auth(headers: HeaderMap, request: Request, next: Next) -> Result<Response, StatusCode> {
if request.uri().path().starts_with("/api/") && get_token(&headers) != CONFIG.auth_token {
if request.uri().path().starts_with("/api/")
&& get_token(&headers).is_none_or(|token| token != VersionedConfig::get().load().auth_token)
{
return Ok(ApiResponse::unauthorized(()).into_response());
}
Ok(next.run(request).await)

View File

@@ -573,13 +573,14 @@ pub async fn upsert_collection(
Extension(bili_client): Extension<Arc<BiliClient>>,
ValidatedJson(request): ValidatedJson<UpsertCollectionRequest>,
) -> Result<ApiResponse<bool>, ApiError> {
let collection_item = CollectionItem {
sid: request.id.to_string(),
mid: request.mid.to_string(),
collection_type: request.collection_type,
};
let collection = Collection::new(bili_client.as_ref(), &collection_item);
let collection = Collection::new(
bili_client.as_ref(),
CollectionItem {
sid: request.id.to_string(),
mid: request.mid.to_string(),
collection_type: request.collection_type,
},
);
let collection_info = collection.get_info().await?;
collection::Entity::insert(collection::ActiveModel {

View File

@@ -2,7 +2,7 @@ use anyhow::{Context, Result, bail};
use serde::{Deserialize, Serialize};
use crate::bilibili::error::BiliError;
use crate::config::CONFIG;
use crate::config::VersionedConfig;
pub struct PageAnalyzer {
info: serde_json::Value,
@@ -136,7 +136,7 @@ impl Stream {
let mut urls = std::iter::once(url.as_str())
.chain(backup_url.iter().map(|s| s.as_str()))
.collect::<Vec<_>>();
if CONFIG.cdn_sorting {
if VersionedConfig::get().load().cdn_sorting {
urls.sort_by_key(|u| {
if u.contains("upos-") {
0 // 服务商 cdn
@@ -351,7 +351,7 @@ impl PageAnalyzer {
mod tests {
use super::*;
use crate::bilibili::{BiliClient, Video};
use crate::config::CONFIG;
use crate::config::VersionedConfig;
#[test]
fn test_quality_order() {
@@ -433,7 +433,7 @@ mod tests {
.get_page_analyzer(&first_page)
.await
.expect("failed to get page analyzer")
.best_stream(&CONFIG.filter_option)
.best_stream(&VersionedConfig::get().load().filter_option)
.expect("failed to get best stream");
dbg!(bvid, &best_stream);
match best_stream {

View File

@@ -1,13 +1,14 @@
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use anyhow::Result;
use leaky_bucket::RateLimiter;
use reqwest::{Method, header};
use sea_orm::DatabaseConnection;
use crate::bilibili::Credential;
use crate::bilibili::credential::WbiImg;
use crate::config::{CONFIG, RateLimit};
use crate::config::{RateLimit, VersionedCache, VersionedConfig};
// 一个对 reqwest::Client 的简单封装,用于 Bilibili 请求
#[derive(Clone)]
@@ -63,53 +64,54 @@ impl Default for Client {
pub struct BiliClient {
pub client: Client,
limiter: Option<RateLimiter>,
limiter: VersionedCache<Option<RateLimiter>>,
}
impl BiliClient {
pub fn new() -> Self {
let client = Client::new();
let limiter = CONFIG
.concurrent_limit
.rate_limit
.as_ref()
.map(|RateLimit { limit, duration }| {
RateLimiter::builder()
.initial(*limit)
.refill(*limit)
.max(*limit)
.interval(Duration::from_millis(*duration))
.build()
});
let limiter = VersionedCache::new(|config| {
Ok(config
.concurrent_limit
.rate_limit
.as_ref()
.map(|RateLimit { limit, duration }| {
RateLimiter::builder()
.initial(*limit)
.refill(*limit)
.max(*limit)
.interval(Duration::from_millis(*duration))
.build()
}))
})
.expect("failed to create rate limiter");
Self { client, limiter }
}
/// 获取一个预构建的请求,通过该方法获取请求时会检查并等待速率限制
pub async fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
if let Some(limiter) = &self.limiter {
if let Some(limiter) = self.limiter.load().as_ref() {
limiter.acquire_one().await;
}
let credential = CONFIG.credential.load();
self.client.request(method, url, credential.as_deref())
let credential = VersionedConfig::get().load().credential.load();
self.client.request(method, url, Some(credential.as_ref()))
}
pub async fn check_refresh(&self) -> Result<()> {
let credential = CONFIG.credential.load();
let Some(credential) = credential.as_deref() else {
return Ok(());
};
pub async fn check_refresh(&self, connection: &DatabaseConnection) -> Result<()> {
let credential = VersionedConfig::get().load().credential.load();
if !credential.need_refresh(&self.client).await? {
return Ok(());
}
let new_credential = credential.refresh(&self.client).await?;
CONFIG.credential.store(Some(Arc::new(new_credential)));
CONFIG.save()
let config = VersionedConfig::get().load();
config.credential.store(Arc::new(new_credential));
config.save_to_database(connection).await?;
Ok(())
}
/// 获取 wbi img用于生成请求签名
pub async fn wbi_img(&self) -> Result<WbiImg> {
let credential = CONFIG.credential.load();
let credential = credential.as_deref().context("no credential found")?;
let credential = VersionedConfig::get().load().credential.load();
credential.wbi_img(&self.client).await
}
}

View File

@@ -10,7 +10,7 @@ use serde_json::Value;
use crate::bilibili::credential::encoded_query;
use crate::bilibili::{BiliClient, MIXIN_KEY, Validate, VideoInfo};
#[derive(PartialEq, Eq, Hash, Clone, Debug, Deserialize, Default)]
#[derive(PartialEq, Eq, Hash, Clone, Debug, Deserialize, Default, Copy)]
pub enum CollectionType {
Series,
#[default]
@@ -55,7 +55,7 @@ pub struct CollectionItem {
pub struct Collection<'a> {
client: &'a BiliClient,
collection: &'a CollectionItem,
collection: CollectionItem,
}
#[derive(Debug, PartialEq)]
@@ -94,7 +94,7 @@ impl<'de> Deserialize<'de> for CollectionInfo {
}
impl<'a> Collection<'a> {
pub fn new(client: &'a BiliClient, collection: &'a CollectionItem) -> Self {
pub fn new(client: &'a BiliClient, collection: CollectionItem) -> Self {
Self { client, collection }
}

View File

@@ -88,14 +88,14 @@ impl fmt::Display for CanvasStyles {
}
}
pub struct AssWriter<W: AsyncWrite> {
pub struct AssWriter<'a, W: AsyncWrite> {
f: Pin<Box<BufWriter<W>>>,
title: String,
canvas_config: CanvasConfig,
canvas_config: CanvasConfig<'a>,
}
impl<W: AsyncWrite> AssWriter<W> {
pub fn new(f: W, title: String, canvas_config: CanvasConfig) -> Self {
impl<'a, W: AsyncWrite> AssWriter<'a, W> {
pub fn new(f: W, title: String, canvas_config: CanvasConfig<'a>) -> Self {
AssWriter {
// 对于 HDD、docker 之类的场景,磁盘 IO 是非常大的瓶颈。使用大缓存
f: Box::pin(BufWriter::with_capacity(10 << 20, f)),
@@ -104,7 +104,7 @@ impl<W: AsyncWrite> AssWriter<W> {
}
}
pub async fn construct(f: W, title: String, canvas_config: CanvasConfig) -> Result<Self> {
pub async fn construct(f: W, title: String, canvas_config: CanvasConfig<'a>) -> Result<Self> {
let mut res = Self::new(f, title, canvas_config);
res.init().await?;
Ok(res)

View File

@@ -18,7 +18,7 @@ pub struct Lane {
}
impl Lane {
pub fn draw(danmu: &Danmu, config: &CanvasConfig) -> Self {
pub fn draw(danmu: &Danmu, config: &CanvasConfig<'_>) -> Self {
Lane {
last_shoot_time: danmu.timeline_s,
last_length: danmu.length(config),
@@ -26,7 +26,7 @@ impl Lane {
}
/// 这个槽位是否可以发射另外一条弹幕,返回可能的情形
pub fn available_for(&self, other: &Danmu, config: &CanvasConfig) -> Collision {
pub fn available_for(&self, other: &Danmu, config: &CanvasConfig<'_>) -> Collision {
#[allow(non_snake_case)]
let T = config.danmaku_option.duration;
#[allow(non_snake_case)]

View File

@@ -10,7 +10,7 @@ use crate::bilibili::danmaku::canvas::lane::Collision;
use crate::bilibili::danmaku::danmu::DanmuType;
use crate::bilibili::danmaku::{Danmu, DrawEffect, Drawable};
#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DanmakuOption {
pub duration: f64,
pub font: String,
@@ -54,13 +54,13 @@ impl Default for DanmakuOption {
}
#[derive(Clone)]
pub struct CanvasConfig {
pub struct CanvasConfig<'a> {
pub width: u64,
pub height: u64,
pub danmaku_option: &'static DanmakuOption,
pub danmaku_option: &'a DanmakuOption,
}
impl CanvasConfig {
pub fn new(danmaku_option: &'static DanmakuOption, page: &PageInfo) -> Self {
impl<'a> CanvasConfig<'a> {
pub fn new(danmaku_option: &'a DanmakuOption, page: &PageInfo) -> Self {
let (width, height) = Self::dimension(page);
Self {
width,
@@ -86,7 +86,7 @@ impl CanvasConfig {
((720.0 / height as f64 * width as f64) as u64, 720)
}
pub fn canvas(self) -> Canvas {
pub fn canvas(self) -> Canvas<'a> {
let float_lanes_cnt =
(self.danmaku_option.float_percentage * self.height as f64 / self.danmaku_option.lane_size as f64) as usize;
@@ -97,12 +97,12 @@ impl CanvasConfig {
}
}
pub struct Canvas {
pub config: CanvasConfig,
pub struct Canvas<'a> {
pub config: CanvasConfig<'a>,
pub float_lanes: Vec<Option<Lane>>,
}
impl Canvas {
impl<'a> Canvas<'a> {
pub fn draw(&mut self, mut danmu: Danmu) -> Result<Option<Drawable>> {
danmu.timeline_s += self.config.danmaku_option.time_offset;
if danmu.timeline_s < 0.0 {

View File

@@ -40,7 +40,7 @@ impl Danmu {
/// 计算弹幕的“像素长度”,会乘上一个缩放因子
///
/// 汉字算一个全宽英文算2/3宽
pub fn length(&self, config: &CanvasConfig) -> f64 {
pub fn length(&self, config: &CanvasConfig<'_>) -> f64 {
let pts = config.danmaku_option.font_size
* self
.content

View File

@@ -6,7 +6,7 @@ use tokio::fs::{self, File};
use crate::bilibili::PageInfo;
use crate::bilibili::danmaku::canvas::CanvasConfig;
use crate::bilibili::danmaku::{AssWriter, Danmu};
use crate::config::CONFIG;
use crate::config::VersionedConfig;
pub struct DanmakuWriter<'a> {
page: &'a PageInfo,
@@ -22,7 +22,8 @@ impl<'a> DanmakuWriter<'a> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let canvas_config = CanvasConfig::new(&CONFIG.danmaku_option, self.page);
let config = VersionedConfig::get().load_full();
let canvas_config = CanvasConfig::new(&config.danmaku_option, self.page);
let mut writer =
AssWriter::construct(File::create(path).await?, self.page.name.clone(), canvas_config.clone()).await?;
let mut canvas = canvas_config.canvas();

View File

@@ -4,7 +4,7 @@ use anyhow::Result;
use reqwest::Method;
use crate::bilibili::{BiliClient, Validate};
use crate::config::CONFIG;
use crate::config::VersionedConfig;
pub struct Me<'a> {
client: &'a BiliClient,
mid: String,
@@ -73,12 +73,7 @@ impl<'a> Me<'a> {
}
fn my_id() -> String {
CONFIG
.credential
.load()
.as_deref()
.map(|c| c.dedeuserid.clone())
.unwrap()
VersionedConfig::get().load().credential.load().dedeuserid.clone()
}
}

View File

@@ -154,13 +154,14 @@ mod tests {
panic!("获取 mixin key 失败");
};
set_global_mixin_key(mixin_key);
// 测试视频合集
let collection_item = CollectionItem {
mid: "521722088".to_string(),
sid: "4523".to_string(),
collection_type: CollectionType::Season,
};
let collection = Collection::new(&bili_client, &collection_item);
let collection = Collection::new(
&bili_client,
CollectionItem {
mid: "521722088".to_string(),
sid: "4523".to_string(),
collection_type: CollectionType::Season,
},
);
let videos = collection
.into_video_stream()
.take(20)

View File

@@ -1,5 +1,4 @@
use anyhow::{Context, Result, anyhow};
use arc_swap::access::Access;
use async_stream::try_stream;
use futures::Stream;
use reqwest::Method;

View File

@@ -68,7 +68,7 @@ impl<'a> Video<'a> {
Ok(serde_json::from_value(res["data"].take())?)
}
#[allow(unused)]
#[allow(dead_code)]
pub async fn get_pages(&self) -> Result<Vec<PageInfo>> {
let mut res = self
.client

View File

@@ -1,7 +1,10 @@
use std::borrow::Cow;
use std::sync::LazyLock;
use clap::Parser;
pub static ARGS: LazyLock<Args> = LazyLock::new(Args::parse);
#[derive(Parser)]
#[command(name = "Bili-Sync", version = detail_version(), about, long_about = None)]
pub struct Args {

View File

@@ -0,0 +1,133 @@
use std::path::PathBuf;
use std::sync::LazyLock;
use anyhow::{Result, bail};
use arc_swap::ArcSwap;
use sea_orm::DatabaseConnection;
use serde::{Deserialize, Serialize};
use crate::bilibili::{Credential, DanmakuOption, FilterOption};
use crate::config::LegacyConfig;
use crate::config::default::{default_auth_token, default_bind_address, default_time_format};
use crate::config::item::{ConcurrentLimit, NFOTimeType};
use crate::utils::model::{load_db_config, save_db_config};
pub static CONFIG_DIR: LazyLock<PathBuf> =
LazyLock::new(|| dirs::config_dir().expect("No config path found").join("bili-sync"));
#[derive(Serialize, Deserialize)]
pub struct Config {
#[serde(default = "default_auth_token")]
pub auth_token: String,
#[serde(default = "default_bind_address")]
pub bind_address: String,
pub credential: ArcSwap<Credential>,
pub filter_option: FilterOption,
#[serde(default)]
pub danmaku_option: DanmakuOption,
pub video_name: String,
pub page_name: String,
pub interval: u64,
pub upper_path: PathBuf,
#[serde(default)]
pub nfo_time_type: NFOTimeType,
#[serde(default)]
pub concurrent_limit: ConcurrentLimit,
#[serde(default = "default_time_format")]
pub time_format: String,
#[serde(default)]
pub cdn_sorting: bool,
}
impl Config {
pub async fn load_from_database(connection: &DatabaseConnection) -> Result<Option<Result<Self>>> {
load_db_config(connection).await
}
pub async fn save_to_database(&self, connection: &DatabaseConnection) -> Result<()> {
save_db_config(self, connection).await
}
pub fn check(&self) -> Result<()> {
let mut errors = Vec::new();
if !self.upper_path.is_absolute() {
errors.push("up 主头像保存的路径应为绝对路径");
}
if self.video_name.is_empty() {
errors.push("未设置 video_name 模板");
}
if self.page_name.is_empty() {
errors.push("未设置 page_name 模板");
}
let credential = self.credential.load();
if credential.sessdata.is_empty()
|| credential.bili_jct.is_empty()
|| credential.buvid3.is_empty()
|| credential.dedeuserid.is_empty()
|| credential.ac_time_value.is_empty()
{
errors.push("Credential 信息不完整,请确保填写完整");
}
if !(self.concurrent_limit.video > 0 && self.concurrent_limit.page > 0) {
errors.push("video 和 page 允许的并发数必须大于 0");
}
if !errors.is_empty() {
bail!(
errors
.into_iter()
.map(|e| format!("- {}", e))
.collect::<Vec<_>>()
.join("\n")
);
}
Ok(())
}
#[cfg(test)]
pub(super) fn test_default() -> Self {
Self {
cdn_sorting: true,
..Default::default()
}
}
}
impl Default for Config {
fn default() -> Self {
Self {
auth_token: default_auth_token(),
bind_address: default_bind_address(),
credential: ArcSwap::from_pointee(Credential::default()),
filter_option: FilterOption::default(),
danmaku_option: DanmakuOption::default(),
video_name: "{{title}}".to_owned(),
page_name: "{{bvid}}".to_owned(),
interval: 1200,
upper_path: CONFIG_DIR.join("upper_face"),
nfo_time_type: NFOTimeType::FavTime,
concurrent_limit: ConcurrentLimit::default(),
time_format: default_time_format(),
cdn_sorting: false,
}
}
}
impl From<LegacyConfig> for Config {
fn from(legacy: LegacyConfig) -> Self {
Self {
auth_token: legacy.auth_token,
bind_address: legacy.bind_address,
credential: legacy.credential,
filter_option: legacy.filter_option,
danmaku_option: legacy.danmaku_option,
video_name: legacy.video_name,
page_name: legacy.page_name,
interval: legacy.interval,
upper_path: legacy.upper_path,
nfo_time_type: legacy.nfo_time_type,
concurrent_limit: legacy.concurrent_limit,
time_format: legacy.time_format,
cdn_sorting: legacy.cdn_sorting,
}
}
}

View File

@@ -0,0 +1,18 @@
use rand::seq::SliceRandom;
pub(super) fn default_time_format() -> String {
"%Y-%m-%d".to_string()
}
/// 默认的 auth_token 实现,生成随机 16 位字符串
pub(super) fn default_auth_token() -> String {
let byte_choices = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()_+-=";
let mut rng = rand::thread_rng();
(0..16)
.map(|_| *(byte_choices.choose(&mut rng).expect("choose byte failed")) as char)
.collect()
}
pub(super) fn default_bind_address() -> String {
"0.0.0.0:12345".to_string()
}

View File

@@ -0,0 +1,3 @@
use std::sync::atomic::AtomicBool;
pub static DOWNLOADER_RUNNING: AtomicBool = AtomicBool::new(false);

View File

@@ -1,62 +0,0 @@
use std::path::PathBuf;
use clap::Parser;
use handlebars::handlebars_helper;
use once_cell::sync::Lazy;
use crate::config::Config;
use crate::config::clap::Args;
use crate::config::item::PathSafeTemplate;
/// 全局的 CONFIG可以从中读取配置信息
pub static CONFIG: Lazy<Config> = Lazy::new(load_config);
/// 全局的 TEMPLATE用来渲染 video_name 和 page_name 模板
pub static TEMPLATE: Lazy<handlebars::Handlebars> = Lazy::new(|| {
let mut handlebars = handlebars::Handlebars::new();
handlebars_helper!(truncate: |s: String, len: usize| {
if s.chars().count() > len {
s.chars().take(len).collect::<String>()
} else {
s.to_string()
}
});
handlebars.register_helper("truncate", Box::new(truncate));
handlebars
.path_safe_register("video", &CONFIG.video_name)
.expect("failed to register video template");
handlebars
.path_safe_register("page", &CONFIG.page_name)
.expect("failed to register page template");
handlebars
});
/// 全局的 ARGS用来解析命令行参数
pub static ARGS: Lazy<Args> = Lazy::new(Args::parse);
/// 全局的 CONFIG_DIR表示配置文件夹的路径
pub static CONFIG_DIR: Lazy<PathBuf> =
Lazy::new(|| dirs::config_dir().expect("No config path found").join("bili-sync"));
fn load_config() -> Config {
if cfg!(test) {
return Config::load(&CONFIG_DIR.join("test_config.toml")).unwrap_or(Config::test_default());
}
info!("开始加载配置文件..");
let config = Config::load(&CONFIG_DIR.join("config.toml")).unwrap_or_else(|err| {
if err
.downcast_ref::<std::io::Error>()
.is_none_or(|e| e.kind() != std::io::ErrorKind::NotFound)
{
panic!("加载配置文件失败,错误为: {err}");
}
warn!("配置文件不存在,使用默认配置..");
Config::default()
});
info!("配置文件加载完毕,覆盖刷新原有配置");
config.save().expect("保存默认配置时遇到错误");
info!("检查配置文件..");
config.check();
info!("配置文件检查通过");
config
}

View File

@@ -0,0 +1,90 @@
use std::sync::LazyLock;
use anyhow::Result;
use handlebars::handlebars_helper;
use crate::config::versioned_cache::VersionedCache;
use crate::config::{Config, PathSafeTemplate};
pub static TEMPLATE: LazyLock<VersionedCache<handlebars::Handlebars<'static>>> =
LazyLock::new(|| VersionedCache::new(create_template).expect("Failed to create handlebars template"));
fn create_template(config: &Config) -> Result<handlebars::Handlebars<'static>> {
let mut handlebars = handlebars::Handlebars::new();
handlebars.register_helper("truncate", Box::new(truncate));
handlebars.path_safe_register("video", config.video_name.to_owned())?;
Ok(handlebars)
}
handlebars_helper!(truncate: |s: String, len: usize| {
if s.chars().count() > len {
s.chars().take(len).collect::<String>()
} else {
s.to_string()
}
});
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn test_template_usage() {
let mut template = handlebars::Handlebars::new();
template.register_helper("truncate", Box::new(truncate));
let _ = template.path_safe_register("video", "test{{bvid}}test");
let _ = template.path_safe_register("test_truncate", "哈哈,{{ truncate title 30 }}");
let _ = template.path_safe_register("test_path_unix", "{{ truncate title 7 }}/test/a");
let _ = template.path_safe_register("test_path_windows", r"{{ truncate title 7 }}\\test\\a");
#[cfg(not(windows))]
{
assert_eq!(
template
.path_safe_render("test_path_unix", &json!({"title": "关注/永雏塔菲喵"}))
.unwrap(),
"关注_永雏塔菲/test/a"
);
assert_eq!(
template
.path_safe_render("test_path_windows", &json!({"title": "关注/永雏塔菲喵"}))
.unwrap(),
"关注_永雏塔菲_test_a"
);
}
#[cfg(windows)]
{
assert_eq!(
template
.path_safe_render("test_path_unix", &json!({"title": "关注/永雏塔菲喵"}))
.unwrap(),
"关注_永雏塔菲_test_a"
);
assert_eq!(
template
.path_safe_render("test_path_windows", &json!({"title": "关注/永雏塔菲喵"}))
.unwrap(),
r"关注_永雏塔菲\\test\\a"
);
}
assert_eq!(
template
.path_safe_render("video", &json!({"bvid": "BV1b5411h7g7"}))
.unwrap(),
"testBV1b5411h7g7test"
);
assert_eq!(
template
.path_safe_render(
"test_truncate",
&json!({"title": "你说得对,但是 Rust 是由 Mozilla 自主研发的一款全新的编译期格斗游戏。\
编译将发生在一个被称作「Cargo」的构建系统中。在这里被引用的指针将被授予「生命周期」之力导引对象安全。\
你将扮演一位名为「Rustacean」的神秘角色, 在与「Rustc」的搏斗中邂逅各种骨骼惊奇的傲娇报错。\
征服她们、通过编译同时逐步发掘「C++」程序崩溃的真相。"})
)
.unwrap(),
"哈哈,你说得对,但是 Rust 是由 Mozilla 自主研发的一"
);
}
}

View File

@@ -1,12 +1,8 @@
use std::collections::HashMap;
use std::path::PathBuf;
use anyhow::Result;
use serde::de::{Deserializer, MapAccess, Visitor};
use serde::ser::SerializeMap;
use serde::{Deserialize, Serialize};
use crate::bilibili::{CollectionItem, CollectionType};
use crate::utils::filenamify::filenamify;
/// 稍后再看的配置
@@ -74,13 +70,14 @@ impl Default for ConcurrentLimit {
}
pub trait PathSafeTemplate {
fn path_safe_register(&mut self, name: &'static str, template: &'static str) -> Result<()>;
fn path_safe_register(&mut self, name: &'static str, template: impl Into<String>) -> Result<()>;
fn path_safe_render(&self, name: &'static str, data: &serde_json::Value) -> Result<String>;
}
/// 通过将模板字符串中的分隔符替换为自定义的字符串,使得模板字符串中的分隔符得以保留
impl PathSafeTemplate for handlebars::Handlebars<'_> {
fn path_safe_register(&mut self, name: &'static str, template: &'static str) -> Result<()> {
fn path_safe_register(&mut self, name: &'static str, template: impl Into<String>) -> Result<()> {
let template = template.into();
Ok(self.register_template_string(name, template.replace(std::path::MAIN_SEPARATOR_STR, "__SEP__"))?)
}
@@ -88,72 +85,3 @@ impl PathSafeTemplate for handlebars::Handlebars<'_> {
Ok(filenamify(&self.render(name, data)?).replace("__SEP__", std::path::MAIN_SEPARATOR_STR))
}
}
/* 后面是用于自定义 Collection 的序列化、反序列化的样板代码 */
pub(super) fn serialize_collection_list<S>(
collection_list: &HashMap<CollectionItem, PathBuf>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serializer.serialize_map(Some(collection_list.len()))?;
for (k, v) in collection_list {
let prefix = match k.collection_type {
CollectionType::Series => "series",
CollectionType::Season => "season",
};
map.serialize_entry(&[prefix, &k.mid, &k.sid].join(":"), v)?;
}
map.end()
}
pub(super) fn deserialize_collection_list<'de, D>(deserializer: D) -> Result<HashMap<CollectionItem, PathBuf>, D::Error>
where
D: Deserializer<'de>,
{
struct CollectionListVisitor;
impl<'de> Visitor<'de> for CollectionListVisitor {
type Value = HashMap<CollectionItem, PathBuf>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a map of collection list")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut collection_list = HashMap::new();
while let Some((key, value)) = map.next_entry::<String, PathBuf>()? {
let collection_item = match key.split(':').collect::<Vec<&str>>().as_slice() {
[prefix, mid, sid] => {
let collection_type = match *prefix {
"series" => CollectionType::Series,
"season" => CollectionType::Season,
_ => {
return Err(serde::de::Error::custom(
"invalid collection type, should be series or season",
));
}
};
CollectionItem {
mid: mid.to_string(),
sid: sid.to_string(),
collection_type,
}
}
_ => {
return Err(serde::de::Error::custom(
"invalid collection key, should be series:mid:sid or season:mid:sid",
));
}
};
collection_list.insert(collection_item, value);
}
Ok(collection_list)
}
}
deserializer.deserialize_map(CollectionListVisitor)
}

View File

@@ -0,0 +1,135 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use anyhow::Result;
use arc_swap::ArcSwap;
use sea_orm::DatabaseConnection;
use serde::de::{Deserializer, MapAccess, Visitor};
use serde::ser::SerializeMap;
use serde::{Deserialize, Serialize};
use crate::bilibili::{CollectionItem, CollectionType, Credential, DanmakuOption, FilterOption};
use crate::config::Config;
use crate::config::default::{default_auth_token, default_bind_address, default_time_format};
use crate::config::item::{ConcurrentLimit, NFOTimeType, WatchLaterConfig};
use crate::utils::model::migrate_legacy_config;
#[derive(Serialize, Deserialize)]
pub struct LegacyConfig {
#[serde(default = "default_auth_token")]
pub auth_token: String,
#[serde(default = "default_bind_address")]
pub bind_address: String,
pub credential: ArcSwap<Credential>,
pub filter_option: FilterOption,
#[serde(default)]
pub danmaku_option: DanmakuOption,
pub favorite_list: HashMap<String, PathBuf>,
#[serde(
default,
serialize_with = "serialize_collection_list",
deserialize_with = "deserialize_collection_list"
)]
pub collection_list: HashMap<CollectionItem, PathBuf>,
#[serde(default)]
pub submission_list: HashMap<String, PathBuf>,
#[serde(default)]
pub watch_later: WatchLaterConfig,
pub video_name: String,
pub page_name: String,
pub interval: u64,
pub upper_path: PathBuf,
#[serde(default)]
pub nfo_time_type: NFOTimeType,
#[serde(default)]
pub concurrent_limit: ConcurrentLimit,
#[serde(default = "default_time_format")]
pub time_format: String,
#[serde(default)]
pub cdn_sorting: bool,
}
impl LegacyConfig {
async fn load_from_file(path: &Path) -> Result<Self> {
let legacy_config_str = tokio::fs::read_to_string(path).await?;
Ok(toml::from_str(&legacy_config_str)?)
}
pub async fn migrate_from_file(path: &Path, connection: &DatabaseConnection) -> Result<Config> {
let legacy_config = Self::load_from_file(path).await?;
migrate_legacy_config(&legacy_config, connection).await?;
Ok(legacy_config.into())
}
}
/*
后面是用于自定义 Collection 的序列化、反序列化的样板代码
*/
pub(super) fn serialize_collection_list<S>(
collection_list: &HashMap<CollectionItem, PathBuf>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut map = serializer.serialize_map(Some(collection_list.len()))?;
for (k, v) in collection_list {
let prefix = match k.collection_type {
CollectionType::Series => "series",
CollectionType::Season => "season",
};
map.serialize_entry(&[prefix, &k.mid, &k.sid].join(":"), v)?;
}
map.end()
}
pub(super) fn deserialize_collection_list<'de, D>(deserializer: D) -> Result<HashMap<CollectionItem, PathBuf>, D::Error>
where
D: Deserializer<'de>,
{
struct CollectionListVisitor;
impl<'de> Visitor<'de> for CollectionListVisitor {
type Value = HashMap<CollectionItem, PathBuf>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a map of collection list")
}
fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
where
A: MapAccess<'de>,
{
let mut collection_list = HashMap::new();
while let Some((key, value)) = map.next_entry::<String, PathBuf>()? {
let collection_item = match key.split(':').collect::<Vec<&str>>().as_slice() {
[prefix, mid, sid] => {
let collection_type = match *prefix {
"series" => CollectionType::Series,
"season" => CollectionType::Season,
_ => {
return Err(serde::de::Error::custom(
"invalid collection type, should be series or season",
));
}
};
CollectionItem {
mid: mid.to_string(),
sid: sid.to_string(),
collection_type,
}
}
_ => {
return Err(serde::de::Error::custom(
"invalid collection key, should be series:mid:sid or season:mid:sid",
));
}
};
collection_list.insert(collection_item, value);
}
Ok(collection_list)
}
}
deserializer.deserialize_map(CollectionListVisitor)
}

View File

@@ -1,191 +1,18 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::Result;
use arc_swap::ArcSwapOption;
use rand::seq::SliceRandom;
use serde::{Deserialize, Serialize};
mod clap;
mod global;
mod args;
mod current;
mod default;
mod flag;
mod handlebar;
mod item;
mod legacy;
mod versioned_cache;
mod versioned_config;
use crate::adapter::Args;
use crate::bilibili::{CollectionItem, Credential, DanmakuOption, FilterOption};
pub use crate::config::clap::version;
pub use crate::config::global::{ARGS, CONFIG, CONFIG_DIR, TEMPLATE};
use crate::config::item::{ConcurrentLimit, deserialize_collection_list, serialize_collection_list};
pub use crate::config::item::{NFOTimeType, PathSafeTemplate, RateLimit, WatchLaterConfig};
fn default_time_format() -> String {
"%Y-%m-%d".to_string()
}
/// 默认的 auth_token 实现,生成随机 16 位字符串
fn default_auth_token() -> Option<String> {
let byte_choices = b"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()_+-=";
let mut rng = rand::thread_rng();
Some(
(0..16)
.map(|_| *(byte_choices.choose(&mut rng).expect("choose byte failed")) as char)
.collect(),
)
}
fn default_bind_address() -> String {
"0.0.0.0:12345".to_string()
}
#[derive(Serialize, Deserialize)]
pub struct Config {
#[serde(default = "default_auth_token")]
pub auth_token: Option<String>,
#[serde(default = "default_bind_address")]
pub bind_address: String,
pub credential: ArcSwapOption<Credential>,
pub filter_option: FilterOption,
#[serde(default)]
pub danmaku_option: DanmakuOption,
pub favorite_list: HashMap<String, PathBuf>,
#[serde(
default,
serialize_with = "serialize_collection_list",
deserialize_with = "deserialize_collection_list"
)]
pub collection_list: HashMap<CollectionItem, PathBuf>,
#[serde(default)]
pub submission_list: HashMap<String, PathBuf>,
#[serde(default)]
pub watch_later: WatchLaterConfig,
pub video_name: Cow<'static, str>,
pub page_name: Cow<'static, str>,
pub interval: u64,
pub upper_path: PathBuf,
#[serde(default)]
pub nfo_time_type: NFOTimeType,
#[serde(default)]
pub concurrent_limit: ConcurrentLimit,
#[serde(default = "default_time_format")]
pub time_format: String,
#[serde(default)]
pub cdn_sorting: bool,
}
impl Default for Config {
fn default() -> Self {
Self {
auth_token: default_auth_token(),
bind_address: default_bind_address(),
credential: ArcSwapOption::from(Some(Arc::new(Credential::default()))),
filter_option: FilterOption::default(),
danmaku_option: DanmakuOption::default(),
favorite_list: HashMap::new(),
collection_list: HashMap::new(),
submission_list: HashMap::new(),
watch_later: Default::default(),
video_name: Cow::Borrowed("{{title}}"),
page_name: Cow::Borrowed("{{bvid}}"),
interval: 1200,
upper_path: CONFIG_DIR.join("upper_face"),
nfo_time_type: NFOTimeType::FavTime,
concurrent_limit: ConcurrentLimit::default(),
time_format: default_time_format(),
cdn_sorting: false,
}
}
}
impl Config {
pub fn save(&self) -> Result<()> {
let config_path = CONFIG_DIR.join("config.toml");
std::fs::create_dir_all(&*CONFIG_DIR)?;
std::fs::write(config_path, toml::to_string_pretty(self)?)?;
Ok(())
}
fn load(path: &Path) -> Result<Self> {
let config_content = std::fs::read_to_string(path)?;
Ok(toml::from_str(&config_content)?)
}
pub fn as_video_sources(&self) -> Vec<(Args<'_>, &PathBuf)> {
let mut params = Vec::new();
self.favorite_list
.iter()
.for_each(|(fid, path)| params.push((Args::Favorite { fid }, path)));
self.collection_list
.iter()
.for_each(|(collection_item, path)| params.push((Args::Collection { collection_item }, path)));
if self.watch_later.enabled {
params.push((Args::WatchLater, &self.watch_later.path));
}
self.submission_list
.iter()
.for_each(|(upper_id, path)| params.push((Args::Submission { upper_id }, path)));
params
}
pub fn check(&self) {
let mut ok = true;
let video_sources = self.as_video_sources();
if video_sources.is_empty() {
ok = false;
error!("没有配置任何需要扫描的内容,程序空转没有意义");
}
for (args, path) in video_sources {
if !path.is_absolute() {
ok = false;
error!("{:?} 保存的路径应为绝对路径,检测到: {}", args, path.display());
}
}
if !self.upper_path.is_absolute() {
ok = false;
error!("up 主头像保存的路径应为绝对路径");
}
if self.video_name.is_empty() {
ok = false;
error!("未设置 video_name 模板");
}
if self.page_name.is_empty() {
ok = false;
error!("未设置 page_name 模板");
}
let credential = self.credential.load();
match credential.as_deref() {
Some(credential) => {
if credential.sessdata.is_empty()
|| credential.bili_jct.is_empty()
|| credential.buvid3.is_empty()
|| credential.dedeuserid.is_empty()
|| credential.ac_time_value.is_empty()
{
ok = false;
error!("Credential 信息不完整,请确保填写完整");
}
}
None => {
ok = false;
error!("未设置 Credential 信息");
}
}
if !(self.concurrent_limit.video > 0 && self.concurrent_limit.page > 0) {
ok = false;
error!("video 和 page 允许的并发数必须大于 0");
}
if !ok {
panic!(
"位于 {} 的配置文件不合法,请参考提示信息修复后继续运行",
CONFIG_DIR.join("config.toml").display()
);
}
}
pub(super) fn test_default() -> Self {
Self {
cdn_sorting: true,
..Default::default()
}
}
}
pub use crate::config::args::{ARGS, version};
pub use crate::config::current::{CONFIG_DIR, Config};
pub use crate::config::flag::DOWNLOADER_RUNNING;
pub use crate::config::handlebar::TEMPLATE;
pub use crate::config::item::{NFOTimeType, PathSafeTemplate, RateLimit};
pub use crate::config::legacy::LegacyConfig;
pub use crate::config::versioned_cache::VersionedCache;
pub use crate::config::versioned_config::VersionedConfig;

View File

@@ -0,0 +1,49 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use anyhow::Result;
use arc_swap::{ArcSwap, Guard};
use crate::config::{Config, VersionedConfig};
pub struct VersionedCache<T> {
inner: ArcSwap<T>,
version: AtomicU64,
builder: fn(&Config) -> Result<T>,
}
impl<T> VersionedCache<T> {
pub fn new(builder: fn(&Config) -> Result<T>) -> Result<Self> {
let current_config = VersionedConfig::get().load();
let initial_value = builder(&current_config)?;
Ok(Self {
inner: ArcSwap::from_pointee(initial_value),
version: AtomicU64::new(0),
builder,
})
}
pub fn load(&self) -> Guard<Arc<T>> {
self.reload_if_needed();
self.inner.load()
}
#[allow(dead_code)]
pub fn load_full(&self) -> Arc<T> {
self.reload_if_needed();
self.inner.load_full()
}
fn reload_if_needed(&self) {
let current_version = VersionedConfig::get().version();
let cached_version = self.version.load(Ordering::Acquire);
if current_version != cached_version {
let current_config = VersionedConfig::get().load();
if let Ok(new_value) = (self.builder)(&current_config) {
self.inner.store(Arc::new(new_value));
self.version.store(current_version, Ordering::Release);
}
}
}
}

View File

@@ -0,0 +1,91 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use anyhow::{Result, anyhow, bail};
use arc_swap::{ArcSwap, Guard};
use sea_orm::DatabaseConnection;
use tokio::sync::OnceCell;
use crate::config::{CONFIG_DIR, Config, LegacyConfig};
pub static VERSIONED_CONFIG: OnceCell<VersionedConfig> = OnceCell::const_new();
pub struct VersionedConfig {
inner: ArcSwap<Config>,
version: AtomicU64,
}
impl VersionedConfig {
/// 初始化全局的 `VersionedConfig`,初始化失败或者已初始化过则返回错误
pub async fn init(connection: &DatabaseConnection) -> Result<()> {
let config = match Config::load_from_database(connection).await? {
Some(Ok(config)) => config,
Some(Err(e)) => bail!("解析数据库配置失败: {}", e),
None => {
let config = match LegacyConfig::migrate_from_file(&CONFIG_DIR.join("config.toml"), connection).await {
Ok(config) => config,
Err(e) => {
if e.downcast_ref::<std::io::Error>()
.is_none_or(|e| e.kind() != std::io::ErrorKind::NotFound)
{
bail!("未成功读取并迁移旧版本配置:{:#}", e);
} else {
let config = Config::default();
warn!(
"生成 auth_token{},可使用该 token 登录 web UI该信息仅在首次运行时打印",
config.auth_token
);
config
}
}
};
config.save_to_database(connection).await?;
config
}
};
let versioned_config = VersionedConfig::new(config);
VERSIONED_CONFIG
.set(versioned_config)
.map_err(|e| anyhow!("VERSIONED_CONFIG has already been initialized: {}", e))?;
Ok(())
}
#[cfg(test)]
/// 单元测试直接使用测试专用的配置即可
pub fn get() -> &'static VersionedConfig {
use std::sync::LazyLock;
static TEST_CONFIG: LazyLock<VersionedConfig> = LazyLock::new(|| VersionedConfig::new(Config::test_default()));
return &TEST_CONFIG;
}
#[cfg(not(test))]
/// 获取全局的 `VersionedConfig`,如果未初始化则会 panic
pub fn get() -> &'static VersionedConfig {
VERSIONED_CONFIG.get().expect("VERSIONED_CONFIG is not initialized")
}
pub fn new(config: Config) -> Self {
Self {
inner: ArcSwap::from_pointee(config),
version: AtomicU64::new(1),
}
}
pub fn load(&self) -> Guard<Arc<Config>> {
self.inner.load()
}
pub fn load_full(&self) -> Arc<Config> {
self.inner.load_full()
}
pub fn version(&self) -> u64 {
self.version.load(Ordering::Acquire)
}
#[allow(dead_code)]
pub fn update(&self, new_config: Config) {
self.inner.store(Arc::new(new_config));
self.version.fetch_add(1, Ordering::AcqRel);
}
}

View File

@@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{Context, Result};
use bili_sync_migration::{Migrator, MigratorTrait};
use sea_orm::{ConnectOptions, Database, DatabaseConnection};
@@ -25,7 +25,7 @@ async fn migrate_database() -> Result<()> {
}
/// 进行数据库迁移并获取数据库连接,供外部使用
pub async fn setup_database() -> DatabaseConnection {
migrate_database().await.expect("数据库迁移失败");
database_connection().await.expect("获取数据库连接失败")
pub async fn setup_database() -> Result<DatabaseConnection> {
migrate_database().await.context("Failed to migrate database")?;
database_connection().await.context("Failed to connect to database")
}

View File

@@ -12,7 +12,7 @@ use tokio::task::JoinSet;
use tokio_util::io::StreamReader;
use crate::bilibili::Client;
use crate::config::CONFIG;
use crate::config::VersionedConfig;
pub struct Downloader {
client: Client,
}
@@ -26,7 +26,7 @@ impl Downloader {
}
pub async fn fetch(&self, url: &str, path: &Path) -> Result<()> {
if CONFIG.concurrent_limit.download.enable {
if VersionedConfig::get().load().concurrent_limit.download.enable {
self.fetch_parallel(url, path).await
} else {
self.fetch_serial(url, path).await
@@ -60,6 +60,13 @@ impl Downloader {
}
async fn fetch_parallel(&self, url: &str, path: &Path) -> Result<()> {
let (concurrency, threshold) = {
let config = VersionedConfig::get().load();
(
config.concurrent_limit.download.concurrency,
config.concurrent_limit.download.threshold,
)
};
let resp = self
.client
.request(Method::HEAD, url, None)
@@ -67,12 +74,12 @@ impl Downloader {
.await?
.error_for_status()?;
let file_size = resp.header_content_length().unwrap_or_default();
let chunk_size = file_size / CONFIG.concurrent_limit.download.concurrency as u64;
let chunk_size = file_size / concurrency as u64;
if resp
.headers()
.get(header::ACCEPT_RANGES)
.is_none_or(|v| v.to_str().unwrap_or_default() == "none") // https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Accept-Ranges#none
|| chunk_size < CONFIG.concurrent_limit.download.threshold
|| chunk_size < threshold
{
return self.fetch_serial(url, path).await;
}
@@ -85,9 +92,9 @@ impl Downloader {
let mut tasks = JoinSet::new();
let url = Arc::new(url.to_string());
let path = Arc::new(path.to_path_buf());
for i in 0..CONFIG.concurrent_limit.download.concurrency {
for i in 0..concurrency {
let start = i as u64 * chunk_size;
let end = if i == CONFIG.concurrent_limit.download.concurrency - 1 {
let end = if i == concurrency - 1 {
file_size
} else {
start + chunk_size

View File

@@ -17,21 +17,20 @@ use std::future::Future;
use std::sync::Arc;
use bilibili::BiliClient;
use once_cell::sync::Lazy;
use sea_orm::DatabaseConnection;
use task::{http_server, video_downloader};
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use crate::config::{ARGS, CONFIG};
use crate::config::{ARGS, VersionedConfig};
use crate::database::setup_database;
use crate::utils::init_logger;
use crate::utils::signal::terminate;
#[tokio::main]
async fn main() {
init();
let connection = init().await;
let bili_client = Arc::new(BiliClient::new());
let connection = Arc::new(setup_database().await);
let token = CancellationToken::new();
let tracker = TaskTracker::new();
@@ -74,12 +73,17 @@ fn spawn_task(
});
}
/// 初始化日志系统打印欢迎信息,加载配置文件
fn init() {
/// 初始化日志系统打印欢迎信息,初始化数据库连接和全局配置,最终返回数据库连接
async fn init() -> Arc<DatabaseConnection> {
init_logger(&ARGS.log_level);
info!("欢迎使用 Bili-Sync当前程序版本{}", config::version());
info!("项目地址https://github.com/amtoaer/bili-sync");
Lazy::force(&CONFIG);
let connection = Arc::new(setup_database().await.expect("数据库初始化失败"));
info!("数据库初始化完成");
VersionedConfig::init(&connection).await.expect("配置初始化失败");
info!("配置初始化完成");
connection
}
async fn handle_shutdown(tracker: TaskTracker, token: CancellationToken) {

View File

@@ -1,9 +1,8 @@
use std::sync::Arc;
use anyhow::{Context, Result};
use axum::body::Body;
use axum::extract::Request;
use axum::http::{Response, Uri, header};
use axum::http::{Uri, header};
use axum::response::IntoResponse;
use axum::routing::get;
use axum::{Extension, Router, ServiceExt, middleware};
@@ -16,7 +15,7 @@ use utoipa_swagger_ui::{Config, SwaggerUi};
use crate::api::auth;
use crate::api::handler::{ApiDoc, api_router};
use crate::bilibili::BiliClient;
use crate::config::CONFIG;
use crate::config::VersionedConfig;
#[derive(RustEmbed)]
#[preserve_source = false]
@@ -41,10 +40,11 @@ pub async fn http_server(database_connection: Arc<DatabaseConnection>, bili_clie
.layer(Extension(database_connection))
.layer(Extension(bili_client))
.layer(middleware::from_fn(auth::auth));
let listener = tokio::net::TcpListener::bind(&CONFIG.bind_address)
let config = VersionedConfig::get().load_full();
let listener = tokio::net::TcpListener::bind(&config.bind_address)
.await
.context("bind address failed")?;
info!("开始运行管理页: http://{}", CONFIG.bind_address);
info!("开始运行管理页: http://{}", config.bind_address);
Ok(axum::serve(listener, ServiceExt::<Request>::into_make_service(app)).await?)
}
@@ -56,16 +56,21 @@ async fn frontend_files(uri: Uri) -> impl IntoResponse {
let Some(content) = Asset::get(path) else {
return (StatusCode::NOT_FOUND, "404 Not Found").into_response();
};
Response::builder()
.status(StatusCode::OK)
.header(
header::CONTENT_TYPE,
content.mime_type().as_deref().unwrap_or("application/octet-stream"),
let mime_type = content.mime_type();
let content_type = mime_type.as_deref().unwrap_or("application/octet-stream");
if cfg!(debug_assertions) {
(
[(header::CONTENT_TYPE, content_type)],
// safety: `RustEmbed` returns uncompressed files directly from the filesystem in debug mode
content.data().unwrap(),
)
.header(header::CONTENT_ENCODING, "br")
// safety: `RustEmbed` will always generate br-compressed files if the feature is enabled
.body(Body::from(content.data_br().unwrap()))
.unwrap_or_else(|_| {
return (StatusCode::INTERNAL_SERVER_ERROR, "500 Internal Server Error").into_response();
})
.into_response()
} else {
(
[(header::CONTENT_TYPE, content_type), (header::CONTENT_ENCODING, "br")],
// safety: `RustEmbed` will always generate br-compressed files if the feature is enabled
content.data_br().unwrap(),
)
.into_response()
}
}

View File

@@ -4,16 +4,22 @@ use sea_orm::DatabaseConnection;
use tokio::time;
use crate::bilibili::{self, BiliClient};
use crate::config::CONFIG;
use crate::config::{DOWNLOADER_RUNNING, VersionedConfig};
use crate::utils::model::get_enabled_video_sources;
use crate::workflow::process_video_source;
/// 启动周期下载视频的任务
pub async fn video_downloader(connection: Arc<DatabaseConnection>, bili_client: Arc<BiliClient>) {
let mut anchor = chrono::Local::now().date_naive();
let video_sources = CONFIG.as_video_sources();
loop {
info!("开始执行本轮视频下载任务..");
DOWNLOADER_RUNNING.store(true, std::sync::atomic::Ordering::Relaxed);
let config = VersionedConfig::get().load_full();
'inner: {
if let Err(e) = config.check() {
error!("配置检查失败,跳过本轮执行:\n{:#}", e);
break 'inner;
}
match bili_client.wbi_img().await.map(|wbi_img| wbi_img.into()) {
Ok(Some(mixin_key)) => bilibili::set_global_mixin_key(mixin_key),
Ok(_) => {
@@ -26,19 +32,28 @@ pub async fn video_downloader(connection: Arc<DatabaseConnection>, bili_client:
}
};
if anchor != chrono::Local::now().date_naive() {
if let Err(e) = bili_client.check_refresh().await {
if let Err(e) = bili_client.check_refresh(&connection).await {
error!("检查刷新 Credential 遇到错误:{:#},等待下一轮执行", e);
break 'inner;
}
anchor = chrono::Local::now().date_naive();
}
for (args, path) in &video_sources {
if let Err(e) = process_video_source(*args, &bili_client, path, &connection).await {
error!("处理过程遇到错误:{:#}", e);
let Ok(video_sources) = get_enabled_video_sources(&connection).await else {
error!("获取视频源列表失败,等待下一轮执行");
break 'inner;
};
if video_sources.is_empty() {
info!("没有可用的视频源,等待下一轮执行");
break 'inner;
}
for video_source in video_sources {
if let Err(e) = process_video_source(video_source, &bili_client, &connection).await {
error!("处理 {} 时遇到错误:{:#},等待下一轮执行", "test", e);
}
}
info!("本轮任务执行完毕,等待下一轮执行");
}
time::sleep(time::Duration::from_secs(CONFIG.interval)).await;
DOWNLOADER_RUNNING.store(false, std::sync::atomic::Ordering::Relaxed);
time::sleep(time::Duration::from_secs(config.interval)).await;
}
}

View File

@@ -1,15 +1,16 @@
use serde_json::json;
use crate::config::CONFIG;
use crate::config::VersionedConfig;
pub fn video_format_args(video_model: &bili_sync_entity::video::Model) -> serde_json::Value {
let config = VersionedConfig::get().load();
json!({
"bvid": &video_model.bvid,
"title": &video_model.name,
"upper_name": &video_model.upper_name,
"upper_mid": &video_model.upper_id,
"pubtime": &video_model.pubtime.and_utc().format(&CONFIG.time_format).to_string(),
"fav_time": &video_model.favtime.and_utc().format(&CONFIG.time_format).to_string(),
"pubtime": &video_model.pubtime.and_utc().format(&config.time_format).to_string(),
"fav_time": &video_model.favtime.and_utc().format(&config.time_format).to_string(),
})
}
@@ -17,6 +18,7 @@ pub fn page_format_args(
video_model: &bili_sync_entity::video::Model,
page_model: &bili_sync_entity::page::Model,
) -> serde_json::Value {
let config = VersionedConfig::get().load();
json!({
"bvid": &video_model.bvid,
"title": &video_model.name,
@@ -24,7 +26,7 @@ pub fn page_format_args(
"upper_mid": &video_model.upper_id,
"ptitle": &page_model.name,
"pid": page_model.pid,
"pubtime": video_model.pubtime.and_utc().format(&CONFIG.time_format).to_string(),
"fav_time": video_model.favtime.and_utc().format(&CONFIG.time_format).to_string(),
"pubtime": video_model.pubtime.and_utc().format(&config.time_format).to_string(),
"fav_time": video_model.favtime.and_utc().format(&config.time_format).to_string(),
})
}

View File

@@ -1,11 +1,13 @@
use anyhow::{Context, Result};
use anyhow::{Context, Result, anyhow};
use bili_sync_entity::*;
use sea_orm::DatabaseTransaction;
use sea_orm::ActiveValue::Set;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::{OnConflict, SimpleExpr};
use sea_orm::{DatabaseTransaction, TransactionTrait};
use crate::adapter::{VideoSource, VideoSourceEnum};
use crate::bilibili::{PageInfo, VideoInfo};
use crate::config::{Config, LegacyConfig};
use crate::utils::status::STATUS_COMPLETED;
/// 筛选未填充的视频
@@ -117,3 +119,123 @@ pub async fn update_pages_model(pages: Vec<page::ActiveModel>, connection: &Data
query.exec(connection).await?;
Ok(())
}
/// 获取所有已经启用的视频源
pub async fn get_enabled_video_sources(connection: &DatabaseConnection) -> Result<Vec<VideoSourceEnum>> {
let (favorite, watch_later, submission, collection) = tokio::try_join!(
favorite::Entity::find()
.filter(favorite::Column::Enabled.eq(true))
.all(connection),
watch_later::Entity::find()
.filter(watch_later::Column::Enabled.eq(true))
.all(connection),
submission::Entity::find()
.filter(submission::Column::Enabled.eq(true))
.all(connection),
collection::Entity::find()
.filter(collection::Column::Enabled.eq(true))
.all(connection),
)?;
let mut sources = Vec::with_capacity(favorite.len() + watch_later.len() + submission.len() + collection.len());
sources.extend(favorite.into_iter().map(VideoSourceEnum::from));
sources.extend(watch_later.into_iter().map(VideoSourceEnum::from));
sources.extend(submission.into_iter().map(VideoSourceEnum::from));
sources.extend(collection.into_iter().map(VideoSourceEnum::from));
Ok(sources)
}
/// 从数据库中加载配置
pub async fn load_db_config(connection: &DatabaseConnection) -> Result<Option<Result<Config>>> {
Ok(bili_sync_entity::config::Entity::find_by_id(1)
.one(connection)
.await?
.map(|model| {
serde_json::from_str(&model.data).map_err(|e| anyhow!("Failed to deserialize config data: {}", e))
}))
}
/// 保存配置到数据库
pub async fn save_db_config(config: &Config, connection: &DatabaseConnection) -> Result<()> {
let data = serde_json::to_string(config).context("Failed to serialize config data")?;
let model = bili_sync_entity::config::ActiveModel {
id: Set(1),
data: Set(data),
..Default::default()
};
bili_sync_entity::config::Entity::insert(model)
.on_conflict(
OnConflict::column(bili_sync_entity::config::Column::Id)
.update_column(bili_sync_entity::config::Column::Data)
.to_owned(),
)
.exec(connection)
.await
.context("Failed to save config to database")?;
Ok(())
}
/// 迁移旧版本配置(即将所有相关联的内容设置为 enabled
pub async fn migrate_legacy_config(config: &LegacyConfig, connection: &DatabaseConnection) -> Result<()> {
let transaction = connection.begin().await.context("Failed to begin transaction")?;
tokio::try_join!(
migrate_favorite(config, &transaction),
migrate_watch_later(config, &transaction),
migrate_submission(config, &transaction),
migrate_collection(config, &transaction)
)?;
transaction.commit().await.context("Failed to commit transaction")?;
Ok(())
}
async fn migrate_favorite(config: &LegacyConfig, connection: &DatabaseTransaction) -> Result<()> {
favorite::Entity::update_many()
.filter(favorite::Column::FId.is_in(config.favorite_list.keys().collect::<Vec<_>>()))
.col_expr(favorite::Column::Enabled, Expr::value(true))
.exec(connection)
.await
.context("Failed to migrate favorite config")?;
Ok(())
}
async fn migrate_watch_later(config: &LegacyConfig, connection: &DatabaseTransaction) -> Result<()> {
if config.watch_later.enabled {
watch_later::Entity::update_many()
.col_expr(watch_later::Column::Enabled, Expr::value(true))
.exec(connection)
.await
.context("Failed to migrate watch later config")?;
}
Ok(())
}
async fn migrate_submission(config: &LegacyConfig, connection: &DatabaseTransaction) -> Result<()> {
submission::Entity::update_many()
.filter(submission::Column::UpperId.is_in(config.submission_list.keys().collect::<Vec<_>>()))
.col_expr(submission::Column::Enabled, Expr::value(true))
.exec(connection)
.await
.context("Failed to migrate submission config")?;
Ok(())
}
async fn migrate_collection(config: &LegacyConfig, connection: &DatabaseTransaction) -> Result<()> {
let tuples: Vec<(i64, i64, i32)> = config
.collection_list
.keys()
.filter_map(|key| Some((key.sid.parse().ok()?, key.mid.parse().ok()?, key.collection_type.into())))
.collect();
collection::Entity::update_many()
.filter(
Expr::tuple([
Expr::column(collection::Column::SId),
Expr::column(collection::Column::MId),
Expr::column(collection::Column::Type),
])
.in_tuples(tuples),
)
.col_expr(collection::Column::Enabled, Expr::value(true))
.exec(connection)
.await
.context("Failed to migrate collection config")?;
Ok(())
}

View File

@@ -6,7 +6,7 @@ use quick_xml::events::{BytesCData, BytesText};
use quick_xml::writer::Writer;
use tokio::io::{AsyncWriteExt, BufWriter};
use crate::config::{CONFIG, NFOTimeType};
use crate::config::{NFOTimeType, VersionedConfig};
#[allow(clippy::upper_case_acronyms)]
pub enum NFO<'a> {
@@ -339,7 +339,7 @@ impl<'a> From<&'a video::Model> for Movie<'a> {
bvid: &video.bvid,
upper_id: video.upper_id,
upper_name: &video.upper_name,
aired: match CONFIG.nfo_time_type {
aired: match VersionedConfig::get().load().nfo_time_type {
NFOTimeType::FavTime => video.favtime,
NFOTimeType::PubTime => video.pubtime,
},
@@ -359,7 +359,7 @@ impl<'a> From<&'a video::Model> for TVShow<'a> {
bvid: &video.bvid,
upper_id: video.upper_id,
upper_name: &video.upper_name,
aired: match CONFIG.nfo_time_type {
aired: match VersionedConfig::get().load().nfo_time_type {
NFOTimeType::FavTime => video.favtime,
NFOTimeType::PubTime => video.pubtime,
},

View File

@@ -12,9 +12,9 @@ use sea_orm::entity::prelude::*;
use tokio::fs;
use tokio::sync::Semaphore;
use crate::adapter::{Args, VideoSource, VideoSourceEnum, video_source_from};
use crate::adapter::{VideoSource, VideoSourceEnum};
use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Video, VideoInfo};
use crate::config::{ARGS, CONFIG, PathSafeTemplate, TEMPLATE};
use crate::config::{ARGS, PathSafeTemplate, TEMPLATE, VersionedConfig};
use crate::downloader::Downloader;
use crate::error::{DownloadAbortError, ExecutionStatus, ProcessPageError};
use crate::utils::format_arg::{page_format_args, video_format_args};
@@ -27,13 +27,12 @@ use crate::utils::status::{PageStatus, STATUS_OK, VideoStatus};
/// 完整地处理某个视频来源
pub async fn process_video_source(
args: Args<'_>,
video_source: VideoSourceEnum,
bili_client: &BiliClient,
path: &Path,
connection: &DatabaseConnection,
) -> Result<()> {
// 从参数中获取视频列表的 Model 与视频流
let (video_source, video_streams) = video_source_from(args, path, bili_client, connection).await?;
let (video_source, video_streams) = video_source.refresh(bili_client, connection).await?;
// 从视频流中获取新视频的简要信息,写入数据库
refresh_video_source(&video_source, video_streams, connection).await?;
// 单独请求视频详情接口,获取视频的详情信息与所有的分页,写入数据库
@@ -154,7 +153,7 @@ pub async fn download_unprocessed_videos(
connection: &DatabaseConnection,
) -> Result<()> {
video_source.log_download_video_start();
let semaphore = Semaphore::new(CONFIG.concurrent_limit.video);
let semaphore = Semaphore::new(VersionedConfig::get().load().concurrent_limit.video);
let downloader = Downloader::new(bili_client.client.clone());
let unhandled_videos_pages = filter_unhandled_video_pages(video_source.filter_expr(), connection).await?;
let mut assigned_upper = HashSet::new();
@@ -215,11 +214,14 @@ pub async fn download_video_pages(
let _permit = semaphore.acquire().await.context("acquire semaphore failed")?;
let mut status = VideoStatus::from(video_model.download_status);
let separate_status = status.should_run();
let base_path = video_source
.path()
.join(TEMPLATE.path_safe_render("video", &video_format_args(&video_model))?);
let base_path = video_source.path().join(
TEMPLATE
.load()
.path_safe_render("video", &video_format_args(&video_model))?,
);
let upper_id = video_model.upper_id.to_string();
let base_upper_path = &CONFIG
let base_upper_path = VersionedConfig::get()
.load()
.upper_path
.join(upper_id.chars().next().context("upper_id is empty")?.to_string())
.join(upper_id);
@@ -311,7 +313,7 @@ pub async fn dispatch_download_page(
if !should_run {
return Ok(ExecutionStatus::Skipped);
}
let child_semaphore = Semaphore::new(CONFIG.concurrent_limit.page);
let child_semaphore = Semaphore::new(VersionedConfig::get().load().concurrent_limit.page);
let tasks = pages
.into_iter()
.map(|page_model| {
@@ -377,7 +379,9 @@ pub async fn download_page(
let mut status = PageStatus::from(page_model.download_status);
let separate_status = status.should_run();
let is_single_page = video_model.single_page.context("single_page is null")?;
let base_name = TEMPLATE.path_safe_render("page", &page_format_args(video_model, &page_model))?;
let base_name = TEMPLATE
.load()
.path_safe_render("page", &page_format_args(video_model, &page_model))?;
let (poster_path, video_path, nfo_path, danmaku_path, fanart_path, subtitle_path) = if is_single_page {
(
base_path.join(format!("{}-poster.jpg", &base_name)),
@@ -532,7 +536,7 @@ pub async fn fetch_page_video(
let streams = bili_video
.get_page_analyzer(page_info)
.await?
.best_stream(&CONFIG.filter_option)?;
.best_stream(&VersionedConfig::get().load().filter_option)?;
match streams {
BestStream::Mixed(mix_stream) => downloader.fetch_with_fallback(&mix_stream.urls(), page_path).await?,
BestStream::VideoAudio {
@@ -686,76 +690,3 @@ async fn generate_nfo(nfo: NFO<'_>, nfo_path: PathBuf) -> Result<()> {
fs::write(nfo_path, nfo.generate_nfo().await?.as_bytes()).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use handlebars::handlebars_helper;
use serde_json::json;
use super::*;
#[test]
fn test_template_usage() {
let mut template = handlebars::Handlebars::new();
handlebars_helper!(truncate: |s: String, len: usize| {
if s.chars().count() > len {
s.chars().take(len).collect::<String>()
} else {
s.to_string()
}
});
template.register_helper("truncate", Box::new(truncate));
let _ = template.path_safe_register("video", "test{{bvid}}test");
let _ = template.path_safe_register("test_truncate", "哈哈,{{ truncate title 30 }}");
let _ = template.path_safe_register("test_path_unix", "{{ truncate title 7 }}/test/a");
let _ = template.path_safe_register("test_path_windows", r"{{ truncate title 7 }}\\test\\a");
#[cfg(not(windows))]
{
assert_eq!(
template
.path_safe_render("test_path_unix", &json!({"title": "关注/永雏塔菲喵"}))
.unwrap(),
"关注_永雏塔菲/test/a"
);
assert_eq!(
template
.path_safe_render("test_path_windows", &json!({"title": "关注/永雏塔菲喵"}))
.unwrap(),
"关注_永雏塔菲_test_a"
);
}
#[cfg(windows)]
{
assert_eq!(
template
.path_safe_render("test_path_unix", &json!({"title": "关注/永雏塔菲喵"}))
.unwrap(),
"关注_永雏塔菲_test_a"
);
assert_eq!(
template
.path_safe_render("test_path_windows", &json!({"title": "关注/永雏塔菲喵"}))
.unwrap(),
r"关注_永雏塔菲\\test\\a"
);
}
assert_eq!(
template
.path_safe_render("video", &json!({"bvid": "BV1b5411h7g7"}))
.unwrap(),
"testBV1b5411h7g7test"
);
assert_eq!(
template
.path_safe_render(
"test_truncate",
&json!({"title": "你说得对,但是 Rust 是由 Mozilla 自主研发的一款全新的编译期格斗游戏。\
编译将发生在一个被称作「Cargo」的构建系统中。在这里被引用的指针将被授予「生命周期」之力导引对象安全。\
你将扮演一位名为「Rustacean」的神秘角色, 在与「Rustc」的搏斗中邂逅各种骨骼惊奇的傲娇报错。\
征服她们、通过编译同时逐步发掘「C++」程序崩溃的真相。"})
)
.unwrap(),
"哈哈,你说得对,但是 Rust 是由 Mozilla 自主研发的一"
);
}
}

View File

@@ -0,0 +1,17 @@
//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.15
use sea_orm::entity::prelude::*;
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
#[sea_orm(table_name = "config")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
pub data: String,
pub created_at: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}

View File

@@ -3,6 +3,7 @@
pub mod prelude;
pub mod collection;
pub mod config;
pub mod favorite;
pub mod page;
pub mod submission;

View File

@@ -6,6 +6,7 @@ mod m20240709_130914_watch_later;
mod m20240724_161008_submission;
mod m20250122_062926_add_latest_row_at;
mod m20250612_090826_add_enabled;
mod m20250613_043257_add_config;
pub struct Migrator;
@@ -19,6 +20,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240724_161008_submission::Migration),
Box::new(m20250122_062926_add_latest_row_at::Migration),
Box::new(m20250612_090826_add_enabled::Migration),
Box::new(m20250613_043257_add_config::Migration),
]
}
}

View File

@@ -0,0 +1,44 @@
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Config::Table)
.if_not_exists()
.col(
ColumnDef::new(Config::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Config::Data).text().not_null())
.col(
ColumnDef::new(Config::CreatedAt)
.timestamp()
.default(Expr::current_timestamp())
.not_null(),
)
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager.drop_table(Table::drop().table(Config::Table).to_owned()).await
}
}
#[derive(DeriveIden)]
enum Config {
Table,
Id,
Data,
CreatedAt,
}