refactor: 精简代码,统一逻辑 (#229)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -385,7 +385,6 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"assert_matches",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"bili_sync_entity",
|
||||
"bili_sync_migration",
|
||||
"chrono",
|
||||
|
||||
@@ -12,7 +12,6 @@ readme = "../../README.md"
|
||||
anyhow = { workspace = true }
|
||||
arc-swap = { workspace = true }
|
||||
async-stream = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
bili_sync_entity = { workspace = true }
|
||||
bili_sync_migration = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
|
||||
@@ -2,99 +2,39 @@ use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use bili_sync_entity::*;
|
||||
use futures::Stream;
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::sea_query::{IntoCondition, OnConflict};
|
||||
use sea_orm::sea_query::{OnConflict, SimpleExpr};
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged};
|
||||
use sea_orm::{DatabaseConnection, Unchanged};
|
||||
|
||||
use crate::adapter::{helper, VideoListModel};
|
||||
use crate::bilibili::{self, BiliClient, Collection, CollectionItem, CollectionType, VideoInfo};
|
||||
use crate::utils::status::STATUS_COMPLETED;
|
||||
use crate::adapter::{VideoListModel, _ActiveModel};
|
||||
use crate::bilibili::{BiliClient, Collection, CollectionItem, CollectionType, VideoInfo};
|
||||
|
||||
#[async_trait]
|
||||
impl VideoListModel for collection::Model {
|
||||
async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<video::Model>> {
|
||||
helper::filter_videos(
|
||||
video::Column::CollectionId
|
||||
.eq(self.id)
|
||||
.and(video::Column::Valid.eq(true))
|
||||
.and(video::Column::DownloadStatus.eq(0))
|
||||
.and(video::Column::Category.eq(2))
|
||||
.and(video::Column::SinglePage.is_null())
|
||||
.into_condition(),
|
||||
connection,
|
||||
)
|
||||
.await
|
||||
fn filter_expr(&self) -> SimpleExpr {
|
||||
video::Column::CollectionId.eq(self.id)
|
||||
}
|
||||
|
||||
async fn unhandled_video_pages(
|
||||
&self,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<Vec<(video::Model, Vec<page::Model>)>> {
|
||||
helper::filter_videos_with_pages(
|
||||
video::Column::CollectionId
|
||||
.eq(self.id)
|
||||
.and(video::Column::Valid.eq(true))
|
||||
.and(video::Column::DownloadStatus.lt(STATUS_COMPLETED))
|
||||
.and(video::Column::Category.eq(2))
|
||||
.and(video::Column::SinglePage.is_not_null())
|
||||
.into_condition(),
|
||||
connection,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option<video::Model>) -> video::ActiveModel {
|
||||
let mut video_model = video_info.to_model(base_model);
|
||||
fn set_relation_id(&self, video_model: &mut video::ActiveModel) {
|
||||
video_model.collection_id = Set(Some(self.id));
|
||||
helper::video_with_path(video_model, &self.path, video_info)
|
||||
}
|
||||
|
||||
async fn fetch_videos_detail(
|
||||
&self,
|
||||
video: bilibili::Video<'_>,
|
||||
video_model: video::Model,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<()> {
|
||||
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await;
|
||||
match info {
|
||||
Ok((tags, view_info)) => {
|
||||
let VideoInfo::Detail { pages, .. } = &view_info else {
|
||||
unreachable!("view_info must be VideoInfo::View")
|
||||
};
|
||||
let txn = connection.begin().await?;
|
||||
// 将分页信息写入数据库
|
||||
helper::create_video_pages(pages, &video_model, &txn).await?;
|
||||
// 将页标记和 tag 写入数据库
|
||||
let mut video_active_model = self.video_model_by_info(&view_info, Some(video_model));
|
||||
video_active_model.single_page = Set(Some(pages.len() == 1));
|
||||
video_active_model.tags = Set(Some(serde_json::to_value(tags)?));
|
||||
video_active_model.save(&txn).await?;
|
||||
txn.commit().await?;
|
||||
}
|
||||
Err(e) => {
|
||||
helper::error_fetch_video_detail(e, video_model, connection).await?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
fn path(&self) -> &Path {
|
||||
Path::new(self.path.as_str())
|
||||
}
|
||||
|
||||
fn get_latest_row_at(&self) -> DateTime {
|
||||
self.latest_row_at
|
||||
}
|
||||
|
||||
async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> {
|
||||
collection::ActiveModel {
|
||||
fn update_latest_row_at(&self, datetime: DateTime) -> _ActiveModel {
|
||||
_ActiveModel::Collection(collection::ActiveModel {
|
||||
id: Unchanged(self.id),
|
||||
latest_row_at: Set(datetime),
|
||||
..Default::default()
|
||||
}
|
||||
.update(connection)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn log_fetch_video_start(&self) {
|
||||
|
||||
@@ -2,96 +2,39 @@ use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use bili_sync_entity::*;
|
||||
use futures::Stream;
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::sea_query::{IntoCondition, OnConflict};
|
||||
use sea_orm::sea_query::{OnConflict, SimpleExpr};
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged};
|
||||
use sea_orm::{DatabaseConnection, Unchanged};
|
||||
|
||||
use crate::adapter::{helper, VideoListModel};
|
||||
use crate::bilibili::{self, BiliClient, FavoriteList, VideoInfo};
|
||||
use crate::utils::status::STATUS_COMPLETED;
|
||||
use crate::adapter::{VideoListModel, _ActiveModel};
|
||||
use crate::bilibili::{BiliClient, FavoriteList, VideoInfo};
|
||||
|
||||
#[async_trait]
|
||||
impl VideoListModel for favorite::Model {
|
||||
async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<video::Model>> {
|
||||
helper::filter_videos(
|
||||
video::Column::FavoriteId
|
||||
.eq(self.id)
|
||||
.and(video::Column::Valid.eq(true))
|
||||
.and(video::Column::DownloadStatus.eq(0))
|
||||
.and(video::Column::Category.eq(2))
|
||||
.and(video::Column::SinglePage.is_null())
|
||||
.into_condition(),
|
||||
connection,
|
||||
)
|
||||
.await
|
||||
fn filter_expr(&self) -> SimpleExpr {
|
||||
video::Column::FavoriteId.eq(self.id)
|
||||
}
|
||||
|
||||
async fn unhandled_video_pages(
|
||||
&self,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<Vec<(video::Model, Vec<page::Model>)>> {
|
||||
helper::filter_videos_with_pages(
|
||||
video::Column::FavoriteId
|
||||
.eq(self.id)
|
||||
.and(video::Column::Valid.eq(true))
|
||||
.and(video::Column::DownloadStatus.lt(STATUS_COMPLETED))
|
||||
.and(video::Column::Category.eq(2))
|
||||
.and(video::Column::SinglePage.is_not_null())
|
||||
.into_condition(),
|
||||
connection,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option<video::Model>) -> video::ActiveModel {
|
||||
let mut video_model = video_info.to_model(base_model);
|
||||
fn set_relation_id(&self, video_model: &mut video::ActiveModel) {
|
||||
video_model.favorite_id = Set(Some(self.id));
|
||||
helper::video_with_path(video_model, &self.path, video_info)
|
||||
}
|
||||
|
||||
async fn fetch_videos_detail(
|
||||
&self,
|
||||
video: bilibili::Video<'_>,
|
||||
video_model: video::Model,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<()> {
|
||||
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_pages().await?)) }.await;
|
||||
match info {
|
||||
Ok((tags, pages_info)) => {
|
||||
let txn = connection.begin().await?;
|
||||
// 将分页信息写入数据库
|
||||
helper::create_video_pages(&pages_info, &video_model, &txn).await?;
|
||||
// 将页标记和 tag 写入数据库
|
||||
let mut video_active_model: video::ActiveModel = video_model.into();
|
||||
video_active_model.single_page = Set(Some(pages_info.len() == 1));
|
||||
video_active_model.tags = Set(Some(serde_json::to_value(tags)?));
|
||||
video_active_model.save(&txn).await?;
|
||||
txn.commit().await?;
|
||||
}
|
||||
Err(e) => {
|
||||
helper::error_fetch_video_detail(e, video_model, connection).await?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
fn path(&self) -> &Path {
|
||||
Path::new(self.path.as_str())
|
||||
}
|
||||
|
||||
fn get_latest_row_at(&self) -> DateTime {
|
||||
self.latest_row_at
|
||||
}
|
||||
|
||||
async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> {
|
||||
favorite::ActiveModel {
|
||||
fn update_latest_row_at(&self, datetime: DateTime) -> _ActiveModel {
|
||||
_ActiveModel::Favorite(favorite::ActiveModel {
|
||||
id: Unchanged(self.id),
|
||||
latest_row_at: Set(datetime),
|
||||
..Default::default()
|
||||
}
|
||||
.update(connection)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn log_fetch_video_start(&self) {
|
||||
|
||||
@@ -1,112 +0,0 @@
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Result;
|
||||
use bili_sync_entity::*;
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::sea_query::OnConflict;
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::{Condition, DatabaseTransaction};
|
||||
|
||||
use crate::bilibili::{BiliError, PageInfo, VideoInfo};
|
||||
use crate::config::{PathSafeTemplate, TEMPLATE};
|
||||
|
||||
/// 使用 condition 筛选视频,返回视频列表
|
||||
pub(super) async fn filter_videos(condition: Condition, conn: &DatabaseConnection) -> Result<Vec<video::Model>> {
|
||||
Ok(video::Entity::find().filter(condition).all(conn).await?)
|
||||
}
|
||||
|
||||
/// 使用 condition 筛选视频,返回视频列表和相关的分 P 列表
|
||||
pub(super) async fn filter_videos_with_pages(
|
||||
condition: Condition,
|
||||
conn: &DatabaseConnection,
|
||||
) -> Result<Vec<(video::Model, Vec<page::Model>)>> {
|
||||
Ok(video::Entity::find()
|
||||
.filter(condition)
|
||||
.find_with_related(page::Entity)
|
||||
.all(conn)
|
||||
.await?)
|
||||
}
|
||||
|
||||
/// 返回设置了 path 的视频
|
||||
pub(super) fn video_with_path(
|
||||
mut video_model: video::ActiveModel,
|
||||
base_path: &str,
|
||||
video_info: &VideoInfo,
|
||||
) -> video::ActiveModel {
|
||||
if let Some(fmt_args) = &video_info.to_fmt_args() {
|
||||
video_model.path = Set(Path::new(base_path)
|
||||
.join(
|
||||
TEMPLATE
|
||||
.path_safe_render("video", fmt_args)
|
||||
.expect("template render failed"),
|
||||
)
|
||||
.to_string_lossy()
|
||||
.to_string());
|
||||
}
|
||||
video_model
|
||||
}
|
||||
|
||||
/// 处理获取视频详细信息失败的情况
|
||||
pub(super) async fn error_fetch_video_detail(
|
||||
e: anyhow::Error,
|
||||
video_model: bili_sync_entity::video::Model,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<()> {
|
||||
error!(
|
||||
"获取视频 {} - {} 的详细信息失败,错误为:{}",
|
||||
&video_model.bvid, &video_model.name, e
|
||||
);
|
||||
if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::<BiliError>() {
|
||||
let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into();
|
||||
video_active_model.valid = Set(false);
|
||||
video_active_model.save(connection).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 创建视频的所有分 P
|
||||
pub(crate) async fn create_video_pages(
|
||||
pages_info: &[PageInfo],
|
||||
video_model: &video::Model,
|
||||
connection: &DatabaseTransaction,
|
||||
) -> Result<()> {
|
||||
let page_models = pages_info
|
||||
.iter()
|
||||
.map(move |p| {
|
||||
let (width, height) = match &p.dimension {
|
||||
Some(d) => {
|
||||
if d.rotate == 0 {
|
||||
(Some(d.width), Some(d.height))
|
||||
} else {
|
||||
(Some(d.height), Some(d.width))
|
||||
}
|
||||
}
|
||||
None => (None, None),
|
||||
};
|
||||
page::ActiveModel {
|
||||
video_id: Set(video_model.id),
|
||||
cid: Set(p.cid),
|
||||
pid: Set(p.page),
|
||||
name: Set(p.name.clone()),
|
||||
width: Set(width),
|
||||
height: Set(height),
|
||||
duration: Set(p.duration),
|
||||
image: Set(p.first_frame.clone()),
|
||||
download_status: Set(0),
|
||||
..Default::default()
|
||||
}
|
||||
})
|
||||
.collect::<Vec<page::ActiveModel>>();
|
||||
for page_chunk in page_models.chunks(50) {
|
||||
page::Entity::insert_many(page_chunk.to_vec())
|
||||
.on_conflict(
|
||||
OnConflict::columns([page::Column::VideoId, page::Column::Pid])
|
||||
.do_nothing()
|
||||
.to_owned(),
|
||||
)
|
||||
.do_nothing()
|
||||
.exec(connection)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
mod collection;
|
||||
mod favorite;
|
||||
mod helper;
|
||||
mod submission;
|
||||
mod watch_later;
|
||||
|
||||
@@ -8,16 +7,53 @@ use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use futures::Stream;
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::sea_query::SimpleExpr;
|
||||
use sea_orm::DatabaseConnection;
|
||||
|
||||
use crate::adapter::collection::collection_from;
|
||||
use crate::adapter::favorite::favorite_from;
|
||||
use crate::adapter::submission::submission_from;
|
||||
use crate::adapter::watch_later::watch_later_from;
|
||||
use crate::bilibili::{self, BiliClient, CollectionItem, VideoInfo};
|
||||
use crate::bilibili::{BiliClient, CollectionItem, VideoInfo};
|
||||
|
||||
pub trait VideoListModel {
|
||||
/// 获取特定视频列表的筛选条件
|
||||
fn filter_expr(&self) -> SimpleExpr;
|
||||
|
||||
// 为 video_model 设置该视频列表的关联 id
|
||||
fn set_relation_id(&self, video_model: &mut bili_sync_entity::video::ActiveModel);
|
||||
|
||||
// 获取视频列表的保存路径
|
||||
fn path(&self) -> &Path;
|
||||
|
||||
/// 获取视频 model 中记录的最新时间
|
||||
fn get_latest_row_at(&self) -> DateTime;
|
||||
|
||||
/// 更新视频 model 中记录的最新时间,此处返回需要更新的 ActiveModel,接着调用 save 方法执行保存
|
||||
/// 不同 VideoListModel 返回的类型不同,为了 VideoListModel 的 object safety 不能使用 impl Trait
|
||||
/// Box<dyn ActiveModelTrait> 又提示 ActiveModelTrait 没有 object safety,因此手写一个 Enum 静态分发
|
||||
fn update_latest_row_at(&self, datetime: DateTime) -> _ActiveModel;
|
||||
|
||||
/// 开始获取视频
|
||||
fn log_fetch_video_start(&self);
|
||||
|
||||
/// 结束获取视频
|
||||
fn log_fetch_video_end(&self);
|
||||
|
||||
/// 开始下载视频
|
||||
fn log_download_video_start(&self);
|
||||
|
||||
/// 结束下载视频
|
||||
fn log_download_video_end(&self);
|
||||
|
||||
/// 开始刷新视频
|
||||
fn log_refresh_video_start(&self);
|
||||
|
||||
/// 结束刷新视频
|
||||
fn log_refresh_video_end(&self, count: usize);
|
||||
}
|
||||
|
||||
pub enum Args<'a> {
|
||||
Favorite { fid: &'a str },
|
||||
@@ -40,53 +76,29 @@ pub async fn video_list_from<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait VideoListModel {
|
||||
/// 未填充的视频
|
||||
async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<bili_sync_entity::video::Model>>;
|
||||
|
||||
/// 未处理的视频和分页
|
||||
async fn unhandled_video_pages(
|
||||
&self,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<Vec<(bili_sync_entity::video::Model, Vec<bili_sync_entity::page::Model>)>>;
|
||||
|
||||
/// 视频信息对应的视频 model
|
||||
fn video_model_by_info(
|
||||
&self,
|
||||
video_info: &VideoInfo,
|
||||
base_model: Option<bili_sync_entity::video::Model>,
|
||||
) -> bili_sync_entity::video::ActiveModel;
|
||||
|
||||
/// 视频 model 中缺失的信息
|
||||
async fn fetch_videos_detail(
|
||||
&self,
|
||||
video: bilibili::Video<'_>,
|
||||
video_model: bili_sync_entity::video::Model,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<()>;
|
||||
|
||||
/// 获取视频 model 中记录的最新时间
|
||||
fn get_latest_row_at(&self) -> DateTime;
|
||||
|
||||
/// 更新视频 model 中记录的最新时间
|
||||
async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()>;
|
||||
|
||||
/// 开始获取视频
|
||||
fn log_fetch_video_start(&self);
|
||||
|
||||
/// 结束获取视频
|
||||
fn log_fetch_video_end(&self);
|
||||
|
||||
/// 开始下载视频
|
||||
fn log_download_video_start(&self);
|
||||
|
||||
/// 结束下载视频
|
||||
fn log_download_video_end(&self);
|
||||
|
||||
/// 开始刷新视频
|
||||
fn log_refresh_video_start(&self);
|
||||
|
||||
/// 结束刷新视频
|
||||
fn log_refresh_video_end(&self, count: usize);
|
||||
pub enum _ActiveModel {
|
||||
Favorite(bili_sync_entity::favorite::ActiveModel),
|
||||
Collection(bili_sync_entity::collection::ActiveModel),
|
||||
Submission(bili_sync_entity::submission::ActiveModel),
|
||||
WatchLater(bili_sync_entity::watch_later::ActiveModel),
|
||||
}
|
||||
|
||||
impl _ActiveModel {
|
||||
pub async fn save(self, connection: &DatabaseConnection) -> Result<()> {
|
||||
match self {
|
||||
_ActiveModel::Favorite(model) => {
|
||||
model.save(connection).await?;
|
||||
}
|
||||
_ActiveModel::Collection(model) => {
|
||||
model.save(connection).await?;
|
||||
}
|
||||
_ActiveModel::Submission(model) => {
|
||||
model.save(connection).await?;
|
||||
}
|
||||
_ActiveModel::WatchLater(model) => {
|
||||
model.save(connection).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,100 +2,39 @@ use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use bili_sync_entity::*;
|
||||
use futures::Stream;
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::sea_query::{IntoCondition, OnConflict};
|
||||
use sea_orm::sea_query::{OnConflict, SimpleExpr};
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged};
|
||||
use sea_orm::{DatabaseConnection, Unchanged};
|
||||
|
||||
use crate::adapter::helper::video_with_path;
|
||||
use crate::adapter::{helper, VideoListModel};
|
||||
use crate::bilibili::{self, BiliClient, Submission, VideoInfo};
|
||||
use crate::utils::status::STATUS_COMPLETED;
|
||||
use crate::adapter::{VideoListModel, _ActiveModel};
|
||||
use crate::bilibili::{BiliClient, Submission, VideoInfo};
|
||||
|
||||
#[async_trait]
|
||||
impl VideoListModel for submission::Model {
|
||||
async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<video::Model>> {
|
||||
helper::filter_videos(
|
||||
video::Column::SubmissionId
|
||||
.eq(self.id)
|
||||
.and(video::Column::Valid.eq(true))
|
||||
.and(video::Column::DownloadStatus.eq(0))
|
||||
.and(video::Column::Category.eq(2))
|
||||
.and(video::Column::SinglePage.is_null())
|
||||
.into_condition(),
|
||||
connection,
|
||||
)
|
||||
.await
|
||||
fn filter_expr(&self) -> SimpleExpr {
|
||||
video::Column::SubmissionId.eq(self.id)
|
||||
}
|
||||
|
||||
async fn unhandled_video_pages(
|
||||
&self,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<Vec<(video::Model, Vec<page::Model>)>> {
|
||||
helper::filter_videos_with_pages(
|
||||
video::Column::SubmissionId
|
||||
.eq(self.id)
|
||||
.and(video::Column::Valid.eq(true))
|
||||
.and(video::Column::DownloadStatus.lt(STATUS_COMPLETED))
|
||||
.and(video::Column::Category.eq(2))
|
||||
.and(video::Column::SinglePage.is_not_null())
|
||||
.into_condition(),
|
||||
connection,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option<video::Model>) -> video::ActiveModel {
|
||||
let mut video_model = video_info.to_model(base_model);
|
||||
fn set_relation_id(&self, video_model: &mut video::ActiveModel) {
|
||||
video_model.submission_id = Set(Some(self.id));
|
||||
video_with_path(video_model, &self.path, video_info)
|
||||
}
|
||||
|
||||
async fn fetch_videos_detail(
|
||||
&self,
|
||||
video: bilibili::Video<'_>,
|
||||
video_model: video::Model,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<()> {
|
||||
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await;
|
||||
match info {
|
||||
Ok((tags, view_info)) => {
|
||||
let VideoInfo::Detail { pages, .. } = &view_info else {
|
||||
unreachable!("view_info must be VideoInfo::View")
|
||||
};
|
||||
let txn = connection.begin().await?;
|
||||
// 将分页信息写入数据库
|
||||
helper::create_video_pages(pages, &video_model, &txn).await?;
|
||||
// 将页标记和 tag 写入数据库
|
||||
let mut video_active_model = self.video_model_by_info(&view_info, Some(video_model));
|
||||
video_active_model.single_page = Set(Some(pages.len() == 1));
|
||||
video_active_model.tags = Set(Some(serde_json::to_value(tags)?));
|
||||
video_active_model.save(&txn).await?;
|
||||
txn.commit().await?;
|
||||
}
|
||||
Err(e) => {
|
||||
helper::error_fetch_video_detail(e, video_model, connection).await?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
fn path(&self) -> &Path {
|
||||
Path::new(self.path.as_str())
|
||||
}
|
||||
|
||||
fn get_latest_row_at(&self) -> DateTime {
|
||||
self.latest_row_at
|
||||
}
|
||||
|
||||
async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> {
|
||||
submission::ActiveModel {
|
||||
fn update_latest_row_at(&self, datetime: DateTime) -> _ActiveModel {
|
||||
_ActiveModel::Submission(submission::ActiveModel {
|
||||
id: Unchanged(self.id),
|
||||
latest_row_at: Set(datetime),
|
||||
..Default::default()
|
||||
}
|
||||
.update(connection)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn log_fetch_video_start(&self) {
|
||||
|
||||
@@ -2,97 +2,39 @@ use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use bili_sync_entity::*;
|
||||
use futures::Stream;
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::sea_query::{IntoCondition, OnConflict};
|
||||
use sea_orm::sea_query::{OnConflict, SimpleExpr};
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::{DatabaseConnection, TransactionTrait, Unchanged};
|
||||
use sea_orm::{DatabaseConnection, Unchanged};
|
||||
|
||||
use crate::adapter::helper::video_with_path;
|
||||
use crate::adapter::{helper, VideoListModel};
|
||||
use crate::bilibili::{self, BiliClient, VideoInfo, WatchLater};
|
||||
use crate::utils::status::STATUS_COMPLETED;
|
||||
use crate::adapter::{VideoListModel, _ActiveModel};
|
||||
use crate::bilibili::{BiliClient, VideoInfo, WatchLater};
|
||||
|
||||
#[async_trait]
|
||||
impl VideoListModel for watch_later::Model {
|
||||
async fn unfilled_videos(&self, connection: &DatabaseConnection) -> Result<Vec<video::Model>> {
|
||||
helper::filter_videos(
|
||||
video::Column::WatchLaterId
|
||||
.eq(self.id)
|
||||
.and(video::Column::Valid.eq(true))
|
||||
.and(video::Column::DownloadStatus.eq(0))
|
||||
.and(video::Column::Category.eq(2))
|
||||
.and(video::Column::SinglePage.is_null())
|
||||
.into_condition(),
|
||||
connection,
|
||||
)
|
||||
.await
|
||||
fn filter_expr(&self) -> SimpleExpr {
|
||||
video::Column::WatchLaterId.eq(self.id)
|
||||
}
|
||||
|
||||
async fn unhandled_video_pages(
|
||||
&self,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<Vec<(video::Model, Vec<page::Model>)>> {
|
||||
helper::filter_videos_with_pages(
|
||||
video::Column::WatchLaterId
|
||||
.eq(self.id)
|
||||
.and(video::Column::Valid.eq(true))
|
||||
.and(video::Column::DownloadStatus.lt(STATUS_COMPLETED))
|
||||
.and(video::Column::Category.eq(2))
|
||||
.and(video::Column::SinglePage.is_not_null())
|
||||
.into_condition(),
|
||||
connection,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn video_model_by_info(&self, video_info: &VideoInfo, base_model: Option<video::Model>) -> video::ActiveModel {
|
||||
let mut video_model = video_info.to_model(base_model);
|
||||
fn set_relation_id(&self, video_model: &mut video::ActiveModel) {
|
||||
video_model.watch_later_id = Set(Some(self.id));
|
||||
video_with_path(video_model, &self.path, video_info)
|
||||
}
|
||||
|
||||
async fn fetch_videos_detail(
|
||||
&self,
|
||||
video: bilibili::Video<'_>,
|
||||
video_model: video::Model,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<()> {
|
||||
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_pages().await?)) }.await;
|
||||
match info {
|
||||
Ok((tags, pages_info)) => {
|
||||
let txn = connection.begin().await?;
|
||||
// 将分页信息写入数据库
|
||||
helper::create_video_pages(&pages_info, &video_model, &txn).await?;
|
||||
// 将页标记和 tag 写入数据库
|
||||
let mut video_active_model: video::ActiveModel = video_model.into();
|
||||
video_active_model.single_page = Set(Some(pages_info.len() == 1));
|
||||
video_active_model.tags = Set(Some(serde_json::to_value(tags)?));
|
||||
video_active_model.save(&txn).await?;
|
||||
txn.commit().await?;
|
||||
}
|
||||
Err(e) => {
|
||||
helper::error_fetch_video_detail(e, video_model, connection).await?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
fn path(&self) -> &Path {
|
||||
Path::new(self.path.as_str())
|
||||
}
|
||||
|
||||
fn get_latest_row_at(&self) -> DateTime {
|
||||
self.latest_row_at
|
||||
}
|
||||
|
||||
async fn update_latest_row_at(&self, datetime: DateTime, connection: &DatabaseConnection) -> Result<()> {
|
||||
watch_later::ActiveModel {
|
||||
fn update_latest_row_at(&self, datetime: DateTime) -> _ActiveModel {
|
||||
_ActiveModel::WatchLater(watch_later::ActiveModel {
|
||||
id: Unchanged(self.id),
|
||||
latest_row_at: Set(datetime),
|
||||
..Default::default()
|
||||
}
|
||||
.update(connection)
|
||||
.await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn log_fetch_video_start(&self) {
|
||||
|
||||
@@ -43,7 +43,6 @@ impl PartialOrd for AudioQuality {
|
||||
}
|
||||
|
||||
impl AudioQuality {
|
||||
#[inline]
|
||||
pub fn as_sort_key(&self) -> isize {
|
||||
match self {
|
||||
// 这可以让 Dolby 和 Hi-RES 排在 192k 之后,且 Dolby 和 Hi-RES 之间的顺序不变
|
||||
|
||||
@@ -222,7 +222,6 @@ pub fn encoded_query<'a>(
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn _encoded_query<'a>(
|
||||
params: Vec<(&'a str, impl Into<Cow<'a, str>>)>,
|
||||
mixin_key: &str,
|
||||
|
||||
@@ -30,7 +30,6 @@ mod watch_later;
|
||||
|
||||
static MIXIN_KEY: Lazy<ArcSwapOption<String>> = Lazy::new(Default::default);
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn set_global_mixin_key(key: String) {
|
||||
MIXIN_KEY.store(Some(Arc::new(key)));
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ impl<'a> Video<'a> {
|
||||
Self { client, aid, bvid }
|
||||
}
|
||||
|
||||
/// 直接调用视频信息接口获取详细的视频信息
|
||||
/// 直接调用视频信息接口获取详细的视频信息,视频信息中包含了视频的分页信息
|
||||
pub async fn get_view_info(&self) -> Result<VideoInfo> {
|
||||
let mut res = self
|
||||
.client
|
||||
@@ -78,6 +78,7 @@ impl<'a> Video<'a> {
|
||||
Ok(serde_json::from_value(res["data"].take())?)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub async fn get_pages(&self) -> Result<Vec<PageInfo>> {
|
||||
let mut res = self
|
||||
.client
|
||||
|
||||
@@ -39,7 +39,6 @@ pub static CONFIG_DIR: Lazy<PathBuf> =
|
||||
Lazy::new(|| dirs::config_dir().expect("No config path found").join("bili-sync"));
|
||||
|
||||
#[cfg(not(test))]
|
||||
#[inline]
|
||||
fn load_config() -> Config {
|
||||
let config = Config::load().unwrap_or_else(|err| {
|
||||
if err
|
||||
@@ -57,11 +56,12 @@ fn load_config() -> Config {
|
||||
// 检查配置文件内容
|
||||
info!("校验配置文件内容...");
|
||||
config.check();
|
||||
info!("配置文件内容校验通过");
|
||||
config
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[inline]
|
||||
|
||||
fn load_config() -> Config {
|
||||
let credential = match (
|
||||
std::env::var("TEST_SESSDATA"),
|
||||
|
||||
@@ -1,23 +1,17 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||
use sea_orm::ActiveValue::{NotSet, Set};
|
||||
use sea_orm::IntoActiveModel;
|
||||
use serde_json::json;
|
||||
|
||||
use crate::bilibili::VideoInfo;
|
||||
use crate::config::CONFIG;
|
||||
use crate::bilibili::{PageInfo, VideoInfo};
|
||||
|
||||
impl VideoInfo {
|
||||
/// 将 VideoInfo 转换为 ActiveModel
|
||||
pub fn to_model(&self, base_model: Option<bili_sync_entity::video::Model>) -> bili_sync_entity::video::ActiveModel {
|
||||
let base_model = match base_model {
|
||||
Some(base_model) => base_model.into_active_model(),
|
||||
None => {
|
||||
let mut tmp_model = bili_sync_entity::video::Model::default().into_active_model();
|
||||
// 注意此处要把 id 和 created_at 设置为 NotSet,方便在 sql 中忽略这些字段,交由数据库自动生成
|
||||
tmp_model.id = NotSet;
|
||||
tmp_model.created_at = NotSet;
|
||||
tmp_model
|
||||
}
|
||||
/// 在检测视频更新时,通过该方法将 VideoInfo 转换为简单的 ActiveModel,此处仅填充一些简单信息,后续会使用详情覆盖
|
||||
pub fn into_simple_model(self) -> bili_sync_entity::video::ActiveModel {
|
||||
let default = bili_sync_entity::video::ActiveModel {
|
||||
id: NotSet,
|
||||
created_at: NotSet,
|
||||
// 此处不使用 ActiveModel::default() 是为了让其它字段有默认值
|
||||
..bili_sync_entity::video::Model::default().into_active_model()
|
||||
};
|
||||
match self {
|
||||
VideoInfo::Collection {
|
||||
@@ -26,13 +20,13 @@ impl VideoInfo {
|
||||
ctime,
|
||||
pubtime,
|
||||
} => bili_sync_entity::video::ActiveModel {
|
||||
bvid: Set(bvid.clone()),
|
||||
cover: Set(cover.clone()),
|
||||
bvid: Set(bvid),
|
||||
cover: Set(cover),
|
||||
ctime: Set(ctime.naive_utc()),
|
||||
pubtime: Set(pubtime.naive_utc()),
|
||||
category: Set(2), // 视频合集里的内容类型肯定是视频
|
||||
valid: Set(true),
|
||||
..base_model
|
||||
..default
|
||||
},
|
||||
VideoInfo::Favorite {
|
||||
title,
|
||||
@@ -46,50 +40,20 @@ impl VideoInfo {
|
||||
pubtime,
|
||||
attr,
|
||||
} => bili_sync_entity::video::ActiveModel {
|
||||
bvid: Set(bvid.clone()),
|
||||
name: Set(title.clone()),
|
||||
category: Set(*vtype),
|
||||
intro: Set(intro.clone()),
|
||||
cover: Set(cover.clone()),
|
||||
bvid: Set(bvid),
|
||||
name: Set(title),
|
||||
category: Set(vtype),
|
||||
intro: Set(intro),
|
||||
cover: Set(cover),
|
||||
ctime: Set(ctime.naive_utc()),
|
||||
pubtime: Set(pubtime.naive_utc()),
|
||||
favtime: Set(fav_time.naive_utc()),
|
||||
download_status: Set(0),
|
||||
valid: Set(*attr == 0),
|
||||
tags: Set(None),
|
||||
single_page: Set(None),
|
||||
valid: Set(attr == 0),
|
||||
upper_id: Set(upper.mid),
|
||||
upper_name: Set(upper.name.clone()),
|
||||
upper_face: Set(upper.face.clone()),
|
||||
..base_model
|
||||
},
|
||||
VideoInfo::Detail {
|
||||
title,
|
||||
bvid,
|
||||
intro,
|
||||
cover,
|
||||
upper,
|
||||
ctime,
|
||||
pubtime,
|
||||
state,
|
||||
..
|
||||
} => bili_sync_entity::video::ActiveModel {
|
||||
bvid: Set(bvid.clone()),
|
||||
name: Set(title.clone()),
|
||||
category: Set(2), // 视频合集里的内容类型肯定是视频
|
||||
intro: Set(intro.clone()),
|
||||
cover: Set(cover.clone()),
|
||||
ctime: Set(ctime.naive_utc()),
|
||||
pubtime: Set(pubtime.naive_utc()),
|
||||
favtime: Set(pubtime.naive_utc()), // 合集不包括 fav_time,使用发布时间代替
|
||||
download_status: Set(0),
|
||||
valid: Set(*state == 0),
|
||||
tags: Set(None),
|
||||
single_page: Set(None),
|
||||
upper_id: Set(upper.mid),
|
||||
upper_name: Set(upper.name.clone()),
|
||||
upper_face: Set(upper.face.clone()),
|
||||
..base_model
|
||||
upper_name: Set(upper.name),
|
||||
upper_face: Set(upper.face),
|
||||
..default
|
||||
},
|
||||
VideoInfo::WatchLater {
|
||||
title,
|
||||
@@ -102,22 +66,20 @@ impl VideoInfo {
|
||||
pubtime,
|
||||
state,
|
||||
} => bili_sync_entity::video::ActiveModel {
|
||||
bvid: Set(bvid.clone()),
|
||||
name: Set(title.clone()),
|
||||
bvid: Set(bvid),
|
||||
name: Set(title),
|
||||
category: Set(2), // 稍后再看里的内容类型肯定是视频
|
||||
intro: Set(intro.clone()),
|
||||
cover: Set(cover.clone()),
|
||||
intro: Set(intro),
|
||||
cover: Set(cover),
|
||||
ctime: Set(ctime.naive_utc()),
|
||||
pubtime: Set(pubtime.naive_utc()),
|
||||
favtime: Set(fav_time.naive_utc()),
|
||||
download_status: Set(0),
|
||||
valid: Set(*state == 0),
|
||||
tags: Set(None),
|
||||
single_page: Set(None),
|
||||
valid: Set(state == 0),
|
||||
upper_id: Set(upper.mid),
|
||||
upper_name: Set(upper.name.clone()),
|
||||
upper_face: Set(upper.face.clone()),
|
||||
..base_model
|
||||
upper_name: Set(upper.name),
|
||||
upper_face: Set(upper.face),
|
||||
..default
|
||||
},
|
||||
VideoInfo::Submission {
|
||||
title,
|
||||
@@ -126,64 +88,58 @@ impl VideoInfo {
|
||||
cover,
|
||||
ctime,
|
||||
} => bili_sync_entity::video::ActiveModel {
|
||||
bvid: Set(bvid.clone()),
|
||||
name: Set(title.clone()),
|
||||
intro: Set(intro.clone()),
|
||||
cover: Set(cover.clone()),
|
||||
bvid: Set(bvid),
|
||||
name: Set(title),
|
||||
intro: Set(intro),
|
||||
cover: Set(cover),
|
||||
ctime: Set(ctime.naive_utc()),
|
||||
category: Set(2), // 投稿视频的内容类型肯定是视频
|
||||
valid: Set(true),
|
||||
..base_model
|
||||
..default
|
||||
},
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_fmt_args(&self) -> Option<serde_json::Value> {
|
||||
/// 填充视频详情时调用,该方法会将视频详情附加到原有的 Model 上
|
||||
/// 特殊地,如果在检测视频更新时记录了 favtime,那么 favtime 会维持原样,否则会使用 pubtime 填充
|
||||
pub fn into_detail_model(self, base_model: bili_sync_entity::video::Model) -> bili_sync_entity::video::ActiveModel {
|
||||
match self {
|
||||
VideoInfo::Collection { .. } | VideoInfo::Submission { .. } => None, // 不能从简单视频信息中构造格式化参数
|
||||
VideoInfo::Favorite {
|
||||
title,
|
||||
bvid,
|
||||
upper,
|
||||
pubtime,
|
||||
fav_time,
|
||||
..
|
||||
}
|
||||
| VideoInfo::WatchLater {
|
||||
title,
|
||||
bvid,
|
||||
upper,
|
||||
pubtime,
|
||||
fav_time,
|
||||
..
|
||||
} => Some(json!({
|
||||
"bvid": &bvid,
|
||||
"title": &title,
|
||||
"upper_name": &upper.name,
|
||||
"upper_mid": &upper.mid,
|
||||
"pubtime": pubtime.format(&CONFIG.time_format).to_string(),
|
||||
"fav_time": fav_time.format(&CONFIG.time_format).to_string(),
|
||||
})),
|
||||
VideoInfo::Detail {
|
||||
title,
|
||||
bvid,
|
||||
intro,
|
||||
cover,
|
||||
upper,
|
||||
ctime,
|
||||
pubtime,
|
||||
state,
|
||||
..
|
||||
} => {
|
||||
let pubtime = pubtime.format(&CONFIG.time_format).to_string();
|
||||
Some(json!({
|
||||
"bvid": &bvid,
|
||||
"title": &title,
|
||||
"upper_name": &upper.name,
|
||||
"upper_mid": &upper.mid,
|
||||
"pubtime": &pubtime,
|
||||
"fav_time": &pubtime,
|
||||
}))
|
||||
}
|
||||
} => bili_sync_entity::video::ActiveModel {
|
||||
bvid: Set(bvid),
|
||||
name: Set(title),
|
||||
category: Set(2),
|
||||
intro: Set(intro),
|
||||
cover: Set(cover),
|
||||
ctime: Set(ctime.naive_utc()),
|
||||
pubtime: Set(pubtime.naive_utc()),
|
||||
favtime: if base_model.favtime != NaiveDateTime::default() {
|
||||
NotSet // 之前设置了 favtime,不覆盖
|
||||
} else {
|
||||
Set(pubtime.naive_utc()) // 未设置过 favtime,使用 pubtime 填充
|
||||
},
|
||||
download_status: Set(0),
|
||||
valid: Set(state == 0),
|
||||
upper_id: Set(upper.mid),
|
||||
upper_name: Set(upper.name),
|
||||
upper_face: Set(upper.face),
|
||||
..base_model.into_active_model()
|
||||
},
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// 获取视频的发布时间,用于对时间做筛选检查新视频
|
||||
pub fn release_datetime(&self) -> &DateTime<Utc> {
|
||||
match self {
|
||||
VideoInfo::Collection { pubtime: time, .. }
|
||||
@@ -194,3 +150,33 @@ impl VideoInfo {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PageInfo {
|
||||
pub fn into_active_model(
|
||||
self,
|
||||
video_model: &bili_sync_entity::video::Model,
|
||||
) -> bili_sync_entity::page::ActiveModel {
|
||||
let (width, height) = match &self.dimension {
|
||||
Some(d) => {
|
||||
if d.rotate == 0 {
|
||||
(Some(d.width), Some(d.height))
|
||||
} else {
|
||||
(Some(d.height), Some(d.width))
|
||||
}
|
||||
}
|
||||
None => (None, None),
|
||||
};
|
||||
bili_sync_entity::page::ActiveModel {
|
||||
video_id: Set(video_model.id),
|
||||
cid: Set(self.cid),
|
||||
pid: Set(self.page),
|
||||
name: Set(self.name),
|
||||
width: Set(width),
|
||||
height: Set(height),
|
||||
duration: Set(self.duration),
|
||||
image: Set(self.first_frame),
|
||||
download_status: Set(0),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
30
crates/bili_sync/src/utils/format_arg.rs
Normal file
30
crates/bili_sync/src/utils/format_arg.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use serde_json::json;
|
||||
|
||||
use crate::config::CONFIG;
|
||||
|
||||
pub fn video_format_args(video_model: &bili_sync_entity::video::Model) -> serde_json::Value {
|
||||
json!({
|
||||
"bvid": &video_model.bvid,
|
||||
"title": &video_model.name,
|
||||
"upper_name": &video_model.upper_name,
|
||||
"upper_mid": &video_model.upper_id,
|
||||
"pubtime": &video_model.pubtime.and_utc().format(&CONFIG.time_format).to_string(),
|
||||
"fav_time": &video_model.favtime.and_utc().format(&CONFIG.time_format).to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn page_format_args(
|
||||
video_model: &bili_sync_entity::video::Model,
|
||||
page_model: &bili_sync_entity::page::Model,
|
||||
) -> serde_json::Value {
|
||||
json!({
|
||||
"bvid": &video_model.bvid,
|
||||
"title": &video_model.name,
|
||||
"upper_name": &video_model.upper_name,
|
||||
"upper_mid": &video_model.upper_id,
|
||||
"ptitle": &page_model.name,
|
||||
"pid": page_model.pid,
|
||||
"pubtime": video_model.pubtime.and_utc().format(&CONFIG.time_format).to_string(),
|
||||
"fav_time": video_model.favtime.and_utc().format(&CONFIG.time_format).to_string(),
|
||||
})
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod convert;
|
||||
pub mod filenamify;
|
||||
pub mod format_arg;
|
||||
pub mod model;
|
||||
pub mod nfo;
|
||||
pub mod status;
|
||||
|
||||
@@ -1,20 +1,65 @@
|
||||
use anyhow::Result;
|
||||
use anyhow::{Context, Result};
|
||||
use bili_sync_entity::*;
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::sea_query::OnConflict;
|
||||
use sea_orm::sea_query::{OnConflict, SimpleExpr};
|
||||
use sea_orm::DatabaseTransaction;
|
||||
|
||||
use crate::adapter::VideoListModel;
|
||||
use crate::bilibili::VideoInfo;
|
||||
use crate::bilibili::{PageInfo, VideoInfo};
|
||||
use crate::utils::status::STATUS_COMPLETED;
|
||||
|
||||
/// 筛选未填充的视频
|
||||
pub async fn filter_unfilled_videos(
|
||||
additional_expr: SimpleExpr,
|
||||
conn: &DatabaseConnection,
|
||||
) -> Result<Vec<video::Model>> {
|
||||
video::Entity::find()
|
||||
.filter(
|
||||
video::Column::Valid
|
||||
.eq(true)
|
||||
.and(video::Column::DownloadStatus.eq(0))
|
||||
.and(video::Column::Category.eq(2))
|
||||
.and(video::Column::SinglePage.is_null())
|
||||
.and(additional_expr),
|
||||
)
|
||||
.all(conn)
|
||||
.await
|
||||
.context("filter unfilled videos failed")
|
||||
}
|
||||
|
||||
/// 筛选未处理完成的视频和视频页
|
||||
pub async fn filter_unhandled_video_pages(
|
||||
additional_expr: SimpleExpr,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<Vec<(video::Model, Vec<page::Model>)>> {
|
||||
video::Entity::find()
|
||||
.filter(
|
||||
video::Column::Valid
|
||||
.eq(true)
|
||||
.and(video::Column::DownloadStatus.lt(STATUS_COMPLETED))
|
||||
.and(video::Column::Category.eq(2))
|
||||
.and(video::Column::SinglePage.is_not_null())
|
||||
.and(additional_expr),
|
||||
)
|
||||
.find_with_related(page::Entity)
|
||||
.all(connection)
|
||||
.await
|
||||
.context("filter unhandled video pages failed")
|
||||
}
|
||||
|
||||
/// 尝试创建 Video Model,如果发生冲突则忽略
|
||||
pub async fn create_videos(
|
||||
videos_info: &[VideoInfo],
|
||||
videos_info: Vec<VideoInfo>,
|
||||
video_list_model: &dyn VideoListModel,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<()> {
|
||||
let video_models = videos_info
|
||||
.iter()
|
||||
.map(|v| video_list_model.video_model_by_info(v, None))
|
||||
.into_iter()
|
||||
.map(|v| {
|
||||
let mut model = v.into_simple_model();
|
||||
video_list_model.set_relation_id(&mut model);
|
||||
model
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
video::Entity::insert_many(video_models)
|
||||
// 这里想表达的是 on 索引名,但 sea-orm 的 api 似乎只支持列名而不支持索引名,好在留空可以达到相同的目的
|
||||
@@ -25,6 +70,30 @@ pub async fn create_videos(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 尝试创建 Page Model,如果发生冲突则忽略
|
||||
pub async fn create_pages(
|
||||
pages_info: Vec<PageInfo>,
|
||||
video_model: &bili_sync_entity::video::Model,
|
||||
connection: &DatabaseTransaction,
|
||||
) -> Result<()> {
|
||||
let page_models = pages_info
|
||||
.into_iter()
|
||||
.map(|p| p.into_active_model(video_model))
|
||||
.collect::<Vec<page::ActiveModel>>();
|
||||
for page_chunk in page_models.chunks(50) {
|
||||
page::Entity::insert_many(page_chunk.to_vec())
|
||||
.on_conflict(
|
||||
OnConflict::columns([page::Column::VideoId, page::Column::Pid])
|
||||
.do_nothing()
|
||||
.to_owned(),
|
||||
)
|
||||
.do_nothing()
|
||||
.exec(connection)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// 更新视频 model 的下载状态
|
||||
pub async fn update_videos_model(videos: Vec<video::ActiveModel>, connection: &DatabaseConnection) -> Result<()> {
|
||||
video::Entity::insert_many(videos)
|
||||
|
||||
@@ -8,7 +8,7 @@ use futures::stream::{FuturesOrdered, FuturesUnordered};
|
||||
use futures::{Future, Stream, StreamExt};
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use serde_json::json;
|
||||
use sea_orm::TransactionTrait;
|
||||
use tokio::fs;
|
||||
use tokio::sync::{Mutex, Semaphore};
|
||||
|
||||
@@ -17,10 +17,15 @@ use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Vi
|
||||
use crate::config::{PathSafeTemplate, ARGS, CONFIG, TEMPLATE};
|
||||
use crate::downloader::Downloader;
|
||||
use crate::error::{DownloadAbortError, ProcessPageError};
|
||||
use crate::utils::model::{create_videos, update_pages_model, update_videos_model};
|
||||
use crate::utils::format_arg::{page_format_args, video_format_args};
|
||||
use crate::utils::model::{
|
||||
create_pages, create_videos, filter_unfilled_videos, filter_unhandled_video_pages, update_pages_model,
|
||||
update_videos_model,
|
||||
};
|
||||
use crate::utils::nfo::{ModelWrapper, NFOMode, NFOSerializer};
|
||||
use crate::utils::status::{PageStatus, VideoStatus};
|
||||
|
||||
/// 完整地处理某个视频列表
|
||||
pub async fn process_video_list(
|
||||
args: Args<'_>,
|
||||
bili_client: &BiliClient,
|
||||
@@ -55,7 +60,7 @@ pub async fn refresh_video_list<'a>(
|
||||
.take_while(|v| {
|
||||
// 虽然 video_streams 是从新到旧的,但由于此处是分页请求,极端情况下可能发生访问完第一页时插入了两整页视频的情况
|
||||
// 此时获取到的第二页视频比第一页的还要新,因此为了确保正确,理应对每一页的第一个视频进行时间比较
|
||||
// 但在 streams 的抽象下,无法判断具体是在哪里分页的,所以暂且对每个视频都进行比较,希望不会有太大性能损失
|
||||
// 但在 streams 的抽象下,无法判断具体是在哪里分页的,所以暂且对每个视频都进行比较,应该不会有太大性能损失
|
||||
let release_datetime = v.release_datetime();
|
||||
if release_datetime > &max_datetime {
|
||||
max_datetime = *release_datetime;
|
||||
@@ -66,11 +71,12 @@ pub async fn refresh_video_list<'a>(
|
||||
let mut count = 0;
|
||||
while let Some(videos_info) = video_streams.next().await {
|
||||
count += videos_info.len();
|
||||
create_videos(&videos_info, video_list_model, connection).await?;
|
||||
create_videos(videos_info, video_list_model, connection).await?;
|
||||
}
|
||||
if max_datetime != latest_row_at {
|
||||
video_list_model
|
||||
.update_latest_row_at(max_datetime.naive_utc(), connection)
|
||||
.update_latest_row_at(max_datetime.naive_utc())
|
||||
.save(connection)
|
||||
.await?;
|
||||
}
|
||||
video_list_model.log_refresh_video_end(count);
|
||||
@@ -84,12 +90,39 @@ pub async fn fetch_video_details(
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<()> {
|
||||
video_list_model.log_fetch_video_start();
|
||||
let videos_model = video_list_model.unfilled_videos(connection).await?;
|
||||
let videos_model = filter_unfilled_videos(video_list_model.filter_expr(), connection).await?;
|
||||
for video_model in videos_model {
|
||||
let video = Video::new(bili_client, video_model.bvid.clone());
|
||||
video_list_model
|
||||
.fetch_videos_detail(video, video_model, connection)
|
||||
.await?;
|
||||
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await;
|
||||
match info {
|
||||
Err(e) => {
|
||||
error!(
|
||||
"获取视频 {} - {} 的详细信息失败,错误为:{}",
|
||||
&video_model.bvid, &video_model.name, e
|
||||
);
|
||||
if let Some(BiliError::RequestFailed(-404, _)) = e.downcast_ref::<BiliError>() {
|
||||
let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into();
|
||||
video_active_model.valid = Set(false);
|
||||
video_active_model.save(connection).await?;
|
||||
}
|
||||
}
|
||||
Ok((tags, mut view_info)) => {
|
||||
let VideoInfo::Detail { pages, .. } = &mut view_info else {
|
||||
unreachable!()
|
||||
};
|
||||
let pages = std::mem::take(pages);
|
||||
let pages_len = pages.len();
|
||||
let txn = connection.begin().await?;
|
||||
// 将分页信息写入数据库
|
||||
create_pages(pages, &video_model, &txn).await?;
|
||||
let mut video_active_model = view_info.into_detail_model(video_model);
|
||||
video_list_model.set_relation_id(&mut video_active_model);
|
||||
video_active_model.single_page = Set(Some(pages_len == 1));
|
||||
video_active_model.tags = Set(Some(serde_json::to_value(tags)?));
|
||||
video_active_model.save(&txn).await?;
|
||||
txn.commit().await?;
|
||||
}
|
||||
};
|
||||
}
|
||||
video_list_model.log_fetch_video_end();
|
||||
Ok(())
|
||||
@@ -105,7 +138,7 @@ pub async fn download_unprocessed_videos(
|
||||
let semaphore = Semaphore::new(CONFIG.concurrent_limit.video);
|
||||
let downloader = Downloader::new(bili_client.client.clone());
|
||||
let mut uppers_mutex: HashMap<i64, (Mutex<()>, Mutex<()>)> = HashMap::new();
|
||||
let unhandled_videos_pages = video_list_model.unhandled_video_pages(connection).await?;
|
||||
let unhandled_videos_pages = filter_unhandled_video_pages(video_list_model.filter_expr(), connection).await?;
|
||||
for (video_model, _) in &unhandled_videos_pages {
|
||||
uppers_mutex
|
||||
.entry(video_model.upper_id)
|
||||
@@ -117,12 +150,12 @@ pub async fn download_unprocessed_videos(
|
||||
let upper_mutex = uppers_mutex.get(&video_model.upper_id).expect("upper mutex not found");
|
||||
download_video_pages(
|
||||
bili_client,
|
||||
video_list_model,
|
||||
video_model,
|
||||
pages_model,
|
||||
connection,
|
||||
&semaphore,
|
||||
&downloader,
|
||||
&CONFIG.upper_path,
|
||||
upper_mutex,
|
||||
)
|
||||
})
|
||||
@@ -156,20 +189,23 @@ pub async fn download_unprocessed_videos(
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn download_video_pages(
|
||||
bili_client: &BiliClient,
|
||||
video_list_model: &dyn VideoListModel,
|
||||
video_model: video::Model,
|
||||
pages: Vec<page::Model>,
|
||||
connection: &DatabaseConnection,
|
||||
semaphore: &Semaphore,
|
||||
downloader: &Downloader,
|
||||
upper_path: &Path,
|
||||
upper_mutex: &(Mutex<()>, Mutex<()>),
|
||||
) -> Result<video::ActiveModel> {
|
||||
let _permit = semaphore.acquire().await.context("acquire semaphore failed")?;
|
||||
let mut status = VideoStatus::new(video_model.download_status);
|
||||
let seprate_status = status.should_run();
|
||||
let base_path = Path::new(&video_model.path);
|
||||
let base_path = video_list_model
|
||||
.path()
|
||||
.join(TEMPLATE.path_safe_render("video", &video_format_args(&video_model))?);
|
||||
let upper_id = video_model.upper_id.to_string();
|
||||
let base_upper_path = upper_path
|
||||
let base_upper_path = &CONFIG
|
||||
.upper_path
|
||||
.join(upper_id.chars().next().context("upper_id is empty")?.to_string())
|
||||
.join(upper_id);
|
||||
let is_single_page = video_model.single_page.context("single_page is null")?;
|
||||
@@ -213,6 +249,7 @@ pub async fn download_video_pages(
|
||||
pages,
|
||||
connection,
|
||||
downloader,
|
||||
&base_path,
|
||||
)),
|
||||
];
|
||||
let tasks: FuturesOrdered<_> = tasks.into_iter().collect();
|
||||
@@ -239,6 +276,7 @@ pub async fn download_video_pages(
|
||||
}
|
||||
let mut video_active_model: video::ActiveModel = video_model.into();
|
||||
video_active_model.download_status = Set(status.into());
|
||||
video_active_model.path = Set(base_path.to_string_lossy().to_string());
|
||||
Ok(video_active_model)
|
||||
}
|
||||
|
||||
@@ -250,6 +288,7 @@ pub async fn dispatch_download_page(
|
||||
pages: Vec<page::Model>,
|
||||
connection: &DatabaseConnection,
|
||||
downloader: &Downloader,
|
||||
base_path: &Path,
|
||||
) -> Result<()> {
|
||||
if !should_run {
|
||||
return Ok(());
|
||||
@@ -257,7 +296,16 @@ pub async fn dispatch_download_page(
|
||||
let child_semaphore = Semaphore::new(CONFIG.concurrent_limit.page);
|
||||
let tasks = pages
|
||||
.into_iter()
|
||||
.map(|page_model| download_page(bili_client, video_model, page_model, &child_semaphore, downloader))
|
||||
.map(|page_model| {
|
||||
download_page(
|
||||
bili_client,
|
||||
video_model,
|
||||
page_model,
|
||||
&child_semaphore,
|
||||
downloader,
|
||||
base_path,
|
||||
)
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
let (mut download_aborted, mut error_occurred) = (false, false);
|
||||
let mut stream = tasks
|
||||
@@ -311,25 +359,13 @@ pub async fn download_page(
|
||||
page_model: page::Model,
|
||||
semaphore: &Semaphore,
|
||||
downloader: &Downloader,
|
||||
base_path: &Path,
|
||||
) -> Result<page::ActiveModel> {
|
||||
let _permit = semaphore.acquire().await.context("acquire semaphore failed")?;
|
||||
let mut status = PageStatus::new(page_model.download_status);
|
||||
let seprate_status = status.should_run();
|
||||
let is_single_page = video_model.single_page.context("single_page is null")?;
|
||||
let base_path = Path::new(&video_model.path);
|
||||
let base_name = TEMPLATE.path_safe_render(
|
||||
"page",
|
||||
&json!({
|
||||
"bvid": &video_model.bvid,
|
||||
"title": &video_model.name,
|
||||
"upper_name": &video_model.upper_name,
|
||||
"upper_mid": &video_model.upper_id,
|
||||
"ptitle": &page_model.name,
|
||||
"pid": page_model.pid,
|
||||
"pubtime": video_model.pubtime.format(&CONFIG.time_format).to_string(),
|
||||
"fav_time": video_model.favtime.format(&CONFIG.time_format).to_string(),
|
||||
}),
|
||||
)?;
|
||||
let base_name = TEMPLATE.path_safe_render("page", &page_format_args(video_model, &page_model))?;
|
||||
let (poster_path, video_path, nfo_path, danmaku_path, fanart_path) = if is_single_page {
|
||||
(
|
||||
base_path.join(format!("{}-poster.jpg", &base_name)),
|
||||
@@ -385,7 +421,7 @@ pub async fn download_page(
|
||||
video_model,
|
||||
downloader,
|
||||
&page_info,
|
||||
video_path.clone(),
|
||||
&video_path,
|
||||
)),
|
||||
Box::pin(generate_page_nfo(seprate_status[2], video_model, &page_model, nfo_path)),
|
||||
Box::pin(fetch_page_danmaku(
|
||||
@@ -459,7 +495,7 @@ pub async fn fetch_page_video(
|
||||
video_model: &video::Model,
|
||||
downloader: &Downloader,
|
||||
page_info: &PageInfo,
|
||||
page_path: PathBuf,
|
||||
page_path: &Path,
|
||||
) -> Result<()> {
|
||||
if !should_run {
|
||||
return Ok(());
|
||||
@@ -470,11 +506,11 @@ pub async fn fetch_page_video(
|
||||
.await?
|
||||
.best_stream(&CONFIG.filter_option)?;
|
||||
match streams {
|
||||
BestStream::Mixed(mix_stream) => downloader.fetch(mix_stream.url(), &page_path).await,
|
||||
BestStream::Mixed(mix_stream) => downloader.fetch(mix_stream.url(), page_path).await,
|
||||
BestStream::VideoAudio {
|
||||
video: video_stream,
|
||||
audio: None,
|
||||
} => downloader.fetch(video_stream.url(), &page_path).await,
|
||||
} => downloader.fetch(video_stream.url(), page_path).await,
|
||||
BestStream::VideoAudio {
|
||||
video: video_stream,
|
||||
audio: Some(audio_stream),
|
||||
@@ -486,7 +522,7 @@ pub async fn fetch_page_video(
|
||||
let res = async {
|
||||
downloader.fetch(video_stream.url(), &tmp_video_path).await?;
|
||||
downloader.fetch(audio_stream.url(), &tmp_audio_path).await?;
|
||||
downloader.merge(&tmp_video_path, &tmp_audio_path, &page_path).await
|
||||
downloader.merge(&tmp_video_path, &tmp_audio_path, page_path).await
|
||||
}
|
||||
.await;
|
||||
let _ = fs::remove_file(tmp_video_path).await;
|
||||
@@ -606,6 +642,7 @@ async fn generate_nfo(serializer: NFOSerializer<'_>, nfo_path: PathBuf) -> Resul
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use handlebars::handlebars_helper;
|
||||
use serde_json::json;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user