diff --git a/crates/bili_sync/src/notifier/info.rs b/crates/bili_sync/src/notifier/info.rs new file mode 100644 index 0000000..6833230 --- /dev/null +++ b/crates/bili_sync/src/notifier/info.rs @@ -0,0 +1,67 @@ +use bili_sync_entity::video; + +use crate::utils::status::{STATUS_OK, VideoStatus}; + +pub enum DownloadNotifyInfo { + List { + source: String, + img_url: Option, + titles: Vec, + }, + Summary { + source: String, + img_url: Option, + count: usize, + }, +} + +impl DownloadNotifyInfo { + pub fn new(source: String) -> Self { + Self::List { + source, + img_url: None, + titles: Vec::with_capacity(10), + } + } + + pub fn record(&mut self, models: &[video::ActiveModel]) { + let success_models = models + .iter() + .filter(|m| { + let sub_task_status: [u32; 5] = VideoStatus::from(*m.download_status.as_ref()).into(); + sub_task_status.into_iter().all(|s| s == STATUS_OK) + }) + .collect::>(); + match self { + Self::List { + source, + img_url, + titles, + } => { + let count = success_models.len() + titles.len(); + if count > 10 { + *self = Self::Summary { + source: std::mem::take(source), + img_url: std::mem::take(img_url), + count, + }; + } else { + if img_url.is_none() { + *img_url = success_models.first().map(|m| m.cover.as_ref().clone()); + } + titles.extend(success_models.into_iter().map(|m| m.name.as_ref().clone())); + } + } + Self::Summary { count, .. } => *count += success_models.len(), + } + } + + pub fn should_notify(&self) -> bool { + if let Self::List { titles, .. } = self + && titles.is_empty() + { + return false; + } + true + } +} diff --git a/crates/bili_sync/src/notifier/message.rs b/crates/bili_sync/src/notifier/message.rs new file mode 100644 index 0000000..8609bc1 --- /dev/null +++ b/crates/bili_sync/src/notifier/message.rs @@ -0,0 +1,59 @@ +use std::borrow::Cow; + +use itertools::Itertools; +use serde::Serialize; + +use crate::notifier::DownloadNotifyInfo; + +#[derive(Serialize)] +pub struct Message<'a> { + pub message: Cow<'a, str>, + pub image_url: Option, +} + +impl<'a> From<&'a str> for Message<'a> { + fn from(message: &'a str) -> Self { + Self { + message: Cow::Borrowed(message), + image_url: None, + } + } +} + +impl From for Message<'_> { + fn from(message: String) -> Self { + Self { + message: message.into(), + image_url: None, + } + } +} + +impl From for Message<'_> { + fn from(info: DownloadNotifyInfo) -> Self { + match info { + DownloadNotifyInfo::List { + source, + img_url, + titles, + } => Self { + message: format!( + "{}的 {} 条新视频已入库:\n{}", + source, + titles.len(), + titles + .into_iter() + .enumerate() + .map(|(i, title)| format!("{}. {title}", i + 1)) + .join("\n") + ) + .into(), + image_url: img_url, + }, + DownloadNotifyInfo::Summary { source, img_url, count } => Self { + message: format!("{}的 {} 条新视频已入库,快去看看吧!", source, count).into(), + image_url: img_url, + }, + } + } +} diff --git a/crates/bili_sync/src/notifier/mod.rs b/crates/bili_sync/src/notifier/mod.rs index 411246d..da2759f 100644 --- a/crates/bili_sync/src/notifier/mod.rs +++ b/crates/bili_sync/src/notifier/mod.rs @@ -1,5 +1,10 @@ +mod info; +mod message; + use anyhow::Result; use futures::future; +pub use info::DownloadNotifyInfo; +pub use message::Message; use reqwest::header; use serde::{Deserialize, Serialize}; @@ -33,23 +38,38 @@ pub fn webhook_template_content(template: &Option) -> &str { } pub trait NotifierAllExt { - async fn notify_all(&self, client: &reqwest::Client, message: &str) -> Result<()>; + async fn notify_all<'a>(&self, client: &reqwest::Client, message: impl Into>) -> Result<()>; } impl NotifierAllExt for Vec { - async fn notify_all(&self, client: &reqwest::Client, message: &str) -> Result<()> { - future::join_all(self.iter().map(|notifier| notifier.notify(client, message))).await; + async fn notify_all<'a>(&self, client: &reqwest::Client, message: impl Into>) -> Result<()> { + let message = message.into(); + future::join_all(self.iter().map(|notifier| notifier.notify_internal(client, &message))).await; Ok(()) } } impl Notifier { - pub async fn notify(&self, client: &reqwest::Client, message: &str) -> Result<()> { + pub async fn notify<'a>(&self, client: &reqwest::Client, message: impl Into>) -> Result<()> { + self.notify_internal(client, &message.into()).await + } + + async fn notify_internal<'a>(&self, client: &reqwest::Client, message: &Message<'a>) -> Result<()> { match self { Notifier::Telegram { bot_token, chat_id } => { - let url = format!("https://api.telegram.org/bot{}/sendMessage", bot_token); - let params = [("chat_id", chat_id.as_str()), ("text", message)]; - client.post(&url).form(¶ms).send().await?; + if let Some(img_url) = &message.image_url { + let url = format!("https://api.telegram.org/bot{}/sendPhoto", bot_token); + let params = [ + ("chat_id", chat_id.as_str()), + ("photo", img_url.as_str()), + ("caption", message.message.as_ref()), + ]; + client.post(&url).form(¶ms).send().await?; + } else { + let url = format!("https://api.telegram.org/bot{}/sendMessage", bot_token); + let params = [("chat_id", chat_id.as_str()), ("text", message.message.as_ref())]; + client.post(&url).form(¶ms).send().await?; + } } Notifier::Webhook { url, @@ -57,15 +77,10 @@ impl Notifier { ignore_cache, } => { let key = webhook_template_key(url); - let data = serde_json::json!( - { - "message": message, - } - ); let handlebar = TEMPLATE.read(); let payload = match ignore_cache { - Some(_) => handlebar.render_template(webhook_template_content(template), &data)?, - None => handlebar.render(&key, &data)?, + Some(_) => handlebar.render_template(webhook_template_content(template), &message)?, + None => handlebar.render(&key, &message)?, }; client .post(url) diff --git a/crates/bili_sync/src/utils/notify.rs b/crates/bili_sync/src/utils/notify.rs index 128ef91..05cd770 100644 --- a/crates/bili_sync/src/utils/notify.rs +++ b/crates/bili_sync/src/utils/notify.rs @@ -1,6 +1,16 @@ use crate::bilibili::BiliClient; use crate::config::Config; -use crate::notifier::NotifierAllExt; +use crate::notifier::{Message, NotifierAllExt}; + +pub fn notify(config: &Config, bili_client: &BiliClient, msg: impl Into>) { + if let Some(notifiers) = &config.notifiers + && !notifiers.is_empty() + { + let (notifiers, inner_client) = (notifiers.clone(), bili_client.inner_client().clone()); + let msg = msg.into(); + tokio::spawn(async move { notifiers.notify_all(&inner_client, msg).await }); + } +} pub fn error_and_notify(config: &Config, bili_client: &BiliClient, msg: String) { error!("{msg}"); @@ -8,6 +18,6 @@ pub fn error_and_notify(config: &Config, bili_client: &BiliClient, msg: String) && !notifiers.is_empty() { let (notifiers, inner_client) = (notifiers.clone(), bili_client.inner_client().clone()); - tokio::spawn(async move { notifiers.notify_all(&inner_client, msg.as_str()).await }); + tokio::spawn(async move { notifiers.notify_all(&inner_client, msg).await }); } } diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 6fe8da8..672be9a 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -17,6 +17,7 @@ use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Vi use crate::config::{ARGS, Config, PathSafeTemplate}; use crate::downloader::Downloader; use crate::error::ExecutionStatus; +use crate::notifier::DownloadNotifyInfo; use crate::utils::download_context::DownloadContext; use crate::utils::format_arg::{page_format_args, video_format_args}; use crate::utils::model::{ @@ -24,6 +25,7 @@ use crate::utils::model::{ update_videos_model, }; use crate::utils::nfo::{NFO, ToNFO}; +use crate::utils::notify::notify; use crate::utils::rule::FieldEvaluatable; use crate::utils::status::{PageStatus, STATUS_OK, VideoStatus}; @@ -49,7 +51,11 @@ pub async fn process_video_source( warn!("已开启仅扫描模式,跳过视频下载.."); } else { // 从数据库中查找所有未下载的视频与分页,下载并处理 - download_unprocessed_videos(bili_client, &video_source, connection, template, config).await?; + let download_notify_info = + download_unprocessed_videos(bili_client, &video_source, connection, template, config).await?; + if download_notify_info.should_notify() { + notify(config, bili_client, download_notify_info); + } } Ok(()) } @@ -176,7 +182,7 @@ pub async fn download_unprocessed_videos( connection: &DatabaseConnection, template: &handlebars::Handlebars<'_>, config: &Config, -) -> Result<()> { +) -> Result { video_source.log_download_video_start(); let semaphore = Semaphore::new(config.concurrent_limit.video); let downloader = Downloader::new(bili_client.client.clone()); @@ -207,14 +213,16 @@ pub async fn download_unprocessed_videos( .filter_map(|res| futures::future::ready(res.ok())) // 将成功返回的 Model 按十个一组合并 .chunks(10); + let mut download_notify_info = DownloadNotifyInfo::new(video_source.display_name().into()); while let Some(models) = stream.next().await { + download_notify_info.record(&models); update_videos_model(models, connection).await?; } if let Some(e) = risk_control_related_error { bail!(e); } video_source.log_download_video_end(); - Ok(()) + Ok(download_notify_info) } pub async fn download_video_pages(