chore: 支持使用 leaky-bucket 限制请求频率 (#211)

* chore: 移除之前引入的 delay

* feat: 支持为 b 站请求配置频率限制
This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2025-01-11 23:24:01 +08:00
committed by GitHub
parent 66a7b1394e
commit 0113bf704d
13 changed files with 57 additions and 65 deletions

14
Cargo.lock generated
View File

@@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
version = 4
[[package]]
name = "addr2line"
@@ -434,6 +434,7 @@ dependencies = [
"futures",
"handlebars",
"hex",
"leaky-bucket",
"md5",
"memchr",
"once_cell",
@@ -1524,6 +1525,17 @@ dependencies = [
"spin",
]
[[package]]
name = "leaky-bucket"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a396bb213c2d09ed6c5495fd082c991b6ab39c9daf4fff59e6727f85c73e4c5"
dependencies = [
"parking_lot",
"pin-project-lite",
"tokio",
]
[[package]]
name = "libc"
version = "0.2.153"

View File

@@ -28,6 +28,7 @@ float-ord = "0.3.2"
futures = "0.3.30"
handlebars = "6.0.0"
hex = "0.4.3"
leaky-bucket = "1.1.2"
md5 = "0.7.0"
memchr = "2.7.4"
once_cell = "1.19.0"

View File

@@ -23,6 +23,7 @@ float-ord = { workspace = true }
futures = { workspace = true }
handlebars = { workspace = true }
hex = { workspace = true }
leaky-bucket = { workspace = true }
md5 = { workspace = true }
memchr = { workspace = true }
once_cell = { workspace = true }

View File

@@ -1,11 +1,13 @@
use std::sync::Arc;
use std::time::Duration;
use anyhow::{bail, Result};
use leaky_bucket::RateLimiter;
use reqwest::{header, Method};
use crate::bilibili::credential::WbiImg;
use crate::bilibili::Credential;
use crate::config::CONFIG;
use crate::config::{RateLimit, CONFIG};
// 一个对 reqwest::Client 的简单封装,用于 Bilibili 请求
#[derive(Clone)]
@@ -61,15 +63,31 @@ impl Default for Client {
pub struct BiliClient {
pub client: Client,
limiter: Option<RateLimiter>,
}
impl BiliClient {
pub fn new() -> Self {
let client = Client::new();
Self { client }
let limiter = match CONFIG.concurrent_limit.rate_limit {
Some(RateLimit { limit, duration }) => Some(
RateLimiter::builder()
.initial(limit)
.refill(limit)
.max(limit)
.interval(Duration::from_secs(duration))
.build(),
),
None => None,
};
Self { client, limiter }
}
pub fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
/// 获取一个预构建的请求,通过该方法获取请求时会检查并等待速率限制
pub async fn request(&self, method: Method, url: &str) -> reqwest::RequestBuilder {
if let Some(limiter) = &self.limiter {
limiter.acquire_one().await;
}
let credential = CONFIG.credential.load();
self.client.request(method, url, credential.as_deref())
}

View File

@@ -109,6 +109,7 @@ impl<'a> Collection<'a> {
async fn get_series_info(&self) -> Result<Value> {
self.client
.request(Method::GET, "https://api.bilibili.com/x/series/series")
.await
.query(&[("series_id", self.collection.sid.as_str())])
.send()
.await?
@@ -151,6 +152,7 @@ impl<'a> Collection<'a> {
};
self.client
.request(Method::GET, url)
.await
.query(&query)
.send()
.await?

View File

@@ -30,6 +30,7 @@ impl<'a> FavoriteList<'a> {
let mut res = self
.client
.request(reqwest::Method::GET, "https://api.bilibili.com/x/v3/fav/folder/info")
.await
.query(&[("media_id", &self.fid)])
.send()
.await?
@@ -43,6 +44,7 @@ impl<'a> FavoriteList<'a> {
async fn get_videos(&self, page: u32) -> Result<Value> {
self.client
.request(reqwest::Method::GET, "https://api.bilibili.com/x/v3/fav/resource/list")
.await
.query(&[
("media_id", self.fid.as_str()),
("pn", &page.to_string()),

View File

@@ -22,6 +22,7 @@ impl<'a> Submission<'a> {
let mut res = self
.client
.request(Method::GET, "https://api.bilibili.com/x/web-interface/card")
.await
.query(&[("mid", self.upper_id.as_str())])
.send()
.await?
@@ -35,6 +36,7 @@ impl<'a> Submission<'a> {
async fn get_videos(&self, page: i32) -> Result<Value> {
self.client
.request(Method::GET, "https://api.bilibili.com/x/space/wbi/arc/search")
.await
.query(&encoded_query(
vec![
("mid", self.upper_id.clone()),

View File

@@ -67,6 +67,7 @@ impl<'a> Video<'a> {
let mut res = self
.client
.request(Method::GET, "https://api.bilibili.com/x/web-interface/view")
.await
.query(&[("aid", &self.aid), ("bvid", &self.bvid)])
.send()
.await?
@@ -81,6 +82,7 @@ impl<'a> Video<'a> {
let mut res = self
.client
.request(Method::GET, "https://api.bilibili.com/x/player/pagelist")
.await
.query(&[("aid", &self.aid), ("bvid", &self.bvid)])
.send()
.await?
@@ -95,6 +97,7 @@ impl<'a> Video<'a> {
let mut res = self
.client
.request(Method::GET, "https://api.bilibili.com/x/web-interface/view/detail/tag")
.await
.query(&[("aid", &self.aid), ("bvid", &self.bvid)])
.send()
.await?
@@ -120,6 +123,7 @@ impl<'a> Video<'a> {
let mut res = self
.client
.request(Method::GET, "http://api.bilibili.com/x/v2/dm/web/seg.so")
.await
.query(&[("type", 1), ("oid", page.cid), ("segment_index", segment_idx)])
.send()
.await?
@@ -140,6 +144,7 @@ impl<'a> Video<'a> {
let mut res = self
.client
.request(Method::GET, "https://api.bilibili.com/x/player/wbi/playurl")
.await
.query(&encoded_query(
vec![
("avid", self.aid.as_str()),

View File

@@ -16,6 +16,7 @@ impl<'a> WatchLater<'a> {
async fn get_videos(&self) -> Result<Value> {
self.client
.request(reqwest::Method::GET, "https://api.bilibili.com/x/v2/history/toview")
.await
.send()
.await?
.error_for_status()?

View File

@@ -16,23 +16,6 @@ pub struct WatchLaterConfig {
pub path: PathBuf,
}
/// 每次执行操作后的延迟配置
#[derive(Serialize, Deserialize, Default)]
pub struct DelayConfig {
pub refresh_video_list: Option<Delay>,
pub fetch_video_detail: Option<Delay>,
pub download_video: Option<Delay>,
pub download_page: Option<Delay>,
}
/// 延迟的定义,支持固定时间和随机时间
#[derive(Serialize, Deserialize)]
#[serde(untagged, rename_all = "lowercase")]
pub enum Delay {
Random { min: u64, max: u64 },
Fixed(u64),
}
/// NFO 文件使用的时间类型
#[derive(Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
@@ -47,7 +30,13 @@ pub enum NFOTimeType {
pub struct ConcurrentLimit {
pub video: usize,
pub page: usize,
pub delay: DelayConfig,
pub rate_limit: Option<RateLimit>,
}
#[derive(Serialize, Deserialize)]
pub struct RateLimit {
pub limit: usize,
pub duration: u64,
}
impl Default for ConcurrentLimit {
@@ -55,7 +44,7 @@ impl Default for ConcurrentLimit {
Self {
video: 3,
page: 2,
delay: DelayConfig::default(),
rate_limit: None,
}
}
}

View File

@@ -14,7 +14,7 @@ mod item;
use crate::bilibili::{CollectionItem, Credential, DanmakuOption, FilterOption};
pub use crate::config::global::{ARGS, CONFIG, CONFIG_DIR, TEMPLATE};
use crate::config::item::{deserialize_collection_list, serialize_collection_list, ConcurrentLimit};
pub use crate::config::item::{Delay, NFOTimeType, PathSafeTemplate, WatchLaterConfig};
pub use crate::config::item::{NFOTimeType, PathSafeTemplate, RateLimit, WatchLaterConfig};
fn default_time_format() -> String {
"%Y-%m-%d".to_string()
@@ -136,22 +136,6 @@ impl Config {
ok = false;
error!("允许的并发数必须大于 0");
}
for delay_config in [
&self.concurrent_limit.delay.refresh_video_list,
&self.concurrent_limit.delay.fetch_video_detail,
&self.concurrent_limit.delay.download_video,
&self.concurrent_limit.delay.download_page,
]
.iter()
.filter_map(|x| x.as_ref())
{
if let Delay::Random { min, max } = delay_config {
if min >= max {
ok = false;
error!("随机延迟的最小值应小于最大值");
}
}
}
if !ok {
panic!(
"位于 {} 的配置文件不合法,请参考提示信息修复后继续运行",

View File

@@ -5,12 +5,8 @@ pub mod nfo;
pub mod status;
use chrono::{DateTime, Utc};
use rand::Rng;
use tokio::time;
use tracing_subscriber::util::SubscriberInitExt;
use crate::config::Delay;
pub fn init_logger(log_level: &str) {
tracing_subscriber::fmt::Subscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::builder().parse_lossy(log_level))
@@ -26,19 +22,3 @@ pub fn init_logger(log_level: &str) {
pub fn id_time_key(bvid: &String, time: &DateTime<Utc>) -> String {
format!("{}-{}", bvid, time.timestamp())
}
pub(crate) async fn delay(delay: Option<&Delay>) {
match delay {
None => {}
Some(Delay::Random { min, max }) => {
let delay = {
let mut rng = rand::thread_rng();
rng.gen_range(*min..=*max)
};
time::sleep(time::Duration::from_millis(delay)).await;
}
Some(Delay::Fixed(delay)) => {
time::sleep(time::Duration::from_millis(*delay)).await;
}
}
}

View File

@@ -17,7 +17,6 @@ use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Vi
use crate::config::{PathSafeTemplate, ARGS, CONFIG, TEMPLATE};
use crate::downloader::Downloader;
use crate::error::{DownloadAbortError, ProcessPageError};
use crate::utils::delay;
use crate::utils::model::{create_videos, update_pages_model, update_videos_model};
use crate::utils::nfo::{ModelWrapper, NFOMode, NFOSerializer};
use crate::utils::status::{PageStatus, VideoStatus};
@@ -59,7 +58,6 @@ pub async fn refresh_video_list<'a>(
info!("到达上一次处理的位置,提前中止");
break;
}
delay(CONFIG.concurrent_limit.delay.refresh_video_list.as_ref()).await;
}
new_count = video_list_model.video_count(connection).await? - new_count;
video_list_model.log_refresh_video_end(got_count, new_count);
@@ -79,7 +77,6 @@ pub async fn fetch_video_details(
video_list_model
.fetch_videos_detail(video, video_model, connection)
.await?;
delay(CONFIG.concurrent_limit.delay.fetch_video_detail.as_ref()).await;
}
video_list_model.log_fetch_video_end();
Ok(video_list_model)
@@ -230,7 +227,6 @@ pub async fn download_video_pages(
}
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.download_status = Set(status.into());
delay(CONFIG.concurrent_limit.delay.download_video.as_ref()).await;
Ok(video_active_model)
}
@@ -417,7 +413,6 @@ pub async fn download_page(
let mut page_active_model: page::ActiveModel = page_model.into();
page_active_model.download_status = Set(status.into());
page_active_model.path = Set(Some(video_path.to_str().unwrap().to_string()));
delay(CONFIG.concurrent_limit.delay.download_page.as_ref()).await;
Ok(page_active_model)
}