refactor(api): 重构数据库访问为SQLAlchemy绑定的session

- 统一移除手动创建的数据库session,统一使用models模块中的db.session
- 修正项目创建接口,增加开始和结束日期的格式验证与处理
- 更新导入项目接口,使用枚举类型校验项目类型并优化异常处理
- 更新统计接口,避免多次查询假期数据,优化日期字符串处理
- 删除回滚前多余的session关闭调用,改为使用db.session.rollback()
- app.py中重构数据库初始化:统一配置SQLAlchemy,动态创建数据库路径和表
- 项目模型新增开始日期和结束日期字段支持
- 添加导入批次历史记录模型支持
- 优化工具函数中日期类型提示,移除无用导入
- 更新requirements.txt依赖版本回退,确保兼容性
- 前端菜单添加导入历史导航入口,实现页面访问路由绑定
This commit is contained in:
2025-09-04 18:12:24 +08:00
parent ef9432f6da
commit 8938ce2708
29 changed files with 3490 additions and 150 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

118
backend/api/data_import.py Normal file
View File

@@ -0,0 +1,118 @@
from flask import Blueprint, request, jsonify
from models.models import db, Project, TimeRecord, Holiday, ImportBatch
from models.utils import calculate_hours, is_holiday, get_week_info
from datetime import datetime
import re
import json
data_import_bp = Blueprint('data_import', __name__)
@data_import_bp.route('/import', methods=['POST'])
def import_records():
"""批量导入工时记录并记录导入历史"""
data = request.json
records_text = data.get('records', '')
lines = records_text.strip().split('\n')
total_records = len([line for line in lines if line.strip()])
success_count = 0
failures = []
projects = {p.project_name: p for p in Project.query.all()}
holidays = Holiday.query.all()
current_year = datetime.now().year
for i, line in enumerate(lines):
line = line.strip()
if not line:
continue
match = re.match(r'^(\d{1,2})月(\d{1,2})日\s+(.+?)\s+(\d{1,2}:\d{2})\s+(\d{1,2}:\d{2})\s+(.*)$', line)
if not match:
failures.append({'line': line, 'reason': '格式不匹配'})
continue
try:
month, day, project_name, start_time_str, end_time_str, activity_num = match.groups()
project_name = project_name.strip()
activity_num = activity_num.strip()
record_date = datetime(current_year, int(month), int(day)).date()
if project_name not in projects:
failures.append({'line': line, 'reason': f'项目 "{project_name}" 不存在'})
continue
project = projects[project_name]
start_time = datetime.strptime(start_time_str, '%H:%M').time()
end_time = datetime.strptime(end_time_str, '%H:%M').time()
holiday_info = is_holiday(record_date, holidays)
hours = calculate_hours(start_time_str, end_time_str, holiday_info['is_holiday'])
week_info = get_week_info(record_date)
record = TimeRecord(
date=record_date,
event_description=f"批量导入 - {project_name}",
project_id=project.id,
start_time=start_time,
end_time=end_time,
activity_num=activity_num,
hours=hours,
is_holiday=holiday_info['is_holiday'],
is_working_on_holiday=holiday_info['is_holiday'] and hours not in ['-', '0:00'],
holiday_type=holiday_info['holiday_type'],
week_info=week_info
)
db.session.add(record)
success_count += 1
except Exception as e:
failures.append({'line': line, 'reason': str(e)})
# 决定导入状态
status = "失败"
if success_count == total_records and total_records > 0:
status = "成功"
elif success_count > 0:
status = "部分成功"
# 创建并保存导入批次记录
batch = ImportBatch(
status=status,
success_count=success_count,
failure_count=len(failures),
total_records=total_records,
source_preview='\n'.join(lines[:5]), # 保存前5行作为预览
failures_log=json.dumps(failures, ensure_ascii=False) if failures else None
)
db.session.add(batch)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'error': f'数据库提交失败: {str(e)}',
'success_count': 0,
'failure_count': total_records,
'failures': [{'line': l, 'reason': '数据库错误'} for l in lines]
}), 500
return jsonify({
'success': success_count > 0,
'success_count': success_count,
'failure_count': len(failures),
'failures': failures
})
@data_import_bp.route('/import/history', methods=['GET'])
def get_import_history():
"""获取导入历史记录"""
try:
history = ImportBatch.query.order_by(ImportBatch.import_date.desc()).all()
return jsonify([h.to_dict() for h in history])
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500

View File

@@ -1,25 +1,17 @@
from flask import Blueprint, request, jsonify
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from backend.models.models import Project, ProjectType
from backend.models.utils import *
from models.models import db, Project, ProjectType
from models.utils import *
import csv
import io
from datetime import datetime
projects_bp = Blueprint('projects', __name__)
def get_db_session():
"""获取数据库会话"""
engine = create_engine('sqlite:///data/timetrack.db')
Session = sessionmaker(bind=engine)
return Session()
@projects_bp.route('/api/projects', methods=['GET'])
def get_projects():
"""获取所有项目列表"""
try:
session = get_db_session()
projects = session.query(Project).filter_by(is_active=True).all()
projects = db.session.query(Project).filter_by(is_active=True).all()
result = []
for project in projects:
@@ -32,10 +24,10 @@ def get_projects():
result.append(project_dict)
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@projects_bp.route('/api/projects', methods=['POST'])
@@ -43,14 +35,19 @@ def create_project():
"""创建新项目"""
try:
data = request.json
session = get_db_session()
# 验证必填字段
if not data.get('project_name') or not data.get('customer_name') or not data.get('project_type'):
return jsonify({'success': False, 'error': '项目名称、客户名和项目类型为必填项'}), 400
project_type_str = data['project_type']
try:
project_type = ProjectType(project_type_str)
except ValueError:
return jsonify({'success': False, 'error': f'无效的项目类型: {project_type_str}'}), 400
# 根据项目类型设置字段
if data['project_type'] == 'traditional':
if project_type == ProjectType.TRADITIONAL:
if not data.get('project_code'):
return jsonify({'success': False, 'error': '传统项目需要填写项目代码'}), 400
project_code = data['project_code']
@@ -60,47 +57,64 @@ def create_project():
return jsonify({'success': False, 'error': 'PSI项目需要填写合同号'}), 400
project_code = 'PSI-PROJ' # PSI项目统一代码
contract_number = data['contract_number']
# 处理结束日期
end_date = None
if data.get('end_date') and data.get('end_date') != '' :
try:
end_date = datetime.strptime(data['end_date'], '%Y-%m-%d').date()
except ValueError:
return jsonify({'success': False, 'error': '结束日期格式不正确,应为 YYYY-MM-DD'}), 400
# 处理开始日期
start_date = None
if data.get('start_date') and data.get('start_date') != '' :
try:
start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date()
except ValueError:
return jsonify({'success': False, 'error': '开始日期格式不正确,应为 YYYY-MM-DD'}), 400
# 检查唯一性约束
existing_project = None
if data['project_type'] == 'traditional':
if project_type == ProjectType.TRADITIONAL:
# 传统项目:客户名+项目代码唯一
existing_project = session.query(Project).filter_by(
existing_project = db.session.query(Project).filter_by(
customer_name=data['customer_name'],
project_code=project_code,
project_type=ProjectType.TRADITIONAL
).first()
else:
# PSI项目客户名+合同号唯一
existing_project = session.query(Project).filter_by(
existing_project = db.session.query(Project).filter_by(
customer_name=data['customer_name'],
contract_number=contract_number,
project_type=ProjectType.PSI
).first()
if existing_project:
session.close()
return jsonify({'success': False, 'error': '项目已存在'}), 400
# 创建新项目
project = Project(
project_name=data['project_name'],
project_type=ProjectType(data['project_type']),
project_type=project_type,
project_code=project_code,
customer_name=data['customer_name'],
contract_number=contract_number,
description=data.get('description', '')
description=data.get('description', ''),
start_date=start_date,
end_date=end_date
)
session.add(project)
session.commit()
db.session.add(project)
db.session.commit()
result = project.to_dict()
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@projects_bp.route('/api/projects/<int:project_id>', methods=['PUT'])
@@ -108,11 +122,9 @@ def update_project(project_id):
"""更新项目信息"""
try:
data = request.json
session = get_db_session()
project = session.query(Project).get(project_id)
project = db.session.query(Project).get(project_id)
if not project:
session.close()
return jsonify({'success': False, 'error': '项目不存在'}), 404
# 更新字段
@@ -121,33 +133,30 @@ def update_project(project_id):
if 'description' in data:
project.description = data['description']
session.commit()
db.session.commit()
result = project.to_dict()
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@projects_bp.route('/api/projects/<int:project_id>', methods=['DELETE'])
def delete_project(project_id):
"""删除项目(软删除)"""
try:
session = get_db_session()
project = session.query(Project).get(project_id)
project = db.session.query(Project).get(project_id)
if not project:
session.close()
return jsonify({'success': False, 'error': '项目不存在'}), 404
project.is_active = False
session.commit()
session.close()
db.session.commit()
return jsonify({'success': True, 'message': '项目已删除'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@projects_bp.route('/api/projects/import', methods=['POST'])
@@ -161,8 +170,6 @@ def import_projects():
if file.filename == '' or not file.filename.endswith('.csv'):
return jsonify({'success': False, 'error': '请选择有效的CSV文件'}), 400
session = get_db_session()
# 读取CSV文件
stream = io.StringIO(file.stream.read().decode("utf-8"))
csv_reader = csv.DictReader(stream)
@@ -177,13 +184,15 @@ def import_projects():
errors.append(f"{row_num}行:项目名称、客户名和项目类型为必填项")
continue
project_type = row['项目类型'].lower()
if project_type not in ['traditional', 'psi']:
project_type_str = row['项目类型'].lower()
try:
project_type = ProjectType(project_type_str)
except ValueError:
errors.append(f"{row_num}行:项目类型只能是 traditional 或 psi")
continue
# 根据项目类型设置字段
if project_type == 'traditional':
if project_type == ProjectType.TRADITIONAL:
if not row.get('项目代码'):
errors.append(f"{row_num}行:传统项目需要填写项目代码")
continue
@@ -198,14 +207,14 @@ def import_projects():
# 检查重复
existing_project = None
if project_type == 'traditional':
existing_project = session.query(Project).filter_by(
if project_type == ProjectType.TRADITIONAL:
existing_project = db.session.query(Project).filter_by(
customer_name=row['客户名'],
project_code=project_code,
project_type=ProjectType.TRADITIONAL
).first()
else:
existing_project = session.query(Project).filter_by(
existing_project = db.session.query(Project).filter_by(
customer_name=row['客户名'],
contract_number=contract_number,
project_type=ProjectType.PSI
@@ -218,21 +227,20 @@ def import_projects():
# 创建项目
project = Project(
project_name=row['项目名称'],
project_type=ProjectType(project_type),
project_type=project_type,
project_code=project_code,
customer_name=row['客户名'],
contract_number=contract_number,
description=row.get('描述', '')
)
session.add(project)
db.session.add(project)
created_count += 1
except Exception as e:
errors.append(f"{row_num}行:{str(e)}")
session.commit()
session.close()
db.session.commit()
return jsonify({
'success': True,
@@ -242,4 +250,5 @@ def import_projects():
})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500

View File

@@ -1,35 +1,25 @@
from flask import Blueprint, request, jsonify
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, and_
from backend.models.models import TimeRecord, Project, CutoffPeriod
from backend.models.utils import *
from sqlalchemy import and_
from models.models import db, TimeRecord, Project, CutoffPeriod, Holiday
from models.utils import *
from datetime import datetime, date, timedelta
from collections import defaultdict
statistics_bp = Blueprint('statistics', __name__)
def get_db_session():
"""获取数据库会话"""
engine = create_engine('sqlite:///data/timetrack.db')
Session = sessionmaker(bind=engine)
return Session()
@statistics_bp.route('/api/statistics/weekly', methods=['GET'])
def get_weekly_statistics():
"""获取周统计数据"""
try:
session = get_db_session()
# 获取查询参数
period_id = request.args.get('period_id')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
start_date_str = request.args.get('start_date')
end_date_str = request.args.get('end_date')
# 如果指定了周期ID使用周期的日期范围
if period_id:
period = session.query(CutoffPeriod).get(int(period_id))
period = db.session.query(CutoffPeriod).get(int(period_id))
if not period:
session.close()
return jsonify({'success': False, 'error': '周期不存在'}), 404
start_date = period.start_date
@@ -38,12 +28,11 @@ def get_weekly_statistics():
period_info = period.to_dict()
else:
# 使用指定的日期范围
if not start_date or not end_date:
session.close()
if not start_date_str or not end_date_str:
return jsonify({'success': False, 'error': '请提供开始日期和结束日期'}), 400
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date_str, '%Y-%m-%d').date()
target_hours = 40 # 默认目标工时
period_info = {
'period_name': f"{start_date.strftime('%m月%d')}-{end_date.strftime('%m月%d')}",
@@ -53,7 +42,7 @@ def get_weekly_statistics():
}
# 查询该时间范围内的所有工时记录
records = session.query(TimeRecord).filter(
records = db.session.query(TimeRecord).filter(
and_(TimeRecord.date >= start_date, TimeRecord.date <= end_date)
).order_by(TimeRecord.date).all()
@@ -68,6 +57,9 @@ def get_weekly_statistics():
record_dict[record.date] = []
record_dict[record.date].append(record)
# 获取时间范围内的所有假期定义,避免在循环中重复查询
holidays_in_range = db.session.query(Holiday).all()
# 生成每日汇总
while current_date <= end_date:
day_records = record_dict.get(current_date, [])
@@ -104,8 +96,7 @@ def get_weekly_statistics():
}
else:
# 如果没有记录,生成默认记录
holidays = session.query(Holiday).all()
holiday_info = is_holiday(current_date, holidays)
holiday_info = is_holiday(current_date, holidays_in_range)
daily_record = {
'date': current_date.isoformat(),
@@ -183,7 +174,6 @@ def get_weekly_statistics():
'completion_rate': round((workday_total + holiday_total) / target_hours * 100, 1) if target_hours > 0 else 0
}
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
@@ -193,14 +183,9 @@ def get_weekly_statistics():
def get_cutoff_periods():
"""获取Cut-Off周期列表"""
try:
session = get_db_session()
periods = session.query(CutoffPeriod).order_by(CutoffPeriod.start_date.desc()).all()
periods = db.session.query(CutoffPeriod).order_by(CutoffPeriod.start_date.desc()).all()
result = [period.to_dict() for period in periods]
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@@ -209,7 +194,6 @@ def create_cutoff_period():
"""创建Cut-Off周期"""
try:
data = request.json
session = get_db_session()
# 验证必填字段
if not all(key in data for key in ['period_name', 'start_date', 'end_date']):
@@ -235,33 +219,29 @@ def create_cutoff_period():
month=start_date.month
)
session.add(period)
session.commit()
db.session.add(period)
db.session.commit()
result = period.to_dict()
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@statistics_bp.route('/api/statistics/periods/<int:period_id>', methods=['DELETE'])
def delete_cutoff_period(period_id):
"""删除Cut-Off周期"""
try:
session = get_db_session()
period = session.query(CutoffPeriod).get(period_id)
period = db.session.query(CutoffPeriod).get(period_id)
if not period:
session.close()
return jsonify({'success': False, 'error': '周期不存在'}), 404
session.delete(period)
session.commit()
session.close()
db.session.delete(period)
db.session.commit()
return jsonify({'success': True, 'message': '周期已删除'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500

View File

@@ -1,31 +1,22 @@
from flask import Blueprint, request, jsonify
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, and_
from backend.models.models import TimeRecord, Project, Holiday
from backend.models.utils import *
from sqlalchemy import and_
from models.models import db, TimeRecord, Project, Holiday
from models.utils import *
from datetime import datetime, date
import json
timerecords_bp = Blueprint('timerecords', __name__)
def get_db_session():
"""获取数据库会话"""
engine = create_engine('sqlite:///data/timetrack.db')
Session = sessionmaker(bind=engine)
return Session()
@timerecords_bp.route('/api/timerecords', methods=['GET'])
def get_timerecords():
"""获取工时记录列表"""
try:
session = get_db_session()
# 获取查询参数
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
project_id = request.args.get('project_id')
query = session.query(TimeRecord)
query = db.session.query(TimeRecord).options(db.joinedload(TimeRecord.project))
# 应用筛选条件
if start_date:
@@ -45,7 +36,6 @@ def get_timerecords():
record_dict['day_of_week'] = get_day_of_week_chinese(record.date)
result.append(record_dict)
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
@@ -56,7 +46,6 @@ def create_timerecord():
"""创建工时记录"""
try:
data = request.json
session = get_db_session()
# 验证必填字段
if not data.get('date'):
@@ -65,7 +54,7 @@ def create_timerecord():
record_date = datetime.strptime(data['date'], '%Y-%m-%d').date()
# 检查是否为休息日
holidays = session.query(Holiday).all()
holidays = db.session.query(Holiday).all()
holiday_info = is_holiday(record_date, holidays)
# 计算工时
@@ -93,18 +82,17 @@ def create_timerecord():
week_info=week_info
)
session.add(record)
session.commit()
db.session.add(record)
db.session.commit()
result = record.to_dict()
if record.date:
result['day_of_week'] = get_day_of_week_chinese(record.date)
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@timerecords_bp.route('/api/timerecords/<int:record_id>', methods=['PUT'])
@@ -112,11 +100,9 @@ def update_timerecord(record_id):
"""更新工时记录"""
try:
data = request.json
session = get_db_session()
record = session.query(TimeRecord).get(record_id)
record = db.session.query(TimeRecord).get(record_id)
if not record:
session.close()
return jsonify({'success': False, 'error': '记录不存在'}), 404
# 更新字段
@@ -144,47 +130,41 @@ def update_timerecord(record_id):
# 更新工作日状态
record.is_working_on_holiday = record.is_holiday and record.hours not in ['-', '0:00']
session.commit()
db.session.commit()
result = record.to_dict()
if record.date:
result['day_of_week'] = get_day_of_week_chinese(record.date)
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@timerecords_bp.route('/api/timerecords/<int:record_id>', methods=['DELETE'])
def delete_timerecord(record_id):
"""删除工时记录"""
try:
session = get_db_session()
record = session.query(TimeRecord).get(record_id)
record = db.session.query(TimeRecord).get(record_id)
if not record:
session.close()
return jsonify({'success': False, 'error': '记录不存在'}), 404
session.delete(record)
session.commit()
session.close()
db.session.delete(record)
db.session.commit()
return jsonify({'success': True, 'message': '记录已删除'})
except Exception as e:
db.session.rollback()
return jsonify({'success': False, 'error': str(e)}), 500
@timerecords_bp.route('/api/timerecords/check_holiday/<string:date_str>', methods=['GET'])
def check_holiday(date_str):
"""检查指定日期是否为休息日"""
try:
session = get_db_session()
check_date = datetime.strptime(date_str, '%Y-%m-%d').date()
holidays = session.query(Holiday).all()
holidays = db.session.query(Holiday).all()
holiday_info = is_holiday(check_date, holidays)
result = {
@@ -196,7 +176,6 @@ def check_holiday(date_str):
'week_info': get_week_info(check_date)
}
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:

View File

@@ -1,10 +1,10 @@
from flask import Flask, render_template
from flask_cors import CORS
from sqlalchemy import create_engine
from backend.models.models import Base
from backend.api.projects import projects_bp
from backend.api.timerecords import timerecords_bp
from backend.api.statistics import statistics_bp
from models.models import db
from api.projects import projects_bp
from api.timerecords import timerecords_bp
from api.statistics import statistics_bp
from api.data_import import data_import_bp
import os
def create_app():
@@ -15,17 +15,27 @@ def create_app():
# 启用CORS支持
CORS(app)
# 确保数据目录存在
os.makedirs('data', exist_ok=True)
# 数据库配置
# 获取项目根目录下的data文件夹路径
data_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data')
os.makedirs(data_dir, exist_ok=True)
db_path = os.path.join(data_dir, 'timetrack.db')
# 创建数据库表
engine = create_engine('sqlite:///data/timetrack.db')
Base.metadata.create_all(engine)
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 初始化数据库
db.init_app(app)
# 在应用上下文中创建数据库表
with app.app_context():
db.create_all()
# 注册蓝图
app.register_blueprint(projects_bp)
app.register_blueprint(timerecords_bp)
app.register_blueprint(statistics_bp)
app.register_blueprint(data_import_bp)
# 主页路由
@app.route('/')
@@ -44,6 +54,10 @@ def create_app():
def statistics():
return render_template('statistics.html')
@app.route('/import')
def import_page():
return render_template('import.html')
return app
if __name__ == '__main__':

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,16 +1,16 @@
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, Date, Time, ForeignKey, Enum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from datetime import datetime
import enum
Base = declarative_base()
db = SQLAlchemy()
class ProjectType(enum.Enum):
TRADITIONAL = "traditional" # 传统项目
PSI = "psi" # PSI项目
class Project(Base):
class Project(db.Model):
"""项目表模型"""
__tablename__ = 'projects'
@@ -26,6 +26,8 @@ class Project(Base):
contract_number = Column(String(100)) # 合同号PSI项目必填
description = Column(Text)
start_date = Column(Date, nullable=True) # 项目开始时间
end_date = Column(Date, nullable=True) # 项目结束时间
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@@ -42,12 +44,14 @@ class Project(Base):
'customer_name': self.customer_name,
'contract_number': self.contract_number,
'description': self.description,
'start_date': self.start_date.isoformat() if self.start_date else None,
'end_date': self.end_date.isoformat() if self.end_date else None,
'is_active': self.is_active,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
class TimeRecord(Base):
class TimeRecord(db.Model):
"""工时记录表模型"""
__tablename__ = 'time_records'
@@ -88,7 +92,7 @@ class TimeRecord(Base):
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
class Holiday(Base):
class Holiday(db.Model):
"""休息日配置表模型"""
__tablename__ = 'holidays'
@@ -109,7 +113,7 @@ class Holiday(Base):
'created_at': self.created_at.isoformat() if self.created_at else None
}
class CutoffPeriod(Base):
class CutoffPeriod(db.Model):
"""Cut-Off周期表模型"""
__tablename__ = 'cutoff_periods'
@@ -134,4 +138,29 @@ class CutoffPeriod(Base):
'year': self.year,
'month': self.month,
'created_at': self.created_at.isoformat() if self.created_at else None
}
}
class ImportBatch(db.Model):
"""导入批次历史记录模型"""
__tablename__ = 'import_batches'
id = Column(Integer, primary_key=True)
import_date = Column(DateTime, default=datetime.utcnow)
status = Column(String(50), nullable=False)
success_count = Column(Integer, default=0)
failure_count = Column(Integer, default=0)
total_records = Column(Integer, default=0)
source_preview = Column(Text)
failures_log = Column(Text) # 存储失败记录的详细日志
def to_dict(self):
return {
'id': self.id,
'import_date': self.import_date.isoformat(),
'status': self.status,
'success_count': self.success_count,
'failure_count': self.failure_count,
'total_records': self.total_records,
'source_preview': self.source_preview,
'failures_log': self.failures_log
}

View File

@@ -1,12 +1,11 @@
import datetime
from typing import Optional, Dict, Any, List
from backend.models.models import Holiday
def is_weekend(date: datetime.date) -> bool:
"""判断是否为周末"""
return date.weekday() >= 5 # 周六=5, 周日=6
def is_holiday(date: datetime.date, holidays: List[Holiday] = None) -> Dict[str, Any]:
def is_holiday(date: datetime.date, holidays: list = None) -> Dict[str, Any]:
"""检测指定日期是否为休息日"""
day_of_week = date.weekday() # 0=周一, 6=周日
is_weekend_day = day_of_week >= 5 # 周六、周日