feat: 完成大部分下载功能,移除许多无意义的 Arc 使用

This commit is contained in:
amtoaer
2024-03-30 01:44:00 +08:00
parent 5ddb0335fc
commit fadb122ec8
4 changed files with 182 additions and 85 deletions

View File

@@ -1,5 +1,3 @@
use std::sync::Arc;
use async_stream::stream;
use chrono::serde::ts_seconds;
use chrono::{DateTime, Utc};
@@ -8,8 +6,8 @@ use serde_json::Value;
use crate::bilibili::BiliClient;
use crate::Result;
pub struct FavoriteList {
client: Arc<BiliClient>,
pub struct FavoriteList<'a> {
client: &'a BiliClient,
fid: String,
}
@@ -43,8 +41,8 @@ pub struct Upper {
pub name: String,
pub face: String,
}
impl FavoriteList {
pub fn new(client: Arc<BiliClient>, fid: String) -> Self {
impl<'a> FavoriteList<'a> {
pub fn new(client: &'a BiliClient, fid: String) -> Self {
Self { client, fid }
}
@@ -89,7 +87,7 @@ impl FavoriteList {
}
// 拿到收藏夹的所有权,返回一个收藏夹下的视频流
pub fn into_video_stream(self) -> impl Stream<Item = VideoInfo> {
pub fn into_video_stream(self) -> impl Stream<Item = VideoInfo> + 'a {
stream! {
let mut page = 1;
loop {

View File

@@ -1,5 +1,3 @@
use std::sync::Arc;
use reqwest::Method;
use crate::bilibili::analyzer::PageAnalyzer;
@@ -16,8 +14,8 @@ static DATA: &[char] = &[
'f',
];
pub struct Video {
client: Arc<BiliClient>,
pub struct Video<'a> {
client: &'a BiliClient,
pub aid: String,
pub bvid: String,
}
@@ -36,18 +34,17 @@ impl serde::Serialize for Tag {
}
}
#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Deserialize, Default)]
pub struct PageInfo {
pub cid: i32,
pub page: i32,
#[serde(rename = "part")]
pub name: String,
#[serde(default = "String::new")]
pub first_frame: String, // 可能不存在,默认填充为空
pub first_frame: Option<String>,
}
impl Video {
pub fn new(client: Arc<BiliClient>, bvid: String) -> Self {
impl<'a> Video<'a> {
pub fn new(client: &'a BiliClient, bvid: String) -> Self {
let aid = bvid_to_aid(&bvid).to_string();
Self { client, aid, bvid }
}

View File

@@ -1,3 +1,4 @@
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
@@ -9,63 +10,68 @@ use log::{error, info};
use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use sea_orm::TryIntoModel;
use serde::Serialize;
use tinytemplate::TinyTemplate;
use tokio::fs;
use tokio::sync::Semaphore;
use super::status::Status;
use super::utils::unhandled_videos_pages;
use crate::bilibili::{BiliClient, FavoriteList, Video};
use super::utils::{unhandled_videos_pages, ModelWrapper, NFOMode, NFOSerializer};
use crate::bilibili::{BestStream, BiliClient, FavoriteList, FilterOption, PageInfo, Video};
use crate::core::utils::{
create_video_pages, create_videos, exist_labels, filter_videos, handle_favorite_info,
};
use crate::downloader::Downloader;
use crate::Result;
/// 用来拼接路径名称
#[derive(Serialize)]
struct Context<'a> {
bvid: &'a str,
name: &'a str,
pid: &'a str,
}
pub async fn process_favorite(
bili_client: Arc<BiliClient>,
bili_client: &BiliClient,
fid: &str,
connection: Arc<DatabaseConnection>,
connection: &DatabaseConnection,
) -> Result<()> {
let favorite_model = refresh_favorite(bili_client.clone(), fid, connection.clone()).await?;
download_favorite(bili_client.clone(), favorite_model, connection.clone()).await?;
let favorite_model = refresh_favorite(bili_client, fid, connection).await?;
download_favorite(bili_client, favorite_model, connection).await?;
Ok(())
}
pub async fn refresh_favorite(
bili_client: Arc<BiliClient>,
bili_client: &BiliClient,
fid: &str,
connection: Arc<DatabaseConnection>,
connection: &DatabaseConnection,
) -> Result<favorite::Model> {
let bili_favorite_list = FavoriteList::new(bili_client.clone(), fid.to_owned());
let bili_favorite_list = FavoriteList::new(bili_client, fid.to_owned());
let favorite_list_info = bili_favorite_list.get_info().await?;
let favorite_model = handle_favorite_info(&favorite_list_info, connection.as_ref()).await?;
let favorite_model = handle_favorite_info(&favorite_list_info, connection).await?;
info!("Scan the favorite: {fid}");
let video_stream = bili_favorite_list.into_video_stream().chunks(10);
pin_mut!(video_stream);
while let Some(videos_info) = video_stream.next().await {
info!("handle videos: {}", videos_info.len());
let exist_labels = exist_labels(&videos_info, &favorite_model, connection.as_ref()).await?;
let exist_labels = exist_labels(&videos_info, &favorite_model, connection).await?;
let should_break = videos_info
.iter()
.any(|v| exist_labels.contains(&(v.bvid.clone(), v.fav_time.naive_utc())));
create_videos(&videos_info, &favorite_model, connection.as_ref()).await?;
let unrefreshed_video_models = filter_videos(
&videos_info,
&favorite_model,
true,
true,
connection.as_ref(),
)
.await?;
create_videos(&videos_info, &favorite_model, connection).await?;
let unrefreshed_video_models =
filter_videos(&videos_info, &favorite_model, true, true, connection).await?;
if !unrefreshed_video_models.is_empty() {
for video_model in unrefreshed_video_models {
let bili_video = Video::new(bili_client.clone(), video_model.bvid.clone());
let bili_video = Video::new(bili_client, video_model.bvid.clone());
let tags = bili_video.get_tags().await?;
let pages_info = bili_video.get_pages().await?;
create_video_pages(&pages_info, &video_model, connection.as_ref()).await?;
create_video_pages(&pages_info, &video_model, connection).await?;
let mut video_active_model: video::ActiveModel = video_model.into();
video_active_model.single_page = Set(Some(pages_info.len() == 1));
video_active_model.tags = Set(Some(serde_json::to_value(tags).unwrap()));
video_active_model.save(connection.as_ref()).await?;
video_active_model.save(connection).await?;
}
}
if should_break {
@@ -75,24 +81,23 @@ pub async fn refresh_favorite(
Ok(favorite_model)
}
#[allow(unused_variables)]
pub async fn download_favorite(
bili_client: Arc<BiliClient>,
bili_client: &BiliClient,
favorite_model: favorite::Model,
connection: Arc<DatabaseConnection>,
connection: &DatabaseConnection,
) -> Result<()> {
let unhandled_videos_pages =
unhandled_videos_pages(&favorite_model, connection.as_ref()).await?;
let unhandled_videos_pages = unhandled_videos_pages(&favorite_model, connection).await?;
let semaphore = Arc::new(Semaphore::new(3));
let downloader = Arc::new(Downloader::default());
let downloader = Downloader::default();
let mut tasks = FuturesUnordered::new();
for (video_model, pages) in unhandled_videos_pages {
tasks.push(Box::pin(download_video_pages(
bili_client.clone(),
bili_client,
video_model,
pages,
connection.clone(),
connection,
semaphore.clone(),
&downloader,
)));
}
while let Some(res) = tasks.next().await {
@@ -104,25 +109,29 @@ pub async fn download_favorite(
}
pub async fn download_video_pages(
bili_client: Arc<BiliClient>,
bili_client: &BiliClient,
video_model: video::Model,
pages: Vec<page::Model>,
connection: Arc<DatabaseConnection>,
connection: &DatabaseConnection,
semaphore: Arc<Semaphore>,
downloader: &Downloader,
) -> Result<()> {
let permit = semaphore.acquire().await;
if let Err(e) = permit {
return Err(e.into());
}
let mut template = TinyTemplate::new();
let _ = template.add_template("video", "{bvid}");
let child_semaphore = Arc::new(Semaphore::new(5));
let mut tasks = FuturesUnordered::new();
for page_model in pages {
tasks.push(Box::pin(download_page(
bili_client.clone(),
bili_client,
&video_model,
page_model,
connection.clone(),
connection,
child_semaphore.clone(),
downloader,
)));
}
while let Some(res) = tasks.next().await {
@@ -134,11 +143,12 @@ pub async fn download_video_pages(
}
pub async fn download_page(
bili_client: Arc<BiliClient>,
bili_client: &BiliClient,
video_model: &video::Model,
page_model: page::Model,
connection: Arc<DatabaseConnection>,
connection: &DatabaseConnection,
semaphore: Arc<Semaphore>,
downloader: &Downloader,
) -> Result<page::Model> {
let permit = semaphore.acquire().await;
if let Err(e) = permit {
@@ -146,74 +156,159 @@ pub async fn download_page(
}
let mut status = Status::new(page_model.download_status);
let seprate_status = status.should_run();
let is_single_page = video_model.single_page.unwrap();
let base_path = Path::new(&video_model.path);
let mut template = TinyTemplate::new();
// 这个文件名模板支持自定义
let _ = template.add_template("video", "{bvid}");
let base_name = template.render(
"video",
&Context {
bvid: &video_model.bvid,
name: &video_model.name,
pid: &page_model.pid.to_string(),
},
)?;
let (poster_path, video_path, nfo_path) = if is_single_page {
(
base_path.join(format!("{}-poster.jpg", &base_name)),
base_path.join(format!("{}.mp4", &base_name)),
base_path.join(format!("{}.nfo", &base_name)),
)
} else {
(
base_path.join("Season 1").join(format!(
"{} - S01E{:2}-thumb.jpg",
&base_name, page_model.pid
)),
base_path
.join("Season 1")
.join(format!("{} - S01E{:2}.mp4", &base_name, page_model.pid)),
base_path
.join("Season 1")
.join(format!("{} - S01E{:2}.nfo", &base_name, page_model.pid)),
)
};
let tasks: Vec<Pin<Box<dyn Future<Output = Result<()>>>>> = vec![
// 暂时不支持下载字幕
Box::pin(download_poster(
seprate_status[0],
&bili_client,
video_model,
&page_model,
)),
Box::pin(download_upper(
seprate_status[1],
&bili_client,
video_model,
&page_model,
downloader,
poster_path,
)),
Box::pin(download_video(
seprate_status[2],
&bili_client,
seprate_status[1],
bili_client,
video_model,
&page_model,
downloader,
video_path,
)),
Box::pin(generate_nfo(
seprate_status[3],
&bili_client,
seprate_status[2],
video_model,
&page_model,
nfo_path,
)),
];
let results = futures::future::join_all(tasks).await;
status.update_status(&results);
let mut page_active_model: page::ActiveModel = page_model.into();
page_active_model.download_status = Set(status.into());
page_active_model = page_active_model.save(connection.as_ref()).await?;
page_active_model = page_active_model.save(connection).await?;
Ok(page_active_model.try_into_model().unwrap())
}
#[allow(unused_variables)]
pub async fn download_poster(
should_run: bool,
bili_client: &Arc<BiliClient>,
video_model: &video::Model,
page_model: &page::Model,
downloader: &Downloader,
poster_path: PathBuf,
) -> Result<()> {
if !should_run {
return Ok(());
}
// 如果单页没有封面,就使用视频的封面
let url = match &page_model.image {
Some(url) => url.as_str(),
None => video_model.cover.as_str(),
};
downloader.fetch(url, &poster_path).await?;
Ok(())
}
#[allow(unused_variables)]
pub async fn download_upper(
should_run: bool,
bili_client: &Arc<BiliClient>,
video_model: &video::Model,
page_model: &page::Model,
) -> Result<()> {
Ok(())
}
#[allow(unused_variables)]
pub async fn download_video(
should_run: bool,
bili_client: &Arc<BiliClient>,
bili_client: &BiliClient,
video_model: &video::Model,
page_model: &page::Model,
downloader: &Downloader,
page_path: PathBuf,
) -> Result<()> {
if !should_run {
return Ok(());
}
let bili_video = Video::new(bili_client, video_model.bvid.clone());
let streams = bili_video
.get_page_analyzer(&PageInfo {
cid: page_model.cid,
..Default::default()
})
.await?
.best_stream(&FilterOption::default())?;
match streams {
BestStream::Mixed(mix_stream) => {
downloader.fetch(mix_stream.url(), &page_path).await?;
}
BestStream::VideoAudio {
video: video_stream,
audio: None,
} => {
downloader.fetch(video_stream.url(), &page_path).await?;
}
BestStream::VideoAudio {
video: video_stream,
audio: Some(audio_stream),
} => {
let (tmp_video_path, tmp_audio_path) = (
page_path.with_extension("tmp_video"),
page_path.with_extension("tmp_audio"),
);
downloader
.fetch(video_stream.url(), &tmp_video_path)
.await?;
downloader
.fetch(audio_stream.url(), &tmp_audio_path)
.await?;
downloader
.merge(&tmp_video_path, &tmp_audio_path, &page_path)
.await?;
}
}
Ok(())
}
#[allow(unused_variables)]
pub async fn generate_nfo(
should_run: bool,
bili_client: &Arc<BiliClient>,
video_model: &video::Model,
page_model: &page::Model,
nfo_path: PathBuf,
) -> Result<()> {
if !should_run {
return Ok(());
}
let single_page = video_model.single_page.unwrap();
let nfo_serializer = if single_page {
NFOSerializer(ModelWrapper::Video(video_model), NFOMode::MOVIE)
} else {
NFOSerializer(ModelWrapper::Page(page_model), NFOMode::EPOSODE)
};
if let Some(parent) = nfo_path.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(nfo_path, nfo_serializer.generate_nfo().await?.as_bytes()).await?;
Ok(())
}

View File

@@ -1,5 +1,3 @@
use std::sync::Arc;
use bili_sync::bilibili::BiliClient;
use bili_sync::core::command::process_favorite;
use bili_sync::database::database_connection;
@@ -8,11 +6,20 @@ use log::error;
#[tokio::main]
async fn main() -> ! {
env_logger::init();
let connection = Arc::new(database_connection().await.unwrap());
let bili_client = Arc::new(BiliClient::new(None));
let mut today = chrono::Local::now().date_naive();
let mut bili_client = BiliClient::new(None);
let connection = database_connection().await.unwrap();
loop {
if today != chrono::Local::now().date_naive() {
if let Err(e) = bili_client.check_refresh().await {
error!("Error: {e}");
tokio::time::sleep(std::time::Duration::from_secs(600)).await;
continue;
}
today = chrono::Local::now().date_naive();
}
for fid in ["52642258"] {
let res = process_favorite(bili_client.clone(), fid, connection.clone()).await;
let res = process_favorite(&bili_client, fid, &connection).await;
if let Err(e) = res {
error!("Error: {e}");
}