diff --git a/crates/bili_sync/src/adapter/collection.rs b/crates/bili_sync/src/adapter/collection.rs index cdc4df3..a5e29a2 100644 --- a/crates/bili_sync/src/adapter/collection.rs +++ b/crates/bili_sync/src/adapter/collection.rs @@ -98,7 +98,10 @@ pub(super) async fn collection_from<'a>( path: &Path, bili_client: &'a BiliClient, connection: &DatabaseConnection, -) -> Result<(Box, Pin + 'a>>)> { +) -> Result<( + Box, + Pin> + 'a>>, +)> { let collection = Collection::new(bili_client, collection_item); let collection_info = collection.get_info().await?; collection::Entity::insert(collection::ActiveModel { @@ -133,6 +136,6 @@ pub(super) async fn collection_from<'a>( .await? .context("collection not found")?, ), - Box::pin(collection.into_simple_video_stream()), + Box::pin(collection.into_video_stream()), )) } diff --git a/crates/bili_sync/src/adapter/favorite.rs b/crates/bili_sync/src/adapter/favorite.rs index eb0ee3f..6a8fe4e 100644 --- a/crates/bili_sync/src/adapter/favorite.rs +++ b/crates/bili_sync/src/adapter/favorite.rs @@ -70,7 +70,10 @@ pub(super) async fn favorite_from<'a>( path: &Path, bili_client: &'a BiliClient, connection: &DatabaseConnection, -) -> Result<(Box, Pin + 'a>>)> { +) -> Result<( + Box, + Pin> + 'a>>, +)> { let favorite = FavoriteList::new(bili_client, fid.to_owned()); let favorite_info = favorite.get_info().await?; favorite::Entity::insert(favorite::ActiveModel { diff --git a/crates/bili_sync/src/adapter/mod.rs b/crates/bili_sync/src/adapter/mod.rs index 3e40aaf..3e89a3e 100644 --- a/crates/bili_sync/src/adapter/mod.rs +++ b/crates/bili_sync/src/adapter/mod.rs @@ -67,7 +67,10 @@ pub async fn video_list_from<'a>( path: &Path, bili_client: &'a BiliClient, connection: &DatabaseConnection, -) -> Result<(Box, Pin + 'a>>)> { +) -> Result<( + Box, + Pin> + 'a>>, +)> { match args { Args::Favorite { fid } => favorite_from(fid, path, bili_client, connection).await, Args::Collection { collection_item } => collection_from(collection_item, path, bili_client, connection).await, diff --git a/crates/bili_sync/src/adapter/submission.rs b/crates/bili_sync/src/adapter/submission.rs index dcb98d6..18dffe3 100644 --- a/crates/bili_sync/src/adapter/submission.rs +++ b/crates/bili_sync/src/adapter/submission.rs @@ -82,7 +82,10 @@ pub(super) async fn submission_from<'a>( path: &Path, bili_client: &'a BiliClient, connection: &DatabaseConnection, -) -> Result<(Box, Pin + 'a>>)> { +) -> Result<( + Box, + Pin> + 'a>>, +)> { let submission = Submission::new(bili_client, upper_id.to_owned()); let upper = submission.get_info().await?; submission::Entity::insert(submission::ActiveModel { diff --git a/crates/bili_sync/src/adapter/watch_later.rs b/crates/bili_sync/src/adapter/watch_later.rs index 6fd20b5..f65bb9c 100644 --- a/crates/bili_sync/src/adapter/watch_later.rs +++ b/crates/bili_sync/src/adapter/watch_later.rs @@ -66,7 +66,10 @@ pub(super) async fn watch_later_from<'a>( path: &Path, bili_client: &'a BiliClient, connection: &DatabaseConnection, -) -> Result<(Box, Pin + 'a>>)> { +) -> Result<( + Box, + Pin> + 'a>>, +)> { let watch_later = WatchLater::new(bili_client); watch_later::Entity::insert(watch_later::ActiveModel { id: Set(1), diff --git a/crates/bili_sync/src/bilibili/collection.rs b/crates/bili_sync/src/bilibili/collection.rs index 20ac038..5c0f67d 100644 --- a/crates/bili_sync/src/bilibili/collection.rs +++ b/crates/bili_sync/src/bilibili/collection.rs @@ -1,7 +1,7 @@ use std::fmt::{Display, Formatter}; -use anyhow::Result; -use async_stream::stream; +use anyhow::{anyhow, Context, Result}; +use async_stream::try_stream; use futures::Stream; use reqwest::Method; use serde::Deserialize; @@ -162,35 +162,30 @@ impl<'a> Collection<'a> { .validate() } - pub fn into_simple_video_stream(self) -> impl Stream + 'a { - stream! { + pub fn into_video_stream(self) -> impl Stream> + 'a { + try_stream! { let mut page = 1; loop { - let mut videos = match self.get_videos(page).await { - Ok(v) => v, - Err(e) => { - error!( - "failed to get videos of collection {:?} page {}: {}", - self.collection, page, e - ); - break; - } - }; + let mut videos = self.get_videos(page).await.with_context(|| { + format!( + "failed to get videos of collection {:?} page {}", + self.collection, page + ) + })?; let archives = &mut videos["data"]["archives"]; if archives.as_array().is_none_or(|v| v.is_empty()) { - error!("no videos found in collection {:?} page {}", self.collection, page); - break; + Err(anyhow!( + "no videos found in collection {:?} page {}", + self.collection, + page + ))?; } - let videos_info: Vec = match serde_json::from_value(archives.take()) { - Ok(v) => v, - Err(e) => { - error!( - "failed to parse videos of collection {:?} page {}: {}", - self.collection, page, e - ); - break; - } - }; + let videos_info: Vec = serde_json::from_value(archives.take()).with_context(|| { + format!( + "failed to parse videos of collection {:?} page {}", + self.collection, page + ) + })?; for video_info in videos_info { yield video_info; } @@ -199,17 +194,23 @@ impl<'a> Collection<'a> { CollectionType::Series => ["num", "size", "total"], CollectionType::Season => ["page_num", "page_size", "total"], }; - let values = fields.iter().map(|f| page_info[f].as_i64()).collect::>>(); + let values = fields + .iter() + .map(|f| page_info[f].as_i64()) + .collect::>>(); if let [Some(num), Some(size), Some(total)] = values[..] { if num * size < total { page += 1; continue; } } else { - error!( + Err(anyhow!( "invalid page info of collection {:?} page {}: read {:?} from {}", - self.collection, page, fields, page_info - ); + self.collection, + page, + fields, + page_info + ))?; } break; } diff --git a/crates/bili_sync/src/bilibili/favorite_list.rs b/crates/bili_sync/src/bilibili/favorite_list.rs index 304b16f..ca47f69 100644 --- a/crates/bili_sync/src/bilibili/favorite_list.rs +++ b/crates/bili_sync/src/bilibili/favorite_list.rs @@ -1,5 +1,5 @@ -use anyhow::Result; -use async_stream::stream; +use anyhow::{anyhow, Context, Result}; +use async_stream::try_stream; use futures::Stream; use serde_json::Value; @@ -62,36 +62,31 @@ impl<'a> FavoriteList<'a> { } // 拿到收藏夹的所有权,返回一个收藏夹下的视频流 - pub fn into_video_stream(self) -> impl Stream + 'a { - stream! { + pub fn into_video_stream(self) -> impl Stream> + 'a { + try_stream! { let mut page = 1; loop { - let mut videos = match self.get_videos(page).await { - Ok(v) => v, - Err(e) => { - error!("failed to get videos of favorite {} page {}: {}", self.fid, page, e); - break; - } - }; + let mut videos = self + .get_videos(page) + .await + .with_context(|| format!("failed to get videos of favorite {} page {}", self.fid, page))?; let medias = &mut videos["data"]["medias"]; if medias.as_array().is_none_or(|v| v.is_empty()) { - error!("no medias found in favorite {} page {}", self.fid, page); - break; + Err(anyhow!("no medias found in favorite {} page {}", self.fid, page))?; } - let videos_info: Vec = match serde_json::from_value(medias.take()) { - Ok(v) => v, - Err(e) => { - error!("failed to parse videos of favorite {} page {}: {}", self.fid, page, e); - break; - } - }; + let videos_info: Vec = serde_json::from_value(medias.take()) + .with_context(|| format!("failed to parse videos of favorite {} page {}", self.fid, page))?; for video_info in videos_info { yield video_info; } let has_more = &videos["data"]["has_more"]; - if has_more.as_bool().is_some_and(|v| v) { - page += 1; - continue; + if let Some(v) = has_more.as_bool() { + if v { + page += 1; + continue; + } + } else { + Err(anyhow!("has_more is not a bool"))?; } break; } diff --git a/crates/bili_sync/src/bilibili/mod.rs b/crates/bili_sync/src/bilibili/mod.rs index 7206682..bc31c19 100644 --- a/crates/bili_sync/src/bilibili/mod.rs +++ b/crates/bili_sync/src/bilibili/mod.rs @@ -158,22 +158,42 @@ mod tests { collection_type: CollectionType::Season, }; let collection = Collection::new(&bili_client, &collection_item); - let videos = collection.into_simple_video_stream().take(20).collect::>().await; + let videos = collection + .into_video_stream() + .take(20) + .filter_map(|v| futures::future::ready(v.ok())) + .collect::>() + .await; assert!(videos.iter().all(|v| matches!(v, VideoInfo::Collection { .. }))); assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); // 测试收藏夹 let favorite = FavoriteList::new(&bili_client, "3144336058".to_string()); - let videos = favorite.into_video_stream().take(20).collect::>().await; + let videos = favorite + .into_video_stream() + .take(20) + .filter_map(|v| futures::future::ready(v.ok())) + .collect::>() + .await; assert!(videos.iter().all(|v| matches!(v, VideoInfo::Favorite { .. }))); assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); // 测试稍后再看 let watch_later = WatchLater::new(&bili_client); - let videos = watch_later.into_video_stream().take(20).collect::>().await; + let videos = watch_later + .into_video_stream() + .take(20) + .filter_map(|v| futures::future::ready(v.ok())) + .collect::>() + .await; assert!(videos.iter().all(|v| matches!(v, VideoInfo::WatchLater { .. }))); assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); // 测试投稿 let submission = Submission::new(&bili_client, "956761".to_string()); - let videos = submission.into_video_stream().take(20).collect::>().await; + let videos = submission + .into_video_stream() + .take(20) + .filter_map(|v| futures::future::ready(v.ok())) + .collect::>() + .await; assert!(videos.iter().all(|v| matches!(v, VideoInfo::Submission { .. }))); assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime())); } diff --git a/crates/bili_sync/src/bilibili/submission.rs b/crates/bili_sync/src/bilibili/submission.rs index 90e004a..f03b038 100644 --- a/crates/bili_sync/src/bilibili/submission.rs +++ b/crates/bili_sync/src/bilibili/submission.rs @@ -1,6 +1,6 @@ -use anyhow::Result; +use anyhow::{anyhow, Context, Result}; use arc_swap::access::Access; -use async_stream::stream; +use async_stream::try_stream; use futures::Stream; use reqwest::Method; use serde_json::Value; @@ -57,36 +57,31 @@ impl<'a> Submission<'a> { .validate() } - pub fn into_video_stream(self) -> impl Stream + 'a { - stream! { + pub fn into_video_stream(self) -> impl Stream> + 'a { + try_stream! { let mut page = 1; loop { - let mut videos = match self.get_videos(page).await { - Ok(v) => v, - Err(e) => { - error!("failed to get videos of upper {} page {}: {}", self.upper_id, page, e); - break; - } - }; + let mut videos = self + .get_videos(page) + .await + .with_context(|| format!("failed to get videos of upper {} page {}", self.upper_id, page))?; let vlist = &mut videos["data"]["list"]["vlist"]; if vlist.as_array().is_none_or(|v| v.is_empty()) { - error!("no medias found in upper {} page {}", self.upper_id, page); - break; + Err(anyhow!("no medias found in upper {} page {}", self.upper_id, page))?; } - let videos_info: Vec = match serde_json::from_value(vlist.take()) { - Ok(v) => v, - Err(e) => { - error!("failed to parse videos of upper {} page {}: {}", self.upper_id, page, e); - break; - } - }; + let videos_info: Vec = serde_json::from_value(vlist.take()) + .with_context(|| format!("failed to parse videos of upper {} page {}", self.upper_id, page))?; for video_info in videos_info { yield video_info; } let count = &videos["data"]["page"]["count"]; - if count.as_i64().is_some_and(|v| v > (page * 30) as i64) { - page += 1; - continue; + if let Some(v) = count.as_i64() { + if v > (page * 30) as i64 { + page += 1; + continue; + } + } else { + Err(anyhow!("count is not an i64"))?; } break; } diff --git a/crates/bili_sync/src/bilibili/watch_later.rs b/crates/bili_sync/src/bilibili/watch_later.rs index 43e1a75..231389d 100644 --- a/crates/bili_sync/src/bilibili/watch_later.rs +++ b/crates/bili_sync/src/bilibili/watch_later.rs @@ -1,5 +1,5 @@ -use anyhow::Result; -use async_stream::stream; +use anyhow::{anyhow, Context, Result}; +use async_stream::try_stream; use futures::Stream; use serde_json::Value; @@ -25,24 +25,20 @@ impl<'a> WatchLater<'a> { .validate() } - pub fn into_video_stream(self) -> impl Stream + 'a { - stream! { - let Ok(mut videos) = self.get_videos().await else { - error!("Failed to get watch later list"); - return; - }; - if !videos["data"]["list"].is_array() { - error!("Watch later list is not an array"); + pub fn into_video_stream(self) -> impl Stream> + 'a { + try_stream! { + let mut videos = self + .get_videos() + .await + .with_context(|| "Failed to get watch later list")?; + let list = &mut videos["data"]["list"]; + if list.as_array().is_none_or(|v| v.is_empty()) { + Err(anyhow!("No videos found in watch later list"))?; } - let videos_info = match serde_json::from_value::>(videos["data"]["list"].take()) { - Ok(v) => v, - Err(e) => { - error!("Failed to parse watch later list: {}", e); - return; - } - }; - for video in videos_info { - yield video; + let videos_info: Vec = + serde_json::from_value(list.take()).with_context(|| "Failed to parse watch later list")?; + for video_info in videos_info { + yield video_info; } } } diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 4335c83..ef13f5b 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::pin::Pin; -use anyhow::{bail, Context, Result}; +use anyhow::{anyhow, bail, Context, Result}; use bili_sync_entity::*; use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{Future, Stream, StreamExt}; @@ -50,29 +50,41 @@ pub async fn process_video_list( /// 请求接口,获取视频列表中所有新添加的视频信息,将其写入数据库 pub async fn refresh_video_list<'a>( video_list_model: &dyn VideoListModel, - video_streams: Pin + 'a>>, + video_streams: Pin> + 'a>>, connection: &DatabaseConnection, ) -> Result<()> { video_list_model.log_refresh_video_start(); let latest_row_at = video_list_model.get_latest_row_at().and_utc(); let mut max_datetime = latest_row_at; + let mut error = Ok(()); let mut video_streams = video_streams - .take_while(|v| { - // 虽然 video_streams 是从新到旧的,但由于此处是分页请求,极端情况下可能发生访问完第一页时插入了两整页视频的情况 - // 此时获取到的第二页视频比第一页的还要新,因此为了确保正确,理应对每一页的第一个视频进行时间比较 - // 但在 streams 的抽象下,无法判断具体是在哪里分页的,所以暂且对每个视频都进行比较,应该不会有太大性能损失 - let release_datetime = v.release_datetime(); - if release_datetime > &max_datetime { - max_datetime = *release_datetime; + .take_while(|res| { + match res { + Err(e) => { + error = Err(anyhow!(e.to_string())); + futures::future::ready(false) + } + Ok(v) => { + // 虽然 video_streams 是从新到旧的,但由于此处是分页请求,极端情况下可能发生访问完第一页时插入了两整页视频的情况 + // 此时获取到的第二页视频比第一页的还要新,因此为了确保正确,理应对每一页的第一个视频进行时间比较 + // 但在 streams 的抽象下,无法判断具体是在哪里分页的,所以暂且对每个视频都进行比较,应该不会有太大性能损失 + let release_datetime = v.release_datetime(); + if release_datetime > &max_datetime { + max_datetime = *release_datetime; + } + futures::future::ready(release_datetime > &latest_row_at) + } } - futures::future::ready(release_datetime > &latest_row_at) }) + .filter_map(|res| futures::future::ready(res.ok())) .chunks(10); let mut count = 0; while let Some(videos_info) = video_streams.next().await { count += videos_info.len(); create_videos(videos_info, video_list_model, connection).await?; } + // 如果获取视频分页过程中发生了错误,直接在此处返回,不更新 latest_row_at + error?; if max_datetime != latest_row_at { video_list_model .update_latest_row_at(max_datetime.naive_utc())