diff --git a/crates/bili_sync/src/api/response.rs b/crates/bili_sync/src/api/response.rs index 41387bb..cd97791 100644 --- a/crates/bili_sync/src/api/response.rs +++ b/crates/bili_sync/src/api/response.rs @@ -159,7 +159,7 @@ pub struct DashBoardResponse { pub videos_by_day: Vec, } -#[derive(Serialize)] +#[derive(Serialize, Clone, Copy)] pub struct SysInfo { pub total_memory: u64, pub used_memory: u64, diff --git a/crates/bili_sync/src/api/routes/ws/log_helper.rs b/crates/bili_sync/src/api/routes/ws/log_helper.rs index 4a9bdec..6c7de25 100644 --- a/crates/bili_sync/src/api/routes/ws/log_helper.rs +++ b/crates/bili_sync/src/api/routes/ws/log_helper.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use std::sync::Arc; -use parking_lot::Mutex; +use parking_lot::RwLock; use tokio::sync::broadcast; use tracing_subscriber::fmt::MakeWriter; @@ -10,11 +10,11 @@ pub const MAX_HISTORY_LOGS: usize = 30; /// LogHelper 维护了日志发送器和一个日志历史记录的缓冲区 pub struct LogHelper { pub sender: broadcast::Sender, - pub log_history: Arc>>, + pub log_history: Arc>>, } impl LogHelper { - pub fn new(sender: broadcast::Sender, log_history: Arc>>) -> Self { + pub fn new(sender: broadcast::Sender, log_history: Arc>>) -> Self { LogHelper { sender, log_history } } } @@ -31,7 +31,7 @@ impl std::io::Write for LogHelper { fn write(&mut self, buf: &[u8]) -> std::io::Result { let log_message = String::from_utf8_lossy(buf).to_string(); let _ = self.sender.send(log_message.clone()); - let mut history = self.log_history.lock(); + let mut history = self.log_history.write(); history.push_back(log_message); if history.len() > MAX_HISTORY_LOGS { history.pop_front(); diff --git a/crates/bili_sync/src/api/routes/ws/mod.rs b/crates/bili_sync/src/api/routes/ws/mod.rs index c81c49d..7afc3e8 100644 --- a/crates/bili_sync/src/api/routes/ws/mod.rs +++ b/crates/bili_sync/src/api/routes/ws/mod.rs @@ -15,11 +15,14 @@ pub use log_helper::{LogHelper, MAX_HISTORY_LOGS}; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use sysinfo::{ - CpuRefreshKind, DiskRefreshKind, Disks, MemoryRefreshKind, ProcessRefreshKind, RefreshKind, System, get_current_pid, + CpuRefreshKind, DiskRefreshKind, Disks, MemoryRefreshKind, Pid, ProcessRefreshKind, ProcessesToUpdate, System, + get_current_pid, }; -use tokio::pin; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::{BroadcastStream, IntervalStream, WatchStream}; +use tokio::sync::mpsc; +use tokio::{pin, select}; +use tokio_stream::wrappers::{BroadcastStream, WatchStream}; +use tokio_util::future::FutureExt; +use tokio_util::sync::CancellationToken; use uuid::Uuid; use crate::api::response::SysInfo; @@ -55,191 +58,238 @@ enum ClientEvent { #[serde(rename_all = "camelCase")] enum ServerEvent { Logs(String), - Tasks(Arc), - SysInfo(Arc), + Tasks(TaskStatus), + SysInfo(SysInfo), } struct WebSocketHandler { - sysinfo_subscribers: Arc>>, - sysinfo_handles: RwLock>>, + sysinfo_subscribers: Arc>>, + sysinfo_cancel: RwLock>, } impl WebSocketHandler { fn new() -> Self { Self { sysinfo_subscribers: Arc::new(DashMap::new()), - sysinfo_handles: RwLock::new(None), + sysinfo_cancel: RwLock::new(None), } } - async fn handle_sender( - &self, - mut sender: SplitSink, - mut rx: tokio::sync::mpsc::Receiver, - ) { + /// 向客户端推送信息 + async fn handle_sender(&self, mut sender: SplitSink, mut rx: mpsc::Receiver) { while let Some(event) = rx.recv().await { - match serde_json::to_string(&event) { - Ok(text) => { - if let Err(e) = sender.send(Message::Text(text.into())).await { - error!("Failed to send message: {:?}", e); - break; - } - } + let text = match serde_json::to_string(&event) { + Ok(text) => text, Err(e) => { error!("Failed to serialize event: {:?}", e); + continue; } + }; + if let Err(e) = sender.send(Message::Text(text.into())).await { + error!("Failed to send message: {:?}", e); + break; } } } + /// 从客户端接收信息 async fn handle_receiver( &self, mut receiver: SplitStream, - tx: tokio::sync::mpsc::Sender, + tx: mpsc::Sender, uuid: Uuid, log_writer: LogHelper, ) { // 日志和任务状态的处理本身就是由 stream 驱动的,可以直接为每个 ws 连接维护独立的任务处理器 // 系统信息是服务端轮询然后推送的,如果单独维护会导致每个连接都独立轮询系统信息,造成不必要的浪费 // 因此采用了全局的订阅者管理,所有连接共享同一个系统信息轮询任务 - let (mut log_handle, mut task_handle) = (None, None); + let (mut log_cancel, mut task_cancel) = (None, None); while let Some(Ok(msg)) = receiver.next().await { - if let Message::Text(text) = msg { - match serde_json::from_str::(&text) { - Ok(ClientEvent::Subscribe(event_type)) => match event_type { - EventType::Logs => { - if log_handle.as_ref().is_none_or(|h: &JoinHandle<()>| h.is_finished()) { - let log_writer_clone = log_writer.clone(); - let tx_clone = tx.clone(); - let history = log_writer_clone.log_history.lock(); - let history_logs: Vec = history.iter().cloned().collect(); - drop(history); - log_handle = Some(tokio::spawn(async move { - let rx = log_writer_clone.sender.subscribe(); - let log_stream = futures::stream::iter(history_logs.into_iter()) - .chain(BroadcastStream::new(rx).filter_map(async |msg| msg.ok())) - .map(ServerEvent::Logs); - pin!(log_stream); - while let Some(event) = log_stream.next().await { - if let Err(e) = tx_clone.send(event).await { - error!("Failed to send log event: {:?}", e); - break; - } - } - })); - } - } - EventType::Tasks => { - if task_handle.as_ref().is_none_or(|h: &JoinHandle<()>| h.is_finished()) { - let tx_clone = tx.clone(); - task_handle = Some(tokio::spawn(async move { - let mut stream = - WatchStream::new(TASK_STATUS_NOTIFIER.subscribe()).map(ServerEvent::Tasks); - while let Some(event) = stream.next().await { - if let Err(e) = tx_clone.send(event).await { - error!("Failed to send task status: {:?}", e); - break; - } - } - })); - } - } - EventType::SysInfo => self.add_sysinfo_subscriber(uuid, tx.clone()).await, - }, - Ok(ClientEvent::Unsubscribe(event_type)) => match event_type { - EventType::Logs => { - if let Some(handle) = log_handle.take() { - handle.abort(); - } - } - EventType::Tasks => { - if let Some(handle) = task_handle.take() { - handle.abort(); - } - } - EventType::SysInfo => { - self.remove_sysinfo_subscriber(uuid).await; - } - }, - Err(e) => { - error!("Failed to parse client message: {:?}", e); + let Message::Text(text) = msg else { + continue; + }; + let client_event = match serde_json::from_str::(&text) { + Ok(event) => event, + Err(e) => { + error!("Failed to parse client message: {:?}, error: {:?}", text, e); + continue; + } + }; + match client_event { + ClientEvent::Subscribe(EventType::Logs) => { + if log_cancel.is_none() { + log_cancel = Some(self.new_log_handler(tx.clone(), &log_writer)); + } + } + ClientEvent::Unsubscribe(EventType::Logs) => { + if let Some(cancel) = log_cancel.take() { + cancel.cancel(); + } + } + ClientEvent::Subscribe(EventType::Tasks) => { + if task_cancel.is_none() { + task_cancel = Some(self.new_task_handler(tx.clone())); + } + } + ClientEvent::Unsubscribe(EventType::Tasks) => { + if let Some(cancel) = task_cancel.take() { + cancel.cancel(); + } + } + ClientEvent::Subscribe(EventType::SysInfo) => { + self.add_sysinfo_subscriber(uuid, tx.clone()); + } + ClientEvent::Unsubscribe(EventType::SysInfo) => { + self.remove_sysinfo_subscriber(uuid); + } + } + } + // 连接关闭,清除仍然残留的任务 + if let Some(cancel) = log_cancel { + cancel.cancel(); + } + if let Some(cancel) = task_cancel { + cancel.cancel(); + } + self.remove_sysinfo_subscriber(uuid); + } + + /// 添加全局系统信息订阅者 + fn add_sysinfo_subscriber(&self, uuid: Uuid, sender: mpsc::Sender) { + self.sysinfo_subscribers.insert(uuid, sender); + if self.sysinfo_cancel.read().is_none() { + let mut sys_info_cancel = self.sysinfo_cancel.write(); + if sys_info_cancel.is_some() { + return; + } + *sys_info_cancel = Some(self.new_sysinfo_handler(self.sysinfo_subscribers.clone())); + } + } + + /// 移除全局系统信息订阅者 + fn remove_sysinfo_subscriber(&self, uuid: Uuid) { + self.sysinfo_subscribers.remove(&uuid); + if self.sysinfo_subscribers.is_empty() + && let Some(token) = self.sysinfo_cancel.write().take() + { + token.cancel(); + } + } + + /// 创建异步日志推送任务,返回任务的取消令牌 + fn new_log_handler(&self, tx: mpsc::Sender, log_writer: &LogHelper) -> CancellationToken { + let cancel_token = CancellationToken::new(); + // 读取历史日志 + let history = log_writer.log_history.read(); + let history_logs = history.iter().cloned().collect::>(); + drop(history); + // 获取日志广播接收器 + let log_rx = log_writer.sender.subscribe(); + tokio::spawn( + async move { + // 合并历史日志和实时日志流 + let log_stream = futures::stream::iter(history_logs) + .chain(BroadcastStream::new(log_rx).filter_map(async |msg| msg.ok())) + .map(ServerEvent::Logs); + pin!(log_stream); + while let Some(event) = log_stream.next().await { + if let Err(e) = tx.send(event).await { + error!("Failed to send log event: {:?}", e); + break; } } } - } - if let Some(handle) = log_handle { - handle.abort(); - } - if let Some(handle) = task_handle { - handle.abort(); - } - self.remove_sysinfo_subscriber(uuid).await; + .with_cancellation_token_owned(cancel_token.clone()), + ); + cancel_token } - // 添加订阅者 - async fn add_sysinfo_subscriber(&self, uuid: Uuid, sender: tokio::sync::mpsc::Sender) { - self.sysinfo_subscribers.insert(uuid, sender); - if !self.sysinfo_subscribers.is_empty() - && self - .sysinfo_handles - .read() - .as_ref() - .is_none_or(|h: &JoinHandle<()>| h.is_finished()) - { - let sysinfo_subscribers = self.sysinfo_subscribers.clone(); - let mut write_guard = self.sysinfo_handles.write(); - if write_guard.as_ref().is_some_and(|h: &JoinHandle<()>| !h.is_finished()) { - return; + /// 创建异步任务状态推送任务,返回任务的取消令牌 + fn new_task_handler(&self, tx: mpsc::Sender) -> CancellationToken { + let cancel_token = CancellationToken::new(); + tokio::spawn( + async move { + let mut stream = WatchStream::new(TASK_STATUS_NOTIFIER.subscribe()).map(ServerEvent::Tasks); + while let Some(event) = stream.next().await { + if let Err(e) = tx.send(event).await { + error!("Failed to send task status: {:?}", e); + break; + } + } } - *write_guard = Some(tokio::spawn(async move { - let mut system = System::new(); - let mut disks = Disks::new(); - let sys_refresh_kind = sys_refresh_kind(); - let disk_refresh_kind = disk_refresh_kind(); + .with_cancellation_token_owned(cancel_token.clone()), + ); + cancel_token + } + + /// 创建异步系统信息推送任务,返回任务的取消令牌 + fn new_sysinfo_handler( + &self, + sysinfo_subscribers: Arc>>, + ) -> CancellationToken { + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + tokio::spawn(async move { + let (tx, mut rx) = mpsc::channel(10); + let (tick_tx, mut tick_rx) = mpsc::channel(3); + // 在阻塞线程中轮询系统信息,防止阻塞异步运行时 + tokio::task::spawn_blocking(move || { // 对于 linux/mac/windows 平台,该方法永远返回 Some(pid),expect 基本是安全的 let self_pid = get_current_pid().expect("Unsupported platform"); - let mut stream = - IntervalStream::new(tokio::time::interval(Duration::from_secs(2))).filter_map(move |_| { - system.refresh_specifics(sys_refresh_kind); - disks.refresh_specifics(true, disk_refresh_kind); - let process = match system.process(self_pid) { - Some(p) => p, - None => return futures::future::ready(None), - }; - futures::future::ready(Some(SysInfo { - total_memory: system.total_memory(), - used_memory: system.used_memory(), - process_memory: process.memory(), - used_cpu: system.global_cpu_usage(), - process_cpu: process.cpu_usage() / system.cpus().len() as f32, - total_disk: disks.iter().map(|d| d.total_space()).sum(), - available_disk: disks.iter().map(|d| d.available_space()).sum(), - })) - }); - while let Some(sys_info) = stream.next().await { - let sys_info = Arc::new(sys_info); - future::join_all(sysinfo_subscribers.iter().map(async |subscriber| { - if let Err(e) = subscriber.send(ServerEvent::SysInfo(sys_info.clone())).await { - error!( - "Failed to send sysinfo event to subscriber {}: {:?}", - subscriber.key(), - e - ); - } - })) - .await; + let mut system = System::new(); + let mut disks = Disks::new(); + // system 需要初始进行一次刷新并等待一小会儿,因为有些数据是根据 diff 计算的 + system.refresh_needed(self_pid); + std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL); + while tick_rx.blocking_recv().is_some() { + system.refresh_needed(self_pid); + disks.refresh_needed(self_pid); + let process = match system.process(self_pid) { + Some(p) => p, + None => continue, + }; + let sys_info = SysInfo { + total_memory: system.total_memory(), + used_memory: system.used_memory(), + process_memory: process.memory(), + used_cpu: system.global_cpu_usage(), + process_cpu: process.cpu_usage() / system.cpus().len() as f32, + total_disk: disks.iter().map(|d| d.total_space()).sum(), + available_disk: disks.iter().map(|d| d.available_space()).sum(), + }; + if tx.blocking_send(sys_info).is_err() { + break; + } } - })); - } - } - - async fn remove_sysinfo_subscriber(&self, uuid: Uuid) { - self.sysinfo_subscribers.remove(&uuid); - if self.sysinfo_subscribers.is_empty() - && let Some(handle) = self.sysinfo_handles.write().take() - { - handle.abort(); - } + }); + // 异步部分负责获取由阻塞线程发送过来的系统信息,并推送给所有订阅者 + // 收到取消信号时,设置标志位,确保阻塞线程正常退出 + let mut interval = tokio::time::interval(Duration::from_secs(2)); + loop { + select! { + _ = cancel_token_clone.cancelled() => { + drop(tick_tx); + break; + } + _ = interval.tick() => { + let _ = tick_tx.send(()).await; + } + Some(sys_info) = rx.recv() => { + future::join_all(sysinfo_subscribers.iter().map(async |subscriber| { + if let Err(e) = subscriber.send(ServerEvent::SysInfo(sys_info)).await { + error!( + "Failed to send sysinfo event to subscriber {}: {:?}", + subscriber.key(), + e + ); + } + })) + .await; + } + } + } + }); + cancel_token } } @@ -251,13 +301,24 @@ async fn handle_socket(socket: WebSocket, log_writer: LogHelper) { tokio::spawn(WEBSOCKET_HANDLER.handle_receiver(ws_receiver, tx, uuid, log_writer)); } -fn sys_refresh_kind() -> RefreshKind { - RefreshKind::nothing() - .with_cpu(CpuRefreshKind::nothing().with_cpu_usage()) - .with_memory(MemoryRefreshKind::nothing().with_ram()) - .with_processes(ProcessRefreshKind::nothing().with_cpu().with_memory()) +trait SysInfoExt { + fn refresh_needed(&mut self, self_pid: Pid); } -fn disk_refresh_kind() -> DiskRefreshKind { - DiskRefreshKind::nothing().with_storage() +impl SysInfoExt for System { + fn refresh_needed(&mut self, self_pid: Pid) { + self.refresh_memory_specifics(MemoryRefreshKind::nothing().with_ram()); + self.refresh_cpu_specifics(CpuRefreshKind::nothing().with_cpu_usage()); + self.refresh_processes_specifics( + ProcessesToUpdate::Some(&[self_pid]), + true, + ProcessRefreshKind::nothing().with_cpu().with_memory(), + ); + } +} + +impl SysInfoExt for Disks { + fn refresh_needed(&mut self, _self_pid: Pid) { + self.refresh_specifics(false, DiskRefreshKind::nothing().with_storage()); + } } diff --git a/crates/bili_sync/src/main.rs b/crates/bili_sync/src/main.rs index 85339aa..fa3bc97 100644 --- a/crates/bili_sync/src/main.rs +++ b/crates/bili_sync/src/main.rs @@ -18,7 +18,7 @@ use std::future::Future; use std::sync::Arc; use bilibili::BiliClient; -use parking_lot::Mutex; +use parking_lot::RwLock; use sea_orm::DatabaseConnection; use task::{http_server, video_downloader}; use tokio_util::sync::CancellationToken; @@ -79,7 +79,7 @@ fn spawn_task( /// 初始化日志系统、打印欢迎信息,初始化数据库连接和全局配置 async fn init() -> (DatabaseConnection, LogHelper) { let (tx, _rx) = tokio::sync::broadcast::channel(30); - let log_history = Arc::new(Mutex::new(VecDeque::with_capacity(MAX_HISTORY_LOGS + 1))); + let log_history = Arc::new(RwLock::new(VecDeque::with_capacity(MAX_HISTORY_LOGS + 1))); let log_writer = LogHelper::new(tx, log_history.clone()); init_logger(&ARGS.log_level, Some(log_writer.clone())); diff --git a/crates/bili_sync/src/utils/task_notifier.rs b/crates/bili_sync/src/utils/task_notifier.rs index 21b0809..5396bc2 100644 --- a/crates/bili_sync/src/utils/task_notifier.rs +++ b/crates/bili_sync/src/utils/task_notifier.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, LazyLock}; +use std::sync::LazyLock; use serde::Serialize; use tokio::sync::MutexGuard; @@ -7,7 +7,7 @@ use crate::config::VersionedConfig; pub static TASK_STATUS_NOTIFIER: LazyLock = LazyLock::new(TaskStatusNotifier::new); -#[derive(Serialize, Default)] +#[derive(Serialize, Default, Clone, Copy)] pub struct TaskStatus { is_running: bool, last_run: Option>, @@ -17,13 +17,13 @@ pub struct TaskStatus { pub struct TaskStatusNotifier { mutex: tokio::sync::Mutex<()>, - tx: tokio::sync::watch::Sender>, - rx: tokio::sync::watch::Receiver>, + tx: tokio::sync::watch::Sender, + rx: tokio::sync::watch::Receiver, } impl TaskStatusNotifier { pub fn new() -> Self { - let (tx, rx) = tokio::sync::watch::channel(Arc::new(TaskStatus::default())); + let (tx, rx) = tokio::sync::watch::channel(TaskStatus::default()); Self { mutex: tokio::sync::Mutex::const_new(()), tx, @@ -33,12 +33,12 @@ impl TaskStatusNotifier { pub async fn start_running(&'_ self) -> MutexGuard<'_, ()> { let lock = self.mutex.lock().await; - let _ = self.tx.send(Arc::new(TaskStatus { + let _ = self.tx.send(TaskStatus { is_running: true, last_run: Some(chrono::Local::now()), last_finish: None, next_run: None, - })); + }); lock } @@ -49,12 +49,12 @@ impl TaskStatusNotifier { let config = VersionedConfig::get().load(); let now = chrono::Local::now(); - let _ = self.tx.send(Arc::new(TaskStatus { + let _ = self.tx.send(TaskStatus { is_running: false, last_run, last_finish: Some(now), next_run: now.checked_add_signed(chrono::Duration::seconds(config.interval as i64)), - })); + }); } /// 精确探测任务执行状态,保证如果读取到“未运行”,那么在锁释放之前任务不会被执行 @@ -62,7 +62,7 @@ impl TaskStatusNotifier { self.mutex.try_lock().ok() } - pub fn subscribe(&self) -> tokio::sync::watch::Receiver> { + pub fn subscribe(&self) -> tokio::sync::watch::Receiver { self.rx.clone() } }