diff --git a/Cargo.lock b/Cargo.lock index c5fac9c..d1f60c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -434,6 +434,7 @@ dependencies = [ "futures", "handlebars", "hex", + "leaky-bucket", "md5", "memchr", "once_cell", @@ -1524,6 +1525,17 @@ dependencies = [ "spin", ] +[[package]] +name = "leaky-bucket" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a396bb213c2d09ed6c5495fd082c991b6ab39c9daf4fff59e6727f85c73e4c5" +dependencies = [ + "parking_lot", + "pin-project-lite", + "tokio", +] + [[package]] name = "libc" version = "0.2.153" diff --git a/Cargo.toml b/Cargo.toml index 0426dbb..67ab934 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ float-ord = "0.3.2" futures = "0.3.30" handlebars = "6.0.0" hex = "0.4.3" +leaky-bucket = "1.1.2" md5 = "0.7.0" memchr = "2.7.4" once_cell = "1.19.0" diff --git a/crates/bili_sync/Cargo.toml b/crates/bili_sync/Cargo.toml index fc63ee7..87f0fcf 100644 --- a/crates/bili_sync/Cargo.toml +++ b/crates/bili_sync/Cargo.toml @@ -23,6 +23,7 @@ float-ord = { workspace = true } futures = { workspace = true } handlebars = { workspace = true } hex = { workspace = true } +leaky-bucket = { workspace = true } md5 = { workspace = true } memchr = { workspace = true } once_cell = { workspace = true } diff --git a/crates/bili_sync/src/bilibili/client.rs b/crates/bili_sync/src/bilibili/client.rs index e2d83a1..80097aa 100644 --- a/crates/bili_sync/src/bilibili/client.rs +++ b/crates/bili_sync/src/bilibili/client.rs @@ -1,11 +1,13 @@ use std::sync::Arc; +use std::time::Duration; use anyhow::{bail, Result}; +use leaky_bucket::RateLimiter; use reqwest::{header, Method}; use crate::bilibili::credential::WbiImg; use crate::bilibili::Credential; -use crate::config::CONFIG; +use crate::config::{RateLimit, CONFIG}; // 一个对 reqwest::Client 的简单封装,用于 Bilibili 请求 #[derive(Clone)] @@ -61,15 +63,31 @@ impl Default for Client { pub struct BiliClient { pub client: Client, + limiter: Option, } impl BiliClient { pub fn new() -> Self { let client = Client::new(); - Self { client } + let limiter = match CONFIG.concurrent_limit.rate_limit { + Some(RateLimit { limit, duration }) => Some( + RateLimiter::builder() + .initial(limit) + .refill(limit) + .max(limit) + .interval(Duration::from_secs(duration)) + .build(), + ), + None => None, + }; + Self { client, limiter } } - pub fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder { + /// 获取一个预构建的请求,通过该方法获取请求时会检查并等待速率限制 + pub async fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder { + if let Some(limiter) = &self.limiter { + limiter.acquire_one().await; + } let credential = CONFIG.credential.load(); self.client.request(method, url, credential.as_deref()) } diff --git a/crates/bili_sync/src/bilibili/collection.rs b/crates/bili_sync/src/bilibili/collection.rs index 0e62a91..c80bb15 100644 --- a/crates/bili_sync/src/bilibili/collection.rs +++ b/crates/bili_sync/src/bilibili/collection.rs @@ -109,6 +109,7 @@ impl<'a> Collection<'a> { async fn get_series_info(&self) -> Result { self.client .request(Method::GET, "https://api.bilibili.com/x/series/series") + .await .query(&[("series_id", self.collection.sid.as_str())]) .send() .await? @@ -151,6 +152,7 @@ impl<'a> Collection<'a> { }; self.client .request(Method::GET, url) + .await .query(&query) .send() .await? diff --git a/crates/bili_sync/src/bilibili/favorite_list.rs b/crates/bili_sync/src/bilibili/favorite_list.rs index 9cc6e9e..533a47e 100644 --- a/crates/bili_sync/src/bilibili/favorite_list.rs +++ b/crates/bili_sync/src/bilibili/favorite_list.rs @@ -30,6 +30,7 @@ impl<'a> FavoriteList<'a> { let mut res = self .client .request(reqwest::Method::GET, "https://api.bilibili.com/x/v3/fav/folder/info") + .await .query(&[("media_id", &self.fid)]) .send() .await? @@ -43,6 +44,7 @@ impl<'a> FavoriteList<'a> { async fn get_videos(&self, page: u32) -> Result { self.client .request(reqwest::Method::GET, "https://api.bilibili.com/x/v3/fav/resource/list") + .await .query(&[ ("media_id", self.fid.as_str()), ("pn", &page.to_string()), diff --git a/crates/bili_sync/src/bilibili/submission.rs b/crates/bili_sync/src/bilibili/submission.rs index 3a59e66..98db162 100644 --- a/crates/bili_sync/src/bilibili/submission.rs +++ b/crates/bili_sync/src/bilibili/submission.rs @@ -22,6 +22,7 @@ impl<'a> Submission<'a> { let mut res = self .client .request(Method::GET, "https://api.bilibili.com/x/web-interface/card") + .await .query(&[("mid", self.upper_id.as_str())]) .send() .await? @@ -35,6 +36,7 @@ impl<'a> Submission<'a> { async fn get_videos(&self, page: i32) -> Result { self.client .request(Method::GET, "https://api.bilibili.com/x/space/wbi/arc/search") + .await .query(&encoded_query( vec![ ("mid", self.upper_id.clone()), diff --git a/crates/bili_sync/src/bilibili/video.rs b/crates/bili_sync/src/bilibili/video.rs index 115b9c5..e0f9744 100644 --- a/crates/bili_sync/src/bilibili/video.rs +++ b/crates/bili_sync/src/bilibili/video.rs @@ -67,6 +67,7 @@ impl<'a> Video<'a> { let mut res = self .client .request(Method::GET, "https://api.bilibili.com/x/web-interface/view") + .await .query(&[("aid", &self.aid), ("bvid", &self.bvid)]) .send() .await? @@ -81,6 +82,7 @@ impl<'a> Video<'a> { let mut res = self .client .request(Method::GET, "https://api.bilibili.com/x/player/pagelist") + .await .query(&[("aid", &self.aid), ("bvid", &self.bvid)]) .send() .await? @@ -95,6 +97,7 @@ impl<'a> Video<'a> { let mut res = self .client .request(Method::GET, "https://api.bilibili.com/x/web-interface/view/detail/tag") + .await .query(&[("aid", &self.aid), ("bvid", &self.bvid)]) .send() .await? @@ -120,6 +123,7 @@ impl<'a> Video<'a> { let mut res = self .client .request(Method::GET, "http://api.bilibili.com/x/v2/dm/web/seg.so") + .await .query(&[("type", 1), ("oid", page.cid), ("segment_index", segment_idx)]) .send() .await? @@ -140,6 +144,7 @@ impl<'a> Video<'a> { let mut res = self .client .request(Method::GET, "https://api.bilibili.com/x/player/wbi/playurl") + .await .query(&encoded_query( vec![ ("avid", self.aid.as_str()), diff --git a/crates/bili_sync/src/bilibili/watch_later.rs b/crates/bili_sync/src/bilibili/watch_later.rs index 942629e..43e1a75 100644 --- a/crates/bili_sync/src/bilibili/watch_later.rs +++ b/crates/bili_sync/src/bilibili/watch_later.rs @@ -16,6 +16,7 @@ impl<'a> WatchLater<'a> { async fn get_videos(&self) -> Result { self.client .request(reqwest::Method::GET, "https://api.bilibili.com/x/v2/history/toview") + .await .send() .await? .error_for_status()? diff --git a/crates/bili_sync/src/config/item.rs b/crates/bili_sync/src/config/item.rs index f5b02c9..0ce5997 100644 --- a/crates/bili_sync/src/config/item.rs +++ b/crates/bili_sync/src/config/item.rs @@ -16,23 +16,6 @@ pub struct WatchLaterConfig { pub path: PathBuf, } -/// 每次执行操作后的延迟配置 -#[derive(Serialize, Deserialize, Default)] -pub struct DelayConfig { - pub refresh_video_list: Option, - pub fetch_video_detail: Option, - pub download_video: Option, - pub download_page: Option, -} - -/// 延迟的定义,支持固定时间和随机时间 -#[derive(Serialize, Deserialize)] -#[serde(untagged, rename_all = "lowercase")] -pub enum Delay { - Random { min: u64, max: u64 }, - Fixed(u64), -} - /// NFO 文件使用的时间类型 #[derive(Serialize, Deserialize, Default)] #[serde(rename_all = "lowercase")] @@ -47,7 +30,13 @@ pub enum NFOTimeType { pub struct ConcurrentLimit { pub video: usize, pub page: usize, - pub delay: DelayConfig, + pub rate_limit: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct RateLimit { + pub limit: usize, + pub duration: u64, } impl Default for ConcurrentLimit { @@ -55,7 +44,7 @@ impl Default for ConcurrentLimit { Self { video: 3, page: 2, - delay: DelayConfig::default(), + rate_limit: None, } } } diff --git a/crates/bili_sync/src/config/mod.rs b/crates/bili_sync/src/config/mod.rs index 16bb8b1..1da9222 100644 --- a/crates/bili_sync/src/config/mod.rs +++ b/crates/bili_sync/src/config/mod.rs @@ -14,7 +14,7 @@ mod item; use crate::bilibili::{CollectionItem, Credential, DanmakuOption, FilterOption}; pub use crate::config::global::{ARGS, CONFIG, CONFIG_DIR, TEMPLATE}; use crate::config::item::{deserialize_collection_list, serialize_collection_list, ConcurrentLimit}; -pub use crate::config::item::{Delay, NFOTimeType, PathSafeTemplate, WatchLaterConfig}; +pub use crate::config::item::{NFOTimeType, PathSafeTemplate, RateLimit, WatchLaterConfig}; fn default_time_format() -> String { "%Y-%m-%d".to_string() @@ -136,22 +136,6 @@ impl Config { ok = false; error!("允许的并发数必须大于 0"); } - for delay_config in [ - &self.concurrent_limit.delay.refresh_video_list, - &self.concurrent_limit.delay.fetch_video_detail, - &self.concurrent_limit.delay.download_video, - &self.concurrent_limit.delay.download_page, - ] - .iter() - .filter_map(|x| x.as_ref()) - { - if let Delay::Random { min, max } = delay_config { - if min >= max { - ok = false; - error!("随机延迟的最小值应小于最大值"); - } - } - } if !ok { panic!( "位于 {} 的配置文件不合法,请参考提示信息修复后继续运行", diff --git a/crates/bili_sync/src/utils/mod.rs b/crates/bili_sync/src/utils/mod.rs index 6599fb2..a5ce98a 100644 --- a/crates/bili_sync/src/utils/mod.rs +++ b/crates/bili_sync/src/utils/mod.rs @@ -5,12 +5,8 @@ pub mod nfo; pub mod status; use chrono::{DateTime, Utc}; -use rand::Rng; -use tokio::time; use tracing_subscriber::util::SubscriberInitExt; -use crate::config::Delay; - pub fn init_logger(log_level: &str) { tracing_subscriber::fmt::Subscriber::builder() .with_env_filter(tracing_subscriber::EnvFilter::builder().parse_lossy(log_level)) @@ -26,19 +22,3 @@ pub fn init_logger(log_level: &str) { pub fn id_time_key(bvid: &String, time: &DateTime) -> String { format!("{}-{}", bvid, time.timestamp()) } - -pub(crate) async fn delay(delay: Option<&Delay>) { - match delay { - None => {} - Some(Delay::Random { min, max }) => { - let delay = { - let mut rng = rand::thread_rng(); - rng.gen_range(*min..=*max) - }; - time::sleep(time::Duration::from_millis(delay)).await; - } - Some(Delay::Fixed(delay)) => { - time::sleep(time::Duration::from_millis(*delay)).await; - } - } -} diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 0e98155..d1c5aa9 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -17,7 +17,6 @@ use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Vi use crate::config::{PathSafeTemplate, ARGS, CONFIG, TEMPLATE}; use crate::downloader::Downloader; use crate::error::{DownloadAbortError, ProcessPageError}; -use crate::utils::delay; use crate::utils::model::{create_videos, update_pages_model, update_videos_model}; use crate::utils::nfo::{ModelWrapper, NFOMode, NFOSerializer}; use crate::utils::status::{PageStatus, VideoStatus}; @@ -59,7 +58,6 @@ pub async fn refresh_video_list<'a>( info!("到达上一次处理的位置,提前中止"); break; } - delay(CONFIG.concurrent_limit.delay.refresh_video_list.as_ref()).await; } new_count = video_list_model.video_count(connection).await? - new_count; video_list_model.log_refresh_video_end(got_count, new_count); @@ -79,7 +77,6 @@ pub async fn fetch_video_details( video_list_model .fetch_videos_detail(video, video_model, connection) .await?; - delay(CONFIG.concurrent_limit.delay.fetch_video_detail.as_ref()).await; } video_list_model.log_fetch_video_end(); Ok(video_list_model) @@ -230,7 +227,6 @@ pub async fn download_video_pages( } let mut video_active_model: video::ActiveModel = video_model.into(); video_active_model.download_status = Set(status.into()); - delay(CONFIG.concurrent_limit.delay.download_video.as_ref()).await; Ok(video_active_model) } @@ -417,7 +413,6 @@ pub async fn download_page( let mut page_active_model: page::ActiveModel = page_model.into(); page_active_model.download_status = Set(status.into()); page_active_model.path = Set(Some(video_path.to_str().unwrap().to_string())); - delay(CONFIG.concurrent_limit.delay.download_page.as_ref()).await; Ok(page_active_model) }