from flask_sqlalchemy import SQLAlchemy from sqlalchemy import Column, Integer, String, Boolean, DateTime, Text, Date, Time, ForeignKey, Enum from sqlalchemy.orm import relationship from datetime import datetime import enum db = SQLAlchemy() class ProjectType(enum.Enum): TRADITIONAL = "traditional" # 传统项目 PSI = "psi" # PSI项目 class Project(db.Model): """项目表模型""" __tablename__ = 'projects' id = Column(Integer, primary_key=True, autoincrement=True) project_name = Column(String(200), nullable=False) project_type = Column(Enum(ProjectType), nullable=False) # 通用字段 project_code = Column(String(50), nullable=False) customer_name = Column(String(200), nullable=False) # PSI项目特有字段 contract_number = Column(String(100)) # 合同号,PSI项目必填 description = Column(Text) start_date = Column(Date, nullable=True) # 项目开始时间 end_date = Column(Date, nullable=True) # 项目结束时间 is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # 关联关系 time_records = relationship("TimeRecord", back_populates="project") def to_dict(self): return { 'id': self.id, 'project_name': self.project_name, 'project_type': self.project_type.value if self.project_type else None, 'project_code': self.project_code, 'customer_name': self.customer_name, 'contract_number': self.contract_number, 'description': self.description, 'start_date': self.start_date.isoformat() if self.start_date else None, 'end_date': self.end_date.isoformat() if self.end_date else None, 'is_active': self.is_active, 'created_at': self.created_at.isoformat() if self.created_at else None, 'updated_at': self.updated_at.isoformat() if self.updated_at else None } class TimeRecord(db.Model): """工时记录表模型""" __tablename__ = 'time_records' id = Column(Integer, primary_key=True, autoincrement=True) date = Column(Date, nullable=False) event_description = Column(String(500)) # 事件描述 project_id = Column(Integer, ForeignKey('projects.id')) start_time = Column(Time) # 可为空,支持"-"占位符 end_time = Column(Time) # 可为空,支持"-"占位符 activity_num = Column(String(100)) # Activity Num hours = Column(String(20)) # 工时,支持"2:42"或"8:00:00"格式 is_holiday = Column(Boolean, default=False) # 是否为休息日 is_working_on_holiday = Column(Boolean, default=False) # 休息日是否工作 holiday_type = Column(String(50)) # 休息日类型 week_info = Column(String(50)) # 周信息 created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # 关联关系 project = relationship("Project", back_populates="time_records") def to_dict(self): return { 'id': self.id, 'date': self.date.isoformat() if self.date else None, 'event_description': self.event_description, 'project_id': self.project_id, 'start_time': self.start_time.strftime('%H:%M') if self.start_time else None, 'end_time': self.end_time.strftime('%H:%M') if self.end_time else None, 'activity_num': self.activity_num, 'hours': self.hours, 'is_holiday': self.is_holiday, 'is_working_on_holiday': self.is_working_on_holiday, 'holiday_type': self.holiday_type, 'week_info': self.week_info, 'project': self.project.to_dict() if self.project else None, 'created_at': self.created_at.isoformat() if self.created_at else None, 'updated_at': self.updated_at.isoformat() if self.updated_at else None } class Holiday(db.Model): """休息日配置表模型""" __tablename__ = 'holidays' id = Column(Integer, primary_key=True, autoincrement=True) date = Column(Date, nullable=False, unique=True) holiday_name = Column(String(100)) # 节假日名称 holiday_type = Column(String(50), nullable=False) # weekend/national_holiday/personal_leave/makeup_day is_working_day = Column(Boolean, default=False) # 调休工作日标记 created_at = Column(DateTime, default=datetime.utcnow) def to_dict(self): return { 'id': self.id, 'date': self.date.isoformat() if self.date else None, 'holiday_name': self.holiday_name, 'holiday_type': self.holiday_type, 'is_working_day': self.is_working_day, 'created_at': self.created_at.isoformat() if self.created_at else None } class CutoffPeriod(db.Model): """Cut-Off周期表模型""" __tablename__ = 'cutoff_periods' id = Column(Integer, primary_key=True, autoincrement=True) period_name = Column(String(50), nullable=False) start_date = Column(Date, nullable=False) end_date = Column(Date, nullable=False) target_hours = Column(Integer, default=160) weeks = Column(Integer, default=4) year = Column(Integer, nullable=False) month = Column(Integer, nullable=False) created_at = Column(DateTime, default=datetime.utcnow) def to_dict(self): return { 'id': self.id, 'period_name': self.period_name, 'start_date': self.start_date.isoformat() if self.start_date else None, 'end_date': self.end_date.isoformat() if self.end_date else None, 'target_hours': self.target_hours, 'weeks': self.weeks, 'year': self.year, 'month': self.month, 'created_at': self.created_at.isoformat() if self.created_at else None } class ImportBatch(db.Model): """导入批次历史记录模型""" __tablename__ = 'import_batches' id = Column(Integer, primary_key=True) import_date = Column(DateTime, default=datetime.utcnow) status = Column(String(50), nullable=False) success_count = Column(Integer, default=0) failure_count = Column(Integer, default=0) total_records = Column(Integer, default=0) source_preview = Column(Text) failures_log = Column(Text) # 存储失败记录的详细日志 def to_dict(self): return { 'id': self.id, 'import_date': self.import_date.isoformat(), 'status': self.status, 'success_count': self.success_count, 'failure_count': self.failure_count, 'total_records': self.total_records, 'source_preview': self.source_preview, 'failures_log': self.failures_log }