feat: 实现全部的下载逻辑

This commit is contained in:
amtoaer
2024-03-31 00:43:25 +08:00
parent 937ae2c157
commit 5540c46541
3 changed files with 191 additions and 36 deletions

View File

@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use entity::{favorite, page, video};
use futures::stream::FuturesUnordered;
@@ -13,9 +13,9 @@ use sea_orm::ActiveValue::Set;
use sea_orm::TryIntoModel;
use serde_json::json;
use tokio::fs;
use tokio::sync::Semaphore;
use tokio::sync::{Mutex, Semaphore};
use super::status::PageStatus;
use super::status::{PageStatus, VideoStatus};
use super::utils::{unhandled_videos_pages, ModelWrapper, NFOMode, NFOSerializer};
use crate::bilibili::{BestStream, BiliClient, FavoriteList, FilterOption, PageInfo, Video};
use crate::core::utils::{create_video_pages, create_videos, exist_labels, filter_videos, handle_favorite_info};
@@ -49,7 +49,7 @@ pub async fn refresh_favorite(
let video_stream = bili_favorite_list.into_video_stream().chunks(10);
pin_mut!(video_stream);
while let Some(videos_info) = video_stream.next().await {
info!("handle videos: {}", videos_info.len());
info!("got {} videos...", videos_info.len());
let exist_labels = exist_labels(&videos_info, &favorite_model, connection).await?;
// 如果发现有视频的收藏时间和 bvid 和数据库中重合,说明到达了上次处理到的地方,可以直接退出
let should_break = videos_info
@@ -77,7 +77,7 @@ pub async fn refresh_favorite(
break;
}
}
info!("handle videos done");
info!("refresh videos done");
Ok(favorite_model)
}
@@ -86,29 +86,44 @@ pub async fn download_favorite(
favorite_model: favorite::Model,
connection: &DatabaseConnection,
) -> Result<()> {
info!("start to download!");
info!("start to download videos in favorite: {}", favorite_model.f_id);
let unhandled_videos_pages = unhandled_videos_pages(&favorite_model, connection).await?;
// 对于视频,允许五个同时下载(视频内还有分页、不同分页还有多种下载任务)
let semaphore = Arc::new(Semaphore::new(5));
let semaphore = Semaphore::new(5);
let downloader = Downloader::default();
let mut uppers_mutex: HashMap<i32, (Mutex<()>, Mutex<()>)> = HashMap::new();
for (video, _) in &unhandled_videos_pages {
uppers_mutex.insert(video.id, (Mutex::new(()), Mutex::new(())));
}
let mut tasks = FuturesUnordered::new();
for (video_model, pages) in unhandled_videos_pages {
let upper_mutex = uppers_mutex.get(&video_model.id).unwrap();
tasks.push(download_video_pages(
bili_client,
video_model,
pages,
connection,
semaphore.clone(),
&semaphore,
&downloader,
upper_mutex,
));
}
// 创建好下载任务,等待下载任务运行即可,任务结束会返回 Result<VideoModel>
while let Some(res) = tasks.next().await {
if let Err(e) = res {
error!("Error: {e}");
match res {
Ok(video_model) => {
info!(
"Video {} processed:\n\t {}",
&video_model.bvid,
VideoStatus::new(video_model.download_status)
);
}
Err(e) => {
error!("Video processed failed: {e}");
}
}
}
info!("download done.");
info!("Done.");
Ok(())
}
@@ -117,40 +132,107 @@ pub async fn download_video_pages(
video_model: video::Model,
pages: Vec<page::Model>,
connection: &DatabaseConnection,
semaphore: Arc<Semaphore>,
semaphore: &Semaphore,
downloader: &Downloader,
) -> Result<()> {
upper_mutex: &(Mutex<()>, Mutex<()>),
) -> Result<video::Model> {
let permit = semaphore.acquire().await;
if let Err(e) = permit {
return Err(e.into());
}
let mut status = VideoStatus::new(video_model.download_status);
let seprate_status = status.should_run();
let base_path = Path::new(&video_model.path);
let is_single_page = video_model.single_page.unwrap();
// 对于单页视频page 的下载已经足够
// 对于多页视频page 下载仅包含了分集内容,需要额外补上视频的 poster 的 tvshow.nfo
let tasks: Vec<Pin<Box<dyn Future<Output = Result<()>>>>> = vec![
// 下载视频封面
Box::pin(fetch_video_poster(
seprate_status[0],
&video_model,
downloader,
base_path.join("poster.jpg"),
)),
// 分发分页下载的任务
Box::pin(dispatch_download_page(
seprate_status[1],
bili_client,
&video_model,
pages,
connection,
downloader,
)),
// 生成视频信息的 nfo
Box::pin(generate_video_nfo(
seprate_status[2] && !is_single_page,
&video_model,
base_path.join("tvshow.nfo"),
)),
// 下载 Up 主头像
Box::pin(fetch_upper_face(
seprate_status[3],
&video_model,
downloader,
&upper_mutex.0,
base_path.join("upper-face.jpg"),
)),
// 生成 Up 主信息的 nfo
Box::pin(generate_upper_nfo(
seprate_status[4],
&video_model,
&upper_mutex.1,
base_path.join("upper.nfo"),
)),
];
let results = futures::future::join_all(tasks).await;
status.update_status(&results);
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.download_status = Set(status.into());
video_active_model = video_active_model.save(connection).await?;
Ok(video_active_model.try_into_model()?)
}
pub async fn dispatch_download_page(
should_run: bool,
bili_client: &BiliClient,
video_model: &video::Model,
pages: Vec<page::Model>,
connection: &DatabaseConnection,
downloader: &Downloader,
) -> Result<()> {
if !should_run {
return Ok(());
}
// 对于视频的分页,允许同时下载三个同时下载(绝大部分是单页视频)
let child_semaphore = Arc::new(Semaphore::new(5));
let child_semaphore = Semaphore::new(5);
let mut tasks = FuturesUnordered::new();
for page_model in pages {
tasks.push(download_page(
bili_client,
&video_model,
video_model,
page_model,
connection,
child_semaphore.clone(),
&child_semaphore,
downloader,
));
}
// 同样创建好下载任务等待运行,任务结束会返回 Result<PageModel>
// 任务结束会返回 Result<PageModel>
while let Some(res) = tasks.next().await {
if let Err(e) = res {
error!("Error: {e}");
match res {
Ok(page_model) => {
info!(
"Video {} page {} processed:\n\t {}",
&video_model.bvid,
page_model.pid,
PageStatus::new(page_model.download_status)
);
}
Err(e) => {
error!("Video {} processed failed: {e}", &video_model.bvid);
}
}
}
let is_single_page = video_model.single_page.unwrap();
// 对于单页视频page 的下载已经足够
// 对于多页视频page 下载仅包含了分集内容,需要额外补上视频的 poster 的 tvshow.nfo
if !is_single_page {
let base_path = Path::new(&video_model.path);
generate_video_nfo(true, &video_model, base_path.join("tvshow.nfo")).await?;
fetch_video_poster(true, &video_model, downloader, base_path.join("poster.jpg")).await?;
}
Ok(())
}
@@ -159,7 +241,7 @@ pub async fn download_page(
video_model: &video::Model,
page_model: page::Model,
connection: &DatabaseConnection,
semaphore: Arc<Semaphore>,
semaphore: &Semaphore,
downloader: &Downloader,
) -> Result<page::Model> {
let permit = semaphore.acquire().await;
@@ -317,6 +399,41 @@ pub async fn fetch_video_poster(
downloader.fetch(&video_model.cover, &poster_path).await
}
pub async fn fetch_upper_face(
should_run: bool,
video_model: &video::Model,
downloader: &Downloader,
upper_face_mutex: &Mutex<()>,
upper_face_path: PathBuf,
) -> Result<()> {
if !should_run {
return Ok(());
}
// 这个锁是为了避免多个视频同时下载同一个 up 主的头像
let _permit = upper_face_mutex.lock().await;
if !upper_face_path.exists() {
return downloader.fetch(&video_model.upper_face, &upper_face_path).await;
}
Ok(())
}
pub async fn generate_upper_nfo(
should_run: bool,
video_model: &video::Model,
upper_nfo_mutex: &Mutex<()>,
nfo_path: PathBuf,
) -> Result<()> {
if !should_run {
return Ok(());
}
let _permit = upper_nfo_mutex.lock().await;
if !nfo_path.exists() {
let nfo_serializer = NFOSerializer(ModelWrapper::Video(video_model), NFOMode::UPPER);
return generate_nfo(nfo_serializer, nfo_path).await;
}
Ok(())
}
pub async fn generate_video_nfo(should_run: bool, video_model: &video::Model, nfo_path: PathBuf) -> Result<()> {
if !should_run {
return Ok(());

View File

@@ -1,3 +1,5 @@
use core::fmt;
use crate::Result;
static STATUS_MAX_RETRY: u32 = 0b100;
@@ -75,7 +77,17 @@ impl Status {
fn get_status(&self, offset: usize) -> u32 {
let helper = !0u32;
self.0 & (helper << (offset * 3)) & (helper >> (32 - 3 * offset - 3))
(self.0 & (helper << (offset * 3)) & (helper >> (32 - 3 * offset - 3))) >> (offset * 3)
}
fn display_status(status: u32) -> String {
if status < STATUS_MAX_RETRY {
format!("retry {} times", status)
} else if status == STATUS_OK {
"ok".to_string()
} else {
"failed".to_string()
}
}
}
@@ -85,7 +97,7 @@ impl From<Status> for u32 {
}
}
/// 从前到后分别表示:视频封面、分页下载、视频信息
/// 从前到后分别表示:视频封面、分页下载、视频信息、Up 主头像、Up 主信息
#[derive(Clone)]
pub struct VideoStatus(Status);
@@ -95,15 +107,29 @@ impl VideoStatus {
}
pub fn should_run(&self) -> Vec<bool> {
self.0.should_run(3)
self.0.should_run(5)
}
pub fn update_status(&mut self, result: &[Result<()>]) {
assert!(result.len() == 3, "VideoStatus should have 3 status");
assert!(result.len() == 5, "VideoStatus should have 5 status");
self.0.update_status(result)
}
}
impl fmt::Display for VideoStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Video Cover: {}, Page: {}, Video NFO: {}, Up Avatar: {}, Up NFO: {}",
Status::display_status(self.0.get_status(0)),
Status::display_status(self.0.get_status(1)),
Status::display_status(self.0.get_status(2)),
Status::display_status(self.0.get_status(3)),
Status::display_status(self.0.get_status(4))
)
}
}
impl From<VideoStatus> for u32 {
fn from(status: VideoStatus) -> Self {
status.0.into()
@@ -129,6 +155,18 @@ impl PageStatus {
}
}
impl fmt::Display for PageStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Page Cover: {}, Page Content: {}, Page NFO: {}",
Status::display_status(self.0.get_status(0)),
Status::display_status(self.0.get_status(1)),
Status::display_status(self.0.get_status(2))
)
}
}
impl From<PageStatus> for u32 {
fn from(status: PageStatus) -> Self {
status.0.into()

View File

@@ -11,6 +11,7 @@ use sea_orm::ActiveValue::Set;
use sea_orm::QuerySelect;
use tokio::io::AsyncWriteExt;
use super::status::Status;
use crate::bilibili::{FavoriteListInfo, PageInfo, VideoInfo};
use crate::Result;
@@ -102,7 +103,7 @@ pub async fn create_videos(
ctime: Set(v.ctime.naive_utc()),
pubtime: Set(v.pubtime.naive_utc()),
favtime: Set(v.fav_time.naive_utc()),
handled: Set(false),
download_status: Set(0),
valid: Set(v.attr == 0),
tags: Set(None),
single_page: Set(None),
@@ -138,7 +139,7 @@ pub async fn filter_videos(
.and(video::Column::Bvid.is_in(bvids))
.and(video::Column::Valid.eq(true));
if only_unhandled {
condition = condition.and(video::Column::Handled.eq(false));
condition = condition.and(video::Column::DownloadStatus.lt(Status::handled()));
}
if only_no_page {
condition = condition.and(video::Column::SinglePage.is_null());
@@ -164,7 +165,6 @@ pub async fn create_video_pages(
.unwrap()
.to_string()),
image: Set(p.first_frame.clone()),
valid: Set(video_model.valid),
download_status: Set(0),
..Default::default()
})
@@ -191,7 +191,7 @@ pub async fn unhandled_videos_pages(
video::Column::FavoriteId
.eq(favorite_model.id)
.and(video::Column::Valid.eq(true))
.and(video::Column::Handled.eq(false))
.and(video::Column::DownloadStatus.lt(Status::handled()))
.and(video::Column::SinglePage.is_not_null()),
)
.find_with_related(page::Entity)