Files
time-tracking-system/backend/api/statistics.py
bf1942 ef9432f6da feat(time-tracking): 添加个人工时记录系统设计文档
- 完成系统架构和数据模型设计,包括项目、工时记录、休息日和周期表模型
- 设计项目管理模块,支持传统项目与PSI项目管理及批量导入功能
- 规划工时记录模块,含日期、事件描述、项目选择及工时计算规则
- 定义休息日分类,支持周末、国定节假日、个人假期及调休工时管理
- 制定统计分析模块设计,支持按Cut-Off周期的周统计与项目工时分布
- 设计周期管理模块,提供周期设置及预设模板功能
- 制定用户界面布局及各页面表单、样式设计方案
- 规划RESTful API端点,涵盖项目、工时记录、休息日、周期及统计数据操作
- 设计数据流示意,阐明操作流程及前后端交互逻辑
- 制定数据存储方案,包括SQLite数据库配置及备份导出机制
2025-09-04 15:19:35 +08:00

267 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, jsonify
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine, and_
from backend.models.models import TimeRecord, Project, CutoffPeriod
from backend.models.utils import *
from datetime import datetime, date, timedelta
from collections import defaultdict
statistics_bp = Blueprint('statistics', __name__)
def get_db_session():
"""获取数据库会话"""
engine = create_engine('sqlite:///data/timetrack.db')
Session = sessionmaker(bind=engine)
return Session()
@statistics_bp.route('/api/statistics/weekly', methods=['GET'])
def get_weekly_statistics():
"""获取周统计数据"""
try:
session = get_db_session()
# 获取查询参数
period_id = request.args.get('period_id')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
# 如果指定了周期ID使用周期的日期范围
if period_id:
period = session.query(CutoffPeriod).get(int(period_id))
if not period:
session.close()
return jsonify({'success': False, 'error': '周期不存在'}), 404
start_date = period.start_date
end_date = period.end_date
target_hours = period.target_hours
period_info = period.to_dict()
else:
# 使用指定的日期范围
if not start_date or not end_date:
session.close()
return jsonify({'success': False, 'error': '请提供开始日期和结束日期'}), 400
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
target_hours = 40 # 默认目标工时
period_info = {
'period_name': f"{start_date.strftime('%m月%d')}-{end_date.strftime('%m月%d')}",
'start_date': start_date.isoformat(),
'end_date': end_date.isoformat(),
'target_hours': target_hours
}
# 查询该时间范围内的所有工时记录
records = session.query(TimeRecord).filter(
and_(TimeRecord.date >= start_date, TimeRecord.date <= end_date)
).order_by(TimeRecord.date).all()
# 生成完整的日期范围
current_date = start_date
daily_records = []
record_dict = {}
# 将记录按日期分组
for record in records:
if record.date not in record_dict:
record_dict[record.date] = []
record_dict[record.date].append(record)
# 生成每日汇总
while current_date <= end_date:
day_records = record_dict.get(current_date, [])
if day_records:
# 如果有记录,汇总该日的数据
total_hours = sum([format_hours_to_decimal(r.hours) for r in day_records if r.hours not in ['-', '0:00']])
event_descriptions = [r.event_description for r in day_records if r.event_description and r.event_description != '-']
activity_nums = [r.activity_num for r in day_records if r.activity_num and r.activity_num != '-']
project_names = []
for r in day_records:
if r.project and r.project.project_name:
if r.project.project_type.value == 'traditional':
project_names.append(f"{r.project.customer_name} {r.project.project_code}")
else:
project_names.append(f"{r.project.customer_name} {r.project.contract_number}")
# 获取第一条记录的时间信息
first_record = day_records[0]
daily_record = {
'date': current_date.isoformat(),
'day_of_week': get_day_of_week_chinese(current_date),
'event': '; '.join(event_descriptions) if event_descriptions else '-',
'project': '; '.join(set(project_names)) if project_names else '-',
'start_time': first_record.start_time.strftime('%H:%M') if first_record.start_time else '-',
'end_time': first_record.end_time.strftime('%H:%M') if first_record.end_time else '-',
'activity_num': '; '.join(activity_nums) if activity_nums else '-',
'hours': format_decimal_to_hours(total_hours) if total_hours > 0 else ('0:00' if first_record.is_holiday else '-'),
'is_holiday': first_record.is_holiday,
'holiday_type': first_record.holiday_type,
'is_working_on_holiday': any(r.is_working_on_holiday for r in day_records)
}
else:
# 如果没有记录,生成默认记录
holidays = session.query(Holiday).all()
holiday_info = is_holiday(current_date, holidays)
daily_record = {
'date': current_date.isoformat(),
'day_of_week': get_day_of_week_chinese(current_date),
'event': '-',
'project': '-',
'start_time': '-',
'end_time': '-',
'activity_num': '-',
'hours': '0:00' if holiday_info['is_holiday'] else '-',
'is_holiday': holiday_info['is_holiday'],
'holiday_type': holiday_info['holiday_type'],
'is_working_on_holiday': False
}
daily_records.append(daily_record)
current_date += timedelta(days=1)
# 计算统计数据
workday_total = 0
holiday_total = 0
working_days = 0
holiday_work_days = 0
rest_days = 0
for daily in daily_records:
hours_decimal = format_hours_to_decimal(daily['hours'])
if daily['is_holiday']:
if daily['is_working_on_holiday']:
holiday_total += hours_decimal
holiday_work_days += 1
else:
rest_days += 1
else:
if hours_decimal > 0:
workday_total += hours_decimal
working_days += 1
else:
rest_days += 1
# 项目工时统计
project_hours = defaultdict(float)
for record in records:
if record.project and record.hours not in ['-', '0:00']:
hours_decimal = format_hours_to_decimal(record.hours)
if record.project.project_type.value == 'traditional':
project_key = f"{record.project.customer_name} {record.project.project_code}"
else:
project_key = f"{record.project.customer_name} {record.project.contract_number}"
project_hours[project_key] += hours_decimal
total_project_hours = sum(project_hours.values())
project_stats = []
for project_name, hours in project_hours.items():
percentage = (hours / total_project_hours * 100) if total_project_hours > 0 else 0
project_stats.append({
'project': project_name,
'hours': format_decimal_to_hours(hours),
'percentage': round(percentage, 1)
})
project_stats.sort(key=lambda x: x['percentage'], reverse=True)
result = {
'period': period_info,
'daily_records': daily_records,
'project_hours': project_stats,
'workday_total': format_decimal_to_hours(workday_total),
'holiday_total': format_decimal_to_hours(holiday_total),
'weekly_total': format_decimal_to_hours(workday_total + holiday_total),
'working_days': working_days,
'holiday_work_days': holiday_work_days,
'rest_days': rest_days,
'target_hours': target_hours,
'completion_rate': round((workday_total + holiday_total) / target_hours * 100, 1) if target_hours > 0 else 0
}
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@statistics_bp.route('/api/statistics/periods', methods=['GET'])
def get_cutoff_periods():
"""获取Cut-Off周期列表"""
try:
session = get_db_session()
periods = session.query(CutoffPeriod).order_by(CutoffPeriod.start_date.desc()).all()
result = [period.to_dict() for period in periods]
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@statistics_bp.route('/api/statistics/periods', methods=['POST'])
def create_cutoff_period():
"""创建Cut-Off周期"""
try:
data = request.json
session = get_db_session()
# 验证必填字段
if not all(key in data for key in ['period_name', 'start_date', 'end_date']):
return jsonify({'success': False, 'error': '周期名称、开始日期和结束日期为必填项'}), 400
start_date = datetime.strptime(data['start_date'], '%Y-%m-%d').date()
end_date = datetime.strptime(data['end_date'], '%Y-%m-%d').date()
if start_date >= end_date:
return jsonify({'success': False, 'error': '开始日期必须早于结束日期'}), 400
# 计算周数
days_diff = (end_date - start_date).days + 1
weeks = round(days_diff / 7)
period = CutoffPeriod(
period_name=data['period_name'],
start_date=start_date,
end_date=end_date,
target_hours=data.get('target_hours', weeks * 40),
weeks=weeks,
year=start_date.year,
month=start_date.month
)
session.add(period)
session.commit()
result = period.to_dict()
session.close()
return jsonify({'success': True, 'data': result})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@statistics_bp.route('/api/statistics/periods/<int:period_id>', methods=['DELETE'])
def delete_cutoff_period(period_id):
"""删除Cut-Off周期"""
try:
session = get_db_session()
period = session.query(CutoffPeriod).get(period_id)
if not period:
session.close()
return jsonify({'success': False, 'error': '周期不存在'}), 404
session.delete(period)
session.commit()
session.close()
return jsonify({'success': True, 'message': '周期已删除'})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500