feat: 前端添加下载状态卡片 (#385)
This commit is contained in:
@@ -168,14 +168,6 @@ pub struct SysInfoResponse {
|
||||
pub available_disk: u64,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct TaskStatusResponse {
|
||||
pub running: bool,
|
||||
pub last_run: Option<String>,
|
||||
pub next_run: Option<String>,
|
||||
pub last_error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, FromQueryResult)]
|
||||
pub struct VideoSourceDetail {
|
||||
pub id: i32,
|
||||
|
||||
@@ -9,7 +9,7 @@ use sea_orm::DatabaseConnection;
|
||||
use crate::api::error::InnerApiError;
|
||||
use crate::api::wrapper::{ApiError, ApiResponse, ValidatedJson};
|
||||
use crate::config::{Config, VersionedConfig};
|
||||
use crate::task::DOWNLOADER_TASK_RUNNING;
|
||||
use crate::utils::task_notifier::TASK_STATUS_NOTIFIER;
|
||||
|
||||
pub(super) fn router() -> Router {
|
||||
Router::new().route("/config", get(get_config).put(update_config))
|
||||
@@ -25,7 +25,7 @@ pub async fn update_config(
|
||||
Extension(db): Extension<Arc<DatabaseConnection>>,
|
||||
ValidatedJson(config): ValidatedJson<Config>,
|
||||
) -> Result<ApiResponse<Arc<Config>>, ApiError> {
|
||||
let Ok(_lock) = DOWNLOADER_TASK_RUNNING.try_lock() else {
|
||||
let Some(_lock) = TASK_STATUS_NOTIFIER.detect_running() else {
|
||||
// 简单避免一下可能的不一致现象
|
||||
return Err(InnerApiError::BadRequest("下载任务正在运行,无法修改配置".to_string()).into());
|
||||
};
|
||||
|
||||
@@ -1,27 +1,16 @@
|
||||
use std::convert::Infallible;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::response::Sse;
|
||||
use axum::response::sse::{Event, KeepAlive};
|
||||
use axum::routing::get;
|
||||
use axum::{Extension, Router};
|
||||
use bili_sync_entity::*;
|
||||
use futures::StreamExt;
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::{FromQueryResult, Statement};
|
||||
use sysinfo::{
|
||||
CpuRefreshKind, DiskRefreshKind, Disks, MemoryRefreshKind, ProcessRefreshKind, RefreshKind, System, get_current_pid,
|
||||
};
|
||||
use tokio_stream::wrappers::IntervalStream;
|
||||
|
||||
use crate::api::response::{DashBoardResponse, DayCountPair, SysInfoResponse};
|
||||
use crate::api::response::{DashBoardResponse, DayCountPair};
|
||||
use crate::api::wrapper::{ApiError, ApiResponse};
|
||||
|
||||
pub(super) fn router() -> Router {
|
||||
Router::new()
|
||||
.route("/dashboard", get(get_dashboard))
|
||||
.route("/dashboard/sysinfo", get(get_sysinfo))
|
||||
Router::new().route("/dashboard", get(get_dashboard))
|
||||
}
|
||||
|
||||
async fn get_dashboard(
|
||||
@@ -86,46 +75,3 @@ ORDER BY
|
||||
videos_by_day,
|
||||
}));
|
||||
}
|
||||
|
||||
async fn get_sysinfo() -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
|
||||
let sys_refresh_kind = sys_refresh_kind();
|
||||
let disk_refresh_kind = disk_refresh_kind();
|
||||
let mut system = System::new();
|
||||
let mut disks = Disks::new();
|
||||
// safety: this functions always returns Ok on Linux/MacOS/Windows
|
||||
let self_pid = get_current_pid().unwrap();
|
||||
let stream = IntervalStream::new(tokio::time::interval(Duration::from_secs(2)))
|
||||
.map(move |_| {
|
||||
system.refresh_specifics(sys_refresh_kind);
|
||||
disks.refresh_specifics(true, disk_refresh_kind);
|
||||
let process = match system.process(self_pid) {
|
||||
Some(p) => p,
|
||||
None => return None,
|
||||
};
|
||||
let info = SysInfoResponse {
|
||||
total_memory: system.total_memory(),
|
||||
used_memory: system.used_memory(),
|
||||
process_memory: process.memory(),
|
||||
used_cpu: system.global_cpu_usage(),
|
||||
process_cpu: process.cpu_usage() / system.cpus().len() as f32,
|
||||
total_disk: disks.iter().map(|d| d.total_space()).sum(),
|
||||
available_disk: disks.iter().map(|d| d.available_space()).sum(),
|
||||
};
|
||||
serde_json::to_string(&info).ok()
|
||||
})
|
||||
.take_while(|info| futures::future::ready(info.is_some()))
|
||||
// safety: after `take_while`, `info` is always Some
|
||||
.map(|info| Ok(Event::default().data(info.unwrap())));
|
||||
Sse::new(stream).keep_alive(KeepAlive::default())
|
||||
}
|
||||
|
||||
fn sys_refresh_kind() -> RefreshKind {
|
||||
RefreshKind::nothing()
|
||||
.with_cpu(CpuRefreshKind::nothing().with_cpu_usage())
|
||||
.with_memory(MemoryRefreshKind::nothing().with_ram())
|
||||
.with_processes(ProcessRefreshKind::nothing().with_cpu().with_memory())
|
||||
}
|
||||
|
||||
fn disk_refresh_kind() -> DiskRefreshKind {
|
||||
DiskRefreshKind::nothing().with_storage()
|
||||
}
|
||||
|
||||
@@ -1,32 +0,0 @@
|
||||
mod mpsc;
|
||||
use std::convert::Infallible;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::response::sse::{Event, KeepAlive, Sse};
|
||||
use axum::routing::get;
|
||||
use axum::{Extension, Router};
|
||||
use futures::{Stream, StreamExt};
|
||||
pub use mpsc::{MAX_HISTORY_LOGS, MpscWriter};
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
|
||||
pub(super) fn router() -> Router {
|
||||
Router::new().route("/logs", get(logs))
|
||||
}
|
||||
|
||||
async fn logs(Extension(log_writer): Extension<MpscWriter>) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
|
||||
let history = log_writer.log_history.lock();
|
||||
let rx = log_writer.sender.subscribe();
|
||||
let history_logs: Vec<String> = history.iter().cloned().collect();
|
||||
drop(history);
|
||||
|
||||
let history_stream = { futures::stream::iter(history_logs.into_iter().map(|msg| Ok(Event::default().data(msg)))) };
|
||||
|
||||
let stream = BroadcastStream::new(rx).filter_map(async |msg| match msg {
|
||||
Ok(log_message) => Some(Ok(Event::default().data(log_message))),
|
||||
Err(e) => {
|
||||
error!("Broadcast stream error: {:?}", e);
|
||||
None
|
||||
}
|
||||
});
|
||||
Sse::new(history_stream.chain(stream)).keep_alive(KeepAlive::new().interval(Duration::from_secs(10)))
|
||||
}
|
||||
@@ -18,12 +18,12 @@ use crate::config::VersionedConfig;
|
||||
|
||||
mod config;
|
||||
mod dashboard;
|
||||
mod logs;
|
||||
mod me;
|
||||
mod sse;
|
||||
mod video_sources;
|
||||
mod videos;
|
||||
|
||||
pub use logs::{MAX_HISTORY_LOGS, MpscWriter};
|
||||
pub use sse::{MAX_HISTORY_LOGS, MpscWriter};
|
||||
|
||||
pub fn router() -> Router {
|
||||
Router::new().route("/image-proxy", get(image_proxy)).nest(
|
||||
@@ -33,7 +33,7 @@ pub fn router() -> Router {
|
||||
.merge(video_sources::router())
|
||||
.merge(videos::router())
|
||||
.merge(dashboard::router())
|
||||
.merge(logs::router())
|
||||
.merge(sse::router())
|
||||
.layer(middleware::from_fn(auth)),
|
||||
)
|
||||
}
|
||||
|
||||
98
crates/bili_sync/src/api/routes/sse/mod.rs
Normal file
98
crates/bili_sync/src/api/routes/sse/mod.rs
Normal file
@@ -0,0 +1,98 @@
|
||||
mod mpsc;
|
||||
|
||||
use std::convert::Infallible;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::response::Sse;
|
||||
use axum::response::sse::{Event, KeepAlive};
|
||||
use axum::routing::get;
|
||||
use axum::{Extension, Router};
|
||||
use futures::{Stream, StreamExt};
|
||||
pub use mpsc::{MAX_HISTORY_LOGS, MpscWriter};
|
||||
use sysinfo::{
|
||||
CpuRefreshKind, DiskRefreshKind, Disks, MemoryRefreshKind, ProcessRefreshKind, RefreshKind, System, get_current_pid,
|
||||
};
|
||||
use tokio_stream::wrappers::{BroadcastStream, IntervalStream, WatchStream};
|
||||
|
||||
use crate::api::response::SysInfoResponse;
|
||||
use crate::utils::task_notifier::TASK_STATUS_NOTIFIER;
|
||||
|
||||
pub(super) fn router() -> Router {
|
||||
Router::new()
|
||||
.route("/sse/logs", get(logs))
|
||||
.route("/sse/tasks", get(get_tasks))
|
||||
.route("/sse/sysinfo", get(get_sysinfo))
|
||||
}
|
||||
|
||||
async fn get_tasks() -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
|
||||
let stream = WatchStream::new(TASK_STATUS_NOTIFIER.subscribe()).filter_map(|status| async move {
|
||||
match serde_json::to_string(&status) {
|
||||
Ok(status) => Some(Ok(Event::default().data(status))),
|
||||
Err(_) => None,
|
||||
}
|
||||
});
|
||||
Sse::new(stream).keep_alive(KeepAlive::default())
|
||||
}
|
||||
|
||||
async fn logs(Extension(log_writer): Extension<MpscWriter>) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
|
||||
let history = log_writer.log_history.lock();
|
||||
let rx = log_writer.sender.subscribe();
|
||||
let history_logs: Vec<String> = history.iter().cloned().collect();
|
||||
drop(history);
|
||||
|
||||
let history_stream = { futures::stream::iter(history_logs.into_iter().map(|msg| Ok(Event::default().data(msg)))) };
|
||||
|
||||
let stream = BroadcastStream::new(rx).filter_map(async |msg| match msg {
|
||||
Ok(log_message) => Some(Ok(Event::default().data(log_message))),
|
||||
Err(e) => {
|
||||
error!("Log stream error: {:?}", e);
|
||||
None
|
||||
}
|
||||
});
|
||||
Sse::new(history_stream.chain(stream)).keep_alive(KeepAlive::default())
|
||||
}
|
||||
|
||||
async fn get_sysinfo() -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
|
||||
let sys_refresh_kind = sys_refresh_kind();
|
||||
let disk_refresh_kind = disk_refresh_kind();
|
||||
let mut system = System::new();
|
||||
let mut disks = Disks::new();
|
||||
// safety: this functions always returns Ok on Linux/MacOS/Windows
|
||||
let self_pid = get_current_pid().unwrap();
|
||||
let stream = IntervalStream::new(tokio::time::interval(Duration::from_secs(2))).filter_map(move |_| {
|
||||
system.refresh_specifics(sys_refresh_kind);
|
||||
disks.refresh_specifics(true, disk_refresh_kind);
|
||||
let process = match system.process(self_pid) {
|
||||
Some(p) => p,
|
||||
None => return futures::future::ready(None),
|
||||
};
|
||||
let info = SysInfoResponse {
|
||||
total_memory: system.total_memory(),
|
||||
used_memory: system.used_memory(),
|
||||
process_memory: process.memory(),
|
||||
used_cpu: system.global_cpu_usage(),
|
||||
process_cpu: process.cpu_usage() / system.cpus().len() as f32,
|
||||
total_disk: disks.iter().map(|d| d.total_space()).sum(),
|
||||
available_disk: disks.iter().map(|d| d.available_space()).sum(),
|
||||
};
|
||||
match serde_json::to_string(&info) {
|
||||
Ok(json) => futures::future::ready(Some(Ok(Event::default().data(json)))),
|
||||
Err(_) => {
|
||||
error!("Failed to serialize system info");
|
||||
futures::future::ready(None)
|
||||
}
|
||||
}
|
||||
});
|
||||
Sse::new(stream).keep_alive(KeepAlive::default())
|
||||
}
|
||||
|
||||
fn sys_refresh_kind() -> RefreshKind {
|
||||
RefreshKind::nothing()
|
||||
.with_cpu(CpuRefreshKind::nothing().with_cpu_usage())
|
||||
.with_memory(MemoryRefreshKind::nothing().with_ram())
|
||||
.with_processes(ProcessRefreshKind::nothing().with_cpu().with_memory())
|
||||
}
|
||||
|
||||
fn disk_refresh_kind() -> DiskRefreshKind {
|
||||
DiskRefreshKind::nothing().with_storage()
|
||||
}
|
||||
@@ -2,4 +2,4 @@ mod http_server;
|
||||
mod video_downloader;
|
||||
|
||||
pub use http_server::http_server;
|
||||
pub use video_downloader::{DOWNLOADER_TASK_RUNNING, video_downloader};
|
||||
pub use video_downloader::video_downloader;
|
||||
|
||||
@@ -6,16 +6,15 @@ use tokio::time;
|
||||
use crate::bilibili::{self, BiliClient};
|
||||
use crate::config::VersionedConfig;
|
||||
use crate::utils::model::get_enabled_video_sources;
|
||||
use crate::utils::task_notifier::TASK_STATUS_NOTIFIER;
|
||||
use crate::workflow::process_video_source;
|
||||
|
||||
pub static DOWNLOADER_TASK_RUNNING: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
|
||||
|
||||
/// 启动周期下载视频的任务
|
||||
pub async fn video_downloader(connection: Arc<DatabaseConnection>, bili_client: Arc<BiliClient>) {
|
||||
let mut anchor = chrono::Local::now().date_naive();
|
||||
loop {
|
||||
info!("开始执行本轮视频下载任务..");
|
||||
let _lock = DOWNLOADER_TASK_RUNNING.lock().await;
|
||||
let _lock = TASK_STATUS_NOTIFIER.start_running().await;
|
||||
let config = VersionedConfig::get().load_full();
|
||||
'inner: {
|
||||
if let Err(e) = config.check() {
|
||||
@@ -55,7 +54,7 @@ pub async fn video_downloader(connection: Arc<DatabaseConnection>, bili_client:
|
||||
}
|
||||
info!("本轮任务执行完毕,等待下一轮执行");
|
||||
}
|
||||
drop(_lock);
|
||||
TASK_STATUS_NOTIFIER.finish_running(_lock);
|
||||
time::sleep(time::Duration::from_secs(config.interval)).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ pub mod model;
|
||||
pub mod nfo;
|
||||
pub mod signal;
|
||||
pub mod status;
|
||||
pub mod task_notifier;
|
||||
pub mod validation;
|
||||
use tracing_subscriber::fmt;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
|
||||
79
crates/bili_sync/src/utils/task_notifier.rs
Normal file
79
crates/bili_sync/src/utils/task_notifier.rs
Normal file
@@ -0,0 +1,79 @@
|
||||
use std::sync::{Arc, LazyLock};
|
||||
|
||||
use serde::Serialize;
|
||||
use tokio::sync::MutexGuard;
|
||||
|
||||
use crate::config::VersionedConfig;
|
||||
|
||||
pub static TASK_STATUS_NOTIFIER: LazyLock<TaskStatusNotifier> = LazyLock::new(TaskStatusNotifier::new);
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct TaskStatus {
|
||||
is_running: bool,
|
||||
last_run: Option<chrono::DateTime<chrono::Local>>,
|
||||
last_finish: Option<chrono::DateTime<chrono::Local>>,
|
||||
next_run: Option<chrono::DateTime<chrono::Local>>,
|
||||
}
|
||||
|
||||
pub struct TaskStatusNotifier {
|
||||
mutex: tokio::sync::Mutex<()>,
|
||||
tx: tokio::sync::watch::Sender<Arc<TaskStatus>>,
|
||||
rx: tokio::sync::watch::Receiver<Arc<TaskStatus>>,
|
||||
}
|
||||
|
||||
impl Default for TaskStatus {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
is_running: false,
|
||||
last_run: None,
|
||||
last_finish: None,
|
||||
next_run: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TaskStatusNotifier {
|
||||
pub fn new() -> Self {
|
||||
let (tx, rx) = tokio::sync::watch::channel(Arc::new(TaskStatus::default()));
|
||||
Self {
|
||||
mutex: tokio::sync::Mutex::const_new(()),
|
||||
tx,
|
||||
rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_running(&self) -> MutexGuard<()> {
|
||||
let lock = self.mutex.lock().await;
|
||||
let _ = self.tx.send(Arc::new(TaskStatus {
|
||||
is_running: true,
|
||||
last_run: Some(chrono::Local::now()),
|
||||
last_finish: None,
|
||||
next_run: None,
|
||||
}));
|
||||
lock
|
||||
}
|
||||
|
||||
pub fn finish_running(&self, _lock: MutexGuard<()>) {
|
||||
let last_status = self.tx.borrow();
|
||||
let last_run = last_status.last_run.clone();
|
||||
drop(last_status);
|
||||
let config = VersionedConfig::get().load();
|
||||
let now = chrono::Local::now();
|
||||
|
||||
let _ = self.tx.send(Arc::new(TaskStatus {
|
||||
is_running: false,
|
||||
last_run,
|
||||
last_finish: Some(now),
|
||||
next_run: now.checked_add_signed(chrono::Duration::seconds(config.interval as i64)),
|
||||
}));
|
||||
}
|
||||
|
||||
/// 精确探测任务执行状态,保证如果读取到“未运行”,那么在锁释放之前任务不会被执行
|
||||
pub fn detect_running(&self) -> Option<MutexGuard<'_, ()>> {
|
||||
self.mutex.try_lock().ok()
|
||||
}
|
||||
|
||||
pub fn subscribe(&self) -> tokio::sync::watch::Receiver<Arc<TaskStatus>> {
|
||||
self.rx.clone()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user