feat: 支持根据筛选条件批量编辑视频的下载状态 (#558)

This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2025-12-06 19:47:16 +08:00
committed by GitHub
parent 930660045f
commit f1703096fd
10 changed files with 761 additions and 114 deletions

View File

@@ -1,49 +1,90 @@
use std::borrow::Borrow;
use itertools::Itertools;
use sea_orm::{ConnectionTrait, DatabaseTransaction};
use crate::api::response::{PageInfo, VideoInfo};
use crate::api::response::{PageInfo, SimplePageInfo, SimpleVideoInfo, VideoInfo};
pub async fn update_video_download_status(
pub trait VideoRecord {
fn as_id_status_tuple(&self) -> (i32, u32);
}
pub trait PageRecord {
fn as_id_status_tuple(&self) -> (i32, u32);
}
impl VideoRecord for VideoInfo {
fn as_id_status_tuple(&self) -> (i32, u32) {
(self.id, self.download_status)
}
}
impl VideoRecord for SimpleVideoInfo {
fn as_id_status_tuple(&self) -> (i32, u32) {
(self.id, self.download_status)
}
}
impl PageRecord for PageInfo {
fn as_id_status_tuple(&self) -> (i32, u32) {
(self.id, self.download_status)
}
}
impl PageRecord for SimplePageInfo {
fn as_id_status_tuple(&self) -> (i32, u32) {
(self.id, self.download_status)
}
}
pub async fn update_video_download_status<T>(
txn: &DatabaseTransaction,
videos: &[impl Borrow<VideoInfo>],
videos: &[impl Borrow<T>],
batch_size: Option<usize>,
) -> Result<(), sea_orm::DbErr> {
) -> Result<(), sea_orm::DbErr>
where
T: VideoRecord,
{
if videos.is_empty() {
return Ok(());
}
let videos = videos.iter().map(|v| v.borrow()).collect::<Vec<_>>();
if let Some(size) = batch_size {
for chunk in videos.chunks(size) {
execute_video_update_batch(txn, chunk).await?;
execute_video_update_batch(txn, chunk.iter().map(|v| v.borrow().as_id_status_tuple())).await?;
}
} else {
execute_video_update_batch(txn, &videos).await?;
execute_video_update_batch(txn, videos.iter().map(|v| v.borrow().as_id_status_tuple())).await?;
}
Ok(())
}
pub async fn update_page_download_status(
pub async fn update_page_download_status<T>(
txn: &DatabaseTransaction,
pages: &[impl Borrow<PageInfo>],
pages: &[impl Borrow<T>],
batch_size: Option<usize>,
) -> Result<(), sea_orm::DbErr> {
) -> Result<(), sea_orm::DbErr>
where
T: PageRecord,
{
if pages.is_empty() {
return Ok(());
}
let pages = pages.iter().map(|v| v.borrow()).collect::<Vec<_>>();
if let Some(size) = batch_size {
for chunk in pages.chunks(size) {
execute_page_update_batch(txn, chunk).await?;
execute_page_update_batch(txn, chunk.iter().map(|v| v.borrow().as_id_status_tuple())).await?;
}
} else {
execute_page_update_batch(txn, &pages).await?;
execute_page_update_batch(txn, pages.iter().map(|v| v.borrow().as_id_status_tuple())).await?;
}
Ok(())
}
async fn execute_video_update_batch(txn: &DatabaseTransaction, videos: &[&VideoInfo]) -> Result<(), sea_orm::DbErr> {
if videos.is_empty() {
async fn execute_video_update_batch(
txn: &DatabaseTransaction,
videos: impl Iterator<Item = (i32, u32)>,
) -> Result<(), sea_orm::DbErr> {
let values = videos.map(|v| format!("({}, {})", v.0, v.1)).join(", ");
if values.is_empty() {
return Ok(());
}
let sql = format!(
@@ -52,18 +93,21 @@ async fn execute_video_update_batch(txn: &DatabaseTransaction, videos: &[&VideoI
SET download_status = tempdata.download_status \
FROM tempdata \
WHERE video.id = tempdata.id",
videos
.iter()
.map(|v| format!("({}, {})", v.id, v.download_status))
.collect::<Vec<_>>()
.join(", ")
values
);
txn.execute_unprepared(&sql).await?;
Ok(())
}
async fn execute_page_update_batch(txn: &DatabaseTransaction, pages: &[&PageInfo]) -> Result<(), sea_orm::DbErr> {
if pages.is_empty() {
async fn execute_page_update_batch(
txn: &DatabaseTransaction,
pages: impl Iterator<Item = (i32, u32)>,
) -> Result<(), sea_orm::DbErr> {
let values = pages
.map(|p| format!("({}, {})", p.0, p.1))
.collect::<Vec<_>>()
.join(", ");
if values.is_empty() {
return Ok(());
}
let sql = format!(
@@ -72,11 +116,7 @@ async fn execute_page_update_batch(txn: &DatabaseTransaction, pages: &[&PageInfo
SET download_status = tempdata.download_status \
FROM tempdata \
WHERE page.id = tempdata.id",
pages
.iter()
.map(|p| format!("({}, {})", p.id, p.download_status))
.collect::<Vec<_>>()
.join(", ")
values
);
txn.execute_unprepared(&sql).await?;
Ok(())

View File

@@ -16,7 +16,18 @@ pub struct VideosRequest {
}
#[derive(Deserialize)]
pub struct ResetRequest {
pub struct ResetVideoStatusRequest {
#[serde(default)]
pub force: bool,
}
#[derive(Deserialize)]
pub struct ResetFilteredVideoStatusRequest {
pub collection: Option<i32>,
pub favorite: Option<i32>,
pub submission: Option<i32>,
pub watch_later: Option<i32>,
pub query: Option<String>,
#[serde(default)]
pub force: bool,
}
@@ -46,6 +57,21 @@ pub struct UpdateVideoStatusRequest {
pub page_updates: Vec<PageStatusUpdate>,
}
#[derive(Deserialize, Validate)]
pub struct UpdateFilteredVideoStatusRequest {
pub collection: Option<i32>,
pub favorite: Option<i32>,
pub submission: Option<i32>,
pub watch_later: Option<i32>,
pub query: Option<String>,
#[serde(default)]
#[validate(nested)]
pub video_updates: Vec<StatusUpdate>,
#[serde(default)]
#[validate(nested)]
pub page_updates: Vec<StatusUpdate>,
}
#[derive(Deserialize)]
pub struct FollowedCollectionsRequest {
pub page_num: Option<i32>,

View File

@@ -33,7 +33,7 @@ pub struct ResetVideoResponse {
}
#[derive(Serialize)]
pub struct ResetAllVideosResponse {
pub struct ResetFilteredVideosResponse {
pub resetted: bool,
pub resetted_videos_count: usize,
pub resetted_pages_count: usize,
@@ -46,6 +46,13 @@ pub struct UpdateVideoStatusResponse {
pub pages: Vec<PageInfo>,
}
#[derive(Serialize)]
pub struct UpdateFilteredVideoStatusResponse {
pub success: bool,
pub updated_videos_count: usize,
pub updated_pages_count: usize,
}
#[derive(FromQueryResult, Serialize)]
pub struct VideoSource {
pub id: i32,
@@ -75,6 +82,21 @@ pub struct PageInfo {
pub download_status: u32,
}
#[derive(Serialize, DerivePartialModel, FromQueryResult, Clone, Copy)]
#[sea_orm(entity = "video::Entity")]
pub struct SimpleVideoInfo {
pub id: i32,
pub download_status: u32,
}
#[derive(Serialize, DerivePartialModel, FromQueryResult, Clone, Copy)]
#[sea_orm(entity = "page::Entity")]
pub struct SimplePageInfo {
pub id: i32,
pub video_id: i32,
pub download_status: u32,
}
fn serde_video_download_status<S>(status: &u32, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,

View File

@@ -11,10 +11,13 @@ use sea_orm::{
use crate::api::error::InnerApiError;
use crate::api::helper::{update_page_download_status, update_video_download_status};
use crate::api::request::{ResetRequest, UpdateVideoStatusRequest, VideosRequest};
use crate::api::request::{
ResetFilteredVideoStatusRequest, ResetVideoStatusRequest, UpdateFilteredVideoStatusRequest,
UpdateVideoStatusRequest, VideosRequest,
};
use crate::api::response::{
PageInfo, ResetAllVideosResponse, ResetVideoResponse, UpdateVideoStatusResponse, VideoInfo, VideoResponse,
VideosResponse,
PageInfo, ResetFilteredVideosResponse, ResetVideoResponse, SimplePageInfo, SimpleVideoInfo,
UpdateFilteredVideoStatusResponse, UpdateVideoStatusResponse, VideoInfo, VideoResponse, VideosResponse,
};
use crate::api::wrapper::{ApiError, ApiResponse, ValidatedJson};
use crate::utils::status::{PageStatus, VideoStatus};
@@ -23,9 +26,10 @@ pub(super) fn router() -> Router {
Router::new()
.route("/videos", get(get_videos))
.route("/videos/{id}", get(get_video))
.route("/videos/{id}/reset", post(reset_video))
.route("/videos/reset-all", post(reset_all_videos))
.route("/videos/{id}/reset-status", post(reset_video_status))
.route("/videos/{id}/update-status", post(update_video_status))
.route("/videos/reset-status", post(reset_filtered_video_status))
.route("/videos/update-status", post(update_filtered_video_status))
}
/// 列出视频的基本信息,支持根据视频来源筛选、名称查找和分页
@@ -89,10 +93,10 @@ pub async fn get_video(
}))
}
pub async fn reset_video(
pub async fn reset_video_status(
Path(id): Path<i32>,
Extension(db): Extension<DatabaseConnection>,
Json(request): Json<ResetRequest>,
Json(request): Json<ResetVideoStatusRequest>,
) -> Result<ApiResponse<ResetVideoResponse>, ApiError> {
let (video_info, pages_info) = tokio::try_join!(
video::Entity::find_by_id(id).into_partial_model::<VideoInfo>().one(&db),
@@ -134,7 +138,7 @@ pub async fn reset_video(
let txn = db.begin().await?;
if !resetted_videos_info.is_empty() {
// 只可能有 1 个元素,所以不用 batch
update_video_download_status(&txn, &resetted_videos_info, None).await?;
update_video_download_status::<VideoInfo>(&txn, &resetted_videos_info, None).await?;
}
if !resetted_pages_info.is_empty() {
update_page_download_status(&txn, &resetted_pages_info, Some(500)).await?;
@@ -148,15 +152,34 @@ pub async fn reset_video(
}))
}
pub async fn reset_all_videos(
pub async fn reset_filtered_video_status(
Extension(db): Extension<DatabaseConnection>,
Json(request): Json<ResetRequest>,
) -> Result<ApiResponse<ResetAllVideosResponse>, ApiError> {
// 先查询所有视频和页面数据
let (all_videos, all_pages) = tokio::try_join!(
video::Entity::find().into_partial_model::<VideoInfo>().all(&db),
page::Entity::find().into_partial_model::<PageInfo>().all(&db)
)?;
Json(request): Json<ResetFilteredVideoStatusRequest>,
) -> Result<ApiResponse<ResetFilteredVideosResponse>, ApiError> {
let mut query = video::Entity::find();
for (field, column) in [
(request.collection, video::Column::CollectionId),
(request.favorite, video::Column::FavoriteId),
(request.submission, video::Column::SubmissionId),
(request.watch_later, video::Column::WatchLaterId),
] {
if let Some(id) = field {
query = query.filter(column.eq(id));
}
}
if let Some(query_word) = request.query {
query = query.filter(
video::Column::Name
.contains(&query_word)
.or(video::Column::Bvid.contains(query_word)),
);
}
let all_videos = query.into_partial_model::<SimpleVideoInfo>().all(&db).await?;
let all_pages = page::Entity::find()
.filter(page::Column::VideoId.is_in(all_videos.iter().map(|v| v.id)))
.into_partial_model::<SimplePageInfo>()
.all(&db)
.await?;
let resetted_pages_info = all_pages
.into_iter()
.filter_map(|mut page_info| {
@@ -200,7 +223,7 @@ pub async fn reset_all_videos(
}
txn.commit().await?;
}
Ok(ApiResponse::ok(ResetAllVideosResponse {
Ok(ApiResponse::ok(ResetFilteredVideosResponse {
resetted: has_video_updates || has_page_updates,
resetted_videos_count: resetted_videos_info.len(),
resetted_pages_count: resetted_pages_info.len(),
@@ -248,10 +271,10 @@ pub async fn update_video_status(
if has_video_updates || has_page_updates {
let txn = db.begin().await?;
if has_video_updates {
update_video_download_status(&txn, &[&video_info], None).await?;
update_video_download_status::<VideoInfo>(&txn, &[&video_info], None).await?;
}
if has_page_updates {
update_page_download_status(&txn, &updated_pages_info, None).await?;
update_page_download_status::<PageInfo>(&txn, &updated_pages_info, None).await?;
}
txn.commit().await?;
}
@@ -261,3 +284,64 @@ pub async fn update_video_status(
pages: pages_info,
}))
}
pub async fn update_filtered_video_status(
Extension(db): Extension<DatabaseConnection>,
ValidatedJson(request): ValidatedJson<UpdateFilteredVideoStatusRequest>,
) -> Result<ApiResponse<UpdateFilteredVideoStatusResponse>, ApiError> {
let mut query = video::Entity::find();
for (field, column) in [
(request.collection, video::Column::CollectionId),
(request.favorite, video::Column::FavoriteId),
(request.submission, video::Column::SubmissionId),
(request.watch_later, video::Column::WatchLaterId),
] {
if let Some(id) = field {
query = query.filter(column.eq(id));
}
}
if let Some(query_word) = request.query {
query = query.filter(
video::Column::Name
.contains(&query_word)
.or(video::Column::Bvid.contains(query_word)),
);
}
let mut all_videos = query.into_partial_model::<SimpleVideoInfo>().all(&db).await?;
let mut all_pages = page::Entity::find()
.filter(page::Column::VideoId.is_in(all_videos.iter().map(|v| v.id)))
.into_partial_model::<SimplePageInfo>()
.all(&db)
.await?;
for video_info in all_videos.iter_mut() {
let mut video_status = VideoStatus::from(video_info.download_status);
for update in &request.video_updates {
video_status.set(update.status_index, update.status_value);
}
video_info.download_status = video_status.into();
}
for page_info in all_pages.iter_mut() {
let mut page_status = PageStatus::from(page_info.download_status);
for update in &request.page_updates {
page_status.set(update.status_index, update.status_value);
}
page_info.download_status = page_status.into();
}
let has_video_updates = !all_videos.is_empty();
let has_page_updates = !all_pages.is_empty();
if has_video_updates || has_page_updates {
let txn = db.begin().await?;
if has_video_updates {
update_video_download_status(&txn, &all_videos, Some(500)).await?;
}
if has_page_updates {
update_page_download_status(&txn, &all_pages, Some(500)).await?;
}
txn.commit().await?;
}
Ok(ApiResponse::ok(UpdateFilteredVideoStatusResponse {
success: has_video_updates || has_page_updates,
updated_videos_count: all_videos.len(),
updated_pages_count: all_pages.len(),
}))
}