feat: 修改交互逻辑,支持前端查看日志 (#378)

This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2025-07-08 12:48:51 +08:00
committed by GitHub
parent 7c73a2f01a
commit 1affe4d594
71 changed files with 2049 additions and 1301 deletions

View File

@@ -5,4 +5,4 @@ mod response;
mod routes;
mod wrapper;
pub use routes::router;
pub use routes::{MpscWriter, router};

View File

@@ -0,0 +1,32 @@
mod mpsc;
use std::convert::Infallible;
use std::time::Duration;
use axum::response::sse::{Event, KeepAlive, Sse};
use axum::routing::get;
use axum::{Extension, Router};
use futures::{Stream, StreamExt};
pub use mpsc::MpscWriter;
use tokio_stream::wrappers::BroadcastStream;
pub(super) fn router() -> Router {
Router::new().route("/logs", get(logs))
}
async fn logs(Extension(log_writer): Extension<MpscWriter>) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let history = log_writer.log_history.lock();
let rx = log_writer.sender.subscribe();
let history_logs: Vec<String> = history.iter().cloned().collect();
drop(history);
let history_stream = { futures::stream::iter(history_logs.into_iter().map(|msg| Ok(Event::default().data(msg)))) };
let stream = BroadcastStream::new(rx).filter_map(async |msg| match msg {
Ok(log_message) => Some(Ok(Event::default().data(log_message))),
Err(e) => {
error!("Broadcast stream error: {:?}", e);
None
}
});
Sse::new(history_stream.chain(stream)).keep_alive(KeepAlive::new().interval(Duration::from_secs(10)))
}

View File

@@ -0,0 +1,53 @@
use std::collections::VecDeque;
use std::sync::Arc;
use parking_lot::Mutex;
use tokio::sync::broadcast;
use tracing_subscriber::fmt::MakeWriter;
const MAX_HISTORY_LOGS: usize = 20;
pub struct MpscWriter {
pub sender: broadcast::Sender<String>,
pub log_history: Arc<Mutex<VecDeque<String>>>,
}
impl MpscWriter {
pub fn new(sender: broadcast::Sender<String>, log_history: Arc<Mutex<VecDeque<String>>>) -> Self {
MpscWriter { sender, log_history }
}
}
impl<'a> MakeWriter<'a> for MpscWriter {
type Writer = Self;
fn make_writer(&'a self) -> Self::Writer {
self.clone()
}
}
impl std::io::Write for MpscWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let log_message = String::from_utf8_lossy(buf).to_string();
let _ = self.sender.send(log_message.clone());
let mut history = self.log_history.lock();
history.push_back(log_message);
if history.len() > MAX_HISTORY_LOGS {
history.pop_front();
}
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl Clone for MpscWriter {
fn clone(&self) -> Self {
MpscWriter {
sender: self.sender.clone(),
log_history: self.log_history.clone(),
}
}
}

View File

@@ -18,10 +18,13 @@ use crate::config::VersionedConfig;
mod config;
mod dashboard;
mod logs;
mod me;
mod video_sources;
mod videos;
pub use logs::MpscWriter;
pub fn router() -> Router {
Router::new().route("/image-proxy", get(image_proxy)).nest(
"/api",
@@ -30,6 +33,7 @@ pub fn router() -> Router {
.merge(video_sources::router())
.merge(videos::router())
.merge(dashboard::router())
.merge(logs::router())
.layer(middleware::from_fn(auth)),
)
}

View File

@@ -147,7 +147,7 @@ mod tests {
#[ignore = "only for manual test"]
#[tokio::test]
async fn test_video_info_type() {
init_logger("None,bili_sync=debug");
init_logger("None,bili_sync=debug", None);
let bili_client = BiliClient::new();
// 请求 UP 主视频必须要获取 mixin key使用 key 计算请求参数的签名,否则直接提示权限不足返回空
let Ok(Some(mixin_key)) = bili_client.wbi_img().await.map(|wbi_img| wbi_img.into()) else {

View File

@@ -12,16 +12,19 @@ mod task;
mod utils;
mod workflow;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::future::Future;
use std::sync::Arc;
use bilibili::BiliClient;
use parking_lot::Mutex;
use sea_orm::DatabaseConnection;
use task::{http_server, video_downloader};
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use crate::api::MpscWriter;
use crate::config::{ARGS, VersionedConfig};
use crate::database::setup_database;
use crate::utils::init_logger;
@@ -29,7 +32,7 @@ use crate::utils::signal::terminate;
#[tokio::main]
async fn main() {
let connection = init().await;
let (connection, log_writer) = init().await;
let bili_client = Arc::new(BiliClient::new());
let token = CancellationToken::new();
@@ -37,7 +40,7 @@ async fn main() {
spawn_task(
"HTTP 服务",
http_server(connection.clone(), bili_client.clone()),
http_server(connection.clone(), bili_client.clone(), log_writer),
&tracker,
token.clone(),
);
@@ -73,9 +76,13 @@ fn spawn_task(
});
}
/// 初始化日志系统、打印欢迎信息,初始化数据库连接和全局配置,最终返回数据库连接
async fn init() -> Arc<DatabaseConnection> {
init_logger(&ARGS.log_level);
/// 初始化日志系统、打印欢迎信息,初始化数据库连接和全局配置
async fn init() -> (Arc<DatabaseConnection>, MpscWriter) {
let (tx, _rx) = tokio::sync::broadcast::channel(30);
let log_history = Arc::new(Mutex::new(VecDeque::with_capacity(20)));
let log_writer = MpscWriter::new(tx, log_history.clone());
init_logger(&ARGS.log_level, Some(log_writer.clone()));
info!("欢迎使用 Bili-Sync当前程序版本{}", config::version());
info!("项目地址https://github.com/amtoaer/bili-sync");
let connection = Arc::new(setup_database().await.expect("数据库初始化失败"));
@@ -83,7 +90,7 @@ async fn init() -> Arc<DatabaseConnection> {
VersionedConfig::init(&connection).await.expect("配置初始化失败");
info!("配置初始化完成");
connection
(connection, log_writer)
}
async fn handle_shutdown(tracker: TaskTracker, token: CancellationToken) {

View File

@@ -10,7 +10,7 @@ use reqwest::StatusCode;
use rust_embed_for_web::{EmbedableFile, RustEmbed};
use sea_orm::DatabaseConnection;
use crate::api::router;
use crate::api::{MpscWriter, router};
use crate::bilibili::BiliClient;
use crate::config::VersionedConfig;
@@ -20,11 +20,16 @@ use crate::config::VersionedConfig;
#[folder = "../../web/build"]
struct Asset;
pub async fn http_server(database_connection: Arc<DatabaseConnection>, bili_client: Arc<BiliClient>) -> Result<()> {
pub async fn http_server(
database_connection: Arc<DatabaseConnection>,
bili_client: Arc<BiliClient>,
log_writer: MpscWriter,
) -> Result<()> {
let app = router()
.fallback_service(get(frontend_files))
.layer(Extension(database_connection))
.layer(Extension(bili_client));
.layer(Extension(bili_client))
.layer(Extension(log_writer));
let config = VersionedConfig::get().load_full();
let listener = tokio::net::TcpListener::bind(&config.bind_address)
.await

View File

@@ -6,17 +6,35 @@ pub mod nfo;
pub mod signal;
pub mod status;
pub mod validation;
use tracing_subscriber::fmt;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
pub fn init_logger(log_level: &str) {
tracing_subscriber::fmt::Subscriber::builder()
use crate::api::MpscWriter;
pub fn init_logger(log_level: &str, log_writer: Option<MpscWriter>) {
let log = tracing_subscriber::fmt::Subscriber::builder()
.compact()
.with_env_filter(tracing_subscriber::EnvFilter::builder().parse_lossy(log_level))
.with_target(false)
.with_timer(tracing_subscriber::fmt::time::ChronoLocal::new(
"%b %d %H:%M:%S".to_owned(),
))
.finish()
.finish();
if let Some(writer) = log_writer {
log.with(
fmt::layer()
.with_ansi(false)
.with_timer(tracing_subscriber::fmt::time::ChronoLocal::new(
"%b %d %H:%M:%S".to_owned(),
))
.json()
.flatten_event(true)
.with_writer(writer),
)
.try_init()
.expect("初始化日志失败");
} else {
log.try_init().expect("初始化日志失败");
}
}