feat: 搭起下载流程的框架,待填充具体下载过程
This commit is contained in:
@@ -14,7 +14,7 @@ pub struct Model {
|
||||
pub path: String,
|
||||
pub image: String,
|
||||
pub valid: bool,
|
||||
pub download_status: i32,
|
||||
pub download_status: u32,
|
||||
pub created_at: String,
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,23 @@
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use entity::video;
|
||||
use entity::{favorite, page, video};
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::Future;
|
||||
use futures_util::{pin_mut, StreamExt};
|
||||
use log::info;
|
||||
use log::{error, info};
|
||||
use sea_orm::entity::prelude::*;
|
||||
use sea_orm::ActiveValue::Set;
|
||||
use sea_orm::TryIntoModel;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use super::status::Status;
|
||||
use super::utils::unhandled_videos_pages;
|
||||
use crate::bilibili::{BiliClient, FavoriteList, Video};
|
||||
use crate::core::utils::{
|
||||
create_video_pages, create_videos, exist_labels, filter_videos, handle_favorite_info,
|
||||
};
|
||||
use crate::downloader::Downloader;
|
||||
use crate::Result;
|
||||
|
||||
pub async fn process_favorite(
|
||||
@@ -17,8 +25,8 @@ pub async fn process_favorite(
|
||||
fid: &str,
|
||||
connection: Arc<DatabaseConnection>,
|
||||
) -> Result<()> {
|
||||
refresh_favorite(bili_client.clone(), fid, connection.clone()).await?;
|
||||
download_favorite(bili_client.clone(), fid, connection.clone()).await?;
|
||||
let favorite_model = refresh_favorite(bili_client.clone(), fid, connection.clone()).await?;
|
||||
download_favorite(bili_client.clone(), favorite_model, connection.clone()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -26,7 +34,7 @@ pub async fn refresh_favorite(
|
||||
bili_client: Arc<BiliClient>,
|
||||
fid: &str,
|
||||
connection: Arc<DatabaseConnection>,
|
||||
) -> Result<()> {
|
||||
) -> Result<favorite::Model> {
|
||||
let bili_favorite_list = FavoriteList::new(bili_client.clone(), fid.to_owned());
|
||||
let favorite_list_info = bili_favorite_list.get_info().await?;
|
||||
let favorite_model = handle_favorite_info(&favorite_list_info, connection.as_ref()).await?;
|
||||
@@ -64,14 +72,148 @@ pub async fn refresh_favorite(
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
Ok(favorite_model)
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
pub async fn download_favorite(
|
||||
bili_client: Arc<BiliClient>,
|
||||
fid: &str,
|
||||
favorite_model: favorite::Model,
|
||||
connection: Arc<DatabaseConnection>,
|
||||
) -> Result<()> {
|
||||
todo!();
|
||||
let unhandled_videos_pages =
|
||||
unhandled_videos_pages(&favorite_model, connection.as_ref()).await?;
|
||||
let semaphore = Arc::new(Semaphore::new(3));
|
||||
let downloader = Arc::new(Downloader::default());
|
||||
let mut tasks = FuturesUnordered::new();
|
||||
for (video_model, pages) in unhandled_videos_pages {
|
||||
tasks.push(Box::pin(download_video_pages(
|
||||
bili_client.clone(),
|
||||
video_model,
|
||||
pages,
|
||||
connection.clone(),
|
||||
semaphore.clone(),
|
||||
)));
|
||||
}
|
||||
while let Some(res) = tasks.next().await {
|
||||
if let Err(e) = res {
|
||||
error!("Error: {e}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn download_video_pages(
|
||||
bili_client: Arc<BiliClient>,
|
||||
video_model: video::Model,
|
||||
pages: Vec<page::Model>,
|
||||
connection: Arc<DatabaseConnection>,
|
||||
semaphore: Arc<Semaphore>,
|
||||
) -> Result<()> {
|
||||
let permit = semaphore.acquire().await;
|
||||
if let Err(e) = permit {
|
||||
return Err(e.into());
|
||||
}
|
||||
let child_semaphore = Arc::new(Semaphore::new(5));
|
||||
let mut tasks = FuturesUnordered::new();
|
||||
for page_model in pages {
|
||||
tasks.push(Box::pin(download_page(
|
||||
bili_client.clone(),
|
||||
&video_model,
|
||||
page_model,
|
||||
connection.clone(),
|
||||
child_semaphore.clone(),
|
||||
)));
|
||||
}
|
||||
while let Some(res) = tasks.next().await {
|
||||
if let Err(e) = res {
|
||||
error!("Error: {e}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn download_page(
|
||||
bili_client: Arc<BiliClient>,
|
||||
video_model: &video::Model,
|
||||
page_model: page::Model,
|
||||
connection: Arc<DatabaseConnection>,
|
||||
semaphore: Arc<Semaphore>,
|
||||
) -> Result<page::Model> {
|
||||
let permit = semaphore.acquire().await;
|
||||
if let Err(e) = permit {
|
||||
return Err(e.into());
|
||||
}
|
||||
let mut status = Status::new(page_model.download_status);
|
||||
let seprate_status = status.should_run();
|
||||
let tasks: Vec<Pin<Box<dyn Future<Output = Result<()>>>>> = vec![
|
||||
// 暂时不支持下载字幕
|
||||
Box::pin(download_poster(
|
||||
seprate_status[0],
|
||||
&bili_client,
|
||||
video_model,
|
||||
&page_model,
|
||||
)),
|
||||
Box::pin(download_upper(
|
||||
seprate_status[1],
|
||||
&bili_client,
|
||||
video_model,
|
||||
&page_model,
|
||||
)),
|
||||
Box::pin(download_video(
|
||||
seprate_status[2],
|
||||
&bili_client,
|
||||
video_model,
|
||||
&page_model,
|
||||
)),
|
||||
Box::pin(generate_nfo(
|
||||
seprate_status[3],
|
||||
&bili_client,
|
||||
video_model,
|
||||
&page_model,
|
||||
)),
|
||||
];
|
||||
let results = futures::future::join_all(tasks).await;
|
||||
status.update_status(&results);
|
||||
let mut page_active_model: page::ActiveModel = page_model.into();
|
||||
page_active_model.download_status = Set(status.into());
|
||||
page_active_model = page_active_model.save(connection.as_ref()).await?;
|
||||
Ok(page_active_model.try_into_model().unwrap())
|
||||
}
|
||||
|
||||
#[allow(unused_variables)]
|
||||
pub async fn download_poster(
|
||||
should_run: bool,
|
||||
bili_client: &Arc<BiliClient>,
|
||||
video_model: &video::Model,
|
||||
page_model: &page::Model,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
#[allow(unused_variables)]
|
||||
pub async fn download_upper(
|
||||
should_run: bool,
|
||||
bili_client: &Arc<BiliClient>,
|
||||
video_model: &video::Model,
|
||||
page_model: &page::Model,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
#[allow(unused_variables)]
|
||||
pub async fn download_video(
|
||||
should_run: bool,
|
||||
bili_client: &Arc<BiliClient>,
|
||||
video_model: &video::Model,
|
||||
page_model: &page::Model,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
#[allow(unused_variables)]
|
||||
pub async fn generate_nfo(
|
||||
should_run: bool,
|
||||
bili_client: &Arc<BiliClient>,
|
||||
video_model: &video::Model,
|
||||
page_model: &page::Model,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
pub mod command;
|
||||
pub mod status;
|
||||
pub mod utils;
|
||||
|
||||
62
src/core/status.rs
Normal file
62
src/core/status.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
use crate::Result;
|
||||
|
||||
static STATUS_MAX_RETRY: u32 = 0b100;
|
||||
static STATUS_OK: u32 = 0b111;
|
||||
|
||||
/// 用来表示下载的状态,不想写太多列了,所以仅使用一个 u32 表示
|
||||
/// 从低位开始,固定每三位表示一种数据的状态
|
||||
pub struct Status(u32);
|
||||
|
||||
impl Status {
|
||||
pub fn new(status: u32) -> Self {
|
||||
Self(status)
|
||||
}
|
||||
|
||||
pub fn should_run(&self) -> [bool; 4] {
|
||||
let mut result = [false; 4];
|
||||
for (i, res) in result.iter_mut().enumerate() {
|
||||
*res = self.check_continue(i);
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
pub fn update_status(&mut self, result: &[Result<()>]) {
|
||||
assert!(result.len() >= 4, "result length must be 4");
|
||||
for (i, res) in result.iter().enumerate().take(4) {
|
||||
self.set_result(res, i);
|
||||
}
|
||||
if self.should_run().iter().all(|x| !x) {
|
||||
// 所有任务都成功或者由于尝试次数过多失败,为 status 最高位打上标记,将来不再重试
|
||||
self.0 |= 1 << 31;
|
||||
}
|
||||
}
|
||||
|
||||
fn check_continue(&self, offset: usize) -> bool {
|
||||
assert!(offset < 10, "u32 can only store 10 status");
|
||||
let helper = !0u32;
|
||||
let sub_status = self.0 & (helper << (offset * 3)) & (helper >> (32 - 3 * offset - 3));
|
||||
sub_status < STATUS_MAX_RETRY
|
||||
}
|
||||
|
||||
fn set_result(&mut self, result: &Result<()>, offset: usize) {
|
||||
if result.is_ok() {
|
||||
self.set_ok(offset);
|
||||
} else {
|
||||
self.plus_one(offset);
|
||||
}
|
||||
}
|
||||
|
||||
fn plus_one(&mut self, offset: usize) {
|
||||
self.0 += 1 << (3 * offset);
|
||||
}
|
||||
|
||||
fn set_ok(&mut self, offset: usize) {
|
||||
self.0 |= STATUS_OK << (3 * offset);
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Status> for u32 {
|
||||
fn from(status: Status) -> Self {
|
||||
status.0
|
||||
}
|
||||
}
|
||||
@@ -179,33 +179,16 @@ pub async fn create_video_pages(
|
||||
pub async fn unhandled_videos_pages(
|
||||
favorite_model: &favorite::Model,
|
||||
connection: &DatabaseConnection,
|
||||
) -> Result<Vec<video::Model>> {
|
||||
) -> Result<Vec<(video::Model, Vec<page::Model>)>> {
|
||||
Ok(video::Entity::find()
|
||||
.filter(
|
||||
video::Column::FavoriteId
|
||||
.eq(favorite_model.id)
|
||||
.and(video::Column::Valid.eq(true))
|
||||
.and(video::Column::Handled.eq(false)),
|
||||
.and(video::Column::Handled.eq(false))
|
||||
.and(video::Column::SinglePage.is_not_null()),
|
||||
)
|
||||
.find_with_related(page::Entity)
|
||||
.all(connection)
|
||||
.await?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use entity::{page, video};
|
||||
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
|
||||
|
||||
#[ignore = "just for manual test"]
|
||||
#[tokio::test]
|
||||
async fn test_join() {
|
||||
let video_with_pages: Vec<(video::Model, Vec<page::Model>)> = video::Entity::find()
|
||||
.filter(video::Column::Handled.eq(false))
|
||||
.filter(video::Column::SinglePage.eq(false))
|
||||
.find_with_related(page::Entity)
|
||||
.all(&crate::database::database_connection().await.unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
dbg!(video_with_pages);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user