feat: 引入更健壮的新视频检测方法 (#228)

* feat: 为各个 video list 表添加 latest_row_at 字段

* chore: 为 model 引入新增的字段

* feat: 实现新版中断条件(待测试)

* test: 更新测试
This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2025-01-22 23:53:18 +08:00
committed by GitHub
parent b888db6a61
commit b4177d4ffc
16 changed files with 292 additions and 225 deletions

View File

@@ -1,4 +1,3 @@
use std::collections::HashSet;
use std::path::Path;
use std::pin::Pin;
@@ -9,7 +8,7 @@ use futures::Stream;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::{IntoCondition, OnConflict};
use sea_orm::ActiveValue::Set;
use sea_orm::{DatabaseConnection, TransactionTrait};
use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged};
use crate::adapter::{helper, VideoListModel};
use crate::bilibili::{self, BiliClient, Collection, CollectionItem, CollectionType, VideoInfo};
@@ -17,10 +16,6 @@ use crate::utils::status::Status;
#[async_trait]
impl VideoListModel for collection::Model {
async fn video_count(&self, connection: &DatabaseConnection) -> Result<u64> {
helper::count_videos(video::Column::CollectionId.eq(self.id).into_condition(), connection).await
}
async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<video::Model>> {
helper::filter_videos(
video::Column::CollectionId
@@ -52,20 +47,6 @@ impl VideoListModel for collection::Model {
.await
}
async fn exist_labels(
&self,
videos_info: &[VideoInfo],
connection: &DatabaseConnection,
) -> Result<HashSet<String>> {
helper::video_keys(
video::Column::CollectionId.eq(self.id),
videos_info,
[video::Column::Bvid, video::Column::Pubtime],
connection,
)
.await
}
fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option<video::Model>) -> video::ActiveModel {
let mut video_model = video_info.to_model(base_model);
video_model.collection_id = Set(Some(self.id));
@@ -81,7 +62,7 @@ impl VideoListModel for collection::Model {
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await;
match info {
Ok((tags, view_info)) => {
let VideoInfo::View { pages, .. } = &view_info else {
let VideoInfo::Detail { pages, .. } = &view_info else {
unreachable!("view_info must be VideoInfo::View")
};
let txn = connection.begin().await?;
@@ -101,6 +82,21 @@ impl VideoListModel for collection::Model {
Ok(())
}
fn get_latest_row_at(&self) -> DateTime {
self.latest_row_at
}
async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> {
collection::ActiveModel {
id: Unchanged(self.id),
latest_row_at: Set(datetime),
..Default::default()
}
.update(connection)
.await?;
Ok(())
}
fn log_fetch_video_start(&self) {
info!(
"开始获取{} {} - {} 的视频与分页信息...",
@@ -146,14 +142,13 @@ impl VideoListModel for collection::Model {
);
}
fn log_refresh_video_end(&self, got_count: usize, new_count: u64) {
fn log_refresh_video_end(&self, count: usize) {
info!(
"扫描{}: {} - {} 的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频",
"扫描{}: {} - {} 的新视频完成,获取了 {} 条新视频",
CollectionType::from(self.r#type),
self.s_id,
self.name,
got_count,
new_count,
count,
);
}
}

View File

@@ -1,4 +1,3 @@
use std::collections::HashSet;
use std::path::Path;
use std::pin::Pin;
@@ -9,7 +8,7 @@ use futures::Stream;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::{IntoCondition, OnConflict};
use sea_orm::ActiveValue::Set;
use sea_orm::{DatabaseConnection, TransactionTrait};
use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged};
use crate::adapter::{helper, VideoListModel};
use crate::bilibili::{self, BiliClient, FavoriteList, VideoInfo};
@@ -17,10 +16,6 @@ use crate::utils::status::Status;
#[async_trait]
impl VideoListModel for favorite::Model {
async fn video_count(&self, connection: &DatabaseConnection) -> Result<u64> {
helper::count_videos(video::Column::FavoriteId.eq(self.id).into_condition(), connection).await
}
async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<video::Model>> {
helper::filter_videos(
video::Column::FavoriteId
@@ -52,20 +47,6 @@ impl VideoListModel for favorite::Model {
.await
}
async fn exist_labels(
&self,
videos_info: &[VideoInfo],
connection: &DatabaseConnection,
) -> Result<HashSet<String>> {
helper::video_keys(
video::Column::FavoriteId.eq(self.id),
videos_info,
[video::Column::Bvid, video::Column::Favtime],
connection,
)
.await
}
fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option<video::Model>) -> video::ActiveModel {
let mut video_model = video_info.to_model(base_model);
video_model.favorite_id = Set(Some(self.id));
@@ -98,6 +79,21 @@ impl VideoListModel for favorite::Model {
Ok(())
}
fn get_latest_row_at(&self) -> DateTime {
self.latest_row_at
}
async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> {
favorite::ActiveModel {
id: Unchanged(self.id),
latest_row_at: Set(datetime),
..Default::default()
}
.update(connection)
.await?;
Ok(())
}
fn log_fetch_video_start(&self) {
info!("开始获取收藏夹 {} - {} 的视频与分页信息...", self.f_id, self.name);
}
@@ -118,10 +114,10 @@ impl VideoListModel for favorite::Model {
info!("开始扫描收藏夹: {} - {} 的新视频...", self.f_id, self.name);
}
fn log_refresh_video_end(&self, got_count: usize, new_count: u64) {
fn log_refresh_video_end(&self, count: usize) {
info!(
"扫描收藏夹: {} - {} 的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频",
self.f_id, self.name, got_count, new_count
"扫描收藏夹: {} - {} 的新视频完成,获取了 {} 条新视频",
self.f_id, self.name, count
);
}
}

View File

@@ -1,21 +1,14 @@
use std::collections::HashSet;
use std::path::Path;
use anyhow::Result;
use bili_sync_entity::*;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::{OnConflict, SimpleExpr};
use sea_orm::sea_query::OnConflict;
use sea_orm::ActiveValue::Set;
use sea_orm::{Condition, DatabaseTransaction, QuerySelect};
use sea_orm::{Condition, DatabaseTransaction};
use crate::bilibili::{BiliError, PageInfo, VideoInfo};
use crate::config::{PathSafeTemplate, TEMPLATE};
use crate::utils::id_time_key;
/// 使用 condition 筛选视频,返回视频数量
pub(super) async fn count_videos(condition: Condition, conn: &DatabaseConnection) -> Result<u64> {
Ok(video::Entity::find().filter(condition).count(conn).await?)
}
/// 使用 condition 筛选视频,返回视频列表
pub(super) async fn filter_videos(condition: Condition, conn: &DatabaseConnection) -> Result<Vec<video::Model>> {
@@ -34,29 +27,6 @@ pub(super) async fn filter_videos_with_pages(
.await?)
}
/// 返回 videos_info 存在于视频表里那部分对应的 key
pub(super) async fn video_keys(
expr: SimpleExpr,
videos_info: &[VideoInfo],
columns: [video::Column; 2],
conn: &DatabaseConnection,
) -> Result<HashSet<String>> {
Ok(video::Entity::find()
.filter(
video::Column::Bvid
.is_in(videos_info.iter().map(|v| v.bvid().to_string()))
.and(expr),
)
.select_only()
.columns(columns)
.into_tuple()
.all(conn)
.await?
.into_iter()
.map(|(bvid, time)| id_time_key(&bvid, &time))
.collect())
}
/// 返回设置了 path 的视频
pub(super) fn video_with_path(
mut video_model: video::ActiveModel,

View File

@@ -4,7 +4,6 @@ mod helper;
mod submission;
mod watch_later;
use std::collections::HashSet;
use std::path::Path;
use std::pin::Pin;
@@ -43,9 +42,6 @@ pub async fn video_list_from<'a>(
#[async_trait]
pub trait VideoListModel {
/// 与视频列表关联的视频总数
async fn video_count(&self, connection: &DatabaseConnection) -> Result<u64>;
/// 未填充的视频
async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<bili_sync_entity::video::Model>>;
@@ -55,10 +51,6 @@ pub trait VideoListModel {
connection: &DatabaseConnection,
) -> Result<Vec<(bili_sync_entity::video::Model, Vec<bili_sync_entity::page::Model>)>>;
/// 该批次视频的存在标记
async fn exist_labels(&self, videos_info: &[VideoInfo], connection: &DatabaseConnection)
-> Result<HashSet<String>>;
/// 视频信息对应的视频 model
fn video_model_by_info(
&self,
@@ -74,6 +66,12 @@ pub trait VideoListModel {
connection: &DatabaseConnection,
) -> Result<()>;
/// 获取视频 model 中记录的最新时间
fn get_latest_row_at(&self) -> DateTime;
/// 更新视频 model 中记录的最新时间
async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()>;
/// 开始获取视频
fn log_fetch_video_start(&self);
@@ -90,5 +88,5 @@ pub trait VideoListModel {
fn log_refresh_video_start(&self);
/// 结束刷新视频
fn log_refresh_video_end(&self, got_count: usize, new_count: u64);
fn log_refresh_video_end(&self, count: usize);
}

View File

@@ -1,4 +1,3 @@
use std::collections::HashSet;
use std::path::Path;
use std::pin::Pin;
@@ -9,7 +8,7 @@ use futures::Stream;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::{IntoCondition, OnConflict};
use sea_orm::ActiveValue::Set;
use sea_orm::{DatabaseConnection, TransactionTrait};
use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged};
use crate::adapter::helper::video_with_path;
use crate::adapter::{helper, VideoListModel};
@@ -18,10 +17,6 @@ use crate::utils::status::Status;
#[async_trait]
impl VideoListModel for submission::Model {
async fn video_count(&self, connection: &DatabaseConnection) -> Result<u64> {
helper::count_videos(video::Column::SubmissionId.eq(self.id).into_condition(), connection).await
}
async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<video::Model>> {
helper::filter_videos(
video::Column::SubmissionId
@@ -53,20 +48,6 @@ impl VideoListModel for submission::Model {
.await
}
async fn exist_labels(
&self,
videos_info: &[VideoInfo],
connection: &DatabaseConnection,
) -> Result<HashSet<String>> {
helper::video_keys(
video::Column::SubmissionId.eq(self.id),
videos_info,
[video::Column::Bvid, video::Column::Ctime],
connection,
)
.await
}
fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option<video::Model>) -> video::ActiveModel {
let mut video_model = video_info.to_model(base_model);
video_model.submission_id = Set(Some(self.id));
@@ -82,7 +63,7 @@ impl VideoListModel for submission::Model {
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await;
match info {
Ok((tags, view_info)) => {
let VideoInfo::View { pages, .. } = &view_info else {
let VideoInfo::Detail { pages, .. } = &view_info else {
unreachable!("view_info must be VideoInfo::View")
};
let txn = connection.begin().await?;
@@ -102,6 +83,21 @@ impl VideoListModel for submission::Model {
Ok(())
}
fn get_latest_row_at(&self) -> DateTime {
self.latest_row_at
}
async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> {
submission::ActiveModel {
id: Unchanged(self.id),
latest_row_at: Set(datetime),
..Default::default()
}
.update(connection)
.await?;
Ok(())
}
fn log_fetch_video_start(&self) {
info!(
"开始获取 UP 主 {} - {} 投稿的视频与分页信息...",
@@ -134,10 +130,10 @@ impl VideoListModel for submission::Model {
info!("开始扫描 UP 主 {} - {} 投稿的新视频...", self.upper_id, self.upper_name);
}
fn log_refresh_video_end(&self, got_count: usize, new_count: u64) {
fn log_refresh_video_end(&self, count: usize) {
info!(
"扫描 UP 主 {} - {} 投稿的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频",
self.upper_id, self.upper_name, got_count, new_count,
"扫描 UP 主 {} - {} 投稿的新视频完成,获取了 {} 条新视频",
self.upper_id, self.upper_name, count,
);
}
}

View File

@@ -1,4 +1,3 @@
use std::collections::HashSet;
use std::path::Path;
use std::pin::Pin;
@@ -9,7 +8,7 @@ use futures::Stream;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::{IntoCondition, OnConflict};
use sea_orm::ActiveValue::Set;
use sea_orm::{DatabaseConnection, TransactionTrait};
use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged};
use crate::adapter::helper::video_with_path;
use crate::adapter::{helper, VideoListModel};
@@ -18,10 +17,6 @@ use crate::utils::status::Status;
#[async_trait]
impl VideoListModel for watch_later::Model {
async fn video_count(&self, connection: &DatabaseConnection) -> Result<u64> {
helper::count_videos(video::Column::WatchLaterId.eq(self.id).into_condition(), connection).await
}
async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<video::Model>> {
helper::filter_videos(
video::Column::WatchLaterId
@@ -53,20 +48,6 @@ impl VideoListModel for watch_later::Model {
.await
}
async fn exist_labels(
&self,
videos_info: &[VideoInfo],
connection: &DatabaseConnection,
) -> Result<HashSet<String>> {
helper::video_keys(
video::Column::WatchLaterId.eq(self.id),
videos_info,
[video::Column::Bvid, video::Column::Favtime],
connection,
)
.await
}
fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option<video::Model>) -> video::ActiveModel {
let mut video_model = video_info.to_model(base_model);
video_model.watch_later_id = Set(Some(self.id));
@@ -99,6 +80,21 @@ impl VideoListModel for watch_later::Model {
Ok(())
}
fn get_latest_row_at(&self) -> DateTime {
self.latest_row_at
}
async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> {
watch_later::ActiveModel {
id: Unchanged(self.id),
latest_row_at: Set(datetime),
..Default::default()
}
.update(connection)
.await?;
Ok(())
}
fn log_fetch_video_start(&self) {
info!("开始获取稍后再看的视频与分页信息...");
}
@@ -119,11 +115,8 @@ impl VideoListModel for watch_later::Model {
info!("开始扫描稍后再看的新视频...");
}
fn log_refresh_video_end(&self, got_count: usize, new_count: u64) {
info!(
"扫描稍后再看的新视频完成,获取了 {} 条新视频,其中有 {} 条新视频",
got_count, new_count,
);
fn log_refresh_video_end(&self, count: usize) {
info!("扫描稍后再看的新视频完成,获取了 {} 条新视频", count);
}
}

View File

@@ -61,7 +61,7 @@ impl Validate for serde_json::Value {
/// > Serde will try to match the data against each variant in order and the first one that deserializes successfully is the one returned.
pub enum VideoInfo {
/// 从视频详情接口获取的视频信息
View {
Detail {
title: String,
bvid: String,
#[serde(rename = "desc")]
@@ -77,8 +77,8 @@ pub enum VideoInfo {
pages: Vec<PageInfo>,
state: i32,
},
/// 从收藏夹获取的视频信息
Detail {
/// 从收藏夹接口获取的视频信息
Favorite {
title: String,
#[serde(rename = "type")]
vtype: i32,
@@ -94,7 +94,7 @@ pub enum VideoInfo {
pubtime: DateTime<Utc>,
attr: i32,
},
/// 从稍后再看获取的视频信息
/// 从稍后再看接口获取的视频信息
WatchLater {
title: String,
bvid: String,
@@ -112,8 +112,8 @@ pub enum VideoInfo {
pubtime: DateTime<Utc>,
state: i32,
},
/// 从视频列表中获取的视频信息
Simple {
/// 从视频合集/视频列表接口获取的视频信息
Collection {
bvid: String,
#[serde(rename = "pic")]
cover: String,
@@ -122,6 +122,7 @@ pub enum VideoInfo {
#[serde(rename = "pubdate", with = "ts_seconds")]
pubtime: DateTime<Utc>,
},
// 从用户投稿接口获取的视频信息
Submission {
title: String,
bvid: String,
@@ -136,7 +137,7 @@ pub enum VideoInfo {
#[cfg(test)]
mod tests {
use futures::{pin_mut, StreamExt};
use futures::StreamExt;
use super::*;
use crate::utils::init_logger;
@@ -151,28 +152,30 @@ mod tests {
panic!("获取 mixin key 失败");
};
set_global_mixin_key(mixin_key);
let video = Video::new(&bili_client, "BV1Z54y1C7ZB".to_string());
assert!(matches!(video.get_view_info().await, Ok(VideoInfo::View { .. })));
// 测试视频合集
let collection_item = CollectionItem {
mid: "521722088".to_string(),
sid: "387214".to_string(),
collection_type: CollectionType::Series,
sid: "4523".to_string(),
collection_type: CollectionType::Season,
};
let collection = Collection::new(&bili_client, &collection_item);
let stream = collection.into_simple_video_stream();
pin_mut!(stream);
assert!(matches!(stream.next().await, Some(VideoInfo::Simple { .. })));
let favorite = FavoriteList::new(&bili_client, "3084505258".to_string());
let stream = favorite.into_video_stream();
pin_mut!(stream);
assert!(matches!(stream.next().await, Some(VideoInfo::Detail { .. })));
let videos = collection.into_simple_video_stream().take(20).collect::<Vec<_>>().await;
assert!(videos.iter().all(|v| matches!(v, VideoInfo::Collection { .. })));
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
// 测试收藏夹
let favorite = FavoriteList::new(&bili_client, "3144336058".to_string());
let videos = favorite.into_video_stream().take(20).collect::<Vec<_>>().await;
assert!(videos.iter().all(|v| matches!(v, VideoInfo::Favorite { .. })));
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
// 测试稍后再看
let watch_later = WatchLater::new(&bili_client);
let stream = watch_later.into_video_stream();
pin_mut!(stream);
assert!(matches!(stream.next().await, Some(VideoInfo::WatchLater { .. })));
let videos = watch_later.into_video_stream().take(20).collect::<Vec<_>>().await;
assert!(videos.iter().all(|v| matches!(v, VideoInfo::WatchLater { .. })));
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
// 测试投稿
let submission = Submission::new(&bili_client, "956761".to_string());
let stream = submission.into_video_stream();
pin_mut!(stream);
assert!(matches!(stream.next().await, Some(VideoInfo::Submission { .. })));
let videos = submission.into_video_stream().take(20).collect::<Vec<_>>().await;
assert!(videos.iter().all(|v| matches!(v, VideoInfo::Submission { .. })));
assert!(videos.iter().rev().is_sorted_by_key(|v| v.release_datetime()));
}
}

View File

@@ -1,10 +1,10 @@
use chrono::{DateTime, Utc};
use sea_orm::ActiveValue::{NotSet, Set};
use sea_orm::IntoActiveModel;
use serde_json::json;
use crate::bilibili::VideoInfo;
use crate::config::CONFIG;
use crate::utils::id_time_key;
impl VideoInfo {
/// 将 VideoInfo 转换为 ActiveModel
@@ -20,7 +20,7 @@ impl VideoInfo {
}
};
match self {
VideoInfo::Simple {
VideoInfo::Collection {
bvid,
cover,
ctime,
@@ -34,7 +34,7 @@ impl VideoInfo {
valid: Set(true),
..base_model
},
VideoInfo::Detail {
VideoInfo::Favorite {
title,
vtype,
bvid,
@@ -63,7 +63,7 @@ impl VideoInfo {
upper_face: Set(upper.face.clone()),
..base_model
},
VideoInfo::View {
VideoInfo::Detail {
title,
bvid,
intro,
@@ -140,8 +140,8 @@ impl VideoInfo {
pub fn to_fmt_args(&self) -> Option<serde_json::Value> {
match self {
VideoInfo::Simple { .. } | VideoInfo::Submission { .. } => None, // 不能从简单视频信息中构造格式化参数
VideoInfo::Detail {
VideoInfo::Collection { .. } | VideoInfo::Submission { .. } => None, // 不能从简单视频信息中构造格式化参数
VideoInfo::Favorite {
title,
bvid,
upper,
@@ -164,7 +164,7 @@ impl VideoInfo {
"pubtime": pubtime.format(&CONFIG.time_format).to_string(),
"fav_time": fav_time.format(&CONFIG.time_format).to_string(),
})),
VideoInfo::View {
VideoInfo::Detail {
title,
bvid,
upper,
@@ -184,31 +184,12 @@ impl VideoInfo {
}
}
pub fn video_key(&self) -> String {
pub fn release_datetime(&self) -> &DateTime<Utc> {
match self {
// 对于合集没有 fav_time只能用 pubtime 代替
VideoInfo::Simple {
bvid, pubtime: time, ..
}
| VideoInfo::Detail {
bvid, fav_time: time, ..
}
| VideoInfo::WatchLater {
bvid, fav_time: time, ..
}
| VideoInfo::Submission { bvid, ctime: time, .. } => id_time_key(bvid, time),
// 详情接口返回的数据仅用于填充详情,不会被作为 video_key
_ => unreachable!(),
}
}
pub fn bvid(&self) -> &str {
match self {
VideoInfo::Simple { bvid, .. }
| VideoInfo::Detail { bvid, .. }
| VideoInfo::WatchLater { bvid, .. }
| VideoInfo::Submission { bvid, .. } => bvid,
// 同上
VideoInfo::Collection { pubtime: time, .. }
| VideoInfo::Favorite { fav_time: time, .. }
| VideoInfo::WatchLater { fav_time: time, .. }
| VideoInfo::Submission { ctime: time, .. } => time,
_ => unreachable!(),
}
}

View File

@@ -4,7 +4,6 @@ pub mod model;
pub mod nfo;
pub mod status;
use chrono::{DateTime, Utc};
use tracing_subscriber::util::SubscriberInitExt;
pub fn init_logger(log_level: &str) {
@@ -17,8 +16,3 @@ pub fn init_logger(log_level: &str) {
.try_init()
.expect("初始化日志失败");
}
/// 生成视频的唯一标记,均由 bvid 和时间戳构成
pub fn id_time_key(bvid: &String, time: &DateTime<Utc>) -> String {
format!("{}-{}", bvid, time.timestamp())
}

View File

@@ -27,49 +27,62 @@ pub async fn process_video_list(
path: &Path,
connection: &DatabaseConnection,
) -> Result<()> {
// 从参数中获取视频列表的 Model 与视频流
let (video_list_model, video_streams) = video_list_from(args, path, bili_client, connection).await?;
let video_list_model = refresh_video_list(video_list_model, video_streams, connection).await?;
let video_list_model = fetch_video_details(bili_client, video_list_model, connection).await?;
// 从视频流中获取新视频的简要信息,写入数据库
refresh_video_list(video_list_model.as_ref(), video_streams, connection).await?;
// 单独请求视频详情接口,获取视频的详情信息与所有的分页,写入数据库
fetch_video_details(bili_client, video_list_model.as_ref(), connection).await?;
if ARGS.scan_only {
warn!("已开启仅扫描模式,跳过视频下载...");
return Ok(());
} else {
// 从数据库中查找所有未下载的视频与分页,下载并处理
download_unprocessed_videos(bili_client, video_list_model.as_ref(), connection).await?;
}
download_unprocessed_videos(bili_client, video_list_model, connection).await
Ok(())
}
/// 请求接口,获取视频列表中所有新添加的视频信息,将其写入数据库
pub async fn refresh_video_list<'a>(
video_list_model: Box<dyn VideoListModel>,
video_list_model: &dyn VideoListModel,
video_streams: Pin<Box<dyn Stream<Item = VideoInfo> + 'a>>,
connection: &DatabaseConnection,
) -> Result<Box<dyn VideoListModel>> {
) -> Result<()> {
video_list_model.log_refresh_video_start();
let mut video_streams = video_streams.chunks(10);
let mut got_count = 0;
let mut new_count = video_list_model.video_count(connection).await?;
let latest_row_at = video_list_model.get_latest_row_at().and_utc();
let mut max_datetime = latest_row_at;
let mut video_streams = video_streams
.take_while(|v| {
// 虽然 video_streams 是从新到旧的,但由于此处是分页请求,极端情况下可能发生访问完第一页时插入了两整页视频的情况
// 此时获取到的第二页视频比第一页的还要新,因此为了确保正确,理应对每一页的第一个视频进行时间比较
// 但在 streams 的抽象下,无法判断具体是在哪里分页的,所以暂且对每个视频都进行比较,希望不会有太大性能损失
let release_datetime = v.release_datetime();
if release_datetime > &max_datetime {
max_datetime = *release_datetime;
}
futures::future::ready(release_datetime > &latest_row_at)
})
.chunks(10);
let mut count = 0;
while let Some(videos_info) = video_streams.next().await {
got_count += videos_info.len();
let exist_labels = video_list_model.exist_labels(&videos_info, connection).await?;
// 如果发现有视频的收藏时间和 bvid 和数据库中重合,说明到达了上次处理到的地方,可以直接退出
let should_break = videos_info.iter().any(|v| exist_labels.contains(&v.video_key()));
// 将视频信息写入数据库
create_videos(&videos_info, video_list_model.as_ref(), connection).await?;
if should_break {
info!("到达上一次处理的位置,提前中止");
break;
}
count += videos_info.len();
create_videos(&videos_info, video_list_model, connection).await?;
}
new_count = video_list_model.video_count(connection).await? - new_count;
video_list_model.log_refresh_video_end(got_count, new_count);
Ok(video_list_model)
if max_datetime != latest_row_at {
video_list_model
.update_latest_row_at(max_datetime.naive_utc(), connection)
.await?;
}
video_list_model.log_refresh_video_end(count);
Ok(())
}
/// 筛选出所有未获取到全部信息的视频,尝试补充其详细信息
pub async fn fetch_video_details(
bili_client: &BiliClient,
video_list_model: Box<dyn VideoListModel>,
video_list_model: &dyn VideoListModel,
connection: &DatabaseConnection,
) -> Result<Box<dyn VideoListModel>> {
) -> Result<()> {
video_list_model.log_fetch_video_start();
let videos_model = video_list_model.unfilled_videos(connection).await?;
for video_model in videos_model {
@@ -79,13 +92,13 @@ pub async fn fetch_video_details(
.await?;
}
video_list_model.log_fetch_video_end();
Ok(video_list_model)
Ok(())
}
/// 下载所有未处理成功的视频
pub async fn download_unprocessed_videos(
bili_client: &BiliClient,
video_list_model: Box<dyn VideoListModel>,
video_list_model: &dyn VideoListModel,
connection: &DatabaseConnection,
) -> Result<()> {
video_list_model.log_download_video_start();

View File

@@ -13,6 +13,7 @@ pub struct Model {
pub r#type: i32,
pub path: String,
pub created_at: String,
pub latest_row_at: DateTime,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@@ -12,6 +12,7 @@ pub struct Model {
pub name: String,
pub path: String,
pub created_at: String,
pub latest_row_at: DateTime,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@@ -11,6 +11,7 @@ pub struct Model {
pub upper_name: String,
pub path: String,
pub created_at: String,
pub latest_row_at: DateTime,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@@ -9,6 +9,7 @@ pub struct Model {
pub id: i32,
pub path: String,
pub created_at: String,
pub latest_row_at: DateTime,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]

View File

@@ -4,6 +4,7 @@ mod m20240322_000001_create_table;
mod m20240505_130850_add_collection;
mod m20240709_130914_watch_later;
mod m20240724_161008_submission;
mod m20250122_062926_add_latest_row_at;
pub struct Migrator;
@@ -15,6 +16,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240505_130850_add_collection::Migration),
Box::new(m20240709_130914_watch_later::Migration),
Box::new(m20240724_161008_submission::Migration),
Box::new(m20250122_062926_add_latest_row_at::Migration),
]
}
}

View File

@@ -0,0 +1,122 @@
use sea_orm_migration::prelude::*;
use sea_orm_migration::schema::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// 为四张 video list 表添加 latest_row_at 字段,表示该列表处理到的最新时间
manager
.alter_table(
Table::alter()
.table(Favorite::Table)
.add_column(timestamp(Favorite::LatestRowAt).default("1970-01-01 00:00:00"))
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(Collection::Table)
.add_column(timestamp(Collection::LatestRowAt).default("1970-01-01 00:00:00"))
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(WatchLater::Table)
.add_column(timestamp(WatchLater::LatestRowAt).default("1970-01-01 00:00:00"))
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(Submission::Table)
.add_column(timestamp(Submission::LatestRowAt).default("1970-01-01 00:00:00"))
.to_owned(),
)
.await?;
// 手动写 SQL 更新这四张表的 latest 字段到当前取值
let db = manager.get_connection();
db.execute_unprepared(
"UPDATE favorite SET latest_row_at = (SELECT IFNULL(MAX(favtime), '1970-01-01 00:00:00') FROM video WHERE favorite_id = favorite.id)",
)
.await?;
db.execute_unprepared(
"UPDATE collection SET latest_row_at = (SELECT IFNULL(MAX(pubtime), '1970-01-01 00:00:00') FROM video WHERE collection_id = collection.id)",
)
.await?;
db.execute_unprepared(
"UPDATE watch_later SET latest_row_at = (SELECT IFNULL(MAX(favtime), '1970-01-01 00:00:00') FROM video WHERE watch_later_id = watch_later.id)",
)
.await?;
db.execute_unprepared(
"UPDATE submission SET latest_row_at = (SELECT IFNULL(MAX(ctime), '1970-01-01 00:00:00') FROM video WHERE submission_id = submission.id)",
)
.await?;
Ok(())
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Favorite::Table)
.drop_column(Favorite::LatestRowAt)
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(Collection::Table)
.drop_column(Collection::LatestRowAt)
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(WatchLater::Table)
.drop_column(WatchLater::LatestRowAt)
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(Submission::Table)
.drop_column(Submission::LatestRowAt)
.to_owned(),
)
.await
}
}
#[derive(DeriveIden)]
enum Favorite {
Table,
LatestRowAt,
}
#[derive(DeriveIden)]
enum Collection {
Table,
LatestRowAt,
}
#[derive(DeriveIden)]
enum WatchLater {
Table,
LatestRowAt,
}
#[derive(DeriveIden)]
enum Submission {
Table,
LatestRowAt,
}