Files
time-tracking-system/backend/api/data_import.py
bf1942 8938ce2708 refactor(api): 重构数据库访问为SQLAlchemy绑定的session
- 统一移除手动创建的数据库session,统一使用models模块中的db.session
- 修正项目创建接口,增加开始和结束日期的格式验证与处理
- 更新导入项目接口,使用枚举类型校验项目类型并优化异常处理
- 更新统计接口,避免多次查询假期数据,优化日期字符串处理
- 删除回滚前多余的session关闭调用,改为使用db.session.rollback()
- app.py中重构数据库初始化:统一配置SQLAlchemy,动态创建数据库路径和表
- 项目模型新增开始日期和结束日期字段支持
- 添加导入批次历史记录模型支持
- 优化工具函数中日期类型提示,移除无用导入
- 更新requirements.txt依赖版本回退,确保兼容性
- 前端菜单添加导入历史导航入口,实现页面访问路由绑定
2025-09-04 18:12:24 +08:00

118 lines
4.2 KiB
Python

from flask import Blueprint, request, jsonify
from models.models import db, Project, TimeRecord, Holiday, ImportBatch
from models.utils import calculate_hours, is_holiday, get_week_info
from datetime import datetime
import re
import json
data_import_bp = Blueprint('data_import', __name__)
@data_import_bp.route('/import', methods=['POST'])
def import_records():
"""批量导入工时记录并记录导入历史"""
data = request.json
records_text = data.get('records', '')
lines = records_text.strip().split('\n')
total_records = len([line for line in lines if line.strip()])
success_count = 0
failures = []
projects = {p.project_name: p for p in Project.query.all()}
holidays = Holiday.query.all()
current_year = datetime.now().year
for i, line in enumerate(lines):
line = line.strip()
if not line:
continue
match = re.match(r'^(\d{1,2})月(\d{1,2})日\s+(.+?)\s+(\d{1,2}:\d{2})\s+(\d{1,2}:\d{2})\s+(.*)$', line)
if not match:
failures.append({'line': line, 'reason': '格式不匹配'})
continue
try:
month, day, project_name, start_time_str, end_time_str, activity_num = match.groups()
project_name = project_name.strip()
activity_num = activity_num.strip()
record_date = datetime(current_year, int(month), int(day)).date()
if project_name not in projects:
failures.append({'line': line, 'reason': f'项目 "{project_name}" 不存在'})
continue
project = projects[project_name]
start_time = datetime.strptime(start_time_str, '%H:%M').time()
end_time = datetime.strptime(end_time_str, '%H:%M').time()
holiday_info = is_holiday(record_date, holidays)
hours = calculate_hours(start_time_str, end_time_str, holiday_info['is_holiday'])
week_info = get_week_info(record_date)
record = TimeRecord(
date=record_date,
event_description=f"批量导入 - {project_name}",
project_id=project.id,
start_time=start_time,
end_time=end_time,
activity_num=activity_num,
hours=hours,
is_holiday=holiday_info['is_holiday'],
is_working_on_holiday=holiday_info['is_holiday'] and hours not in ['-', '0:00'],
holiday_type=holiday_info['holiday_type'],
week_info=week_info
)
db.session.add(record)
success_count += 1
except Exception as e:
failures.append({'line': line, 'reason': str(e)})
# 决定导入状态
status = "失败"
if success_count == total_records and total_records > 0:
status = "成功"
elif success_count > 0:
status = "部分成功"
# 创建并保存导入批次记录
batch = ImportBatch(
status=status,
success_count=success_count,
failure_count=len(failures),
total_records=total_records,
source_preview='\n'.join(lines[:5]), # 保存前5行作为预览
failures_log=json.dumps(failures, ensure_ascii=False) if failures else None
)
db.session.add(batch)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'error': f'数据库提交失败: {str(e)}',
'success_count': 0,
'failure_count': total_records,
'failures': [{'line': l, 'reason': '数据库错误'} for l in lines]
}), 500
return jsonify({
'success': success_count > 0,
'success_count': success_count,
'failure_count': len(failures),
'failures': failures
})
@data_import_bp.route('/import/history', methods=['GET'])
def get_import_history():
"""获取导入历史记录"""
try:
history = ImportBatch.query.order_by(ImportBatch.import_date.desc()).all()
return jsonify([h.to_dict() for h in history])
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500