feat: 修改通知器,支持提示成功任务数量 (#672)

This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2026-03-15 03:31:41 +08:00
committed by GitHub
parent 2bd660efc9
commit e97fa73542
5 changed files with 178 additions and 19 deletions

View File

@@ -0,0 +1,67 @@
use bili_sync_entity::video;
use crate::utils::status::{STATUS_OK, VideoStatus};
pub enum DownloadNotifyInfo {
List {
source: String,
img_url: Option<String>,
titles: Vec<String>,
},
Summary {
source: String,
img_url: Option<String>,
count: usize,
},
}
impl DownloadNotifyInfo {
pub fn new(source: String) -> Self {
Self::List {
source,
img_url: None,
titles: Vec::with_capacity(10),
}
}
pub fn record(&mut self, models: &[video::ActiveModel]) {
let success_models = models
.iter()
.filter(|m| {
let sub_task_status: [u32; 5] = VideoStatus::from(*m.download_status.as_ref()).into();
sub_task_status.into_iter().all(|s| s == STATUS_OK)
})
.collect::<Vec<_>>();
match self {
Self::List {
source,
img_url,
titles,
} => {
let count = success_models.len() + titles.len();
if count > 10 {
*self = Self::Summary {
source: std::mem::take(source),
img_url: std::mem::take(img_url),
count,
};
} else {
if img_url.is_none() {
*img_url = success_models.first().map(|m| m.cover.as_ref().clone());
}
titles.extend(success_models.into_iter().map(|m| m.name.as_ref().clone()));
}
}
Self::Summary { count, .. } => *count += success_models.len(),
}
}
pub fn should_notify(&self) -> bool {
if let Self::List { titles, .. } = self
&& titles.is_empty()
{
return false;
}
true
}
}

View File

@@ -0,0 +1,59 @@
use std::borrow::Cow;
use itertools::Itertools;
use serde::Serialize;
use crate::notifier::DownloadNotifyInfo;
#[derive(Serialize)]
pub struct Message<'a> {
pub message: Cow<'a, str>,
pub image_url: Option<String>,
}
impl<'a> From<&'a str> for Message<'a> {
fn from(message: &'a str) -> Self {
Self {
message: Cow::Borrowed(message),
image_url: None,
}
}
}
impl From<String> for Message<'_> {
fn from(message: String) -> Self {
Self {
message: message.into(),
image_url: None,
}
}
}
impl From<DownloadNotifyInfo> for Message<'_> {
fn from(info: DownloadNotifyInfo) -> Self {
match info {
DownloadNotifyInfo::List {
source,
img_url,
titles,
} => Self {
message: format!(
"{}的 {} 条新视频已入库:\n{}",
source,
titles.len(),
titles
.into_iter()
.enumerate()
.map(|(i, title)| format!("{}. {title}", i + 1))
.join("\n")
)
.into(),
image_url: img_url,
},
DownloadNotifyInfo::Summary { source, img_url, count } => Self {
message: format!("{}的 {} 条新视频已入库,快去看看吧!", source, count).into(),
image_url: img_url,
},
}
}
}

View File

@@ -1,5 +1,10 @@
mod info;
mod message;
use anyhow::Result;
use futures::future;
pub use info::DownloadNotifyInfo;
pub use message::Message;
use reqwest::header;
use serde::{Deserialize, Serialize};
@@ -33,23 +38,38 @@ pub fn webhook_template_content(template: &Option<String>) -> &str {
}
pub trait NotifierAllExt {
async fn notify_all(&self, client: &reqwest::Client, message: &str) -> Result<()>;
async fn notify_all<'a>(&self, client: &reqwest::Client, message: impl Into<Message<'a>>) -> Result<()>;
}
impl NotifierAllExt for Vec<Notifier> {
async fn notify_all(&self, client: &reqwest::Client, message: &str) -> Result<()> {
future::join_all(self.iter().map(|notifier| notifier.notify(client, message))).await;
async fn notify_all<'a>(&self, client: &reqwest::Client, message: impl Into<Message<'a>>) -> Result<()> {
let message = message.into();
future::join_all(self.iter().map(|notifier| notifier.notify_internal(client, &message))).await;
Ok(())
}
}
impl Notifier {
pub async fn notify(&self, client: &reqwest::Client, message: &str) -> Result<()> {
pub async fn notify<'a>(&self, client: &reqwest::Client, message: impl Into<Message<'a>>) -> Result<()> {
self.notify_internal(client, &message.into()).await
}
async fn notify_internal<'a>(&self, client: &reqwest::Client, message: &Message<'a>) -> Result<()> {
match self {
Notifier::Telegram { bot_token, chat_id } => {
let url = format!("https://api.telegram.org/bot{}/sendMessage", bot_token);
let params = [("chat_id", chat_id.as_str()), ("text", message)];
client.post(&url).form(&params).send().await?;
if let Some(img_url) = &message.image_url {
let url = format!("https://api.telegram.org/bot{}/sendPhoto", bot_token);
let params = [
("chat_id", chat_id.as_str()),
("photo", img_url.as_str()),
("caption", message.message.as_ref()),
];
client.post(&url).form(&params).send().await?;
} else {
let url = format!("https://api.telegram.org/bot{}/sendMessage", bot_token);
let params = [("chat_id", chat_id.as_str()), ("text", message.message.as_ref())];
client.post(&url).form(&params).send().await?;
}
}
Notifier::Webhook {
url,
@@ -57,15 +77,10 @@ impl Notifier {
ignore_cache,
} => {
let key = webhook_template_key(url);
let data = serde_json::json!(
{
"message": message,
}
);
let handlebar = TEMPLATE.read();
let payload = match ignore_cache {
Some(_) => handlebar.render_template(webhook_template_content(template), &data)?,
None => handlebar.render(&key, &data)?,
Some(_) => handlebar.render_template(webhook_template_content(template), &message)?,
None => handlebar.render(&key, &message)?,
};
client
.post(url)

View File

@@ -1,6 +1,16 @@
use crate::bilibili::BiliClient;
use crate::config::Config;
use crate::notifier::NotifierAllExt;
use crate::notifier::{Message, NotifierAllExt};
pub fn notify(config: &Config, bili_client: &BiliClient, msg: impl Into<Message<'static>>) {
if let Some(notifiers) = &config.notifiers
&& !notifiers.is_empty()
{
let (notifiers, inner_client) = (notifiers.clone(), bili_client.inner_client().clone());
let msg = msg.into();
tokio::spawn(async move { notifiers.notify_all(&inner_client, msg).await });
}
}
pub fn error_and_notify(config: &Config, bili_client: &BiliClient, msg: String) {
error!("{msg}");
@@ -8,6 +18,6 @@ pub fn error_and_notify(config: &Config, bili_client: &BiliClient, msg: String)
&& !notifiers.is_empty()
{
let (notifiers, inner_client) = (notifiers.clone(), bili_client.inner_client().clone());
tokio::spawn(async move { notifiers.notify_all(&inner_client, msg.as_str()).await });
tokio::spawn(async move { notifiers.notify_all(&inner_client, msg).await });
}
}

View File

@@ -17,6 +17,7 @@ use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Vi
use crate::config::{ARGS, Config, PathSafeTemplate};
use crate::downloader::Downloader;
use crate::error::ExecutionStatus;
use crate::notifier::DownloadNotifyInfo;
use crate::utils::download_context::DownloadContext;
use crate::utils::format_arg::{page_format_args, video_format_args};
use crate::utils::model::{
@@ -24,6 +25,7 @@ use crate::utils::model::{
update_videos_model,
};
use crate::utils::nfo::{NFO, ToNFO};
use crate::utils::notify::notify;
use crate::utils::rule::FieldEvaluatable;
use crate::utils::status::{PageStatus, STATUS_OK, VideoStatus};
@@ -49,7 +51,11 @@ pub async fn process_video_source(
warn!("已开启仅扫描模式,跳过视频下载..");
} else {
// 从数据库中查找所有未下载的视频与分页,下载并处理
download_unprocessed_videos(bili_client, &video_source, connection, template, config).await?;
let download_notify_info =
download_unprocessed_videos(bili_client, &video_source, connection, template, config).await?;
if download_notify_info.should_notify() {
notify(config, bili_client, download_notify_info);
}
}
Ok(())
}
@@ -176,7 +182,7 @@ pub async fn download_unprocessed_videos(
connection: &DatabaseConnection,
template: &handlebars::Handlebars<'_>,
config: &Config,
) -> Result<()> {
) -> Result<DownloadNotifyInfo> {
video_source.log_download_video_start();
let semaphore = Semaphore::new(config.concurrent_limit.video);
let downloader = Downloader::new(bili_client.client.clone());
@@ -207,14 +213,16 @@ pub async fn download_unprocessed_videos(
.filter_map(|res| futures::future::ready(res.ok()))
// 将成功返回的 Model 按十个一组合并
.chunks(10);
let mut download_notify_info = DownloadNotifyInfo::new(video_source.display_name().into());
while let Some(models) = stream.next().await {
download_notify_info.record(&models);
update_videos_model(models, connection).await?;
}
if let Some(e) = risk_control_related_error {
bail!(e);
}
video_source.log_download_video_end();
Ok(())
Ok(download_notify_info)
}
pub async fn download_video_pages(