feat: 确保 video stream 在出现错误时返回 Err (#231)

This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2025-01-24 13:17:12 +08:00
committed by GitHub
parent 7c220f0d2b
commit 9e5a8b0573
11 changed files with 149 additions and 115 deletions

View File

@@ -98,7 +98,10 @@ pub(super) async fn collection_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(Box<dyn VideoListModel>, Pin<Box<dyn Stream<Item = VideoInfo> + 'a>>)> {
) -> Result<(
Box<dyn VideoListModel>,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>,
)> {
let collection = Collection::new(bili_client, collection_item);
let collection_info = collection.get_info().await?;
collection::Entity::insert(collection::ActiveModel {
@@ -133,6 +136,6 @@ pub(super) async fn collection_from<'a>(
.await?
.context("collection not found")?,
),
Box::pin(collection.into_simple_video_stream()),
Box::pin(collection.into_video_stream()),
))
}

View File

@@ -70,7 +70,10 @@ pub(super) async fn favorite_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(Box<dyn VideoListModel>, Pin<Box<dyn Stream<Item = VideoInfo> + 'a>>)> {
) -> Result<(
Box<dyn VideoListModel>,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>,
)> {
let favorite = FavoriteList::new(bili_client, fid.to_owned());
let favorite_info = favorite.get_info().await?;
favorite::Entity::insert(favorite::ActiveModel {

View File

@@ -67,7 +67,10 @@ pub async fn video_list_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(Box<dyn VideoListModel>, Pin<Box<dyn Stream<Item = VideoInfo> + 'a>>)> {
) -> Result<(
Box<dyn VideoListModel>,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>,
)> {
match args {
Args::Favorite { fid } => favorite_from(fid, path, bili_client, connection).await,
Args::Collection { collection_item } => collection_from(collection_item, path, bili_client, connection).await,

View File

@@ -82,7 +82,10 @@ pub(super) async fn submission_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(Box<dyn VideoListModel>, Pin<Box<dyn Stream<Item = VideoInfo> + 'a>>)> {
) -> Result<(
Box<dyn VideoListModel>,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>,
)> {
let submission = Submission::new(bili_client, upper_id.to_owned());
let upper = submission.get_info().await?;
submission::Entity::insert(submission::ActiveModel {

View File

@@ -66,7 +66,10 @@ pub(super) async fn watch_later_from<'a>(
path: &Path,
bili_client: &'a BiliClient,
connection: &DatabaseConnection,
) -> Result<(Box<dyn VideoListModel>, Pin<Box<dyn Stream<Item = VideoInfo> + 'a>>)> {
) -> Result<(
Box<dyn VideoListModel>,
Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>,
)> {
let watch_later = WatchLater::new(bili_client);
watch_later::Entity::insert(watch_later::ActiveModel {
id: Set(1),

View File

@@ -1,7 +1,7 @@
use std::fmt::{Display, Formatter};
use anyhow::Result;
use async_stream::stream;
use anyhow::{anyhow, Context, Result};
use async_stream::try_stream;
use futures::Stream;
use reqwest::Method;
use serde::Deserialize;
@@ -162,35 +162,30 @@ impl<'a> Collection<'a> {
.validate()
}
pub fn into_simple_video_stream(self) -> impl Stream<Item = VideoInfo> + 'a {
stream! {
pub fn into_video_stream(self) -> impl Stream<Item = Result<VideoInfo>> + 'a {
try_stream! {
let mut page = 1;
loop {
let mut videos = match self.get_videos(page).await {
Ok(v) => v,
Err(e) => {
error!(
"failed to get videos of collection {:?} page {}: {}",
self.collection, page, e
);
break;
}
};
let mut videos = self.get_videos(page).await.with_context(|| {
format!(
"failed to get videos of collection {:?} page {}",
self.collection, page
)
})?;
let archives = &mut videos["data"]["archives"];
if archives.as_array().is_none_or(|v| v.is_empty()) {
error!("no videos found in collection {:?} page {}", self.collection, page);
break;
Err(anyhow!(
"no videos found in collection {:?} page {}",
self.collection,
page
))?;
}
let videos_info: Vec<VideoInfo> = match serde_json::from_value(archives.take()) {
Ok(v) => v,
Err(e) => {
error!(
"failed to parse videos of collection {:?} page {}: {}",
self.collection, page, e
);
break;
}
};
let videos_info: Vec<VideoInfo> = serde_json::from_value(archives.take()).with_context(|| {
format!(
"failed to parse videos of collection {:?} page {}",
self.collection, page
)
})?;
for video_info in videos_info {
yield video_info;
}
@@ -199,17 +194,23 @@ impl<'a> Collection<'a> {
CollectionType::Series => ["num", "size", "total"],
CollectionType::Season => ["page_num", "page_size", "total"],
};
let values = fields.iter().map(|f| page_info[f].as_i64()).collect::<Vec<Option<i64>>>();
let values = fields
.iter()
.map(|f| page_info[f].as_i64())
.collect::<Vec<Option<i64>>>();
if let [Some(num), Some(size), Some(total)] = values[..] {
if num * size < total {
page += 1;
continue;
}
} else {
error!(
Err(anyhow!(
"invalid page info of collection {:?} page {}: read {:?} from {}",
self.collection, page, fields, page_info
);
self.collection,
page,
fields,
page_info
))?;
}
break;
}

View File

@@ -1,5 +1,5 @@
use anyhow::Result;
use async_stream::stream;
use anyhow::{anyhow, Context, Result};
use async_stream::try_stream;
use futures::Stream;
use serde_json::Value;
@@ -62,36 +62,31 @@ impl<'a> FavoriteList<'a> {
}
// 拿到收藏夹的所有权,返回一个收藏夹下的视频流
pub fn into_video_stream(self) -> impl Stream<Item = VideoInfo> + 'a {
stream! {
pub fn into_video_stream(self) -> impl Stream<Item = Result<VideoInfo>> + 'a {
try_stream! {
let mut page = 1;
loop {
let mut videos = match self.get_videos(page).await {
Ok(v) => v,
Err(e) => {
error!("failed to get videos of favorite {} page {}: {}", self.fid, page, e);
break;
}
};
let mut videos = self
.get_videos(page)
.await
.with_context(|| format!("failed to get videos of favorite {} page {}", self.fid, page))?;
let medias = &mut videos["data"]["medias"];
if medias.as_array().is_none_or(|v| v.is_empty()) {
error!("no medias found in favorite {} page {}", self.fid, page);
break;
Err(anyhow!("no medias found in favorite {} page {}", self.fid, page))?;
}
let videos_info: Vec<VideoInfo> = match serde_json::from_value(medias.take()) {
Ok(v) => v,
Err(e) => {
error!("failed to parse videos of favorite {} page {}: {}", self.fid, page, e);
break;
}
};
let videos_info: Vec<VideoInfo> = serde_json::from_value(medias.take())
.with_context(|| format!("failed to parse videos of favorite {} page {}", self.fid, page))?;
for video_info in videos_info {
yield video_info;
}
let has_more = &videos["data"]["has_more"];
if has_more.as_bool().is_some_and(|v| v) {
page += 1;
continue;
if let Some(v) = has_more.as_bool() {
if v {
page += 1;
continue;
}
} else {
Err(anyhow!("has_more is not a bool"))?;
}
break;
}

View File

@@ -158,22 +158,42 @@ mod tests {
collection_type: CollectionType::Season,
};
let collection = Collection::new(&bili_client, &collection_item);
let videos = collection.into_simple_video_stream().take(20).collect::<Vec<_>>().await;
let videos = collection
.into_video_stream()
.take(20)
.filter_map(|v| futures::future::ready(v.ok()))
.collect::<Vec<_>>()
.await;
assert!(videos.iter().all(|v| matches!(v, VideoInfo::Collection { .. })));
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
// 测试收藏夹
let favorite = FavoriteList::new(&bili_client, "3144336058".to_string());
let videos = favorite.into_video_stream().take(20).collect::<Vec<_>>().await;
let videos = favorite
.into_video_stream()
.take(20)
.filter_map(|v| futures::future::ready(v.ok()))
.collect::<Vec<_>>()
.await;
assert!(videos.iter().all(|v| matches!(v, VideoInfo::Favorite { .. })));
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
// 测试稍后再看
let watch_later = WatchLater::new(&bili_client);
let videos = watch_later.into_video_stream().take(20).collect::<Vec<_>>().await;
let videos = watch_later
.into_video_stream()
.take(20)
.filter_map(|v| futures::future::ready(v.ok()))
.collect::<Vec<_>>()
.await;
assert!(videos.iter().all(|v| matches!(v, VideoInfo::WatchLater { .. })));
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
// 测试投稿
let submission = Submission::new(&bili_client, "956761".to_string());
let videos = submission.into_video_stream().take(20).collect::<Vec<_>>().await;
let videos = submission
.into_video_stream()
.take(20)
.filter_map(|v| futures::future::ready(v.ok()))
.collect::<Vec<_>>()
.await;
assert!(videos.iter().all(|v| matches!(v, VideoInfo::Submission { .. })));
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
}

View File

@@ -1,6 +1,6 @@
use anyhow::Result;
use anyhow::{anyhow, Context, Result};
use arc_swap::access::Access;
use async_stream::stream;
use async_stream::try_stream;
use futures::Stream;
use reqwest::Method;
use serde_json::Value;
@@ -57,36 +57,31 @@ impl<'a> Submission<'a> {
.validate()
}
pub fn into_video_stream(self) -> impl Stream<Item = VideoInfo> + 'a {
stream! {
pub fn into_video_stream(self) -> impl Stream<Item = Result<VideoInfo>> + 'a {
try_stream! {
let mut page = 1;
loop {
let mut videos = match self.get_videos(page).await {
Ok(v) => v,
Err(e) => {
error!("failed to get videos of upper {} page {}: {}", self.upper_id, page, e);
break;
}
};
let mut videos = self
.get_videos(page)
.await
.with_context(|| format!("failed to get videos of upper {} page {}", self.upper_id, page))?;
let vlist = &mut videos["data"]["list"]["vlist"];
if vlist.as_array().is_none_or(|v| v.is_empty()) {
error!("no medias found in upper {} page {}", self.upper_id, page);
break;
Err(anyhow!("no medias found in upper {} page {}", self.upper_id, page))?;
}
let videos_info: Vec<VideoInfo> = match serde_json::from_value(vlist.take()) {
Ok(v) => v,
Err(e) => {
error!("failed to parse videos of upper {} page {}: {}", self.upper_id, page, e);
break;
}
};
let videos_info: Vec<VideoInfo> = serde_json::from_value(vlist.take())
.with_context(|| format!("failed to parse videos of upper {} page {}", self.upper_id, page))?;
for video_info in videos_info {
yield video_info;
}
let count = &videos["data"]["page"]["count"];
if count.as_i64().is_some_and(|v| v > (page * 30) as i64) {
page += 1;
continue;
if let Some(v) = count.as_i64() {
if v > (page * 30) as i64 {
page += 1;
continue;
}
} else {
Err(anyhow!("count is not an i64"))?;
}
break;
}

View File

@@ -1,5 +1,5 @@
use anyhow::Result;
use async_stream::stream;
use anyhow::{anyhow, Context, Result};
use async_stream::try_stream;
use futures::Stream;
use serde_json::Value;
@@ -25,24 +25,20 @@ impl<'a> WatchLater<'a> {
.validate()
}
pub fn into_video_stream(self) -> impl Stream<Item = VideoInfo> + 'a {
stream! {
let Ok(mut videos) = self.get_videos().await else {
error!("Failed to get watch later list");
return;
};
if !videos["data"]["list"].is_array() {
error!("Watch later list is not an array");
pub fn into_video_stream(self) -> impl Stream<Item = Result<VideoInfo>> + 'a {
try_stream! {
let mut videos = self
.get_videos()
.await
.with_context(|| "Failed to get watch later list")?;
let list = &mut videos["data"]["list"];
if list.as_array().is_none_or(|v| v.is_empty()) {
Err(anyhow!("No videos found in watch later list"))?;
}
let videos_info = match serde_json::from_value::<Vec<VideoInfo>>(videos["data"]["list"].take()) {
Ok(v) => v,
Err(e) => {
error!("Failed to parse watch later list: {}", e);
return;
}
};
for video in videos_info {
yield video;
let videos_info: Vec<VideoInfo> =
serde_json::from_value(list.take()).with_context(|| "Failed to parse watch later list")?;
for video_info in videos_info {
yield video_info;
}
}
}

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use anyhow::{bail, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use bili_sync_entity::*;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{Future, Stream, StreamExt};
@@ -50,29 +50,41 @@ pub async fn process_video_list(
/// 请求接口,获取视频列表中所有新添加的视频信息,将其写入数据库
pub async fn refresh_video_list<'a>(
video_list_model: &dyn VideoListModel,
video_streams: Pin<Box<dyn Stream<Item = VideoInfo> + 'a>>,
video_streams: Pin<Box<dyn Stream<Item = Result<VideoInfo>> + 'a>>,
connection: &DatabaseConnection,
) -> Result<()> {
video_list_model.log_refresh_video_start();
let latest_row_at = video_list_model.get_latest_row_at().and_utc();
let mut max_datetime = latest_row_at;
let mut error = Ok(());
let mut video_streams = video_streams
.take_while(|v| {
// 虽然 video_streams 是从新到旧的,但由于此处是分页请求,极端情况下可能发生访问完第一页时插入了两整页视频的情况
// 此时获取到的第二页视频比第一页的还要新,因此为了确保正确,理应对每一页的第一个视频进行时间比较
// 但在 streams 的抽象下,无法判断具体是在哪里分页的,所以暂且对每个视频都进行比较,应该不会有太大性能损失
let release_datetime = v.release_datetime();
if release_datetime > &max_datetime {
max_datetime = *release_datetime;
.take_while(|res| {
match res {
Err(e) => {
error = Err(anyhow!(e.to_string()));
futures::future::ready(false)
}
Ok(v) => {
// 虽然 video_streams 是从新到旧的,但由于此处是分页请求,极端情况下可能发生访问完第一页时插入了两整页视频的情况
// 此时获取到的第二页视频比第一页的还要新,因此为了确保正确,理应对每一页的第一个视频进行时间比较
// 但在 streams 的抽象下,无法判断具体是在哪里分页的,所以暂且对每个视频都进行比较,应该不会有太大性能损失
let release_datetime = v.release_datetime();
if release_datetime > &max_datetime {
max_datetime = *release_datetime;
}
futures::future::ready(release_datetime > &latest_row_at)
}
}
futures::future::ready(release_datetime > &latest_row_at)
})
.filter_map(|res| futures::future::ready(res.ok()))
.chunks(10);
let mut count = 0;
while let Some(videos_info) = video_streams.next().await {
count += videos_info.len();
create_videos(videos_info, video_list_model, connection).await?;
}
// 如果获取视频分页过程中发生了错误,直接在此处返回,不更新 latest_row_at
error?;
if max_datetime != latest_row_at {
video_list_model
.update_latest_row_at(max_datetime.naive_utc())