feat: 重构优化部分 API,支持重置全体失败的任务 (#351)

This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2025-06-04 17:04:15 +08:00
committed by GitHub
parent 45849957ff
commit c528152986
9 changed files with 380 additions and 154 deletions

View File

@@ -1,27 +1,30 @@
use std::collections::HashSet;
use std::sync::Arc;
use anyhow::{Result, anyhow};
use anyhow::Result;
use axum::extract::{Extension, Path, Query};
use bili_sync_entity::*;
use bili_sync_migration::{Expr, OnConflict};
use bili_sync_migration::Expr;
use sea_orm::{
ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, QueryOrder,
QuerySelect, Set, TransactionTrait, Unchanged,
ColumnTrait, DatabaseConnection, EntityTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect,
TransactionTrait,
};
use utoipa::OpenApi;
use crate::api::auth::OpenAPIAuth;
use crate::api::error::InnerApiError;
use crate::api::helper::{update_page_download_status, update_video_download_status};
use crate::api::request::VideosRequest;
use crate::api::response::{
PageInfo, ResetVideoResponse, VideoInfo, VideoResponse, VideoSource, VideoSourcesResponse, VideosResponse,
PageInfo, ResetAllVideosResponse, ResetVideoResponse, VideoInfo, VideoResponse, VideoSource, VideoSourcesResponse,
VideosResponse,
};
use crate::api::wrapper::{ApiError, ApiResponse};
use crate::utils::status::{PageStatus, VideoStatus};
#[derive(OpenApi)]
#[openapi(
paths(get_video_sources, get_videos, get_video, reset_video),
paths(get_video_sources, get_videos, get_video, reset_video, reset_all_videos),
modifiers(&OpenAPIAuth),
security(
("Token" = []),
@@ -40,33 +43,35 @@ pub struct ApiDoc;
pub async fn get_video_sources(
Extension(db): Extension<Arc<DatabaseConnection>>,
) -> Result<ApiResponse<VideoSourcesResponse>, ApiError> {
Ok(ApiResponse::ok(VideoSourcesResponse {
collection: collection::Entity::find()
let (collection, favorite, submission, watch_later) = tokio::try_join!(
collection::Entity::find()
.select_only()
.columns([collection::Column::Id, collection::Column::Name])
.into_model::<VideoSource>()
.all(db.as_ref())
.await?,
favorite: favorite::Entity::find()
.all(db.as_ref()),
favorite::Entity::find()
.select_only()
.columns([favorite::Column::Id, favorite::Column::Name])
.into_model::<VideoSource>()
.all(db.as_ref())
.await?,
submission: submission::Entity::find()
.all(db.as_ref()),
submission::Entity::find()
.select_only()
.column(submission::Column::Id)
.column_as(submission::Column::UpperName, "name")
.into_model::<VideoSource>()
.all(db.as_ref())
.await?,
watch_later: watch_later::Entity::find()
.all(db.as_ref()),
watch_later::Entity::find()
.select_only()
.column(watch_later::Column::Id)
.column_as(Expr::value("稍后再看"), "name")
.into_model::<VideoSource>()
.all(db.as_ref())
.await?,
)?;
Ok(ApiResponse::ok(VideoSourcesResponse {
collection,
favorite,
submission,
watch_later,
}))
}
@@ -103,25 +108,15 @@ pub async fn get_videos(
let (page, page_size) = if let (Some(page), Some(page_size)) = (params.page, params.page_size) {
(page, page_size)
} else {
(1, 10)
(0, 10)
};
Ok(ApiResponse::ok(VideosResponse {
videos: query
.order_by_desc(video::Column::Id)
.select_only()
.columns([
video::Column::Id,
video::Column::Name,
video::Column::UpperName,
video::Column::DownloadStatus,
])
.into_tuple::<(i32, String, String, u32)>()
.into_partial_model::<VideoInfo>()
.paginate(db.as_ref(), page_size)
.fetch_page(page)
.await?
.into_iter()
.map(VideoInfo::from)
.collect(),
.await?,
total_count,
}))
}
@@ -138,40 +133,22 @@ pub async fn get_video(
Path(id): Path<i32>,
Extension(db): Extension<Arc<DatabaseConnection>>,
) -> Result<ApiResponse<VideoResponse>, ApiError> {
let video_info = video::Entity::find_by_id(id)
.select_only()
.columns([
video::Column::Id,
video::Column::Name,
video::Column::UpperName,
video::Column::DownloadStatus,
])
.into_tuple::<(i32, String, String, u32)>()
.one(db.as_ref())
.await?
.map(VideoInfo::from);
let (video_info, pages_info) = tokio::try_join!(
video::Entity::find_by_id(id)
.into_partial_model::<VideoInfo>()
.one(db.as_ref()),
page::Entity::find()
.filter(page::Column::VideoId.eq(id))
.order_by_asc(page::Column::Cid)
.into_partial_model::<PageInfo>()
.all(db.as_ref())
)?;
let Some(video_info) = video_info else {
return Err(InnerApiError::NotFound(id).into());
};
let pages = page::Entity::find()
.filter(page::Column::VideoId.eq(id))
.order_by_asc(page::Column::Pid)
.select_only()
.columns([
page::Column::Id,
page::Column::Pid,
page::Column::Name,
page::Column::DownloadStatus,
])
.into_tuple::<(i32, i32, String, u32)>()
.all(db.as_ref())
.await?
.into_iter()
.map(PageInfo::from)
.collect();
Ok(ApiResponse::ok(VideoResponse {
video: video_info,
pages,
pages: pages_info,
}))
}
@@ -180,73 +157,129 @@ pub async fn get_video(
post,
path = "/api/videos/{id}/reset",
responses(
(status = 200, body = ApiResponse<ResetVideoResponse> ),
(status = 200, body = ApiResponse<ResetVideoResponse>),
)
)]
pub async fn reset_video(
Path(id): Path<i32>,
Extension(db): Extension<Arc<DatabaseConnection>>,
) -> Result<ApiResponse<ResetVideoResponse>, ApiError> {
let txn = db.begin().await?;
let video_status: Option<u32> = video::Entity::find_by_id(id)
.select_only()
.column(video::Column::DownloadStatus)
.into_tuple()
.one(&txn)
.await?;
let Some(video_status) = video_status else {
return Err(anyhow!(InnerApiError::NotFound(id)).into());
let (video_info, pages_info) = tokio::try_join!(
video::Entity::find_by_id(id)
.into_partial_model::<VideoInfo>()
.one(db.as_ref()),
page::Entity::find()
.filter(page::Column::VideoId.eq(id))
.order_by_asc(page::Column::Cid)
.into_partial_model::<PageInfo>()
.all(db.as_ref())
)?;
let Some(mut video_info) = video_info else {
return Err(InnerApiError::NotFound(id).into());
};
let resetted_pages_model: Vec<_> = page::Entity::find()
.filter(page::Column::VideoId.eq(id))
.all(&txn)
.await?
let resetted_pages_info = pages_info
.into_iter()
.filter_map(|mut model| {
let mut page_status = PageStatus::from(model.download_status);
.filter_map(|mut page_info| {
let mut page_status = PageStatus::from(page_info.download_status);
if page_status.reset_failed() {
model.download_status = page_status.into();
Some(model)
page_info.download_status = page_status.into();
Some(page_info)
} else {
None
}
})
.collect();
let mut video_status = VideoStatus::from(video_status);
let mut should_update_video = video_status.reset_failed();
if !resetted_pages_model.is_empty() {
// 视频状态标志的第 5 位表示是否有分 P 下载失败,如果有需要重置的分页,需要同时重置视频的该状态
video_status.set(4, 0);
should_update_video = true;
.collect::<Vec<_>>();
let mut video_status = VideoStatus::from(video_info.download_status);
let mut video_resetted = video_status.reset_failed();
if !resetted_pages_info.is_empty() {
video_status.set(4, 0); // 将“分P下载”重置为 0
video_resetted = true;
}
if should_update_video {
video::Entity::update(video::ActiveModel {
id: Unchanged(id),
download_status: Set(video_status.into()),
..Default::default()
})
.exec(&txn)
.await?;
let resetted_videos_info = if video_resetted {
video_info.download_status = video_status.into();
vec![video_info.clone()]
} else {
vec![]
};
let resetted = !resetted_videos_info.is_empty() || !resetted_pages_info.is_empty();
if resetted {
let txn = db.begin().await?;
if !resetted_videos_info.is_empty() {
// 只可能有 1 个元素,所以不用 batch
update_video_download_status(&txn, &resetted_videos_info, None).await?;
}
if !resetted_pages_info.is_empty() {
update_page_download_status(&txn, &resetted_pages_info, Some(500)).await?;
}
txn.commit().await?;
}
let resetted_pages_id: Vec<_> = resetted_pages_model.iter().map(|model| model.id).collect();
let resetted_pages_model: Vec<page::ActiveModel> = resetted_pages_model
.into_iter()
.map(|model| model.into_active_model())
.collect();
for page_trunk in resetted_pages_model.chunks(50) {
page::Entity::insert_many(page_trunk.to_vec())
.on_conflict(
OnConflict::column(page::Column::Id)
.update_column(page::Column::DownloadStatus)
.to_owned(),
)
.exec(&txn)
.await?;
}
txn.commit().await?;
Ok(ApiResponse::ok(ResetVideoResponse {
resetted: should_update_video,
video: id,
pages: resetted_pages_id,
resetted,
video: video_info,
pages: resetted_pages_info,
}))
}
/// 重置所有视频和页面的失败状态为未下载状态,这样在下次下载任务中会触发重试
#[utoipa::path(
post,
path = "/api/videos/reset-all",
responses(
(status = 200, body = ApiResponse<ResetAllVideosResponse>),
)
)]
pub async fn reset_all_videos(
Extension(db): Extension<Arc<DatabaseConnection>>,
) -> Result<ApiResponse<ResetAllVideosResponse>, ApiError> {
// 先查询所有视频和页面数据
let (all_videos, all_pages) = tokio::try_join!(
video::Entity::find().into_partial_model::<VideoInfo>().all(db.as_ref()),
page::Entity::find().into_partial_model::<PageInfo>().all(db.as_ref())
)?;
let resetted_pages_info = all_pages
.into_iter()
.filter_map(|mut page_info| {
let mut page_status = PageStatus::from(page_info.download_status);
if page_status.reset_failed() {
page_info.download_status = page_status.into();
Some(page_info)
} else {
None
}
})
.collect::<Vec<_>>();
let video_ids_with_resetted_pages: HashSet<i32> = resetted_pages_info.iter().map(|page| page.video_id).collect();
let resetted_videos_info = all_videos
.into_iter()
.filter_map(|mut video_info| {
let mut video_status = VideoStatus::from(video_info.download_status);
let mut video_resetted = video_status.reset_failed();
if video_ids_with_resetted_pages.contains(&video_info.id) {
video_status.set(4, 0); // 将"分P下载"重置为 0
video_resetted = true;
}
if video_resetted {
video_info.download_status = video_status.into();
Some(video_info)
} else {
None
}
})
.collect::<Vec<_>>();
let resetted = !(resetted_videos_info.is_empty() && resetted_pages_info.is_empty());
if resetted {
let txn = db.begin().await?;
if !resetted_videos_info.is_empty() {
update_video_download_status(&txn, &resetted_videos_info, Some(500)).await?;
}
if !resetted_pages_info.is_empty() {
update_page_download_status(&txn, &resetted_pages_info, Some(500)).await?;
}
txn.commit().await?;
}
Ok(ApiResponse::ok(ResetAllVideosResponse {
resetted,
resetted_videos_count: resetted_videos_info.len(),
resetted_pages_count: resetted_pages_info.len(),
}))
}

View File

@@ -0,0 +1,79 @@
use sea_orm::{ConnectionTrait, DatabaseTransaction};
use crate::api::response::{PageInfo, VideoInfo};
pub async fn update_video_download_status(
txn: &DatabaseTransaction,
videos: &[VideoInfo],
batch_size: Option<usize>,
) -> Result<(), sea_orm::DbErr> {
if videos.is_empty() {
return Ok(());
}
if let Some(size) = batch_size {
for chunk in videos.chunks(size) {
execute_video_update_batch(txn, chunk).await?;
}
} else {
execute_video_update_batch(txn, videos).await?;
}
Ok(())
}
pub async fn update_page_download_status(
txn: &DatabaseTransaction,
pages: &[PageInfo],
batch_size: Option<usize>,
) -> Result<(), sea_orm::DbErr> {
if pages.is_empty() {
return Ok(());
}
if let Some(size) = batch_size {
for chunk in pages.chunks(size) {
execute_page_update_batch(txn, chunk).await?;
}
} else {
execute_page_update_batch(txn, pages).await?;
}
Ok(())
}
async fn execute_video_update_batch(txn: &DatabaseTransaction, videos: &[VideoInfo]) -> Result<(), sea_orm::DbErr> {
if videos.is_empty() {
return Ok(());
}
let sql = format!(
"WITH tempdata(id, download_status) AS (VALUES {}) \
UPDATE video \
SET download_status = tempdata.download_status \
FROM tempdata \
WHERE video.id = tempdata.id",
videos
.iter()
.map(|v| format!("({}, {})", v.id, v.download_status))
.collect::<Vec<_>>()
.join(", ")
);
txn.execute_unprepared(&sql).await?;
Ok(())
}
async fn execute_page_update_batch(txn: &DatabaseTransaction, pages: &[PageInfo]) -> Result<(), sea_orm::DbErr> {
if pages.is_empty() {
return Ok(());
}
let sql = format!(
"WITH tempdata(id, download_status) AS (VALUES {}) \
UPDATE page \
SET download_status = tempdata.download_status \
FROM tempdata \
WHERE page.id = tempdata.id",
pages
.iter()
.map(|p| format!("({}, {})", p.id, p.download_status))
.collect::<Vec<_>>()
.join(", ")
);
txn.execute_unprepared(&sql).await?;
Ok(())
}

View File

@@ -2,6 +2,7 @@ pub mod auth;
pub mod handler;
mod error;
mod helper;
mod request;
mod response;
mod wrapper;

View File

@@ -1,4 +1,5 @@
use sea_orm::FromQueryResult;
use bili_sync_entity::*;
use sea_orm::{DerivePartialModel, FromQueryResult};
use serde::Serialize;
use utoipa::ToSchema;
@@ -27,8 +28,15 @@ pub struct VideoResponse {
#[derive(Serialize, ToSchema)]
pub struct ResetVideoResponse {
pub resetted: bool,
pub video: i32,
pub pages: Vec<i32>,
pub video: VideoInfo,
pub pages: Vec<PageInfo>,
}
#[derive(Serialize, ToSchema)]
pub struct ResetAllVideosResponse {
pub resetted: bool,
pub resetted_videos_count: usize,
pub resetted_pages_count: usize,
}
#[derive(FromQueryResult, Serialize, ToSchema)]
@@ -37,40 +45,39 @@ pub struct VideoSource {
name: String,
}
#[derive(Serialize, ToSchema)]
pub struct PageInfo {
pub id: i32,
pub pid: i32,
pub name: String,
pub download_status: [u32; 5],
}
impl From<(i32, i32, String, u32)> for PageInfo {
fn from((id, pid, name, download_status): (i32, i32, String, u32)) -> Self {
Self {
id,
pid,
name,
download_status: PageStatus::from(download_status).into(),
}
}
}
#[derive(Serialize, ToSchema)]
#[derive(Serialize, ToSchema, DerivePartialModel, FromQueryResult, Clone)]
#[sea_orm(entity = "video::Entity")]
pub struct VideoInfo {
pub id: i32,
pub name: String,
pub upper_name: String,
pub download_status: [u32; 5],
#[serde(serialize_with = "serde_video_download_status")]
pub download_status: u32,
}
impl From<(i32, String, String, u32)> for VideoInfo {
fn from((id, name, upper_name, download_status): (i32, String, String, u32)) -> Self {
Self {
id,
name,
upper_name,
download_status: VideoStatus::from(download_status).into(),
}
}
#[derive(Serialize, ToSchema, DerivePartialModel, FromQueryResult)]
#[sea_orm(entity = "page::Entity")]
pub struct PageInfo {
pub id: i32,
pub video_id: i32,
pub pid: i32,
pub name: String,
#[serde(serialize_with = "serde_page_download_status")]
pub download_status: u32,
}
fn serde_video_download_status<S>(status: &u32, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let status: [u32; 5] = VideoStatus::from(*status).into();
status.serialize(serializer)
}
fn serde_page_download_status<S>(status: &u32, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let status: [u32; 5] = PageStatus::from(*status).into();
status.serialize(serializer)
}

View File

@@ -13,7 +13,7 @@ use utoipa::OpenApi;
use utoipa_swagger_ui::{Config, SwaggerUi};
use crate::api::auth;
use crate::api::handler::{ApiDoc, get_video, get_video_sources, get_videos, reset_video};
use crate::api::handler::{ApiDoc, get_video, get_video_sources, get_videos, reset_all_videos, reset_video};
use crate::config::CONFIG;
#[derive(Embed)]
@@ -26,6 +26,7 @@ pub async fn http_server(database_connection: Arc<DatabaseConnection>) -> Result
.route("/api/videos", get(get_videos))
.route("/api/videos/{id}", get(get_video))
.route("/api/videos/{id}/reset", post(reset_video))
.route("/api/videos/reset-all", post(reset_all_videos))
.merge(
SwaggerUi::new("/swagger-ui/")
.url("/api-docs/openapi.json", ApiDoc::openapi())

View File

@@ -5,6 +5,7 @@ import type {
VideosResponse,
VideoResponse,
ResetVideoResponse,
ResetAllVideosResponse,
ApiError
} from './types';
@@ -146,6 +147,13 @@ class ApiClient {
async resetVideo(id: number): Promise<ApiResponse<ResetVideoResponse>> {
return this.post<ResetVideoResponse>(`/videos/${id}/reset`);
}
/**
* 重置所有视频下载状态
*/
async resetAllVideos(): Promise<ApiResponse<ResetAllVideosResponse>> {
return this.post<ResetAllVideosResponse>('/videos/reset-all');
}
}
// 创建默认的 API 客户端实例
@@ -173,6 +181,11 @@ export const api = {
*/
resetVideo: (id: number) => apiClient.resetVideo(id),
/**
* 重置所有视频下载状态
*/
resetAllVideos: () => apiClient.resetAllVideos(),
/**
* 设置认证 token
*/

View File

@@ -61,8 +61,15 @@ export interface VideoResponse {
// 重置视频响应类型
export interface ResetVideoResponse {
resetted: boolean;
video: number;
pages: number[];
video: VideoInfo,
pages: PageInfo[];
}
// 重置所有视频响应类型
export interface ResetAllVideosResponse {
resetted: boolean;
resetted_videos_count: number;
resetted_pages_count: number;
}
// API 错误类型

View File

@@ -2,8 +2,16 @@
import VideoCard from '$lib/components/video-card.svelte';
import FilterBadge from '$lib/components/filter-badge.svelte';
import Pagination from '$lib/components/pagination.svelte';
import { Button } from '$lib/components/ui/button/index.js';
import * as AlertDialog from '$lib/components/ui/alert-dialog/index.js';
import RotateCcwIcon from '@lucide/svelte/icons/rotate-ccw';
import api from '$lib/api';
import type { VideosResponse, VideoSourcesResponse, ApiError } from '$lib/types';
import type {
VideosResponse,
VideoSourcesResponse,
ApiError,
ResetAllVideosResponse
} from '$lib/types';
import { onMount } from 'svelte';
import { page } from '$app/stores';
import { goto } from '$app/navigation';
@@ -26,6 +34,10 @@
let currentFilter: { type: string; id: string } | null = null;
let lastSearch: string | null = null;
// 重置所有视频相关状态
let resetAllDialogOpen = false;
let resettingAll = false;
// 从URL参数获取筛选条件
function getFilterFromURL(searchParams: URLSearchParams) {
for (const source of Object.values(VIDEO_SOURCES)) {
@@ -113,6 +125,33 @@
goto(`/${ToQuery($appStateStore)}`);
}
async function handleResetAllVideos() {
resettingAll = true;
try {
const result = await api.resetAllVideos();
const data = result.data;
if (data.resetted) {
toast.success('重置成功', {
description: `已重置 ${data.resetted_videos_count} 个视频和 ${data.resetted_pages_count} 个分页`
});
// 重新加载当前页面的视频数据
const query = $page.url.searchParams.get('query');
loadVideos(query || '', currentPage, currentFilter);
} else {
toast.info('没有需要重置的视频');
}
} catch (error) {
console.error('重置失败:', error);
toast.error('重置失败', {
description: (error as ApiError).message
});
} finally {
resettingAll = false;
resetAllDialogOpen = false;
}
}
$: if ($page.url.search !== lastSearch) {
lastSearch = $page.url.search;
handleSearchParamsChange();
@@ -141,11 +180,25 @@
<!-- 统计信息 -->
{#if videosData}
<div class="mb-6 flex items-center justify-between">
<div class="text-muted-foreground text-sm">
{videosData.total_count} 个视频
<div class="flex items-center gap-4">
<div class="text-muted-foreground text-sm">
{videosData.total_count} 个视频
</div>
<div class="text-muted-foreground text-sm">
{totalPages}
</div>
</div>
<div class="text-muted-foreground text-sm">
{totalPages}
<div class="flex items-center gap-2">
<Button
size="sm"
variant="outline"
class="text-xs"
onclick={() => (resetAllDialogOpen = true)}
disabled={resettingAll || loading}
>
<RotateCcwIcon class="mr-1.5 h-3 w-3 {resettingAll ? 'animate-spin' : ''}" />
重置所有视频
</Button>
</div>
</div>
{/if}
@@ -176,3 +229,32 @@
</div>
</div>
{/if}
<!-- 重置所有视频确认对话框 -->
<AlertDialog.Root bind:open={resetAllDialogOpen}>
<AlertDialog.Content>
<AlertDialog.Header>
<AlertDialog.Title>重置所有视频</AlertDialog.Title>
<AlertDialog.Description>
此操作将重置所有视频和分页的失败状态为未下载状态,使它们在下次下载任务中重新尝试。
<br />
<strong class="text-destructive">此操作不可撤销,确定要继续吗?</strong>
</AlertDialog.Description>
</AlertDialog.Header>
<AlertDialog.Footer>
<AlertDialog.Cancel disabled={resettingAll}>取消</AlertDialog.Cancel>
<AlertDialog.Action
onclick={handleResetAllVideos}
disabled={resettingAll}
class="bg-destructive text-destructive-foreground hover:bg-destructive/90"
>
{#if resettingAll}
<RotateCcwIcon class="mr-2 h-4 w-4 animate-spin" />
重置中...
{:else}
确认重置
{/if}
</AlertDialog.Action>
</AlertDialog.Footer>
</AlertDialog.Content>
</AlertDialog.Root>

View File

@@ -115,7 +115,10 @@
try {
const result = await api.resetVideo((videoData as VideoResponse).video.id);
if (result.data.resetted) {
await loadVideoDetail();
videoData = {
video: result.data.video,
pages: result.data.pages
};
toast.success('重置成功');
}
} catch (error) {