feat: 优化对全局配置的处理,调整下载路径填充逻辑 (#523)
This commit is contained in:
@@ -13,7 +13,7 @@ use sea_orm::sea_query::SimpleExpr;
|
||||
use sea_orm::{DatabaseConnection, Unchanged};
|
||||
|
||||
use crate::adapter::{_ActiveModel, VideoSource, VideoSourceEnum};
|
||||
use crate::bilibili::{BiliClient, Collection, CollectionItem, CollectionType, VideoInfo};
|
||||
use crate::bilibili::{BiliClient, Collection, CollectionItem, CollectionType, Credential, VideoInfo};
|
||||
|
||||
impl VideoSource for collection::Model {
|
||||
fn display_name(&self) -> Cow<'static, str> {
|
||||
@@ -77,6 +77,7 @@ impl VideoSource for collection::Model {
|
||||
async fn refresh<'a>(
|
||||
self,
|
||||
bili_client: &'a BiliClient,
|
||||
credential: &'a Credential,
|
||||
connection: &'a DatabaseConnection,
|
||||
) -> Result<(
|
||||
VideoSourceEnum,
|
||||
@@ -89,6 +90,7 @@ impl VideoSource for collection::Model {
|
||||
mid: self.m_id.to_string(),
|
||||
collection_type: CollectionType::from_expected(self.r#type),
|
||||
},
|
||||
credential,
|
||||
);
|
||||
let collection_info = collection.get_info().await?;
|
||||
ensure!(
|
||||
|
||||
@@ -12,7 +12,7 @@ use sea_orm::sea_query::SimpleExpr;
|
||||
use sea_orm::{DatabaseConnection, Unchanged};
|
||||
|
||||
use crate::adapter::{_ActiveModel, VideoSource, VideoSourceEnum};
|
||||
use crate::bilibili::{BiliClient, FavoriteList, VideoInfo};
|
||||
use crate::bilibili::{BiliClient, Credential, FavoriteList, VideoInfo};
|
||||
|
||||
impl VideoSource for favorite::Model {
|
||||
fn display_name(&self) -> Cow<'static, str> {
|
||||
@@ -50,12 +50,13 @@ impl VideoSource for favorite::Model {
|
||||
async fn refresh<'a>(
|
||||
self,
|
||||
bili_client: &'a BiliClient,
|
||||
credential: &'a Credential,
|
||||
connection: &'a DatabaseConnection,
|
||||
) -> Result<(
|
||||
VideoSourceEnum,
|
||||
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + Send + 'a>>,
|
||||
)> {
|
||||
let favorite = FavoriteList::new(bili_client, self.f_id.to_string());
|
||||
let favorite = FavoriteList::new(bili_client, self.f_id.to_string(), credential);
|
||||
let favorite_info = favorite.get_info().await?;
|
||||
ensure!(
|
||||
favorite_info.id == self.f_id,
|
||||
|
||||
@@ -23,7 +23,7 @@ use bili_sync_entity::rule::Rule;
|
||||
use bili_sync_entity::submission::Model as Submission;
|
||||
use bili_sync_entity::watch_later::Model as WatchLater;
|
||||
|
||||
use crate::bilibili::{BiliClient, VideoInfo};
|
||||
use crate::bilibili::{BiliClient, Credential, VideoInfo};
|
||||
|
||||
#[enum_dispatch]
|
||||
pub enum VideoSourceEnum {
|
||||
@@ -104,6 +104,7 @@ pub trait VideoSource {
|
||||
async fn refresh<'a>(
|
||||
self,
|
||||
bili_client: &'a BiliClient,
|
||||
credential: &'a Credential,
|
||||
connection: &'a DatabaseConnection,
|
||||
) -> Result<(
|
||||
VideoSourceEnum,
|
||||
|
||||
@@ -11,7 +11,7 @@ use sea_orm::sea_query::SimpleExpr;
|
||||
use sea_orm::{DatabaseConnection, Unchanged};
|
||||
|
||||
use crate::adapter::{_ActiveModel, VideoSource, VideoSourceEnum};
|
||||
use crate::bilibili::{BiliClient, Dynamic, Submission, VideoInfo};
|
||||
use crate::bilibili::{BiliClient, Credential, Dynamic, Submission, VideoInfo};
|
||||
|
||||
impl VideoSource for submission::Model {
|
||||
fn display_name(&self) -> std::borrow::Cow<'static, str> {
|
||||
@@ -85,12 +85,13 @@ impl VideoSource for submission::Model {
|
||||
async fn refresh<'a>(
|
||||
self,
|
||||
bili_client: &'a BiliClient,
|
||||
credential: &'a Credential,
|
||||
connection: &'a DatabaseConnection,
|
||||
) -> Result<(
|
||||
VideoSourceEnum,
|
||||
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + Send + 'a>>,
|
||||
)> {
|
||||
let submission = Submission::new(bili_client, self.upper_id.to_string());
|
||||
let submission = Submission::new(bili_client, self.upper_id.to_string(), credential);
|
||||
let upper = submission.get_info().await?;
|
||||
ensure!(
|
||||
upper.mid == submission.upper_id,
|
||||
|
||||
@@ -11,7 +11,7 @@ use sea_orm::sea_query::SimpleExpr;
|
||||
use sea_orm::{DatabaseConnection, Unchanged};
|
||||
|
||||
use crate::adapter::{_ActiveModel, VideoSource, VideoSourceEnum};
|
||||
use crate::bilibili::{BiliClient, VideoInfo, WatchLater};
|
||||
use crate::bilibili::{BiliClient, Credential, VideoInfo, WatchLater};
|
||||
|
||||
impl VideoSource for watch_later::Model {
|
||||
fn display_name(&self) -> std::borrow::Cow<'static, str> {
|
||||
@@ -49,12 +49,13 @@ impl VideoSource for watch_later::Model {
|
||||
async fn refresh<'a>(
|
||||
self,
|
||||
bili_client: &'a BiliClient,
|
||||
credential: &'a Credential,
|
||||
_connection: &'a DatabaseConnection,
|
||||
) -> Result<(
|
||||
VideoSourceEnum,
|
||||
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + Send + 'a>>,
|
||||
)> {
|
||||
let watch_later = WatchLater::new(bili_client);
|
||||
let watch_later = WatchLater::new(bili_client, credential);
|
||||
Ok((self.into(), Box::pin(watch_later.into_video_stream())))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,10 +6,8 @@ use axum::extract::Extension;
|
||||
use axum::routing::get;
|
||||
use sea_orm::DatabaseConnection;
|
||||
|
||||
use crate::api::error::InnerApiError;
|
||||
use crate::api::wrapper::{ApiError, ApiResponse, ValidatedJson};
|
||||
use crate::config::{Config, VersionedConfig};
|
||||
use crate::utils::task_notifier::TASK_STATUS_NOTIFIER;
|
||||
|
||||
pub(super) fn router() -> Router {
|
||||
Router::new().route("/config", get(get_config).put(update_config))
|
||||
@@ -17,7 +15,7 @@ pub(super) fn router() -> Router {
|
||||
|
||||
/// 获取全局配置
|
||||
pub async fn get_config() -> Result<ApiResponse<Arc<Config>>, ApiError> {
|
||||
Ok(ApiResponse::ok(VersionedConfig::get().load_full()))
|
||||
Ok(ApiResponse::ok(VersionedConfig::get().snapshot()))
|
||||
}
|
||||
|
||||
/// 更新全局配置
|
||||
@@ -25,12 +23,7 @@ pub async fn update_config(
|
||||
Extension(db): Extension<DatabaseConnection>,
|
||||
ValidatedJson(config): ValidatedJson<Config>,
|
||||
) -> Result<ApiResponse<Arc<Config>>, ApiError> {
|
||||
let Some(_lock) = TASK_STATUS_NOTIFIER.detect_running() else {
|
||||
// 简单避免一下可能的不一致现象
|
||||
return Err(InnerApiError::BadRequest("下载任务正在运行,无法修改配置".to_string()).into());
|
||||
};
|
||||
config.check()?;
|
||||
let new_config = VersionedConfig::get().update(config, &db).await?;
|
||||
drop(_lock);
|
||||
Ok(ApiResponse::ok(new_config))
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::api::response::{
|
||||
};
|
||||
use crate::api::wrapper::{ApiError, ApiResponse};
|
||||
use crate::bilibili::{BiliClient, Me};
|
||||
use crate::config::VersionedConfig;
|
||||
|
||||
pub(super) fn router() -> Router {
|
||||
Router::new()
|
||||
@@ -28,7 +29,8 @@ pub async fn get_created_favorites(
|
||||
Extension(db): Extension<DatabaseConnection>,
|
||||
Extension(bili_client): Extension<Arc<BiliClient>>,
|
||||
) -> Result<ApiResponse<FavoritesResponse>, ApiError> {
|
||||
let me = Me::new(bili_client.as_ref());
|
||||
let credential = &VersionedConfig::get().read().credential;
|
||||
let me = Me::new(bili_client.as_ref(), credential);
|
||||
let bili_favorites = me.get_created_favorites().await?;
|
||||
|
||||
let favorites = if let Some(bili_favorites) = bili_favorites {
|
||||
@@ -68,7 +70,8 @@ pub async fn get_followed_collections(
|
||||
Extension(bili_client): Extension<Arc<BiliClient>>,
|
||||
Query(params): Query<FollowedCollectionsRequest>,
|
||||
) -> Result<ApiResponse<CollectionsResponse>, ApiError> {
|
||||
let me = Me::new(bili_client.as_ref());
|
||||
let credential = &VersionedConfig::get().read().credential;
|
||||
let me = Me::new(bili_client.as_ref(), credential);
|
||||
let (page_num, page_size) = (params.page_num.unwrap_or(1), params.page_size.unwrap_or(50));
|
||||
let bili_collections = me.get_followed_collections(page_num, page_size).await?;
|
||||
|
||||
@@ -110,7 +113,8 @@ pub async fn get_followed_uppers(
|
||||
Extension(bili_client): Extension<Arc<BiliClient>>,
|
||||
Query(params): Query<FollowedUppersRequest>,
|
||||
) -> Result<ApiResponse<UppersResponse>, ApiError> {
|
||||
let me = Me::new(bili_client.as_ref());
|
||||
let credential = &VersionedConfig::get().read().credential;
|
||||
let me = Me::new(bili_client.as_ref(), credential);
|
||||
let (page_num, page_size) = (params.page_num.unwrap_or(1), params.page_size.unwrap_or(20));
|
||||
let bili_uppers = me.get_followed_uppers(page_num, page_size).await?;
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ pub fn router() -> Router {
|
||||
|
||||
/// 中间件:使用 auth token 对请求进行身份验证
|
||||
pub async fn auth(mut headers: HeaderMap, request: Request, next: Next) -> Result<Response, StatusCode> {
|
||||
let config = VersionedConfig::get().load();
|
||||
let config = VersionedConfig::get().read();
|
||||
let token = config.auth_token.as_str();
|
||||
if headers
|
||||
.get("Authorization")
|
||||
|
||||
@@ -22,7 +22,7 @@ use crate::api::response::{
|
||||
};
|
||||
use crate::api::wrapper::{ApiError, ApiResponse, ValidatedJson};
|
||||
use crate::bilibili::{BiliClient, Collection, CollectionItem, FavoriteList, Submission};
|
||||
use crate::config::{PathSafeTemplate, TEMPLATE};
|
||||
use crate::config::{PathSafeTemplate, TEMPLATE, VersionedConfig};
|
||||
use crate::utils::rule::FieldEvaluatable;
|
||||
|
||||
pub(super) fn router() -> Router {
|
||||
@@ -170,8 +170,10 @@ pub async fn get_video_sources_default_path(
|
||||
"submissions" => "submission_default_path",
|
||||
_ => return Err(InnerApiError::BadRequest("Invalid video source type".to_string()).into()),
|
||||
};
|
||||
let (template, params) = (TEMPLATE.load(), serde_json::to_value(params)?);
|
||||
Ok(ApiResponse::ok(template.path_safe_render(template_name, ¶ms)?))
|
||||
let template = TEMPLATE.read();
|
||||
Ok(ApiResponse::ok(
|
||||
template.path_safe_render(template_name, &serde_json::to_value(params)?)?,
|
||||
))
|
||||
}
|
||||
|
||||
/// 更新视频来源
|
||||
@@ -323,7 +325,8 @@ pub async fn insert_favorite(
|
||||
Extension(bili_client): Extension<Arc<BiliClient>>,
|
||||
ValidatedJson(request): ValidatedJson<InsertFavoriteRequest>,
|
||||
) -> Result<ApiResponse<bool>, ApiError> {
|
||||
let favorite = FavoriteList::new(bili_client.as_ref(), request.fid.to_string());
|
||||
let credential = &VersionedConfig::get().read().credential;
|
||||
let favorite = FavoriteList::new(bili_client.as_ref(), request.fid.to_string(), credential);
|
||||
let favorite_info = favorite.get_info().await?;
|
||||
favorite::Entity::insert(favorite::ActiveModel {
|
||||
f_id: Set(favorite_info.id),
|
||||
@@ -343,6 +346,7 @@ pub async fn insert_collection(
|
||||
Extension(bili_client): Extension<Arc<BiliClient>>,
|
||||
ValidatedJson(request): ValidatedJson<InsertCollectionRequest>,
|
||||
) -> Result<ApiResponse<bool>, ApiError> {
|
||||
let credential = &VersionedConfig::get().read().credential;
|
||||
let collection = Collection::new(
|
||||
bili_client.as_ref(),
|
||||
CollectionItem {
|
||||
@@ -350,6 +354,7 @@ pub async fn insert_collection(
|
||||
mid: request.mid.to_string(),
|
||||
collection_type: request.collection_type,
|
||||
},
|
||||
credential,
|
||||
);
|
||||
let collection_info = collection.get_info().await?;
|
||||
collection::Entity::insert(collection::ActiveModel {
|
||||
@@ -373,7 +378,8 @@ pub async fn insert_submission(
|
||||
Extension(bili_client): Extension<Arc<BiliClient>>,
|
||||
ValidatedJson(request): ValidatedJson<InsertSubmissionRequest>,
|
||||
) -> Result<ApiResponse<bool>, ApiError> {
|
||||
let submission = Submission::new(bili_client.as_ref(), request.upper_id.to_string());
|
||||
let credential = &VersionedConfig::get().read().credential;
|
||||
let submission = Submission::new(bili_client.as_ref(), request.upper_id.to_string(), credential);
|
||||
let upper = submission.get_info().await?;
|
||||
submission::Entity::insert(submission::ActiveModel {
|
||||
upper_id: Set(upper.mid.parse()?),
|
||||
|
||||
@@ -2,7 +2,6 @@ use anyhow::{Context, Result, bail};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::bilibili::error::BiliError;
|
||||
use crate::config::VersionedConfig;
|
||||
|
||||
pub struct PageAnalyzer {
|
||||
info: serde_json::Value,
|
||||
@@ -131,14 +130,14 @@ pub enum Stream {
|
||||
|
||||
// 通用的获取流链接的方法,交由 Downloader 使用
|
||||
impl Stream {
|
||||
pub fn urls(&self) -> Vec<&str> {
|
||||
pub fn urls(&self, enable_cdn_sorting: bool) -> Vec<&str> {
|
||||
match self {
|
||||
Self::Flv(url) | Self::Html5Mp4(url) | Self::EpisodeTryMp4(url) => vec![url],
|
||||
Self::DashVideo { url, backup_url, .. } | Self::DashAudio { url, backup_url, .. } => {
|
||||
let mut urls = std::iter::once(url.as_str())
|
||||
.chain(backup_url.iter().map(|s| s.as_str()))
|
||||
.collect::<Vec<_>>();
|
||||
if VersionedConfig::get().load().cdn_sorting {
|
||||
if enable_cdn_sorting {
|
||||
urls.sort_by_key(|u| {
|
||||
if u.contains("upos-") {
|
||||
0 // 服务商 cdn
|
||||
@@ -424,16 +423,17 @@ mod tests {
|
||||
Some(AudioQuality::Quality192k),
|
||||
),
|
||||
];
|
||||
let config = VersionedConfig::get().read();
|
||||
for (bvid, video_quality, video_codec, audio_quality) in testcases.into_iter() {
|
||||
let client = BiliClient::new();
|
||||
let video = Video::new(&client, bvid.to_owned());
|
||||
let video = Video::new(&client, bvid.to_owned(), &config.credential);
|
||||
let pages = video.get_pages().await.expect("failed to get pages");
|
||||
let first_page = pages.into_iter().next().expect("no page found");
|
||||
let best_stream = video
|
||||
.get_page_analyzer(&first_page)
|
||||
.await
|
||||
.expect("failed to get page analyzer")
|
||||
.best_stream(&VersionedConfig::get().load().filter_option)
|
||||
.best_stream(&config.filter_option)
|
||||
.expect("failed to get best stream");
|
||||
dbg!(bvid, &best_stream);
|
||||
match best_stream {
|
||||
@@ -469,7 +469,7 @@ mod tests {
|
||||
codecs: VideoCodecs::AVC,
|
||||
};
|
||||
assert_eq!(
|
||||
stream.urls(),
|
||||
stream.urls(true),
|
||||
vec![
|
||||
"https://upos-sz-mirrorcos.bilivideo.com",
|
||||
"https://cn-tj-cu-01-11.bilivideo.com",
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use anyhow::{Result, bail};
|
||||
use leaky_bucket::RateLimiter;
|
||||
use reqwest::{Method, header};
|
||||
use sea_orm::DatabaseConnection;
|
||||
use ua_generator::ua;
|
||||
|
||||
use crate::bilibili::Credential;
|
||||
use crate::bilibili::credential::WbiImg;
|
||||
use crate::config::{RateLimit, VersionedCache, VersionedConfig};
|
||||
use crate::config::{RateLimit, VersionedCache};
|
||||
|
||||
// 一个对 reqwest::Client 的简单封装,用于 Bilibili 请求
|
||||
#[derive(Clone)]
|
||||
@@ -60,56 +60,78 @@ impl Default for Client {
|
||||
}
|
||||
}
|
||||
|
||||
enum Limiter {
|
||||
Latest(VersionedCache<Option<RateLimiter>>),
|
||||
Snapshot(Arc<Option<RateLimiter>>),
|
||||
}
|
||||
|
||||
pub struct BiliClient {
|
||||
pub client: Client,
|
||||
limiter: VersionedCache<Option<RateLimiter>>,
|
||||
limiter: Limiter,
|
||||
}
|
||||
|
||||
impl BiliClient {
|
||||
pub fn new() -> Self {
|
||||
let client = Client::new();
|
||||
let limiter = VersionedCache::new(|config| {
|
||||
Ok(config
|
||||
.concurrent_limit
|
||||
.rate_limit
|
||||
.as_ref()
|
||||
.map(|RateLimit { limit, duration }| {
|
||||
RateLimiter::builder()
|
||||
.initial(*limit)
|
||||
.refill(*limit)
|
||||
.max(*limit)
|
||||
.interval(Duration::from_millis(*duration))
|
||||
.build()
|
||||
}))
|
||||
})
|
||||
.expect("failed to create rate limiter");
|
||||
let limiter = Limiter::Latest(
|
||||
VersionedCache::new(|config| {
|
||||
Ok(config
|
||||
.concurrent_limit
|
||||
.rate_limit
|
||||
.as_ref()
|
||||
.map(|RateLimit { limit, duration }| {
|
||||
RateLimiter::builder()
|
||||
.initial(*limit)
|
||||
.refill(*limit)
|
||||
.max(*limit)
|
||||
.interval(Duration::from_millis(*duration))
|
||||
.build()
|
||||
}))
|
||||
})
|
||||
.expect("failed to create rate limiter"),
|
||||
);
|
||||
Self { client, limiter }
|
||||
}
|
||||
|
||||
/// 获取当前 BiliClient 的快照,快照中的限流器固定不变
|
||||
pub fn snapshot(&self) -> Result<Self> {
|
||||
let Limiter::Latest(inner) = &self.limiter else {
|
||||
// 语法上没问题,但语义上不允许对快照进行快照
|
||||
bail!("cannot snapshot a snapshot BiliClient");
|
||||
};
|
||||
Ok(Self {
|
||||
client: self.client.clone(),
|
||||
limiter: Limiter::Snapshot(inner.snapshot()),
|
||||
})
|
||||
}
|
||||
|
||||
/// 获取一个预构建的请求,通过该方法获取请求时会检查并等待速率限制
|
||||
pub async fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
|
||||
if let Some(limiter) = self.limiter.load().as_ref() {
|
||||
limiter.acquire_one().await;
|
||||
pub async fn request(&self, method: Method, url: &str, credential: &Credential) -> reqwest::RequestBuilder {
|
||||
match &self.limiter {
|
||||
Limiter::Latest(inner) => {
|
||||
if let Some(limiter) = inner.read().as_ref() {
|
||||
limiter.acquire_one().await;
|
||||
}
|
||||
}
|
||||
Limiter::Snapshot(inner) => {
|
||||
if let Some(limiter) = inner.as_ref() {
|
||||
limiter.acquire_one().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
let credential = &VersionedConfig::get().load().credential;
|
||||
self.client.request(method, url, Some(credential))
|
||||
}
|
||||
|
||||
pub async fn check_refresh(&self, connection: &DatabaseConnection) -> Result<()> {
|
||||
let credential = &VersionedConfig::get().load().credential;
|
||||
/// 检查并刷新 Credential,不需要刷新返回 Ok(None),需要刷新返回 Ok(Some(new_credential))
|
||||
pub async fn check_refresh(&self, credential: &Credential) -> Result<Option<Credential>> {
|
||||
if !credential.need_refresh(&self.client).await? {
|
||||
return Ok(());
|
||||
return Ok(None);
|
||||
}
|
||||
let new_credential = credential.refresh(&self.client).await?;
|
||||
VersionedConfig::get()
|
||||
.update_credential(new_credential, connection)
|
||||
.await?;
|
||||
Ok(())
|
||||
Ok(Some(credential.refresh(&self.client).await?))
|
||||
}
|
||||
|
||||
/// 获取 wbi img,用于生成请求签名
|
||||
pub async fn wbi_img(&self) -> Result<WbiImg> {
|
||||
let credential = &VersionedConfig::get().load().credential;
|
||||
pub async fn wbi_img(&self, credential: &Credential) -> Result<WbiImg> {
|
||||
credential.wbi_img(&self.client).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ use reqwest::Method;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::bilibili::{BiliClient, Validate, VideoInfo};
|
||||
use crate::bilibili::{BiliClient, Credential, Validate, VideoInfo};
|
||||
|
||||
#[derive(PartialEq, Eq, Hash, Clone, Debug, Default, Copy)]
|
||||
pub enum CollectionType {
|
||||
@@ -73,6 +73,7 @@ pub struct CollectionItem {
|
||||
pub struct Collection<'a> {
|
||||
client: &'a BiliClient,
|
||||
pub collection: CollectionItem,
|
||||
credential: &'a Credential,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
@@ -111,8 +112,12 @@ impl<'de> Deserialize<'de> for CollectionInfo {
|
||||
}
|
||||
|
||||
impl<'a> Collection<'a> {
|
||||
pub fn new(client: &'a BiliClient, collection: CollectionItem) -> Self {
|
||||
Self { client, collection }
|
||||
pub fn new(client: &'a BiliClient, collection: CollectionItem, credential: &'a Credential) -> Self {
|
||||
Self {
|
||||
client,
|
||||
collection,
|
||||
credential,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_info(&self) -> Result<CollectionInfo> {
|
||||
@@ -126,7 +131,7 @@ impl<'a> Collection<'a> {
|
||||
|
||||
async fn get_series_info(&self) -> Result<Value> {
|
||||
self.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/series/series")
|
||||
.request(Method::GET, "https://api.bilibili.com/x/series/series", self.credential)
|
||||
.await
|
||||
.query(&[("series_id", self.collection.sid.as_str())])
|
||||
.send()
|
||||
@@ -141,7 +146,11 @@ impl<'a> Collection<'a> {
|
||||
let req = match self.collection.collection_type {
|
||||
CollectionType::Series => self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/series/archives")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/series/archives",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("pn", page)])
|
||||
.query(&[
|
||||
@@ -156,6 +165,7 @@ impl<'a> Collection<'a> {
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/polymer/web-space/seasons_archives_list",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("page_num", page)])
|
||||
|
||||
@@ -3,10 +3,9 @@ use std::path::PathBuf;
|
||||
use anyhow::Result;
|
||||
use tokio::fs::{self, File};
|
||||
|
||||
use crate::bilibili::PageInfo;
|
||||
use crate::bilibili::danmaku::canvas::CanvasConfig;
|
||||
use crate::bilibili::danmaku::{AssWriter, Danmu};
|
||||
use crate::config::VersionedConfig;
|
||||
use crate::bilibili::{DanmakuOption, PageInfo};
|
||||
|
||||
pub struct DanmakuWriter<'a> {
|
||||
page: &'a PageInfo,
|
||||
@@ -18,12 +17,11 @@ impl<'a> DanmakuWriter<'a> {
|
||||
DanmakuWriter { page, danmaku }
|
||||
}
|
||||
|
||||
pub async fn write(self, path: PathBuf) -> Result<()> {
|
||||
pub async fn write(self, path: PathBuf, danmaku_option: &DanmakuOption) -> Result<()> {
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent).await?;
|
||||
}
|
||||
let config = VersionedConfig::get().load_full();
|
||||
let canvas_config = CanvasConfig::new(&config.danmaku_option, self.page);
|
||||
let canvas_config = CanvasConfig::new(danmaku_option, self.page);
|
||||
let mut writer =
|
||||
AssWriter::construct(File::create(path).await?, self.page.name.clone(), canvas_config.clone()).await?;
|
||||
let mut canvas = canvas_config.canvas();
|
||||
|
||||
@@ -6,11 +6,12 @@ use futures::Stream;
|
||||
use reqwest::Method;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::bilibili::{BiliClient, MIXIN_KEY, Validate, VideoInfo, WbiSign};
|
||||
use crate::bilibili::{BiliClient, Credential, MIXIN_KEY, Validate, VideoInfo, WbiSign};
|
||||
|
||||
pub struct Dynamic<'a> {
|
||||
client: &'a BiliClient,
|
||||
pub upper_id: String,
|
||||
credential: &'a Credential,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
@@ -20,8 +21,12 @@ pub struct DynamicItemPublished {
|
||||
}
|
||||
|
||||
impl<'a> Dynamic<'a> {
|
||||
pub fn new(client: &'a BiliClient, upper_id: String) -> Self {
|
||||
Self { client, upper_id }
|
||||
pub fn new(client: &'a BiliClient, upper_id: String, credential: &'a Credential) -> Self {
|
||||
Self {
|
||||
client,
|
||||
upper_id,
|
||||
credential,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_dynamics(&self, offset: Option<String>) -> Result<Value> {
|
||||
@@ -29,6 +34,7 @@ impl<'a> Dynamic<'a> {
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/polymer/web-dynamic/v1/feed/space",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[
|
||||
|
||||
@@ -3,10 +3,11 @@ use async_stream::try_stream;
|
||||
use futures::Stream;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::bilibili::{BiliClient, Validate, VideoInfo};
|
||||
use crate::bilibili::{BiliClient, Credential, Validate, VideoInfo};
|
||||
pub struct FavoriteList<'a> {
|
||||
client: &'a BiliClient,
|
||||
fid: String,
|
||||
credential: &'a Credential,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
@@ -22,14 +23,22 @@ pub struct Upper<T> {
|
||||
pub face: String,
|
||||
}
|
||||
impl<'a> FavoriteList<'a> {
|
||||
pub fn new(client: &'a BiliClient, fid: String) -> Self {
|
||||
Self { client, fid }
|
||||
pub fn new(client: &'a BiliClient, fid: String, credential: &'a Credential) -> Self {
|
||||
Self {
|
||||
client,
|
||||
fid,
|
||||
credential,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_info(&self) -> Result<FavoriteListInfo> {
|
||||
let mut res = self
|
||||
.client
|
||||
.request(reqwest::Method::GET, "https://api.bilibili.com/x/v3/fav/folder/info")
|
||||
.request(
|
||||
reqwest::Method::GET,
|
||||
"https://api.bilibili.com/x/v3/fav/folder/info",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("media_id", &self.fid)])
|
||||
.send()
|
||||
@@ -43,7 +52,11 @@ impl<'a> FavoriteList<'a> {
|
||||
|
||||
async fn get_videos(&self, page: u32) -> Result<Value> {
|
||||
self.client
|
||||
.request(reqwest::Method::GET, "https://api.bilibili.com/x/v3/fav/resource/list")
|
||||
.request(
|
||||
reqwest::Method::GET,
|
||||
"https://api.bilibili.com/x/v3/fav/resource/list",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[
|
||||
("media_id", self.fid.as_str()),
|
||||
|
||||
@@ -1,28 +1,32 @@
|
||||
use anyhow::{Result, ensure};
|
||||
use reqwest::Method;
|
||||
|
||||
use crate::bilibili::{BiliClient, Validate};
|
||||
use crate::config::VersionedConfig;
|
||||
use crate::bilibili::{BiliClient, Credential, Validate};
|
||||
|
||||
pub struct Me<'a> {
|
||||
client: &'a BiliClient,
|
||||
mid: String,
|
||||
credential: &'a Credential,
|
||||
}
|
||||
|
||||
impl<'a> Me<'a> {
|
||||
pub fn new(client: &'a BiliClient) -> Self {
|
||||
Self {
|
||||
client,
|
||||
mid: Self::my_id(),
|
||||
}
|
||||
pub fn new(client: &'a BiliClient, credential: &'a Credential) -> Self {
|
||||
Self { client, credential }
|
||||
}
|
||||
|
||||
pub async fn get_created_favorites(&self) -> Result<Option<Vec<FavoriteItem>>> {
|
||||
ensure!(!self.mid.is_empty(), "未获取到用户 ID,请确保填写设置中的 B 站认证信息");
|
||||
ensure!(
|
||||
!self.mid().is_empty(),
|
||||
"未获取到用户 ID,请确保填写设置中的 B 站认证信息"
|
||||
);
|
||||
let mut resp = self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/v3/fav/folder/created/list-all")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/v3/fav/folder/created/list-all",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("up_mid", &self.mid)])
|
||||
.query(&[("up_mid", &self.mid())])
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?
|
||||
@@ -33,12 +37,19 @@ impl<'a> Me<'a> {
|
||||
}
|
||||
|
||||
pub async fn get_followed_collections(&self, page_num: i32, page_size: i32) -> Result<Collections> {
|
||||
ensure!(!self.mid.is_empty(), "未获取到用户 ID,请确保填写设置中的 B 站认证信息");
|
||||
ensure!(
|
||||
!self.mid().is_empty(),
|
||||
"未获取到用户 ID,请确保填写设置中的 B 站认证信息"
|
||||
);
|
||||
let mut resp = self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/v3/fav/folder/collected/list")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/v3/fav/folder/collected/list",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("up_mid", self.mid.as_str()), ("platform", "web")])
|
||||
.query(&[("up_mid", self.mid()), ("platform", "web")])
|
||||
.query(&[("pn", page_num), ("ps", page_size)])
|
||||
.send()
|
||||
.await?
|
||||
@@ -50,12 +61,19 @@ impl<'a> Me<'a> {
|
||||
}
|
||||
|
||||
pub async fn get_followed_uppers(&self, page_num: i32, page_size: i32) -> Result<FollowedUppers> {
|
||||
ensure!(!self.mid.is_empty(), "未获取到用户 ID,请确保填写设置中的 B 站认证信息");
|
||||
ensure!(
|
||||
!self.mid().is_empty(),
|
||||
"未获取到用户 ID,请确保填写设置中的 B 站认证信息"
|
||||
);
|
||||
let mut resp = self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/relation/followings")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/relation/followings",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("vmid", self.mid.as_str())])
|
||||
.query(&[("vmid", self.mid())])
|
||||
.query(&[("pn", page_num), ("ps", page_size)])
|
||||
.send()
|
||||
.await?
|
||||
@@ -66,8 +84,8 @@ impl<'a> Me<'a> {
|
||||
Ok(serde_json::from_value(resp["data"].take())?)
|
||||
}
|
||||
|
||||
fn my_id() -> String {
|
||||
VersionedConfig::get().load().credential.dedeuserid.clone()
|
||||
fn mid(&self) -> &str {
|
||||
&self.credential.dedeuserid
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ use chrono::serde::ts_seconds;
|
||||
use chrono::{DateTime, Utc};
|
||||
pub use client::{BiliClient, Client};
|
||||
pub use collection::{Collection, CollectionItem, CollectionType};
|
||||
pub use credential::{Credential, WbiImg};
|
||||
pub use credential::Credential;
|
||||
pub use danmaku::DanmakuOption;
|
||||
pub use dynamic::Dynamic;
|
||||
pub use error::BiliError;
|
||||
@@ -208,10 +208,15 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_video_info_type() -> Result<()> {
|
||||
VersionedConfig::init_for_test(&setup_database(Path::new("./test.sqlite")).await?).await?;
|
||||
let credential = &VersionedConfig::get().read().credential;
|
||||
init_logger("None,bili_sync=debug", None);
|
||||
let bili_client = BiliClient::new();
|
||||
// 请求 UP 主视频必须要获取 mixin key,使用 key 计算请求参数的签名,否则直接提示权限不足返回空
|
||||
let mixin_key = bili_client.wbi_img().await?.into_mixin_key().context("no mixin key")?;
|
||||
let mixin_key = bili_client
|
||||
.wbi_img(credential)
|
||||
.await?
|
||||
.into_mixin_key()
|
||||
.context("no mixin key")?;
|
||||
set_global_mixin_key(mixin_key);
|
||||
let collection = Collection::new(
|
||||
&bili_client,
|
||||
@@ -220,6 +225,7 @@ mod tests {
|
||||
sid: "4523".to_string(),
|
||||
collection_type: CollectionType::Season,
|
||||
},
|
||||
&credential,
|
||||
);
|
||||
let videos = collection
|
||||
.into_video_stream()
|
||||
@@ -230,7 +236,7 @@ mod tests {
|
||||
assert!(videos.iter().all(|v| matches!(v, VideoInfo::Collection { .. })));
|
||||
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
|
||||
// 测试收藏夹
|
||||
let favorite = FavoriteList::new(&bili_client, "3144336058".to_string());
|
||||
let favorite = FavoriteList::new(&bili_client, "3144336058".to_string(), &credential);
|
||||
let videos = favorite
|
||||
.into_video_stream()
|
||||
.take(20)
|
||||
@@ -240,7 +246,7 @@ mod tests {
|
||||
assert!(videos.iter().all(|v| matches!(v, VideoInfo::Favorite { .. })));
|
||||
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
|
||||
// 测试稍后再看
|
||||
let watch_later = WatchLater::new(&bili_client);
|
||||
let watch_later = WatchLater::new(&bili_client, &credential);
|
||||
let videos = watch_later
|
||||
.into_video_stream()
|
||||
.take(20)
|
||||
@@ -250,7 +256,7 @@ mod tests {
|
||||
assert!(videos.iter().all(|v| matches!(v, VideoInfo::WatchLater { .. })));
|
||||
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
|
||||
// 测试投稿
|
||||
let submission = Submission::new(&bili_client, "956761".to_string());
|
||||
let submission = Submission::new(&bili_client, "956761".to_string(), &credential);
|
||||
let videos = submission
|
||||
.into_video_stream()
|
||||
.take(20)
|
||||
@@ -260,7 +266,7 @@ mod tests {
|
||||
assert!(videos.iter().all(|v| matches!(v, VideoInfo::Submission { .. })));
|
||||
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
|
||||
// 测试动态
|
||||
let dynamic = Dynamic::new(&bili_client, "659898".to_string());
|
||||
let dynamic = Dynamic::new(&bili_client, "659898".to_string(), &credential);
|
||||
let videos = dynamic
|
||||
.into_video_stream()
|
||||
.take(20)
|
||||
@@ -275,10 +281,16 @@ mod tests {
|
||||
#[ignore = "only for manual test"]
|
||||
#[tokio::test]
|
||||
async fn test_subtitle_parse() -> Result<()> {
|
||||
VersionedConfig::init_for_test(&setup_database(Path::new("./test.sqlite")).await?).await?;
|
||||
let credential = &VersionedConfig::get().read().credential;
|
||||
let bili_client = BiliClient::new();
|
||||
let mixin_key = bili_client.wbi_img().await?.into_mixin_key().context("no mixin key")?;
|
||||
let mixin_key = bili_client
|
||||
.wbi_img(credential)
|
||||
.await?
|
||||
.into_mixin_key()
|
||||
.context("no mixin key")?;
|
||||
set_global_mixin_key(mixin_key);
|
||||
let video = Video::new(&bili_client, "BV1gLfnY8E6D".to_string());
|
||||
let video = Video::new(&bili_client, "BV1gLfnY8E6D".to_string(), &credential);
|
||||
let pages = video.get_pages().await?;
|
||||
println!("pages: {:?}", pages);
|
||||
let subtitles = video.get_subtitles(&pages[0]).await?;
|
||||
@@ -296,15 +308,20 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_upower_parse() -> Result<()> {
|
||||
VersionedConfig::init_for_test(&setup_database(Path::new("./test.sqlite")).await?).await?;
|
||||
let credential = &VersionedConfig::get().read().credential;
|
||||
let bili_client = BiliClient::new();
|
||||
let mixin_key = bili_client.wbi_img().await?.into_mixin_key().context("no mixin key")?;
|
||||
let mixin_key = bili_client
|
||||
.wbi_img(credential)
|
||||
.await?
|
||||
.into_mixin_key()
|
||||
.context("no mixin key")?;
|
||||
set_global_mixin_key(mixin_key);
|
||||
for (bvid, (upower_exclusive, upower_play)) in [
|
||||
("BV1HxXwYEEqt", (true, false)), // 充电专享且无权观看
|
||||
("BV16w41187fx", (true, true)), // 充电专享但有权观看
|
||||
("BV1n34jzPEYq", (false, false)), // 普通视频
|
||||
] {
|
||||
let video = Video::new(&bili_client, bvid.to_string());
|
||||
let video = Video::new(&bili_client, bvid.to_string(), credential);
|
||||
let info = video.get_view_info().await?;
|
||||
let VideoInfo::Detail {
|
||||
is_upower_exclusive,
|
||||
@@ -324,15 +341,20 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_ep_parse() -> Result<()> {
|
||||
VersionedConfig::init_for_test(&setup_database(Path::new("./test.sqlite")).await?).await?;
|
||||
let credential = &VersionedConfig::get().read().credential;
|
||||
let bili_client = BiliClient::new();
|
||||
let mixin_key = bili_client.wbi_img().await?.into_mixin_key().context("no mixin key")?;
|
||||
let mixin_key = bili_client
|
||||
.wbi_img(credential)
|
||||
.await?
|
||||
.into_mixin_key()
|
||||
.context("no mixin key")?;
|
||||
set_global_mixin_key(mixin_key);
|
||||
for (bvid, redirect_is_none) in [
|
||||
("BV1SF411g796", false), // EP
|
||||
("BV13xtnzPEye", false), // 番剧
|
||||
("BV1kT4NzTEZj", true), // 普通视频
|
||||
] {
|
||||
let video = Video::new(&bili_client, bvid.to_string());
|
||||
let video = Video::new(&bili_client, bvid.to_string(), credential);
|
||||
let info = video.get_view_info().await?;
|
||||
let VideoInfo::Detail { redirect_url, .. } = info else {
|
||||
unreachable!();
|
||||
|
||||
@@ -5,27 +5,36 @@ use reqwest::Method;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::bilibili::favorite_list::Upper;
|
||||
use crate::bilibili::{BiliClient, Dynamic, MIXIN_KEY, Validate, VideoInfo, WbiSign};
|
||||
use crate::bilibili::{BiliClient, Credential, Dynamic, MIXIN_KEY, Validate, VideoInfo, WbiSign};
|
||||
pub struct Submission<'a> {
|
||||
client: &'a BiliClient,
|
||||
pub upper_id: String,
|
||||
credential: &'a Credential,
|
||||
}
|
||||
|
||||
impl<'a> From<Submission<'a>> for Dynamic<'a> {
|
||||
fn from(submission: Submission<'a>) -> Self {
|
||||
Dynamic::new(submission.client, submission.upper_id)
|
||||
Dynamic::new(submission.client, submission.upper_id, submission.credential)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Submission<'a> {
|
||||
pub fn new(client: &'a BiliClient, upper_id: String) -> Self {
|
||||
Self { client, upper_id }
|
||||
pub fn new(client: &'a BiliClient, upper_id: String, credential: &'a Credential) -> Self {
|
||||
Self {
|
||||
client,
|
||||
upper_id,
|
||||
credential,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_info(&self) -> Result<Upper<String>> {
|
||||
let mut res = self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/web-interface/card")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/web-interface/card",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("mid", self.upper_id.as_str())])
|
||||
.send()
|
||||
@@ -39,7 +48,11 @@ impl<'a> Submission<'a> {
|
||||
|
||||
async fn get_videos(&self, page: i32) -> Result<Value> {
|
||||
self.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/space/wbi/arc/search")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/space/wbi/arc/search",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[
|
||||
("mid", self.upper_id.as_str()),
|
||||
|
||||
@@ -8,11 +8,12 @@ use crate::bilibili::analyzer::PageAnalyzer;
|
||||
use crate::bilibili::client::BiliClient;
|
||||
use crate::bilibili::danmaku::{DanmakuElem, DanmakuWriter, DmSegMobileReply};
|
||||
use crate::bilibili::subtitle::{SubTitle, SubTitleBody, SubTitleInfo, SubTitlesInfo};
|
||||
use crate::bilibili::{MIXIN_KEY, Validate, VideoInfo, WbiSign};
|
||||
use crate::bilibili::{Credential, MIXIN_KEY, Validate, VideoInfo, WbiSign};
|
||||
|
||||
pub struct Video<'a> {
|
||||
client: &'a BiliClient,
|
||||
pub bvid: String,
|
||||
credential: &'a Credential,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize, Default)]
|
||||
@@ -34,15 +35,23 @@ pub struct Dimension {
|
||||
}
|
||||
|
||||
impl<'a> Video<'a> {
|
||||
pub fn new(client: &'a BiliClient, bvid: String) -> Self {
|
||||
Self { client, bvid }
|
||||
pub fn new(client: &'a BiliClient, bvid: String, credential: &'a Credential) -> Self {
|
||||
Self {
|
||||
client,
|
||||
bvid,
|
||||
credential,
|
||||
}
|
||||
}
|
||||
|
||||
/// 直接调用视频信息接口获取详细的视频信息,视频信息中包含了视频的分页信息
|
||||
pub async fn get_view_info(&self) -> Result<VideoInfo> {
|
||||
let mut res = self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/web-interface/wbi/view")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/web-interface/wbi/view",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("bvid", &self.bvid)])
|
||||
.wbi_sign(MIXIN_KEY.load().as_deref())?
|
||||
@@ -59,7 +68,11 @@ impl<'a> Video<'a> {
|
||||
pub async fn get_pages(&self) -> Result<Vec<PageInfo>> {
|
||||
let mut res = self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/player/pagelist")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/player/pagelist",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("bvid", &self.bvid)])
|
||||
.send()
|
||||
@@ -74,7 +87,11 @@ impl<'a> Video<'a> {
|
||||
pub async fn get_tags(&self) -> Result<Vec<String>> {
|
||||
let res = self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/web-interface/view/detail/tag")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/web-interface/view/detail/tag",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("bvid", &self.bvid)])
|
||||
.send()
|
||||
@@ -105,7 +122,11 @@ impl<'a> Video<'a> {
|
||||
async fn get_danmaku_segment(&self, page: &PageInfo, segment_idx: i64) -> Result<Vec<DanmakuElem>> {
|
||||
let mut res = self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/v2/dm/wbi/web/seg.so")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/v2/dm/wbi/web/seg.so",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[("type", 1), ("oid", page.cid), ("segment_index", segment_idx)])
|
||||
.wbi_sign(MIXIN_KEY.load().as_deref())?
|
||||
@@ -126,7 +147,11 @@ impl<'a> Video<'a> {
|
||||
pub async fn get_page_analyzer(&self, page: &PageInfo) -> Result<PageAnalyzer> {
|
||||
let mut res = self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/player/wbi/playurl")
|
||||
.request(
|
||||
Method::GET,
|
||||
"https://api.bilibili.com/x/player/wbi/playurl",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.query(&[
|
||||
("bvid", self.bvid.as_str()),
|
||||
@@ -149,7 +174,7 @@ impl<'a> Video<'a> {
|
||||
pub async fn get_subtitles(&self, page: &PageInfo) -> Result<Vec<SubTitle>> {
|
||||
let mut res = self
|
||||
.client
|
||||
.request(Method::GET, "https://api.bilibili.com/x/player/wbi/v2")
|
||||
.request(Method::GET, "https://api.bilibili.com/x/player/wbi/v2", self.credential)
|
||||
.await
|
||||
.query(&[("bvid", self.bvid.as_str())])
|
||||
.query(&[("cid", page.cid)])
|
||||
|
||||
@@ -3,19 +3,24 @@ use async_stream::try_stream;
|
||||
use futures::Stream;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::bilibili::{BiliClient, Validate, VideoInfo};
|
||||
use crate::bilibili::{BiliClient, Credential, Validate, VideoInfo};
|
||||
pub struct WatchLater<'a> {
|
||||
client: &'a BiliClient,
|
||||
credential: &'a Credential,
|
||||
}
|
||||
|
||||
impl<'a> WatchLater<'a> {
|
||||
pub fn new(client: &'a BiliClient) -> Self {
|
||||
Self { client }
|
||||
pub fn new(client: &'a BiliClient, credential: &'a Credential) -> Self {
|
||||
Self { client, credential }
|
||||
}
|
||||
|
||||
async fn get_videos(&self) -> Result<Value> {
|
||||
self.client
|
||||
.request(reqwest::Method::GET, "https://api.bilibili.com/x/v2/history/toview")
|
||||
.request(
|
||||
reqwest::Method::GET,
|
||||
"https://api.bilibili.com/x/v2/history/toview",
|
||||
self.credential,
|
||||
)
|
||||
.await
|
||||
.send()
|
||||
.await?
|
||||
|
||||
@@ -85,14 +85,6 @@ impl Config {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn test_default() -> Self {
|
||||
Self {
|
||||
cdn_sorting: true,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Config {
|
||||
|
||||
@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
|
||||
use crate::utils::filenamify::filenamify;
|
||||
|
||||
/// NFO 文件使用的时间类型
|
||||
#[derive(Serialize, Deserialize, Default, Clone)]
|
||||
#[derive(Serialize, Deserialize, Default, Clone, Copy)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum NFOTimeType {
|
||||
#[default]
|
||||
|
||||
@@ -9,6 +9,6 @@ mod versioned_config;
|
||||
pub use crate::config::args::{ARGS, version};
|
||||
pub use crate::config::current::{CONFIG_DIR, Config};
|
||||
pub use crate::config::handlebar::TEMPLATE;
|
||||
pub use crate::config::item::{NFOTimeType, PathSafeTemplate, RateLimit, SkipOption};
|
||||
pub use crate::config::item::{ConcurrentDownloadLimit, NFOTimeType, PathSafeTemplate, RateLimit};
|
||||
pub use crate::config::versioned_cache::VersionedCache;
|
||||
pub use crate::config::versioned_config::VersionedConfig;
|
||||
|
||||
@@ -1,54 +1,56 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use anyhow::Result;
|
||||
use arc_swap::{ArcSwap, Guard};
|
||||
use tokio_util::future::FutureExt;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::config::{Config, VersionedConfig};
|
||||
|
||||
pub struct VersionedCache<T> {
|
||||
inner: ArcSwap<T>,
|
||||
version: AtomicU64,
|
||||
builder: fn(&Config) -> Result<T>,
|
||||
mutex: parking_lot::Mutex<()>,
|
||||
inner: Arc<ArcSwap<T>>,
|
||||
cancel_token: CancellationToken,
|
||||
}
|
||||
|
||||
impl<T> VersionedCache<T> {
|
||||
/// 一个跟随全局配置变化自动更新的缓存
|
||||
impl<T: Send + Sync + 'static> VersionedCache<T> {
|
||||
pub fn new(builder: fn(&Config) -> Result<T>) -> Result<Self> {
|
||||
let current_config = VersionedConfig::get().load();
|
||||
let current_version = current_config.version;
|
||||
let initial_value = builder(¤t_config)?;
|
||||
Ok(Self {
|
||||
inner: ArcSwap::from_pointee(initial_value),
|
||||
version: AtomicU64::new(current_version),
|
||||
builder,
|
||||
mutex: parking_lot::Mutex::new(()),
|
||||
})
|
||||
let mut rx = VersionedConfig::get().subscribe();
|
||||
let initial_value = builder(&rx.borrow_and_update())?;
|
||||
let cancel_token = CancellationToken::new();
|
||||
let inner = Arc::new(ArcSwap::from_pointee(initial_value));
|
||||
let inner_clone = inner.clone();
|
||||
tokio::spawn(
|
||||
async move {
|
||||
while rx.changed().await.is_ok() {
|
||||
match builder(&rx.borrow()) {
|
||||
Ok(new_value) => {
|
||||
inner_clone.store(Arc::new(new_value));
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to update versioned cache: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
.with_cancellation_token_owned(cancel_token.clone()),
|
||||
);
|
||||
Ok(Self { inner, cancel_token })
|
||||
}
|
||||
|
||||
pub fn load(&self) -> Guard<Arc<T>> {
|
||||
self.reload_if_needed();
|
||||
/// 获取一个临时的只读引用
|
||||
pub fn read(&self) -> Guard<Arc<T>> {
|
||||
self.inner.load()
|
||||
}
|
||||
|
||||
fn reload_if_needed(&self) {
|
||||
let current_config = VersionedConfig::get().load();
|
||||
let current_version = current_config.version;
|
||||
let version = self.version.load(Ordering::Relaxed);
|
||||
if version < current_version {
|
||||
let _lock = self.mutex.lock();
|
||||
if self.version.load(Ordering::Relaxed) >= current_version {
|
||||
return;
|
||||
}
|
||||
match (self.builder)(¤t_config) {
|
||||
Err(e) => {
|
||||
error!("Failed to rebuild versioned cache: {:?}", e);
|
||||
}
|
||||
Ok(new_value) => {
|
||||
self.inner.store(Arc::new(new_value));
|
||||
self.version.store(current_version, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
/// 获取当前缓存的完整快照
|
||||
pub fn snapshot(&self) -> Arc<T> {
|
||||
self.inner.load_full()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for VersionedCache<T> {
|
||||
fn drop(&mut self) {
|
||||
self.cancel_token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::sync::Arc;
|
||||
use anyhow::{Result, anyhow, bail};
|
||||
use arc_swap::{ArcSwap, Guard};
|
||||
use sea_orm::DatabaseConnection;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::sync::{OnceCell, watch};
|
||||
|
||||
use crate::bilibili::Credential;
|
||||
use crate::config::{CONFIG_DIR, Config};
|
||||
@@ -13,6 +13,8 @@ pub static VERSIONED_CONFIG: OnceCell<VersionedConfig> = OnceCell::const_new();
|
||||
pub struct VersionedConfig {
|
||||
inner: ArcSwap<Config>,
|
||||
update_lock: tokio::sync::Mutex<()>,
|
||||
tx: watch::Sender<Arc<Config>>,
|
||||
rx: watch::Receiver<Arc<Config>>,
|
||||
}
|
||||
|
||||
impl VersionedConfig {
|
||||
@@ -68,46 +70,51 @@ impl VersionedConfig {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
/// 尝试获取全局的 `VersionedConfig`,如果未初始化则退回测试环境的默认配置
|
||||
/// 尝试获取全局的 `VersionedConfig`,如果未初始化则退回默认配置
|
||||
pub fn get() -> &'static VersionedConfig {
|
||||
use std::sync::LazyLock;
|
||||
static FALLBACK_CONFIG: LazyLock<VersionedConfig> =
|
||||
LazyLock::new(|| VersionedConfig::new(Config::test_default()));
|
||||
// 优先从全局变量获取,未初始化则返回测试环境的默认配置
|
||||
static FALLBACK_CONFIG: LazyLock<VersionedConfig> = LazyLock::new(|| VersionedConfig::new(Config::default()));
|
||||
// 优先从全局变量获取,未初始化则退回默认配置
|
||||
return VERSIONED_CONFIG.get().unwrap_or_else(|| &FALLBACK_CONFIG);
|
||||
}
|
||||
|
||||
pub fn new(config: Config) -> Self {
|
||||
fn new(config: Config) -> Self {
|
||||
let inner = ArcSwap::from_pointee(config);
|
||||
let (tx, rx) = watch::channel(inner.load_full());
|
||||
Self {
|
||||
inner: ArcSwap::from_pointee(config),
|
||||
inner,
|
||||
update_lock: tokio::sync::Mutex::new(()),
|
||||
tx,
|
||||
rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load(&self) -> Guard<Arc<Config>> {
|
||||
pub fn read(&self) -> Guard<Arc<Config>> {
|
||||
self.inner.load()
|
||||
}
|
||||
|
||||
pub fn load_full(&self) -> Arc<Config> {
|
||||
pub fn snapshot(&self) -> Arc<Config> {
|
||||
self.inner.load_full()
|
||||
}
|
||||
|
||||
pub async fn update_credential(&self, new_credential: Credential, connection: &DatabaseConnection) -> Result<()> {
|
||||
// 确保更新内容与写入数据库的操作是原子性的
|
||||
pub fn subscribe(&self) -> watch::Receiver<Arc<Config>> {
|
||||
self.rx.clone()
|
||||
}
|
||||
|
||||
pub async fn update_credential(
|
||||
&self,
|
||||
new_credential: Credential,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<Arc<Config>> {
|
||||
let _lock = self.update_lock.lock().await;
|
||||
loop {
|
||||
let old_config = self.inner.load();
|
||||
let mut new_config = old_config.as_ref().clone();
|
||||
new_config.credential = new_credential.clone();
|
||||
new_config.version += 1;
|
||||
if Arc::ptr_eq(
|
||||
&old_config,
|
||||
&self.inner.compare_and_swap(&old_config, Arc::new(new_config)),
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
self.inner.load().save_to_database(connection).await
|
||||
let mut new_config = self.inner.load().as_ref().clone();
|
||||
new_config.credential = new_credential;
|
||||
new_config.version += 1;
|
||||
new_config.save_to_database(connection).await?;
|
||||
let new_config = Arc::new(new_config);
|
||||
self.inner.store(new_config.clone());
|
||||
self.tx.send(new_config.clone())?;
|
||||
Ok(new_config)
|
||||
}
|
||||
|
||||
/// 外部 API 会调用这个方法,如果更新失败直接返回错误
|
||||
@@ -118,14 +125,10 @@ impl VersionedConfig {
|
||||
bail!("配置版本不匹配,请刷新页面修改后重新提交");
|
||||
}
|
||||
new_config.version += 1;
|
||||
let new_config = Arc::new(new_config);
|
||||
if !Arc::ptr_eq(
|
||||
&old_config,
|
||||
&self.inner.compare_and_swap(&old_config, new_config.clone()),
|
||||
) {
|
||||
bail!("配置版本不匹配,请刷新页面修改后重新提交");
|
||||
}
|
||||
new_config.save_to_database(connection).await?;
|
||||
let new_config = Arc::new(new_config);
|
||||
self.inner.store(new_config.clone());
|
||||
self.tx.send(new_config.clone())?;
|
||||
Ok(new_config)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,8 @@ use tokio::task::JoinSet;
|
||||
use tokio_util::io::StreamReader;
|
||||
|
||||
use crate::bilibili::Client;
|
||||
use crate::config::VersionedConfig;
|
||||
use crate::config::ConcurrentDownloadLimit;
|
||||
|
||||
pub struct Downloader {
|
||||
client: Client,
|
||||
}
|
||||
@@ -27,9 +28,9 @@ impl Downloader {
|
||||
Self { client }
|
||||
}
|
||||
|
||||
pub async fn fetch(&self, url: &str, path: &Path) -> Result<()> {
|
||||
pub async fn fetch(&self, url: &str, path: &Path, concurrent_download: &ConcurrentDownloadLimit) -> Result<()> {
|
||||
let mut temp_file = TempFile::new().await?;
|
||||
self.fetch_internal(url, &mut temp_file).await?;
|
||||
self.fetch_internal(url, &mut temp_file, concurrent_download).await?;
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent).await?;
|
||||
}
|
||||
@@ -41,8 +42,13 @@ impl Downloader {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn multi_fetch(&self, urls: &[&str], path: &Path) -> Result<()> {
|
||||
let temp_file = self.multi_fetch_internal(urls).await?;
|
||||
pub async fn multi_fetch(
|
||||
&self,
|
||||
urls: &[&str],
|
||||
path: &Path,
|
||||
concurrent_download: &ConcurrentDownloadLimit,
|
||||
) -> Result<()> {
|
||||
let temp_file = self.multi_fetch_internal(urls, concurrent_download).await?;
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent).await?;
|
||||
}
|
||||
@@ -51,10 +57,16 @@ impl Downloader {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn multi_fetch_and_merge(&self, video_urls: &[&str], audio_urls: &[&str], path: &Path) -> Result<()> {
|
||||
pub async fn multi_fetch_and_merge(
|
||||
&self,
|
||||
video_urls: &[&str],
|
||||
audio_urls: &[&str],
|
||||
path: &Path,
|
||||
concurrent_download: &ConcurrentDownloadLimit,
|
||||
) -> Result<()> {
|
||||
let (video_temp_file, audio_temp_file) = tokio::try_join!(
|
||||
self.multi_fetch_internal(video_urls),
|
||||
self.multi_fetch_internal(audio_urls)
|
||||
self.multi_fetch_internal(video_urls, concurrent_download),
|
||||
self.multi_fetch_internal(audio_urls, concurrent_download)
|
||||
)?;
|
||||
let final_temp_file = TempFile::new().await?;
|
||||
let output = Command::new("ffmpeg")
|
||||
@@ -90,13 +102,17 @@ impl Downloader {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn multi_fetch_internal(&self, urls: &[&str]) -> Result<TempFile> {
|
||||
async fn multi_fetch_internal(
|
||||
&self,
|
||||
urls: &[&str],
|
||||
concurrent_download: &ConcurrentDownloadLimit,
|
||||
) -> Result<TempFile> {
|
||||
if urls.is_empty() {
|
||||
bail!("no urls provided");
|
||||
}
|
||||
let mut temp_file = TempFile::new().await?;
|
||||
for (idx, url) in urls.iter().enumerate() {
|
||||
match self.fetch_internal(url, &mut temp_file).await {
|
||||
match self.fetch_internal(url, &mut temp_file, concurrent_download).await {
|
||||
Ok(_) => return Ok(temp_file),
|
||||
Err(e) => {
|
||||
if idx == urls.len() - 1 {
|
||||
@@ -111,9 +127,14 @@ impl Downloader {
|
||||
unreachable!()
|
||||
}
|
||||
|
||||
async fn fetch_internal(&self, url: &str, file: &mut TempFile) -> Result<()> {
|
||||
if VersionedConfig::get().load().concurrent_limit.download.enable {
|
||||
self.fetch_parallel(url, file).await
|
||||
async fn fetch_internal(
|
||||
&self,
|
||||
url: &str,
|
||||
file: &mut TempFile,
|
||||
concurrent_download: &ConcurrentDownloadLimit,
|
||||
) -> Result<()> {
|
||||
if concurrent_download.enable {
|
||||
self.fetch_parallel(url, file, concurrent_download).await
|
||||
} else {
|
||||
self.fetch_serial(url, file).await
|
||||
}
|
||||
@@ -141,14 +162,13 @@ impl Downloader {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fetch_parallel(&self, url: &str, file: &mut TempFile) -> Result<()> {
|
||||
let (concurrency, threshold) = {
|
||||
let config = VersionedConfig::get().load();
|
||||
(
|
||||
config.concurrent_limit.download.concurrency,
|
||||
config.concurrent_limit.download.threshold,
|
||||
)
|
||||
};
|
||||
async fn fetch_parallel(
|
||||
&self,
|
||||
url: &str,
|
||||
file: &mut TempFile,
|
||||
concurrent_download: &ConcurrentDownloadLimit,
|
||||
) -> Result<()> {
|
||||
let (concurrency, threshold) = (concurrent_download.concurrency, concurrent_download.threshold);
|
||||
let resp = self
|
||||
.client
|
||||
.request(Method::HEAD, url, None)
|
||||
|
||||
@@ -30,11 +30,11 @@ pub async fn http_server(
|
||||
.layer(Extension(database_connection))
|
||||
.layer(Extension(bili_client))
|
||||
.layer(Extension(log_writer));
|
||||
let config = VersionedConfig::get().load_full();
|
||||
let listener = tokio::net::TcpListener::bind(&config.bind_address)
|
||||
let bind_address = VersionedConfig::get().read().bind_address.to_owned();
|
||||
let listener = tokio::net::TcpListener::bind(&bind_address)
|
||||
.await
|
||||
.context("bind address failed")?;
|
||||
info!("开始运行管理页:http://{}", config.bind_address);
|
||||
info!("开始运行管理页:http://{}", bind_address);
|
||||
Ok(axum::serve(listener, ServiceExt::<Request>::into_make_service(app)).await?)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{Context, Result, bail};
|
||||
use chrono::NaiveDate;
|
||||
use sea_orm::DatabaseConnection;
|
||||
use tokio::time;
|
||||
|
||||
use crate::adapter::VideoSource;
|
||||
use crate::bilibili::{self, BiliClient, WbiImg};
|
||||
use crate::config::VersionedConfig;
|
||||
use crate::bilibili::{self, BiliClient};
|
||||
use crate::config::{Config, TEMPLATE, VersionedConfig};
|
||||
use crate::utils::model::get_enabled_video_sources;
|
||||
use crate::utils::task_notifier::TASK_STATUS_NOTIFIER;
|
||||
use crate::workflow::process_video_source;
|
||||
@@ -14,49 +16,59 @@ use crate::workflow::process_video_source;
|
||||
pub async fn video_downloader(connection: DatabaseConnection, bili_client: Arc<BiliClient>) {
|
||||
let mut anchor = chrono::Local::now().date_naive();
|
||||
loop {
|
||||
info!("开始执行本轮视频下载任务..");
|
||||
let _lock = TASK_STATUS_NOTIFIER.start_running().await;
|
||||
let config = VersionedConfig::get().load_full();
|
||||
'inner: {
|
||||
if let Err(e) = config.check() {
|
||||
error!("配置检查失败,跳过本轮执行:\n{:#}", e);
|
||||
break 'inner;
|
||||
}
|
||||
match bili_client.wbi_img().await.map(WbiImg::into_mixin_key) {
|
||||
Ok(Some(mixin_key)) => bilibili::set_global_mixin_key(mixin_key),
|
||||
Ok(_) => {
|
||||
error!("解析 mixin key 失败,等待下一轮执行");
|
||||
break 'inner;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("获取 mixin key 遇到错误:{:#},等待下一轮执行", e);
|
||||
break 'inner;
|
||||
}
|
||||
};
|
||||
if anchor != chrono::Local::now().date_naive() {
|
||||
if let Err(e) = bili_client.check_refresh(&connection).await {
|
||||
error!("检查刷新 Credential 遇到错误:{:#},等待下一轮执行", e);
|
||||
break 'inner;
|
||||
}
|
||||
anchor = chrono::Local::now().date_naive();
|
||||
}
|
||||
let Ok(video_sources) = get_enabled_video_sources(&connection).await else {
|
||||
error!("获取视频源列表失败,等待下一轮执行");
|
||||
break 'inner;
|
||||
};
|
||||
if video_sources.is_empty() {
|
||||
info!("没有可用的视频源,等待下一轮执行");
|
||||
break 'inner;
|
||||
}
|
||||
for video_source in video_sources {
|
||||
let display_name = video_source.display_name();
|
||||
if let Err(e) = process_video_source(video_source, &bili_client, &connection).await {
|
||||
error!("处理 {} 时遇到错误:{:#},等待下一轮执行", display_name, e);
|
||||
}
|
||||
}
|
||||
info!("本轮任务执行完毕,等待下一轮执行");
|
||||
let mut config = VersionedConfig::get().snapshot();
|
||||
info!("开始执行本轮视频下载任务..");
|
||||
if let Err(e) = download_all_video_sources(&connection, &bili_client, &mut config, &mut anchor).await {
|
||||
error!("本轮视频下载任务执行遇到错误:{:#},跳过本轮执行", e);
|
||||
} else {
|
||||
info!("本轮视频下载任务执行完毕");
|
||||
}
|
||||
TASK_STATUS_NOTIFIER.finish_running(_lock);
|
||||
TASK_STATUS_NOTIFIER.finish_running(_lock, config.interval as i64);
|
||||
time::sleep(time::Duration::from_secs(config.interval)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_all_video_sources(
|
||||
connection: &DatabaseConnection,
|
||||
bili_client: &BiliClient,
|
||||
config: &mut Arc<Config>,
|
||||
anchor: &mut NaiveDate,
|
||||
) -> Result<()> {
|
||||
config.check().context("配置检查失败")?;
|
||||
let mixin_key = bili_client
|
||||
.wbi_img(&config.credential)
|
||||
.await
|
||||
.context("获取 wbi_img 失败")?
|
||||
.into_mixin_key()
|
||||
.context("解析 mixin key 失败")?;
|
||||
bilibili::set_global_mixin_key(mixin_key);
|
||||
if *anchor != chrono::Local::now().date_naive() {
|
||||
if let Some(new_credential) = bili_client
|
||||
.check_refresh(&config.credential)
|
||||
.await
|
||||
.context("检查刷新 Credential 失败")?
|
||||
{
|
||||
*config = VersionedConfig::get()
|
||||
.update_credential(new_credential, connection)
|
||||
.await
|
||||
.context("更新 Credential 失败")?;
|
||||
}
|
||||
*anchor = chrono::Local::now().date_naive();
|
||||
}
|
||||
let template = TEMPLATE.snapshot();
|
||||
let bili_client = bili_client.snapshot()?;
|
||||
let video_sources = get_enabled_video_sources(connection)
|
||||
.await
|
||||
.context("获取视频源列表失败")?;
|
||||
if video_sources.is_empty() {
|
||||
bail!("没有可用的视频源");
|
||||
}
|
||||
for video_source in video_sources {
|
||||
let display_name = video_source.display_name();
|
||||
if let Err(e) = process_video_source(video_source, &bili_client, connection, &template, config).await {
|
||||
error!("处理 {} 时遇到错误:{:#},跳过该视频源", display_name, e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
36
crates/bili_sync/src/utils/download_context.rs
Normal file
36
crates/bili_sync/src/utils/download_context.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
use sea_orm::DatabaseConnection;
|
||||
|
||||
use crate::adapter::VideoSourceEnum;
|
||||
use crate::bilibili::BiliClient;
|
||||
use crate::config::Config;
|
||||
use crate::downloader::Downloader;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct DownloadContext<'a> {
|
||||
pub bili_client: &'a BiliClient,
|
||||
pub video_source: &'a VideoSourceEnum,
|
||||
pub template: &'a handlebars::Handlebars<'a>,
|
||||
pub connection: &'a DatabaseConnection,
|
||||
pub downloader: &'a Downloader,
|
||||
pub config: &'a Config,
|
||||
}
|
||||
|
||||
impl<'a> DownloadContext<'a> {
|
||||
pub fn new(
|
||||
bili_client: &'a BiliClient,
|
||||
video_source: &'a VideoSourceEnum,
|
||||
template: &'a handlebars::Handlebars<'a>,
|
||||
connection: &'a DatabaseConnection,
|
||||
downloader: &'a Downloader,
|
||||
config: &'a Config,
|
||||
) -> Self {
|
||||
Self {
|
||||
bili_client,
|
||||
video_source,
|
||||
template,
|
||||
connection,
|
||||
downloader,
|
||||
config,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,24 +1,21 @@
|
||||
use serde_json::json;
|
||||
|
||||
use crate::config::VersionedConfig;
|
||||
|
||||
pub fn video_format_args(video_model: &bili_sync_entity::video::Model) -> serde_json::Value {
|
||||
let config = VersionedConfig::get().load();
|
||||
pub fn video_format_args(video_model: &bili_sync_entity::video::Model, time_format: &str) -> serde_json::Value {
|
||||
json!({
|
||||
"bvid": &video_model.bvid,
|
||||
"title": &video_model.name,
|
||||
"upper_name": &video_model.upper_name,
|
||||
"upper_mid": &video_model.upper_id,
|
||||
"pubtime": &video_model.pubtime.and_utc().format(&config.time_format).to_string(),
|
||||
"fav_time": &video_model.favtime.and_utc().format(&config.time_format).to_string(),
|
||||
"pubtime": &video_model.pubtime.and_utc().format(time_format).to_string(),
|
||||
"fav_time": &video_model.favtime.and_utc().format(time_format).to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn page_format_args(
|
||||
video_model: &bili_sync_entity::video::Model,
|
||||
page_model: &bili_sync_entity::page::Model,
|
||||
time_format: &str,
|
||||
) -> serde_json::Value {
|
||||
let config = VersionedConfig::get().load();
|
||||
json!({
|
||||
"bvid": &video_model.bvid,
|
||||
"title": &video_model.name,
|
||||
@@ -26,7 +23,7 @@ pub fn page_format_args(
|
||||
"upper_mid": &video_model.upper_id,
|
||||
"ptitle": &page_model.name,
|
||||
"pid": page_model.pid,
|
||||
"pubtime": video_model.pubtime.and_utc().format(&config.time_format).to_string(),
|
||||
"fav_time": video_model.favtime.and_utc().format(&config.time_format).to_string(),
|
||||
"pubtime": video_model.pubtime.and_utc().format(time_format).to_string(),
|
||||
"fav_time": video_model.favtime.and_utc().format(time_format).to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod convert;
|
||||
pub mod download_context;
|
||||
pub mod filenamify;
|
||||
pub mod format_arg;
|
||||
pub mod model;
|
||||
|
||||
@@ -6,7 +6,7 @@ use quick_xml::events::{BytesCData, BytesText};
|
||||
use quick_xml::writer::Writer;
|
||||
use tokio::io::{AsyncWriteExt, BufWriter};
|
||||
|
||||
use crate::config::{NFOTimeType, VersionedConfig};
|
||||
use crate::config::NFOTimeType;
|
||||
|
||||
#[allow(clippy::upper_case_acronyms)]
|
||||
pub enum NFO<'a> {
|
||||
@@ -265,7 +265,10 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
assert_eq!(
|
||||
NFO::Movie((&video).into()).generate_nfo().await.unwrap(),
|
||||
NFO::Movie((&video).to_nfo(NFOTimeType::FavTime))
|
||||
.generate_nfo()
|
||||
.await
|
||||
.unwrap(),
|
||||
r#"<?xml version="1.0" encoding="utf-8" standalone="yes"?>
|
||||
<movie>
|
||||
<plot><![CDATA[原始视频:<a href="https://www.bilibili.com/video/BV1nWcSeeEkV/">BV1nWcSeeEkV</a><br/><br/>intro]]></plot>
|
||||
@@ -283,7 +286,10 @@ mod tests {
|
||||
</movie>"#,
|
||||
);
|
||||
assert_eq!(
|
||||
NFO::TVShow((&video).into()).generate_nfo().await.unwrap(),
|
||||
NFO::TVShow((&video).to_nfo(NFOTimeType::FavTime))
|
||||
.generate_nfo()
|
||||
.await
|
||||
.unwrap(),
|
||||
r#"<?xml version="1.0" encoding="utf-8" standalone="yes"?>
|
||||
<tvshow>
|
||||
<plot><![CDATA[原始视频:<a href="https://www.bilibili.com/video/BV1nWcSeeEkV/">BV1nWcSeeEkV</a><br/><br/>intro]]></plot>
|
||||
@@ -301,7 +307,10 @@ mod tests {
|
||||
</tvshow>"#,
|
||||
);
|
||||
assert_eq!(
|
||||
NFO::Upper((&video).into()).generate_nfo().await.unwrap(),
|
||||
NFO::Upper((&video).to_nfo(NFOTimeType::FavTime))
|
||||
.generate_nfo()
|
||||
.await
|
||||
.unwrap(),
|
||||
r#"<?xml version="1.0" encoding="utf-8" standalone="yes"?>
|
||||
<person>
|
||||
<plot/>
|
||||
@@ -318,7 +327,10 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
assert_eq!(
|
||||
NFO::Episode((&page).into()).generate_nfo().await.unwrap(),
|
||||
NFO::Episode((&page).to_nfo(NFOTimeType::FavTime))
|
||||
.generate_nfo()
|
||||
.await
|
||||
.unwrap(),
|
||||
r#"<?xml version="1.0" encoding="utf-8" standalone="yes"?>
|
||||
<episodedetails>
|
||||
<plot/>
|
||||
@@ -331,54 +343,58 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a video::Model> for Movie<'a> {
|
||||
fn from(video: &'a video::Model) -> Self {
|
||||
Self {
|
||||
name: &video.name,
|
||||
intro: &video.intro,
|
||||
bvid: &video.bvid,
|
||||
upper_id: video.upper_id,
|
||||
upper_name: &video.upper_name,
|
||||
aired: match VersionedConfig::get().load().nfo_time_type {
|
||||
NFOTimeType::FavTime => video.favtime,
|
||||
NFOTimeType::PubTime => video.pubtime,
|
||||
pub trait ToNFO<'a, T> {
|
||||
fn to_nfo(&'a self, nfo_time_type: NFOTimeType) -> T;
|
||||
}
|
||||
|
||||
impl<'a> ToNFO<'a, Movie<'a>> for &'a video::Model {
|
||||
fn to_nfo(&'a self, nfo_time_type: NFOTimeType) -> Movie<'a> {
|
||||
Movie {
|
||||
name: &self.name,
|
||||
intro: &self.intro,
|
||||
bvid: &self.bvid,
|
||||
upper_id: self.upper_id,
|
||||
upper_name: &self.upper_name,
|
||||
aired: match nfo_time_type {
|
||||
NFOTimeType::FavTime => self.favtime,
|
||||
NFOTimeType::PubTime => self.pubtime,
|
||||
},
|
||||
tags: video.tags.as_ref().map(|tags| tags.clone().into()),
|
||||
tags: self.tags.as_ref().map(|tags| tags.clone().into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a video::Model> for TVShow<'a> {
|
||||
fn from(video: &'a video::Model) -> Self {
|
||||
Self {
|
||||
name: &video.name,
|
||||
intro: &video.intro,
|
||||
bvid: &video.bvid,
|
||||
upper_id: video.upper_id,
|
||||
upper_name: &video.upper_name,
|
||||
aired: match VersionedConfig::get().load().nfo_time_type {
|
||||
NFOTimeType::FavTime => video.favtime,
|
||||
NFOTimeType::PubTime => video.pubtime,
|
||||
impl<'a> ToNFO<'a, TVShow<'a>> for &'a video::Model {
|
||||
fn to_nfo(&'a self, nfo_time_type: NFOTimeType) -> TVShow<'a> {
|
||||
TVShow {
|
||||
name: &self.name,
|
||||
intro: &self.intro,
|
||||
bvid: &self.bvid,
|
||||
upper_id: self.upper_id,
|
||||
upper_name: &self.upper_name,
|
||||
aired: match nfo_time_type {
|
||||
NFOTimeType::FavTime => self.favtime,
|
||||
NFOTimeType::PubTime => self.pubtime,
|
||||
},
|
||||
tags: video.tags.as_ref().map(|tags| tags.clone().into()),
|
||||
tags: self.tags.as_ref().map(|tags| tags.clone().into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a video::Model> for Upper {
|
||||
fn from(video: &'a video::Model) -> Self {
|
||||
Self {
|
||||
upper_id: video.upper_id.to_string(),
|
||||
pubtime: video.pubtime,
|
||||
impl<'a> ToNFO<'a, Upper> for &'a video::Model {
|
||||
fn to_nfo(&'a self, _nfo_time_type: NFOTimeType) -> Upper {
|
||||
Upper {
|
||||
upper_id: self.upper_id.to_string(),
|
||||
pubtime: self.pubtime,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a page::Model> for Episode<'a> {
|
||||
fn from(page: &'a page::Model) -> Self {
|
||||
Self {
|
||||
name: &page.name,
|
||||
pid: page.pid.to_string(),
|
||||
impl<'a> ToNFO<'a, Episode<'a>> for &'a page::Model {
|
||||
fn to_nfo(&'a self, _nfo_time_type: NFOTimeType) -> Episode<'a> {
|
||||
Episode {
|
||||
name: &self.name,
|
||||
pid: self.pid.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use std::sync::LazyLock;
|
||||
|
||||
use serde::Serialize;
|
||||
use tokio::sync::MutexGuard;
|
||||
|
||||
use crate::config::VersionedConfig;
|
||||
use tokio::sync::{MutexGuard, watch};
|
||||
|
||||
pub static TASK_STATUS_NOTIFIER: LazyLock<TaskStatusNotifier> = LazyLock::new(TaskStatusNotifier::new);
|
||||
|
||||
@@ -17,13 +15,13 @@ pub struct TaskStatus {
|
||||
|
||||
pub struct TaskStatusNotifier {
|
||||
mutex: tokio::sync::Mutex<()>,
|
||||
tx: tokio::sync::watch::Sender<TaskStatus>,
|
||||
rx: tokio::sync::watch::Receiver<TaskStatus>,
|
||||
tx: watch::Sender<TaskStatus>,
|
||||
rx: watch::Receiver<TaskStatus>,
|
||||
}
|
||||
|
||||
impl TaskStatusNotifier {
|
||||
pub fn new() -> Self {
|
||||
let (tx, rx) = tokio::sync::watch::channel(TaskStatus::default());
|
||||
let (tx, rx) = watch::channel(TaskStatus::default());
|
||||
Self {
|
||||
mutex: tokio::sync::Mutex::const_new(()),
|
||||
tx,
|
||||
@@ -42,26 +40,19 @@ impl TaskStatusNotifier {
|
||||
lock
|
||||
}
|
||||
|
||||
pub fn finish_running(&self, _lock: MutexGuard<()>) {
|
||||
pub fn finish_running(&self, _lock: MutexGuard<()>, interval: i64) {
|
||||
let last_status = self.tx.borrow();
|
||||
let last_run = last_status.last_run;
|
||||
drop(last_status);
|
||||
let config = VersionedConfig::get().load();
|
||||
let now = chrono::Local::now();
|
||||
|
||||
let _ = self.tx.send(TaskStatus {
|
||||
is_running: false,
|
||||
last_run,
|
||||
last_finish: Some(now),
|
||||
next_run: now.checked_add_signed(chrono::Duration::seconds(config.interval as i64)),
|
||||
next_run: now.checked_add_signed(chrono::Duration::seconds(interval)),
|
||||
});
|
||||
}
|
||||
|
||||
/// 精确探测任务执行状态,保证如果读取到“未运行”,那么在锁释放之前任务不会被执行
|
||||
pub fn detect_running(&self) -> Option<MutexGuard<'_, ()>> {
|
||||
self.mutex.try_lock().ok()
|
||||
}
|
||||
|
||||
pub fn subscribe(&self) -> tokio::sync::watch::Receiver<TaskStatus> {
|
||||
self.rx.clone()
|
||||
}
|
||||
|
||||
@@ -14,15 +14,16 @@ use tokio::sync::Semaphore;
|
||||
|
||||
use crate::adapter::{VideoSource, VideoSourceEnum};
|
||||
use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Video, VideoInfo};
|
||||
use crate::config::{ARGS, PathSafeTemplate, SkipOption, TEMPLATE, VersionedConfig};
|
||||
use crate::config::{ARGS, Config, PathSafeTemplate};
|
||||
use crate::downloader::Downloader;
|
||||
use crate::error::{DownloadAbortError, ExecutionStatus, ProcessPageError};
|
||||
use crate::utils::download_context::DownloadContext;
|
||||
use crate::utils::format_arg::{page_format_args, video_format_args};
|
||||
use crate::utils::model::{
|
||||
create_pages, create_videos, filter_unfilled_videos, filter_unhandled_video_pages, update_pages_model,
|
||||
update_videos_model,
|
||||
};
|
||||
use crate::utils::nfo::NFO;
|
||||
use crate::utils::nfo::{NFO, ToNFO};
|
||||
use crate::utils::rule::FieldEvaluatable;
|
||||
use crate::utils::status::{PageStatus, STATUS_OK, VideoStatus};
|
||||
|
||||
@@ -31,20 +32,24 @@ pub async fn process_video_source(
|
||||
video_source: VideoSourceEnum,
|
||||
bili_client: &BiliClient,
|
||||
connection: &DatabaseConnection,
|
||||
template: &handlebars::Handlebars<'_>,
|
||||
config: &Config,
|
||||
) -> Result<()> {
|
||||
// 预创建视频源目录,提前检测目录是否可写
|
||||
video_source.create_dir_all().await?;
|
||||
// 从参数中获取视频列表的 Model 与视频流
|
||||
let (video_source, video_streams) = video_source.refresh(bili_client, connection).await?;
|
||||
let (video_source, video_streams) = video_source
|
||||
.refresh(bili_client, &config.credential, connection)
|
||||
.await?;
|
||||
// 从视频流中获取新视频的简要信息,写入数据库
|
||||
refresh_video_source(&video_source, video_streams, connection).await?;
|
||||
// 单独请求视频详情接口,获取视频的详情信息与所有的分页,写入数据库
|
||||
fetch_video_details(bili_client, &video_source, connection).await?;
|
||||
fetch_video_details(bili_client, &video_source, connection, config).await?;
|
||||
if ARGS.scan_only {
|
||||
warn!("已开启仅扫描模式,跳过视频下载..");
|
||||
} else {
|
||||
// 从数据库中查找所有未下载的视频与分页,下载并处理
|
||||
download_unprocessed_videos(bili_client, &video_source, connection).await?;
|
||||
download_unprocessed_videos(bili_client, &video_source, connection, template, config).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -103,16 +108,17 @@ pub async fn fetch_video_details(
|
||||
bili_client: &BiliClient,
|
||||
video_source: &VideoSourceEnum,
|
||||
connection: &DatabaseConnection,
|
||||
config: &Config,
|
||||
) -> Result<()> {
|
||||
video_source.log_fetch_video_start();
|
||||
let videos_model = filter_unfilled_videos(video_source.filter_expr(), connection).await?;
|
||||
let semaphore = Semaphore::new(VersionedConfig::get().load().concurrent_limit.video);
|
||||
let semaphore = Semaphore::new(config.concurrent_limit.video);
|
||||
let semaphore_ref = &semaphore;
|
||||
let tasks = videos_model
|
||||
.into_iter()
|
||||
.map(|video_model| async move {
|
||||
let _permit = semaphore_ref.acquire().await.context("acquire semaphore failed")?;
|
||||
let video = Video::new(bili_client, video_model.bvid.clone());
|
||||
let video = Video::new(bili_client, video_model.bvid.clone(), &config.credential);
|
||||
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await;
|
||||
match info {
|
||||
Err(e) => {
|
||||
@@ -161,12 +167,13 @@ pub async fn download_unprocessed_videos(
|
||||
bili_client: &BiliClient,
|
||||
video_source: &VideoSourceEnum,
|
||||
connection: &DatabaseConnection,
|
||||
template: &handlebars::Handlebars<'_>,
|
||||
config: &Config,
|
||||
) -> Result<()> {
|
||||
video_source.log_download_video_start();
|
||||
let semaphore = Semaphore::new(VersionedConfig::get().load().concurrent_limit.video);
|
||||
let semaphore = Semaphore::new(config.concurrent_limit.video);
|
||||
let downloader = Downloader::new(bili_client.client.clone());
|
||||
// 提前获取 skip_option,保证单个视频源在整个下载过程中配置不变
|
||||
let skip_option = &VersionedConfig::get().load().skip_option;
|
||||
let cx = DownloadContext::new(bili_client, video_source, template, connection, &downloader, config);
|
||||
let unhandled_videos_pages = filter_unhandled_video_pages(video_source.filter_expr(), connection).await?;
|
||||
let mut assigned_upper = HashSet::new();
|
||||
let tasks = unhandled_videos_pages
|
||||
@@ -174,17 +181,7 @@ pub async fn download_unprocessed_videos(
|
||||
.map(|(video_model, pages_model)| {
|
||||
let should_download_upper = !assigned_upper.contains(&video_model.upper_id);
|
||||
assigned_upper.insert(video_model.upper_id);
|
||||
download_video_pages(
|
||||
bili_client,
|
||||
video_source,
|
||||
video_model,
|
||||
pages_model,
|
||||
connection,
|
||||
&semaphore,
|
||||
&downloader,
|
||||
should_download_upper,
|
||||
skip_option,
|
||||
)
|
||||
download_video_pages(video_model, pages_model, &semaphore, should_download_upper, cx)
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
let mut download_aborted = false;
|
||||
@@ -213,29 +210,28 @@ pub async fn download_unprocessed_videos(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn download_video_pages(
|
||||
bili_client: &BiliClient,
|
||||
video_source: &VideoSourceEnum,
|
||||
video_model: video::Model,
|
||||
pages: Vec<page::Model>,
|
||||
connection: &DatabaseConnection,
|
||||
page_models: Vec<page::Model>,
|
||||
semaphore: &Semaphore,
|
||||
downloader: &Downloader,
|
||||
should_download_upper: bool,
|
||||
skip_option: &SkipOption,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<video::ActiveModel> {
|
||||
let _permit = semaphore.acquire().await.context("acquire semaphore failed")?;
|
||||
let mut status = VideoStatus::from(video_model.download_status);
|
||||
let separate_status = status.should_run();
|
||||
let base_path = video_source.path().join(
|
||||
TEMPLATE
|
||||
.load()
|
||||
.path_safe_render("video", &video_format_args(&video_model))?,
|
||||
);
|
||||
// 未记录路径时填充,已经填充过路径时使用现有的
|
||||
let base_path = if !video_model.path.is_empty() {
|
||||
PathBuf::from(&video_model.path)
|
||||
} else {
|
||||
cx.video_source.path().join(
|
||||
cx.template
|
||||
.path_safe_render("video", &video_format_args(&video_model, &cx.config.time_format))?,
|
||||
)
|
||||
};
|
||||
let upper_id = video_model.upper_id.to_string();
|
||||
let base_upper_path = VersionedConfig::get()
|
||||
.load()
|
||||
let base_upper_path = cx
|
||||
.config
|
||||
.upper_path
|
||||
.join(upper_id.chars().next().context("upper_id is empty")?.to_string())
|
||||
.join(upper_id);
|
||||
@@ -245,47 +241,37 @@ pub async fn download_video_pages(
|
||||
let (res_1, res_2, res_3, res_4, res_5) = tokio::join!(
|
||||
// 下载视频封面
|
||||
fetch_video_poster(
|
||||
separate_status[0] && !is_single_page && !skip_option.no_poster,
|
||||
separate_status[0] && !is_single_page && !cx.config.skip_option.no_poster,
|
||||
&video_model,
|
||||
downloader,
|
||||
base_path.join("poster.jpg"),
|
||||
base_path.join("fanart.jpg"),
|
||||
cx
|
||||
),
|
||||
// 生成视频信息的 nfo
|
||||
generate_video_nfo(
|
||||
separate_status[1] && !is_single_page && !skip_option.no_video_nfo,
|
||||
separate_status[1] && !is_single_page && !cx.config.skip_option.no_video_nfo,
|
||||
&video_model,
|
||||
base_path.join("tvshow.nfo"),
|
||||
cx
|
||||
),
|
||||
// 下载 Up 主头像
|
||||
fetch_upper_face(
|
||||
separate_status[2] && should_download_upper && !skip_option.no_upper,
|
||||
separate_status[2] && should_download_upper && !cx.config.skip_option.no_upper,
|
||||
&video_model,
|
||||
downloader,
|
||||
base_upper_path.join("folder.jpg"),
|
||||
cx
|
||||
),
|
||||
// 生成 Up 主信息的 nfo
|
||||
generate_upper_nfo(
|
||||
separate_status[3] && should_download_upper && !skip_option.no_upper,
|
||||
separate_status[3] && should_download_upper && !cx.config.skip_option.no_upper,
|
||||
&video_model,
|
||||
base_upper_path.join("person.nfo"),
|
||||
cx,
|
||||
),
|
||||
// 分发并执行分页下载的任务
|
||||
dispatch_download_page(
|
||||
separate_status[4],
|
||||
bili_client,
|
||||
&video_model,
|
||||
pages,
|
||||
connection,
|
||||
downloader,
|
||||
&base_path,
|
||||
skip_option,
|
||||
)
|
||||
dispatch_download_page(separate_status[4], &video_model, page_models, &base_path, cx)
|
||||
);
|
||||
let results = [res_1, res_2, res_3, res_4, res_5]
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect::<Vec<_>>();
|
||||
let results = [res_1.into(), res_2.into(), res_3.into(), res_4.into(), res_5.into()];
|
||||
status.update_status(&results);
|
||||
results
|
||||
.iter()
|
||||
@@ -316,34 +302,20 @@ pub async fn download_video_pages(
|
||||
}
|
||||
|
||||
/// 分发并执行分页下载任务,当且仅当所有分页成功下载或达到最大重试次数时返回 Ok,否则根据失败原因返回对应的错误
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn dispatch_download_page(
|
||||
should_run: bool,
|
||||
bili_client: &BiliClient,
|
||||
video_model: &video::Model,
|
||||
pages: Vec<page::Model>,
|
||||
connection: &DatabaseConnection,
|
||||
downloader: &Downloader,
|
||||
page_models: Vec<page::Model>,
|
||||
base_path: &Path,
|
||||
skip_option: &SkipOption,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<ExecutionStatus> {
|
||||
if !should_run {
|
||||
return Ok(ExecutionStatus::Skipped);
|
||||
}
|
||||
let child_semaphore = Semaphore::new(VersionedConfig::get().load().concurrent_limit.page);
|
||||
let tasks = pages
|
||||
let child_semaphore = Semaphore::new(cx.config.concurrent_limit.page);
|
||||
let tasks = page_models
|
||||
.into_iter()
|
||||
.map(|page_model| {
|
||||
download_page(
|
||||
bili_client,
|
||||
video_model,
|
||||
page_model,
|
||||
&child_semaphore,
|
||||
downloader,
|
||||
base_path,
|
||||
skip_option,
|
||||
)
|
||||
})
|
||||
.map(|page_model| download_page(video_model, page_model, &child_semaphore, base_path, cx))
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
let (mut download_aborted, mut target_status) = (false, STATUS_OK);
|
||||
let mut stream = tasks
|
||||
@@ -372,7 +344,7 @@ pub async fn dispatch_download_page(
|
||||
.filter_map(|res| futures::future::ready(res.ok()))
|
||||
.chunks(10);
|
||||
while let Some(models) = stream.next().await {
|
||||
update_pages_model(models, connection).await?;
|
||||
update_pages_model(models, cx.connection).await?;
|
||||
}
|
||||
if download_aborted {
|
||||
error!("下载视频「{}」的分页时触发风控,将异常向上传递..", &video_model.name);
|
||||
@@ -386,21 +358,54 @@ pub async fn dispatch_download_page(
|
||||
|
||||
/// 下载某个分页,未发生风控且正常运行时返回 Ok(Page::ActiveModel),其中 status 字段存储了新的下载状态,发生风控时返回 DownloadAbortError
|
||||
pub async fn download_page(
|
||||
bili_client: &BiliClient,
|
||||
video_model: &video::Model,
|
||||
page_model: page::Model,
|
||||
semaphore: &Semaphore,
|
||||
downloader: &Downloader,
|
||||
base_path: &Path,
|
||||
skip_option: &SkipOption,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<page::ActiveModel> {
|
||||
let _permit = semaphore.acquire().await.context("acquire semaphore failed")?;
|
||||
let mut status = PageStatus::from(page_model.download_status);
|
||||
let separate_status = status.should_run();
|
||||
let is_single_page = video_model.single_page.context("single_page is null")?;
|
||||
let base_name = TEMPLATE
|
||||
.load()
|
||||
.path_safe_render("page", &page_format_args(video_model, &page_model))?;
|
||||
// 未记录路径时填充,已经填充过路径时使用现有的
|
||||
let (base_path, base_name) = if let Some(old_video_path) = &page_model.path
|
||||
&& !old_video_path.is_empty()
|
||||
{
|
||||
let old_video_path = Path::new(old_video_path);
|
||||
let old_video_filename = old_video_path
|
||||
.file_name()
|
||||
.context("invalid page path format")?
|
||||
.to_string_lossy();
|
||||
if is_single_page {
|
||||
// 单页下的路径是 {base_path}/{base_name}.mp4
|
||||
(
|
||||
old_video_path.parent().context("invalid page path format")?,
|
||||
old_video_filename.trim_end_matches(".mp4").to_string(),
|
||||
)
|
||||
} else {
|
||||
// 多页下的路径是 {base_path}/Season 1/{base_name} - S01Exx.mp4
|
||||
(
|
||||
old_video_path
|
||||
.parent()
|
||||
.and_then(|p| p.parent())
|
||||
.context("invalid page path format")?,
|
||||
old_video_filename
|
||||
.rsplit_once(" - ")
|
||||
.context("invalid page path format")?
|
||||
.0
|
||||
.to_string(),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
(
|
||||
base_path,
|
||||
cx.template.path_safe_render(
|
||||
"page",
|
||||
&page_format_args(video_model, &page_model, &cx.config.time_format),
|
||||
)?,
|
||||
)
|
||||
};
|
||||
let (poster_path, video_path, nfo_path, danmaku_path, fanart_path, subtitle_path) = if is_single_page {
|
||||
(
|
||||
base_path.join(format!("{}-poster.jpg", &base_name)),
|
||||
@@ -448,50 +453,41 @@ pub async fn download_page(
|
||||
let (res_1, res_2, res_3, res_4, res_5) = tokio::join!(
|
||||
// 下载分页封面
|
||||
fetch_page_poster(
|
||||
separate_status[0] && !skip_option.no_poster,
|
||||
separate_status[0] && !cx.config.skip_option.no_poster,
|
||||
video_model,
|
||||
&page_model,
|
||||
downloader,
|
||||
poster_path,
|
||||
fanart_path
|
||||
fanart_path,
|
||||
cx
|
||||
),
|
||||
// 下载分页视频
|
||||
fetch_page_video(
|
||||
separate_status[1],
|
||||
bili_client,
|
||||
video_model,
|
||||
downloader,
|
||||
&page_info,
|
||||
&video_path
|
||||
),
|
||||
fetch_page_video(separate_status[1], video_model, &page_info, &video_path, cx),
|
||||
// 生成分页视频信息的 nfo
|
||||
generate_page_nfo(
|
||||
separate_status[2] && !skip_option.no_video_nfo,
|
||||
separate_status[2] && !cx.config.skip_option.no_video_nfo,
|
||||
video_model,
|
||||
&page_model,
|
||||
nfo_path
|
||||
nfo_path,
|
||||
cx,
|
||||
),
|
||||
// 下载分页弹幕
|
||||
fetch_page_danmaku(
|
||||
separate_status[3] && !skip_option.no_danmaku,
|
||||
bili_client,
|
||||
separate_status[3] && !cx.config.skip_option.no_danmaku,
|
||||
video_model,
|
||||
&page_info,
|
||||
danmaku_path
|
||||
danmaku_path,
|
||||
cx,
|
||||
),
|
||||
// 下载分页字幕
|
||||
fetch_page_subtitle(
|
||||
separate_status[4] && !skip_option.no_subtitle,
|
||||
bili_client,
|
||||
separate_status[4] && !cx.config.skip_option.no_subtitle,
|
||||
video_model,
|
||||
&page_info,
|
||||
&subtitle_path
|
||||
&subtitle_path,
|
||||
cx
|
||||
)
|
||||
);
|
||||
let results = [res_1, res_2, res_3, res_4, res_5]
|
||||
.into_iter()
|
||||
.map(Into::into)
|
||||
.collect::<Vec<_>>();
|
||||
let results = [res_1.into(), res_2.into(), res_3.into(), res_4.into(), res_5.into()];
|
||||
status.update_status(&results);
|
||||
results
|
||||
.iter()
|
||||
@@ -532,9 +528,9 @@ pub async fn fetch_page_poster(
|
||||
should_run: bool,
|
||||
video_model: &video::Model,
|
||||
page_model: &page::Model,
|
||||
downloader: &Downloader,
|
||||
poster_path: PathBuf,
|
||||
fanart_path: Option<PathBuf>,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<ExecutionStatus> {
|
||||
if !should_run {
|
||||
return Ok(ExecutionStatus::Skipped);
|
||||
@@ -550,7 +546,9 @@ pub async fn fetch_page_poster(
|
||||
None => video_model.cover.as_str(),
|
||||
}
|
||||
};
|
||||
downloader.fetch(url, &poster_path).await?;
|
||||
cx.downloader
|
||||
.fetch(url, &poster_path, &cx.config.concurrent_limit.download)
|
||||
.await?;
|
||||
if let Some(fanart_path) = fanart_path {
|
||||
fs::copy(&poster_path, &fanart_path).await?;
|
||||
}
|
||||
@@ -559,32 +557,52 @@ pub async fn fetch_page_poster(
|
||||
|
||||
pub async fn fetch_page_video(
|
||||
should_run: bool,
|
||||
bili_client: &BiliClient,
|
||||
video_model: &video::Model,
|
||||
downloader: &Downloader,
|
||||
page_info: &PageInfo,
|
||||
page_path: &Path,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<ExecutionStatus> {
|
||||
if !should_run {
|
||||
return Ok(ExecutionStatus::Skipped);
|
||||
}
|
||||
let bili_video = Video::new(bili_client, video_model.bvid.clone());
|
||||
let bili_video = Video::new(cx.bili_client, video_model.bvid.clone(), &cx.config.credential);
|
||||
let streams = bili_video
|
||||
.get_page_analyzer(page_info)
|
||||
.await?
|
||||
.best_stream(&VersionedConfig::get().load().filter_option)?;
|
||||
.best_stream(&cx.config.filter_option)?;
|
||||
match streams {
|
||||
BestStream::Mixed(mix_stream) => downloader.multi_fetch(&mix_stream.urls(), page_path).await?,
|
||||
BestStream::Mixed(mix_stream) => {
|
||||
cx.downloader
|
||||
.multi_fetch(
|
||||
&mix_stream.urls(cx.config.cdn_sorting),
|
||||
page_path,
|
||||
&cx.config.concurrent_limit.download,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
BestStream::VideoAudio {
|
||||
video: video_stream,
|
||||
audio: None,
|
||||
} => downloader.multi_fetch(&video_stream.urls(), page_path).await?,
|
||||
} => {
|
||||
cx.downloader
|
||||
.multi_fetch(
|
||||
&video_stream.urls(cx.config.cdn_sorting),
|
||||
page_path,
|
||||
&cx.config.concurrent_limit.download,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
BestStream::VideoAudio {
|
||||
video: video_stream,
|
||||
audio: Some(audio_stream),
|
||||
} => {
|
||||
downloader
|
||||
.multi_fetch_and_merge(&video_stream.urls(), &audio_stream.urls(), page_path)
|
||||
cx.downloader
|
||||
.multi_fetch_and_merge(
|
||||
&video_stream.urls(cx.config.cdn_sorting),
|
||||
&audio_stream.urls(cx.config.cdn_sorting),
|
||||
page_path,
|
||||
&cx.config.concurrent_limit.download,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
}
|
||||
@@ -593,34 +611,34 @@ pub async fn fetch_page_video(
|
||||
|
||||
pub async fn fetch_page_danmaku(
|
||||
should_run: bool,
|
||||
bili_client: &BiliClient,
|
||||
video_model: &video::Model,
|
||||
page_info: &PageInfo,
|
||||
danmaku_path: PathBuf,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<ExecutionStatus> {
|
||||
if !should_run {
|
||||
return Ok(ExecutionStatus::Skipped);
|
||||
}
|
||||
let bili_video = Video::new(bili_client, video_model.bvid.clone());
|
||||
let bili_video = Video::new(cx.bili_client, video_model.bvid.clone(), &cx.config.credential);
|
||||
bili_video
|
||||
.get_danmaku_writer(page_info)
|
||||
.await?
|
||||
.write(danmaku_path)
|
||||
.write(danmaku_path, &cx.config.danmaku_option)
|
||||
.await?;
|
||||
Ok(ExecutionStatus::Succeeded)
|
||||
}
|
||||
|
||||
pub async fn fetch_page_subtitle(
|
||||
should_run: bool,
|
||||
bili_client: &BiliClient,
|
||||
video_model: &video::Model,
|
||||
page_info: &PageInfo,
|
||||
subtitle_path: &Path,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<ExecutionStatus> {
|
||||
if !should_run {
|
||||
return Ok(ExecutionStatus::Skipped);
|
||||
}
|
||||
let bili_video = Video::new(bili_client, video_model.bvid.clone());
|
||||
let bili_video = Video::new(cx.bili_client, video_model.bvid.clone(), &cx.config.credential);
|
||||
let subtitles = bili_video.get_subtitles(page_info).await?;
|
||||
let tasks = subtitles
|
||||
.into_iter()
|
||||
@@ -638,15 +656,16 @@ pub async fn generate_page_nfo(
|
||||
video_model: &video::Model,
|
||||
page_model: &page::Model,
|
||||
nfo_path: PathBuf,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<ExecutionStatus> {
|
||||
if !should_run {
|
||||
return Ok(ExecutionStatus::Skipped);
|
||||
}
|
||||
let single_page = video_model.single_page.context("single_page is null")?;
|
||||
let nfo = if single_page {
|
||||
NFO::Movie(video_model.into())
|
||||
NFO::Movie(video_model.to_nfo(cx.config.nfo_time_type))
|
||||
} else {
|
||||
NFO::Episode(page_model.into())
|
||||
NFO::Episode(page_model.to_nfo(cx.config.nfo_time_type))
|
||||
};
|
||||
generate_nfo(nfo, nfo_path).await?;
|
||||
Ok(ExecutionStatus::Succeeded)
|
||||
@@ -655,14 +674,16 @@ pub async fn generate_page_nfo(
|
||||
pub async fn fetch_video_poster(
|
||||
should_run: bool,
|
||||
video_model: &video::Model,
|
||||
downloader: &Downloader,
|
||||
poster_path: PathBuf,
|
||||
fanart_path: PathBuf,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<ExecutionStatus> {
|
||||
if !should_run {
|
||||
return Ok(ExecutionStatus::Skipped);
|
||||
}
|
||||
downloader.fetch(&video_model.cover, &poster_path).await?;
|
||||
cx.downloader
|
||||
.fetch(&video_model.cover, &poster_path, &cx.config.concurrent_limit.download)
|
||||
.await?;
|
||||
fs::copy(&poster_path, &fanart_path).await?;
|
||||
Ok(ExecutionStatus::Succeeded)
|
||||
}
|
||||
@@ -670,13 +691,19 @@ pub async fn fetch_video_poster(
|
||||
pub async fn fetch_upper_face(
|
||||
should_run: bool,
|
||||
video_model: &video::Model,
|
||||
downloader: &Downloader,
|
||||
upper_face_path: PathBuf,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<ExecutionStatus> {
|
||||
if !should_run {
|
||||
return Ok(ExecutionStatus::Skipped);
|
||||
}
|
||||
downloader.fetch(&video_model.upper_face, &upper_face_path).await?;
|
||||
cx.downloader
|
||||
.fetch(
|
||||
&video_model.upper_face,
|
||||
&upper_face_path,
|
||||
&cx.config.concurrent_limit.download,
|
||||
)
|
||||
.await?;
|
||||
Ok(ExecutionStatus::Succeeded)
|
||||
}
|
||||
|
||||
@@ -684,11 +711,12 @@ pub async fn generate_upper_nfo(
|
||||
should_run: bool,
|
||||
video_model: &video::Model,
|
||||
nfo_path: PathBuf,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<ExecutionStatus> {
|
||||
if !should_run {
|
||||
return Ok(ExecutionStatus::Skipped);
|
||||
}
|
||||
generate_nfo(NFO::Upper(video_model.into()), nfo_path).await?;
|
||||
generate_nfo(NFO::Upper(video_model.to_nfo(cx.config.nfo_time_type)), nfo_path).await?;
|
||||
Ok(ExecutionStatus::Succeeded)
|
||||
}
|
||||
|
||||
@@ -696,11 +724,12 @@ pub async fn generate_video_nfo(
|
||||
should_run: bool,
|
||||
video_model: &video::Model,
|
||||
nfo_path: PathBuf,
|
||||
cx: DownloadContext<'_>,
|
||||
) -> Result<ExecutionStatus> {
|
||||
if !should_run {
|
||||
return Ok(ExecutionStatus::Skipped);
|
||||
}
|
||||
generate_nfo(NFO::TVShow(video_model.into()), nfo_path).await?;
|
||||
generate_nfo(NFO::TVShow(video_model.to_nfo(cx.config.nfo_time_type)), nfo_path).await?;
|
||||
Ok(ExecutionStatus::Succeeded)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user