From dd23d1db582e44d0e2e79435ce916c3e840c48bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=B4=80=E1=B4=8D=E1=B4=9B=E1=B4=8F=E1=B4=80=E1=B4=87?= =?UTF-8?q?=CA=80?= Date: Fri, 11 Jul 2025 00:14:20 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BA=8B=E4=BB=B6=E6=8E=A8=E9=80=81?= =?UTF-8?q?=E7=94=B1=20SSE=20=E5=88=87=E6=8D=A2=E5=88=B0=20WebSocket=20(#3?= =?UTF-8?q?86)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 173 ++++++++++-- Cargo.toml | 5 +- crates/bili_sync/Cargo.toml | 3 +- crates/bili_sync/src/api/mod.rs | 2 +- crates/bili_sync/src/api/response.rs | 2 +- crates/bili_sync/src/api/routes/mod.rs | 12 +- crates/bili_sync/src/api/routes/sse/mod.rs | 98 ------- .../routes/{sse/mpsc.rs => ws/log_helper.rs} | 15 +- crates/bili_sync/src/api/routes/ws/mod.rs | 263 +++++++++++++++++ crates/bili_sync/src/main.rs | 6 +- crates/bili_sync/src/task/http_server.rs | 4 +- crates/bili_sync/src/utils/mod.rs | 4 +- web/src/lib/api.ts | 78 ++--- web/src/lib/components/video-card.svelte | 3 +- web/src/lib/stores/tasks.ts | 14 - web/src/lib/types.ts | 9 +- web/src/lib/ws.ts | 266 ++++++++++++++++++ web/src/routes/+layout.svelte | 25 -- web/src/routes/+page.svelte | 67 ++--- web/src/routes/logs/+page.svelte | 36 +-- web/vite.config.ts | 5 + 21 files changed, 783 insertions(+), 307 deletions(-) delete mode 100644 crates/bili_sync/src/api/routes/sse/mod.rs rename crates/bili_sync/src/api/routes/{sse/mpsc.rs => ws/log_helper.rs} (79%) create mode 100644 crates/bili_sync/src/api/routes/ws/mod.rs delete mode 100644 web/src/lib/stores/tasks.ts create mode 100644 web/src/lib/ws.ts diff --git a/Cargo.lock b/Cargo.lock index 5f53458..3575ecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" dependencies = [ - "getrandom", + "getrandom 0.2.12", "once_cell", "version_check", ] @@ -359,6 +359,7 @@ checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" dependencies = [ "axum-core", "axum-macros", + "base64", "bytes", "form_urlencoded", "futures-util", @@ -378,8 +379,10 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-layer", "tower-service", @@ -480,6 +483,7 @@ dependencies = [ "clap", "cookie", "cow-utils", + "dashmap", "dirs", "enum_dispatch", "float-ord", @@ -494,7 +498,7 @@ dependencies = [ "parking_lot", "prost", "quick-xml", - "rand", + "rand 0.8.5", "regex", "reqwest", "rsa", @@ -513,7 +517,7 @@ dependencies = [ "tower", "tracing", "tracing-subscriber", - "url", + "uuid", "validator", ] @@ -922,6 +926,26 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + [[package]] name = "der" version = "0.7.9" @@ -1326,7 +1350,19 @@ checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", ] [[package]] @@ -1423,6 +1459,12 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1992,7 +2034,7 @@ checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" dependencies = [ "hermit-abi", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -2048,7 +2090,7 @@ dependencies = [ "num-integer", "num-iter", "num-traits", - "rand", + "rand 0.8.5", "smallvec", "zeroize", ] @@ -2537,7 +2579,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" dependencies = [ "bytes", - "rand", + "rand 0.8.5", "ring", "rustc-hash", "rustls", @@ -2569,6 +2611,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + [[package]] name = "radium" version = "0.7.0" @@ -2582,8 +2630,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", ] [[package]] @@ -2593,7 +2651,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -2602,7 +2670,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.12", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.3", ] [[package]] @@ -2629,7 +2706,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b" dependencies = [ - "getrandom", + "getrandom 0.2.12", "libredox", "thiserror 2.0.12", ] @@ -2745,7 +2822,7 @@ checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", "cfg-if", - "getrandom", + "getrandom 0.2.12", "libc", "spin", "untrusted", @@ -2794,7 +2871,7 @@ dependencies = [ "num-traits", "pkcs1", "pkcs8", - "rand_core", + "rand_core 0.6.4", "sha2", "signature", "spki", @@ -2852,7 +2929,7 @@ dependencies = [ "borsh", "bytes", "num-traits", - "rand", + "rand 0.8.5", "rkyv", "serde", "serde_json", @@ -3241,7 +3318,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" dependencies = [ "digest", - "rand_core", + "rand_core 0.6.4", ] [[package]] @@ -3421,7 +3498,7 @@ dependencies = [ "memchr", "once_cell", "percent-encoding", - "rand", + "rand 0.8.5", "rsa", "rust_decimal", "serde", @@ -3465,7 +3542,7 @@ dependencies = [ "memchr", "num-bigint", "once_cell", - "rand", + "rand 0.8.5", "rust_decimal", "serde", "serde_json", @@ -3814,6 +3891,18 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.15" @@ -3991,6 +4080,23 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" +dependencies = [ + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand 0.9.1", + "sha1", + "thiserror 2.0.12", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -4047,6 +4153,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -4061,11 +4173,14 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "uuid" -version = "1.8.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +checksum = "3cf4199d1e5d15ddd86a694e4d0dffa9c323ce759fea589f00fef9d81cc1931d" dependencies = [ + "getrandom 0.3.3", + "js-sys", "serde", + "wasm-bindgen", ] [[package]] @@ -4147,6 +4262,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasite" version = "0.1.0" @@ -4666,6 +4790,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.5.0", +] + [[package]] name = "writeable" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index 52ca798..24aa36f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,12 +21,13 @@ assert_matches = "1.5.0" async-std = { version = "1.13.1", features = ["attributes", "tokio1"] } async-stream = "0.3.6" async-trait = "0.1.88" -axum = { version = "0.8.4", features = ["macros"] } +axum = { version = "0.8.4", features = ["macros", "ws"] } built = { version = "0.7.7", features = ["git2", "chrono"] } chrono = { version = "0.4.41", features = ["serde"] } clap = { version = "4.5.38", features = ["env", "string"] } cookie = "0.18.1" cow-utils = "0.1.3" +dashmap = "6.1.0" dirs = "6.0.0" enum_dispatch = "0.3.13" float-ord = "0.3.2" @@ -73,7 +74,7 @@ toml = "0.8.22" tower = "0.5.2" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["chrono", "json"] } -url = "2.5.4" +uuid = { version = "1.17.0", features = ["v4"] } validator = { version = "0.20.0", features = ["derive"] } [workspace.metadata.release] diff --git a/crates/bili_sync/Cargo.toml b/crates/bili_sync/Cargo.toml index 62e11d1..4083784 100644 --- a/crates/bili_sync/Cargo.toml +++ b/crates/bili_sync/Cargo.toml @@ -20,6 +20,7 @@ chrono = { workspace = true } clap = { workspace = true } cookie = { workspace = true } cow-utils = { workspace = true } +dashmap = { workspace = true } dirs = { workspace = true } enum_dispatch = { workspace = true } float-ord = { workspace = true } @@ -52,7 +53,7 @@ toml = { workspace = true } tower = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } -url = { workspace = true } +uuid = { workspace = true } validator = { workspace = true } [dev-dependencies] diff --git a/crates/bili_sync/src/api/mod.rs b/crates/bili_sync/src/api/mod.rs index 1221dbb..9e9b107 100644 --- a/crates/bili_sync/src/api/mod.rs +++ b/crates/bili_sync/src/api/mod.rs @@ -5,4 +5,4 @@ mod response; mod routes; mod wrapper; -pub use routes::{MAX_HISTORY_LOGS, MpscWriter, router}; +pub use routes::{LogHelper, MAX_HISTORY_LOGS, router}; diff --git a/crates/bili_sync/src/api/response.rs b/crates/bili_sync/src/api/response.rs index bbec3a1..2f57bf3 100644 --- a/crates/bili_sync/src/api/response.rs +++ b/crates/bili_sync/src/api/response.rs @@ -158,7 +158,7 @@ pub struct DashBoardResponse { } #[derive(Serialize)] -pub struct SysInfoResponse { +pub struct SysInfo { pub total_memory: u64, pub used_memory: u64, pub process_memory: u64, diff --git a/crates/bili_sync/src/api/routes/mod.rs b/crates/bili_sync/src/api/routes/mod.rs index ccaac2c..7355a9c 100644 --- a/crates/bili_sync/src/api/routes/mod.rs +++ b/crates/bili_sync/src/api/routes/mod.rs @@ -9,7 +9,6 @@ use axum::response::{IntoResponse, Response}; use axum::routing::get; use axum::{Router, middleware}; use reqwest::{Method, StatusCode, header}; -use url::Url; use super::request::ImageProxyParams; use crate::api::wrapper::ApiResponse; @@ -19,11 +18,11 @@ use crate::config::VersionedConfig; mod config; mod dashboard; mod me; -mod sse; mod video_sources; mod videos; +mod ws; -pub use sse::{MAX_HISTORY_LOGS, MpscWriter}; +pub use ws::{LogHelper, MAX_HISTORY_LOGS}; pub fn router() -> Router { Router::new().route("/image-proxy", get(image_proxy)).nest( @@ -33,7 +32,7 @@ pub fn router() -> Router { .merge(video_sources::router()) .merge(videos::router()) .merge(dashboard::router()) - .merge(sse::router()) + .merge(ws::router()) .layer(middleware::from_fn(auth)), ) } @@ -45,8 +44,9 @@ pub async fn auth(headers: HeaderMap, request: Request, next: Next) -> Result Router { - Router::new() - .route("/sse/logs", get(logs)) - .route("/sse/tasks", get(get_tasks)) - .route("/sse/sysinfo", get(get_sysinfo)) -} - -async fn get_tasks() -> Sse>> { - let stream = WatchStream::new(TASK_STATUS_NOTIFIER.subscribe()).filter_map(|status| async move { - match serde_json::to_string(&status) { - Ok(status) => Some(Ok(Event::default().data(status))), - Err(_) => None, - } - }); - Sse::new(stream).keep_alive(KeepAlive::default()) -} - -async fn logs(Extension(log_writer): Extension) -> Sse>> { - let history = log_writer.log_history.lock(); - let rx = log_writer.sender.subscribe(); - let history_logs: Vec = history.iter().cloned().collect(); - drop(history); - - let history_stream = { futures::stream::iter(history_logs.into_iter().map(|msg| Ok(Event::default().data(msg)))) }; - - let stream = BroadcastStream::new(rx).filter_map(async |msg| match msg { - Ok(log_message) => Some(Ok(Event::default().data(log_message))), - Err(e) => { - error!("Log stream error: {:?}", e); - None - } - }); - Sse::new(history_stream.chain(stream)).keep_alive(KeepAlive::default()) -} - -async fn get_sysinfo() -> Sse>> { - let sys_refresh_kind = sys_refresh_kind(); - let disk_refresh_kind = disk_refresh_kind(); - let mut system = System::new(); - let mut disks = Disks::new(); - // safety: this functions always returns Ok on Linux/MacOS/Windows - let self_pid = get_current_pid().unwrap(); - let stream = IntervalStream::new(tokio::time::interval(Duration::from_secs(2))).filter_map(move |_| { - system.refresh_specifics(sys_refresh_kind); - disks.refresh_specifics(true, disk_refresh_kind); - let process = match system.process(self_pid) { - Some(p) => p, - None => return futures::future::ready(None), - }; - let info = SysInfoResponse { - total_memory: system.total_memory(), - used_memory: system.used_memory(), - process_memory: process.memory(), - used_cpu: system.global_cpu_usage(), - process_cpu: process.cpu_usage() / system.cpus().len() as f32, - total_disk: disks.iter().map(|d| d.total_space()).sum(), - available_disk: disks.iter().map(|d| d.available_space()).sum(), - }; - match serde_json::to_string(&info) { - Ok(json) => futures::future::ready(Some(Ok(Event::default().data(json)))), - Err(_) => { - error!("Failed to serialize system info"); - futures::future::ready(None) - } - } - }); - Sse::new(stream).keep_alive(KeepAlive::default()) -} - -fn sys_refresh_kind() -> RefreshKind { - RefreshKind::nothing() - .with_cpu(CpuRefreshKind::nothing().with_cpu_usage()) - .with_memory(MemoryRefreshKind::nothing().with_ram()) - .with_processes(ProcessRefreshKind::nothing().with_cpu().with_memory()) -} - -fn disk_refresh_kind() -> DiskRefreshKind { - DiskRefreshKind::nothing().with_storage() -} diff --git a/crates/bili_sync/src/api/routes/sse/mpsc.rs b/crates/bili_sync/src/api/routes/ws/log_helper.rs similarity index 79% rename from crates/bili_sync/src/api/routes/sse/mpsc.rs rename to crates/bili_sync/src/api/routes/ws/log_helper.rs index 473f74a..4a9bdec 100644 --- a/crates/bili_sync/src/api/routes/sse/mpsc.rs +++ b/crates/bili_sync/src/api/routes/ws/log_helper.rs @@ -7,18 +7,19 @@ use tracing_subscriber::fmt::MakeWriter; pub const MAX_HISTORY_LOGS: usize = 30; -pub struct MpscWriter { +/// LogHelper 维护了日志发送器和一个日志历史记录的缓冲区 +pub struct LogHelper { pub sender: broadcast::Sender, pub log_history: Arc>>, } -impl MpscWriter { +impl LogHelper { pub fn new(sender: broadcast::Sender, log_history: Arc>>) -> Self { - MpscWriter { sender, log_history } + LogHelper { sender, log_history } } } -impl<'a> MakeWriter<'a> for MpscWriter { +impl<'a> MakeWriter<'a> for LogHelper { type Writer = Self; fn make_writer(&'a self) -> Self::Writer { @@ -26,7 +27,7 @@ impl<'a> MakeWriter<'a> for MpscWriter { } } -impl std::io::Write for MpscWriter { +impl std::io::Write for LogHelper { fn write(&mut self, buf: &[u8]) -> std::io::Result { let log_message = String::from_utf8_lossy(buf).to_string(); let _ = self.sender.send(log_message.clone()); @@ -43,9 +44,9 @@ impl std::io::Write for MpscWriter { } } -impl Clone for MpscWriter { +impl Clone for LogHelper { fn clone(&self) -> Self { - MpscWriter { + LogHelper { sender: self.sender.clone(), log_history: self.log_history.clone(), } diff --git a/crates/bili_sync/src/api/routes/ws/mod.rs b/crates/bili_sync/src/api/routes/ws/mod.rs new file mode 100644 index 0000000..5a62836 --- /dev/null +++ b/crates/bili_sync/src/api/routes/ws/mod.rs @@ -0,0 +1,263 @@ +mod log_helper; + +use std::sync::{Arc, LazyLock}; +use std::time::Duration; + +use axum::extract::WebSocketUpgrade; +use axum::extract::ws::{Message, WebSocket}; +use axum::response::IntoResponse; +use axum::routing::any; +use axum::{Extension, Router}; +use dashmap::DashMap; +use futures::stream::{SplitSink, SplitStream}; +use futures::{SinkExt, StreamExt, future}; +pub use log_helper::{LogHelper, MAX_HISTORY_LOGS}; +use parking_lot::RwLock; +use serde::{Deserialize, Serialize}; +use sysinfo::{ + CpuRefreshKind, DiskRefreshKind, Disks, MemoryRefreshKind, ProcessRefreshKind, RefreshKind, System, get_current_pid, +}; +use tokio::pin; +use tokio::task::JoinHandle; +use tokio_stream::wrappers::{BroadcastStream, IntervalStream, WatchStream}; +use uuid::Uuid; + +use crate::api::response::SysInfo; +use crate::utils::task_notifier::{TASK_STATUS_NOTIFIER, TaskStatus}; + +static WEBSOCKET_HANDLER: LazyLock = LazyLock::new(WebSocketHandler::new); + +pub(super) fn router() -> Router { + Router::new().route("/ws", any(websocket_handler)) +} + +async fn websocket_handler(ws: WebSocketUpgrade, Extension(log_writer): Extension) -> impl IntoResponse { + ws.on_upgrade(|socket| handle_socket(socket, log_writer)) +} + +// 事件类型枚举 +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +enum EventType { + Logs, + Tasks, + SysInfo, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +enum ClientEvent { + Subscribe(EventType), + Unsubscribe(EventType), +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +enum ServerEvent { + Logs(String), + Tasks(Arc), + SysInfo(Arc), +} + +struct WebSocketHandler { + sysinfo_subscribers: Arc>>, + sysinfo_handles: RwLock>>, +} + +impl WebSocketHandler { + fn new() -> Self { + Self { + sysinfo_subscribers: Arc::new(DashMap::new()), + sysinfo_handles: RwLock::new(None), + } + } + + async fn handle_sender( + &self, + mut sender: SplitSink, + mut rx: tokio::sync::mpsc::Receiver, + ) { + while let Some(event) = rx.recv().await { + match serde_json::to_string(&event) { + Ok(text) => { + if let Err(e) = sender.send(Message::Text(text.into())).await { + error!("Failed to send message: {:?}", e); + break; + } + } + Err(e) => { + error!("Failed to serialize event: {:?}", e); + } + } + } + } + + async fn handle_receiver( + &self, + mut receiver: SplitStream, + tx: tokio::sync::mpsc::Sender, + uuid: Uuid, + log_writer: LogHelper, + ) { + // 日志和任务状态的处理本身就是由 stream 驱动的,可以直接为每个 ws 连接维护独立的任务处理器 + // 系统信息是服务端轮询然后推送的,如果单独维护会导致每个连接都独立轮询系统信息,造成不必要的浪费 + // 因此采用了全局的订阅者管理,所有连接共享同一个系统信息轮询任务 + let (mut log_handle, mut task_handle) = (None, None); + while let Some(Ok(msg)) = receiver.next().await { + if let Message::Text(text) = msg { + match serde_json::from_str::(&text) { + Ok(ClientEvent::Subscribe(event_type)) => match event_type { + EventType::Logs => { + if log_handle.as_ref().is_none_or(|h: &JoinHandle<()>| h.is_finished()) { + let log_writer_clone = log_writer.clone(); + let tx_clone = tx.clone(); + let history = log_writer_clone.log_history.lock(); + let history_logs: Vec = history.iter().cloned().collect(); + drop(history); + log_handle = Some(tokio::spawn(async move { + let rx = log_writer_clone.sender.subscribe(); + let log_stream = futures::stream::iter(history_logs.into_iter()) + .chain(BroadcastStream::new(rx).filter_map(async |msg| msg.ok())) + .map(|msg| ServerEvent::Logs(msg)); + pin!(log_stream); + while let Some(event) = log_stream.next().await { + if let Err(e) = tx_clone.send(event).await { + error!("Failed to send log event: {:?}", e); + break; + } + } + })); + } + } + EventType::Tasks => { + if task_handle.as_ref().is_none_or(|h: &JoinHandle<()>| h.is_finished()) { + let tx_clone = tx.clone(); + task_handle = Some(tokio::spawn(async move { + let mut stream = WatchStream::new(TASK_STATUS_NOTIFIER.subscribe()) + .map(|status| ServerEvent::Tasks(status)); + while let Some(event) = stream.next().await { + if let Err(e) = tx_clone.send(event).await { + error!("Failed to send task status: {:?}", e); + break; + } + } + })); + } + } + EventType::SysInfo => self.add_sysinfo_subscriber(uuid, tx.clone()).await, + }, + Ok(ClientEvent::Unsubscribe(event_type)) => match event_type { + EventType::Logs => { + if let Some(handle) = log_handle.take() { + handle.abort(); + } + } + EventType::Tasks => { + if let Some(handle) = task_handle.take() { + handle.abort(); + } + } + EventType::SysInfo => { + self.remove_sysinfo_subscriber(uuid).await; + } + }, + Err(e) => { + error!("Failed to parse client message: {:?}", e); + } + } + } + } + if let Some(handle) = log_handle { + handle.abort(); + } + if let Some(handle) = task_handle { + handle.abort(); + } + self.remove_sysinfo_subscriber(uuid).await; + } + + // 添加订阅者 + async fn add_sysinfo_subscriber(&self, uuid: Uuid, sender: tokio::sync::mpsc::Sender) { + self.sysinfo_subscribers.insert(uuid, sender); + if self.sysinfo_subscribers.len() > 0 + && self + .sysinfo_handles + .read() + .as_ref() + .is_none_or(|h: &JoinHandle<()>| h.is_finished()) + { + let sysinfo_subscribers = self.sysinfo_subscribers.clone(); + let mut write_guard = self.sysinfo_handles.write(); + if write_guard.as_ref().is_some_and(|h: &JoinHandle<()>| !h.is_finished()) { + return; + } + *write_guard = Some(tokio::spawn(async move { + let mut system = System::new(); + let mut disks = Disks::new(); + let sys_refresh_kind = sys_refresh_kind(); + let disk_refresh_kind = disk_refresh_kind(); + // 对于 linux/mac/windows 平台,该方法永远返回 Some(pid),expect 基本是安全的 + let self_pid = get_current_pid().expect("Unsupported platform"); + let mut stream = + IntervalStream::new(tokio::time::interval(Duration::from_secs(2))).filter_map(move |_| { + system.refresh_specifics(sys_refresh_kind); + disks.refresh_specifics(true, disk_refresh_kind); + let process = match system.process(self_pid) { + Some(p) => p, + None => return futures::future::ready(None), + }; + futures::future::ready(Some(SysInfo { + total_memory: system.total_memory(), + used_memory: system.used_memory(), + process_memory: process.memory(), + used_cpu: system.global_cpu_usage(), + process_cpu: process.cpu_usage() / system.cpus().len() as f32, + total_disk: disks.iter().map(|d| d.total_space()).sum(), + available_disk: disks.iter().map(|d| d.available_space()).sum(), + })) + }); + while let Some(sys_info) = stream.next().await { + let sys_info = Arc::new(sys_info); + future::join_all(sysinfo_subscribers.iter().map(async |subscriber| { + if let Err(e) = subscriber.send(ServerEvent::SysInfo(sys_info.clone())).await { + error!( + "Failed to send sysinfo event to subscriber {}: {:?}", + subscriber.key(), + e + ); + } + })) + .await; + } + })); + } + } + + async fn remove_sysinfo_subscriber(&self, uuid: Uuid) { + self.sysinfo_subscribers.remove(&uuid); + if self.sysinfo_subscribers.is_empty() { + if let Some(handle) = self.sysinfo_handles.write().take() { + handle.abort(); + } + } + } +} + +async fn handle_socket(socket: WebSocket, log_writer: LogHelper) { + let (ws_sender, ws_receiver) = socket.split(); + let uuid = Uuid::new_v4(); + let (tx, rx) = tokio::sync::mpsc::channel(100); + tokio::spawn(WEBSOCKET_HANDLER.handle_sender(ws_sender, rx)); + tokio::spawn(WEBSOCKET_HANDLER.handle_receiver(ws_receiver, tx, uuid, log_writer)); +} + +fn sys_refresh_kind() -> RefreshKind { + RefreshKind::nothing() + .with_cpu(CpuRefreshKind::nothing().with_cpu_usage()) + .with_memory(MemoryRefreshKind::nothing().with_ram()) + .with_processes(ProcessRefreshKind::nothing().with_cpu().with_memory()) +} + +fn disk_refresh_kind() -> DiskRefreshKind { + DiskRefreshKind::nothing().with_storage() +} diff --git a/crates/bili_sync/src/main.rs b/crates/bili_sync/src/main.rs index 2326218..f30194b 100644 --- a/crates/bili_sync/src/main.rs +++ b/crates/bili_sync/src/main.rs @@ -24,7 +24,7 @@ use task::{http_server, video_downloader}; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; -use crate::api::{MAX_HISTORY_LOGS, MpscWriter}; +use crate::api::{LogHelper, MAX_HISTORY_LOGS}; use crate::config::{ARGS, VersionedConfig}; use crate::database::setup_database; use crate::utils::init_logger; @@ -77,10 +77,10 @@ fn spawn_task( } /// 初始化日志系统、打印欢迎信息,初始化数据库连接和全局配置 -async fn init() -> (Arc, MpscWriter) { +async fn init() -> (Arc, LogHelper) { let (tx, _rx) = tokio::sync::broadcast::channel(30); let log_history = Arc::new(Mutex::new(VecDeque::with_capacity(MAX_HISTORY_LOGS + 1))); - let log_writer = MpscWriter::new(tx, log_history.clone()); + let log_writer = LogHelper::new(tx, log_history.clone()); init_logger(&ARGS.log_level, Some(log_writer.clone())); info!("欢迎使用 Bili-Sync,当前程序版本:{}", config::version()); diff --git a/crates/bili_sync/src/task/http_server.rs b/crates/bili_sync/src/task/http_server.rs index 1929b31..fde9db1 100644 --- a/crates/bili_sync/src/task/http_server.rs +++ b/crates/bili_sync/src/task/http_server.rs @@ -11,7 +11,7 @@ use reqwest::StatusCode; use rust_embed_for_web::{EmbedableFile, RustEmbed}; use sea_orm::DatabaseConnection; -use crate::api::{MpscWriter, router}; +use crate::api::{LogHelper, router}; use crate::bilibili::BiliClient; use crate::config::VersionedConfig; @@ -23,7 +23,7 @@ struct Asset; pub async fn http_server( database_connection: Arc, bili_client: Arc, - log_writer: MpscWriter, + log_writer: LogHelper, ) -> Result<()> { let app = router() .fallback_service(get(frontend_files)) diff --git a/crates/bili_sync/src/utils/mod.rs b/crates/bili_sync/src/utils/mod.rs index a3638a8..7bcf70e 100644 --- a/crates/bili_sync/src/utils/mod.rs +++ b/crates/bili_sync/src/utils/mod.rs @@ -11,9 +11,9 @@ use tracing_subscriber::fmt; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use crate::api::MpscWriter; +use crate::api::LogHelper; -pub fn init_logger(log_level: &str, log_writer: Option) { +pub fn init_logger(log_level: &str, log_writer: Option) { let log = tracing_subscriber::fmt::Subscriber::builder() .compact() .with_env_filter(tracing_subscriber::EnvFilter::builder().parse_lossy(log_level)) diff --git a/web/src/lib/api.ts b/web/src/lib/api.ts index 1c9dcf2..ed71a9e 100644 --- a/web/src/lib/api.ts +++ b/web/src/lib/api.ts @@ -19,8 +19,10 @@ import type { UpdateVideoSourceRequest, Config, DashBoardResponse, - SysInfoResponse + SysInfo, + TaskStatus } from './types'; +import { wsManager } from './ws'; // API 基础配置 const API_BASE_URL = '/api'; @@ -56,6 +58,8 @@ class ApiClient { clearAuthToken() { delete this.defaultHeaders['Authorization']; localStorage.removeItem('authToken'); + // 断开WebSocket连接,因为token已经无效 + wsManager.disconnect(); } // 通用请求方法 @@ -222,58 +226,14 @@ class ApiClient { async getDashboard(): Promise> { return this.get('/dashboard'); } - - createLogStream( - onMessage: (data: string) => void, - onError?: (error: Event) => void - ): EventSource { - const token = localStorage.getItem('authToken'); - const url = `/api/sse/logs${token ? `?token=${encodeURIComponent(token)}` : ''}`; - const eventSource = new EventSource(url); - eventSource.onmessage = (event) => { - onMessage(event.data); - }; - if (onError) { - eventSource.onerror = onError; - } - return eventSource; + subscribeToLogs(onMessage: (data: string) => void) { + return wsManager.subscribeToLogs(onMessage); } - - createSysInfoStream( - onMessage: (data: SysInfoResponse) => void, - onError?: (error: Event) => void - ): EventSource { - const token = localStorage.getItem('authToken'); - const url = `/api/sse/sysinfo${token ? `?token=${encodeURIComponent(token)}` : ''}`; - const eventSource = new EventSource(url); - eventSource.onmessage = (event) => { - try { - const data = JSON.parse(event.data) as SysInfoResponse; - onMessage(data); - } catch (error) { - console.error('Failed to parse SSE data:', error); - } - }; - if (onError) { - eventSource.onerror = onError; - } - return eventSource; + subscribeToSysInfo(onMessage: (data: SysInfo) => void) { + return wsManager.subscribeToSysInfo(onMessage); } - - createTasksStream( - onMessage: (data: string) => void, - onError?: (error: Event) => void - ): EventSource { - const token = localStorage.getItem('authToken'); - const url = `/api/sse/tasks${token ? `?token=${encodeURIComponent(token)}` : ''}`; - const eventSource = new EventSource(url); - eventSource.onmessage = (event) => { - onMessage(event.data); - }; - if (onError) { - eventSource.onerror = onError; - } - return eventSource; + subscribeToTasks(onMessage: (data: TaskStatus) => void) { + return wsManager.subscribeToTasks(onMessage); } } @@ -303,14 +263,14 @@ const api = { getConfig: () => apiClient.getConfig(), updateConfig: (config: Config) => apiClient.updateConfig(config), getDashboard: () => apiClient.getDashboard(), - createSysInfoStream: ( - onMessage: (data: SysInfoResponse) => void, - onError?: (error: Event) => void - ) => apiClient.createSysInfoStream(onMessage, onError), - createLogStream: (onMessage: (data: string) => void, onError?: (error: Event) => void) => - apiClient.createLogStream(onMessage, onError), - createTasksStream: (onMessage: (data: string) => void, onError?: (error: Event) => void) => - apiClient.createTasksStream(onMessage, onError), + subscribeToSysInfo: (onMessage: (data: SysInfo) => void) => + apiClient.subscribeToSysInfo(onMessage), + + subscribeToLogs: (onMessage: (data: string) => void) => apiClient.subscribeToLogs(onMessage), + + subscribeToTasks: (onMessage: (data: TaskStatus) => void) => + apiClient.subscribeToTasks(onMessage), + setAuthToken: (token: string) => apiClient.setAuthToken(token), clearAuthToken: () => apiClient.clearAuthToken() }; diff --git a/web/src/lib/components/video-card.svelte b/web/src/lib/components/video-card.svelte index a53c9f7..1bb3fa1 100644 --- a/web/src/lib/components/video-card.svelte +++ b/web/src/lib/components/video-card.svelte @@ -13,7 +13,8 @@ import { goto } from '$app/navigation'; import * as Tooltip from '$lib/components/ui/tooltip/index.js'; - export let video: VideoInfo; + // 将 bvid 设置为可选属性,但保留 VideoInfo 的其它所有属性 + export let video: Omit & { bvid?: string }; export let showActions: boolean = true; // 控制是否显示操作按钮 export let mode: 'default' | 'detail' | 'page' = 'default'; // 卡片模式 export let customTitle: string = ''; // 自定义标题 diff --git a/web/src/lib/stores/tasks.ts b/web/src/lib/stores/tasks.ts deleted file mode 100644 index 9652cf6..0000000 --- a/web/src/lib/stores/tasks.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { writable } from 'svelte/store'; - -export interface TaskStatus { - is_running: boolean; - last_run: Date | null; - last_finish: Date | null; - next_run: Date | null; -} - -export const taskStatusStore = writable(undefined); - -export function setTaskStatus(status: TaskStatus) { - taskStatusStore.set(status); -} diff --git a/web/src/lib/types.ts b/web/src/lib/types.ts index 6d52b38..6e975b5 100644 --- a/web/src/lib/types.ts +++ b/web/src/lib/types.ts @@ -259,7 +259,7 @@ export interface DashBoardResponse { } // 系统信息响应类型 -export interface SysInfoResponse { +export interface SysInfo { total_memory: number; used_memory: number; process_memory: number; @@ -270,3 +270,10 @@ export interface SysInfoResponse { available_disk: number; uptime: number; } + +export interface TaskStatus { + is_running: boolean; + last_run: Date | null; + last_finish: Date | null; + next_run: Date | null; +} diff --git a/web/src/lib/ws.ts b/web/src/lib/ws.ts new file mode 100644 index 0000000..b730ae9 --- /dev/null +++ b/web/src/lib/ws.ts @@ -0,0 +1,266 @@ +import { toast } from 'svelte-sonner'; +import type { SysInfo, TaskStatus } from './types'; + +// 支持的事件类型 +export enum EventType { + Logs = 'logs', + Tasks = 'tasks', + SysInfo = 'sysInfo' +} + +// 服务器事件响应格式 +interface ServerEvent { + logs?: string; + tasks?: TaskStatus; + sysInfo?: SysInfo; +} + +// 客户端事件请求格式 +interface ClientEvent { + subscribe?: EventType; + unsubscribe?: EventType; +} + +// 回调函数类型定义 +type LogsCallback = (data: string) => void; +type TasksCallback = (data: TaskStatus) => void; +type SysInfoCallback = (data: SysInfo) => void; +type ErrorCallback = (error: Event) => void; + +export class WebSocketManager { + private static instance: WebSocketManager; + private socket: WebSocket | null = null; + private connected = false; + private connecting = false; + private reconnectTimer: ReturnType | null = null; + private reconnectAttempts = 0; + private maxReconnectAttempts = 5; + private baseReconnectDelay = 1000; + + private logsSubscribers: Set = new Set(); + private tasksSubscribers: Set = new Set(); + private sysInfoSubscribers: Set = new Set(); + private errorSubscribers: Set = new Set(); + + private subscribedEvents: Set = new Set(); + + private constructor() {} + + public static getInstance(): WebSocketManager { + if (!WebSocketManager.instance) { + WebSocketManager.instance = new WebSocketManager(); + } + return WebSocketManager.instance; + } + + // 连接 WebSocket + public connect(): void { + if (this.connected || this.connecting) return; + + this.connecting = true; + const token = localStorage.getItem('authToken') || ''; + + try { + const protocol = window.location.protocol === 'https:' ? 'wss://' : 'ws://'; + this.socket = new WebSocket(`${protocol}${window.location.host}/api/ws`, [token]); + + this.socket.onopen = () => { + this.connected = true; + this.connecting = false; + this.reconnectAttempts = 0; + + this.resubscribeEvents(); + }; + + this.socket.onmessage = this.handleMessage.bind(this); + + this.socket.onclose = () => { + this.connected = false; + this.connecting = false; + this.scheduleReconnect(); + }; + + this.socket.onerror = (error) => { + console.error('WebSocket error:', error); + toast.error('WebSocket 连接发生错误,请检查网络或稍后重试'); + }; + } catch (error) { + this.connecting = false; + console.error('Failed to create WebSocket:', error); + toast.error('创建 WebSocket 连接失败,请检查网络或稍后重试'); + this.scheduleReconnect(); + } + } + + private handleMessage(event: MessageEvent): void { + try { + const data = JSON.parse(event.data) as ServerEvent; + + if (data.logs !== undefined) { + this.notifyLogsSubscribers(data.logs); + } else if (data.tasks !== undefined) { + this.notifyTasksSubscribers(data.tasks); + } else if (data.sysInfo !== undefined) { + this.notifySysInfoSubscribers(data.sysInfo); + } + } catch (error) { + console.error('Failed to parse WebSocket message:', error, event.data); + toast.error('解析 WebSocket 消息失败', { + description: `消息内容: ${event.data}\n错误信息: ${error instanceof Error ? error.message : String(error)}` + }); + } + } + + private sendMessage(message: ClientEvent): void { + if (!this.connected || !this.socket) { + console.warn('Cannot send message: WebSocket not connected'); + return; + } + + try { + this.socket.send(JSON.stringify(message)); + } catch (error) { + console.error('Failed to send message:', error); + toast.error('发送 WebSocket 消息失败', { + description: `消息内容: ${JSON.stringify(message)}\n错误信息: ${error instanceof Error ? error.message : String(error)}` + }); + } + } + + private subscribe(eventType: EventType): void { + if (this.subscribedEvents.has(eventType)) return; + + this.sendMessage({ subscribe: eventType }); + this.subscribedEvents.add(eventType); + } + + // 取消订阅事件 + private unsubscribe(eventType: EventType): void { + if (!this.subscribedEvents.has(eventType)) return; + + this.sendMessage({ unsubscribe: eventType }); + this.subscribedEvents.delete(eventType); + } + + private resubscribeEvents(): void { + for (const eventType of this.subscribedEvents) { + this.sendMessage({ subscribe: eventType }); + } + } + + private scheduleReconnect(): void { + if (this.reconnectTimer !== null) { + clearTimeout(this.reconnectTimer); + } + + if (this.reconnectAttempts >= this.maxReconnectAttempts) { + console.log('Max reconnect attempts reached'); + return; + } + + const delay = this.baseReconnectDelay * Math.pow(2, this.reconnectAttempts); + console.log(`Scheduling reconnect in ${delay}ms`); + + this.reconnectTimer = setTimeout(() => { + this.reconnectAttempts++; + this.connect(); + }, delay); + } + + public subscribeToLogs(callback: LogsCallback): () => void { + this.connect(); + this.logsSubscribers.add(callback); + + if (this.logsSubscribers.size === 1) { + this.subscribe(EventType.Logs); + } + + return () => { + this.logsSubscribers.delete(callback); + if (this.logsSubscribers.size === 0) { + this.unsubscribe(EventType.Logs); + } + }; + } + + // 订阅任务状态 + public subscribeToTasks(callback: TasksCallback): () => void { + this.connect(); + this.tasksSubscribers.add(callback); + + if (this.tasksSubscribers.size === 1) { + this.subscribe(EventType.Tasks); + } + + return () => { + this.tasksSubscribers.delete(callback); + if (this.tasksSubscribers.size === 0) { + this.unsubscribe(EventType.Tasks); + } + }; + } + + public subscribeToSysInfo(callback: SysInfoCallback): () => void { + this.connect(); + this.sysInfoSubscribers.add(callback); + + if (this.sysInfoSubscribers.size === 1) { + this.subscribe(EventType.SysInfo); + } + + return () => { + this.sysInfoSubscribers.delete(callback); + if (this.sysInfoSubscribers.size === 0) { + this.unsubscribe(EventType.SysInfo); + } + }; + } + + private notifyLogsSubscribers(data: string): void { + this.logsSubscribers.forEach((callback) => { + try { + callback(data); + } catch (error) { + console.error('Error in logs subscriber callback:', error); + } + }); + } + + private notifyTasksSubscribers(data: TaskStatus): void { + this.tasksSubscribers.forEach((callback) => { + try { + callback(data); + } catch (error) { + console.error('Error in tasks subscriber callback:', error); + } + }); + } + + private notifySysInfoSubscribers(data: SysInfo): void { + this.sysInfoSubscribers.forEach((callback) => { + try { + callback(data); + } catch (error) { + console.error('Error in sysInfo subscriber callback:', error); + } + }); + } + + public disconnect(): void { + if (this.socket) { + this.socket.close(); + this.socket = null; + } + + if (this.reconnectTimer !== null) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + this.connected = false; + this.connecting = false; + this.subscribedEvents.clear(); + } +} + +export const wsManager = WebSocketManager.getInstance(); diff --git a/web/src/routes/+layout.svelte b/web/src/routes/+layout.svelte index 9b3b1b1..ea50e53 100644 --- a/web/src/routes/+layout.svelte +++ b/web/src/routes/+layout.svelte @@ -6,31 +6,6 @@ import { breadcrumbStore } from '$lib/stores/breadcrumb'; import * as Sidebar from '$lib/components/ui/sidebar/index.js'; import { Toaster } from '$lib/components/ui/sonner/index.js'; - import { onMount } from 'svelte'; - import { setTaskStatus, type TaskStatus } from '$lib/stores/tasks'; - import api from '$lib/api'; - import { toast } from 'svelte-sonner'; - - let tasksStream: EventSource | undefined; - - onMount(() => { - tasksStream = api.createTasksStream( - (data: string) => { - const status: TaskStatus = JSON.parse(data); - setTaskStatus(status); - }, - (error: Event) => { - console.error('任务状态流错误:', error); - toast.error('任务状态流错误,请检查网络连接或稍后重试'); - } - ); - return () => { - if (tasksStream) { - tasksStream.close(); - tasksStream = undefined; - } - }; - }); diff --git a/web/src/routes/+page.svelte b/web/src/routes/+page.svelte index e375d42..3811a33 100644 --- a/web/src/routes/+page.svelte +++ b/web/src/routes/+page.svelte @@ -11,7 +11,7 @@ import { toast } from 'svelte-sonner'; import CloudDownloadIcon from '@lucide/svelte/icons/cloud-download'; import api from '$lib/api'; - import type { DashBoardResponse, SysInfoResponse, ApiError } from '$lib/types'; + import type { DashBoardResponse, SysInfo, ApiError, TaskStatus } from '$lib/types'; import DatabaseIcon from '@lucide/svelte/icons/database'; import HeartIcon from '@lucide/svelte/icons/heart'; import FolderIcon from '@lucide/svelte/icons/folder'; @@ -24,12 +24,13 @@ import PlayIcon from '@lucide/svelte/icons/play'; import CheckCircleIcon from '@lucide/svelte/icons/check-circle'; import CalendarIcon from '@lucide/svelte/icons/calendar'; - import { taskStatusStore } from '$lib/stores/tasks'; let dashboardData: DashBoardResponse | null = null; - let sysInfo: SysInfoResponse | null = null; + let sysInfo: SysInfo | null = null; + let taskStatus: TaskStatus | null = null; let loading = false; - let sysInfoEventSource: EventSource | null = null; + let unsubscribeSysInfo: (() => void) | null = null; + let unsubscribeTasks: (() => void) | null = null; function formatBytes(bytes: number): string { if (bytes === 0) return '0 B'; @@ -58,33 +59,25 @@ } } - // 启动系统信息流 - function startSysInfoStream() { - sysInfoEventSource = api.createSysInfoStream( - (data) => { - sysInfo = data; - }, - (error) => { - console.error('系统信息流错误:', error); - toast.error('系统信息流出现错误,请检查网络连接或稍后重试'); - } - ); - } - - // 停止系统信息流 - function stopSysInfoStream() { - if (sysInfoEventSource) { - sysInfoEventSource.close(); - sysInfoEventSource = null; - } - } - onMount(() => { setBreadcrumb([{ label: '仪表盘' }]); + + unsubscribeSysInfo = api.subscribeToSysInfo((data) => { + sysInfo = data; + }); + unsubscribeTasks = api.subscribeToTasks((data: TaskStatus) => { + taskStatus = data; + }); loadDashboard(); - startSysInfoStream(); return () => { - stopSysInfoStream(); + if (unsubscribeSysInfo) { + unsubscribeSysInfo(); + unsubscribeSysInfo = null; + } + if (unsubscribeTasks) { + unsubscribeTasks(); + unsubscribeTasks = null; + } }; }); @@ -236,7 +229,7 @@ {#if dashboardData && dashboardData.videos_by_day.length > 0}
- 近七日共新增视频 + 近七日新增视频 {dashboardData.videos_by_day.reduce((sum, v) => sum + v.cnt, 0)} 个 @@ -283,14 +276,14 @@ - {#if $taskStatusStore} + {#if taskStatus}
当前任务状态 - - {$taskStatusStore.is_running ? '运行中' : '未运行'} + + {taskStatus.is_running ? '运行中' : '未运行'}
@@ -300,8 +293,8 @@ 开始运行
- {$taskStatusStore.last_run - ? new Date($taskStatusStore.last_run).toLocaleString('en-US', { + {taskStatus.last_run + ? new Date(taskStatus.last_run).toLocaleString('en-US', { hour: '2-digit', minute: '2-digit', second: '2-digit', @@ -316,8 +309,8 @@ 运行结束
- {$taskStatusStore.last_finish - ? new Date($taskStatusStore.last_finish).toLocaleString('en-US', { + {taskStatus.last_finish + ? new Date(taskStatus.last_finish).toLocaleString('en-US', { hour: '2-digit', minute: '2-digit', second: '2-digit', @@ -332,8 +325,8 @@ 下次运行
- {$taskStatusStore.next_run - ? new Date($taskStatusStore.next_run).toLocaleString('en-US', { + {taskStatus.next_run + ? new Date(taskStatus.next_run).toLocaleString('en-US', { hour: '2-digit', minute: '2-digit', second: '2-digit', diff --git a/web/src/routes/logs/+page.svelte b/web/src/routes/logs/+page.svelte index beaeeb0..618cfaf 100644 --- a/web/src/routes/logs/+page.svelte +++ b/web/src/routes/logs/+page.svelte @@ -3,9 +3,8 @@ import { setBreadcrumb } from '$lib/stores/breadcrumb'; import { onMount } from 'svelte'; import { Badge } from '$lib/components/ui/badge'; - import { toast } from 'svelte-sonner'; - let logEventSource: EventSource | null = null; + let unsubscribeLog: (() => void) | null = null; let logs: Array<{ timestamp: string; level: string; message: string }> = []; let shouldAutoScroll = true; let main: HTMLElement | null = null; @@ -23,37 +22,20 @@ } } - function startLogStream() { - if (logEventSource) { - logEventSource.close(); - } - logEventSource = api.createLogStream( - (data: string) => { - logs = [...logs.slice(-200), JSON.parse(data)]; - setTimeout(scrollToBottom, 0); - }, - (error: Event) => { - console.error('日志流错误:', error); - toast.error('日志流出现错误,请检查网络连接或稍后重试'); - } - ); - } - - function stopLogStream() { - if (logEventSource) { - logEventSource.close(); - logEventSource = null; - } - } - onMount(() => { setBreadcrumb([{ label: '日志' }]); main = document.getElementById('main'); main?.addEventListener('scroll', checkScrollPosition); - startLogStream(); + unsubscribeLog = api.subscribeToLogs((data: string) => { + logs = [...logs.slice(-200), JSON.parse(data)]; + setTimeout(scrollToBottom, 0); + }); return () => { - stopLogStream(); main?.removeEventListener('scroll', checkScrollPosition); + if (unsubscribeLog) { + unsubscribeLog(); + unsubscribeLog = null; + } }; }); diff --git a/web/vite.config.ts b/web/vite.config.ts index 525781f..e4ecc78 100644 --- a/web/vite.config.ts +++ b/web/vite.config.ts @@ -6,6 +6,11 @@ export default defineConfig({ plugins: [tailwindcss(), sveltekit()], server: { proxy: { + '/api/ws': { + target: 'ws://localhost:12345', + ws: true, + rewriteWsOrigin: true + }, '/api': 'http://localhost:12345', '/image-proxy': 'http://localhost:12345' },