"""
Goalskillsales_team.xlsx / Goalskillsales_personal.xlsx 에서 데이터 추출(export).
Excel_Parsing_Specification.md 기준. openpyxl 사용.
"""
import io
from typing import List, Dict, Any

import openpyxl

from app.models.sales_sheet_module import insert_sales_sheet_records

# Excel 행 14~17 → period_type
ROW_PERIOD = {
    14: "quarter",
    15: "month",
    16: "week",
    17: "day",
}

# Category 1~9: (target 열, result 열) openpyxl 1-based. D=4,E=5 / G=7,H=8 / ...
COLS_1BASED = [
    (4, 5),   # 1 新規リストアップ行動 D,E
    (7, 8),   # 2 架電数 G,H
    (10, 11), # 3 繋がるための行動 J,K
    (13, 14), # 4 アポ M,N
    (16, 17), # 5 訪問 P,Q
    (19, 20), # 6 見積 S,T
    (22, 23), # 7 見積額 V,W
    (25, 26), # 8 受注 Y,Z
    (28, 29), # 9 受注額 AB,AC
]


def _cell_value(ws, row: int, col: int):
    """시트에서 셀 값 읽기. 1-based. 숫자는 그대로, None이면 None."""
    try:
        v = ws.cell(row=row, column=col).value
        if v is None:
            return None
        if isinstance(v, (int, float)):
            return float(v) if isinstance(v, float) else int(v)
        return v
    except Exception:
        return None


def _is_team_sheet(ws) -> bool:
    """B12 값에 'Team' 포함 여부로 팀 시트 판별."""
    b12 = _cell_value(ws, 12, 2)
    if b12 is None:
        return False
    return "Team" in str(b12) or "team" in str(b12).lower()


def _sheet_type_from_sheet(ws, filename: str) -> str:
    """파일이 팀 시트인지 개인 시트인지 한 번만 판별 → '1s'(개인) or '2s'(팀)."""
    fn_lower = (filename or "").lower()
    if "team" in fn_lower or _is_team_sheet(ws):
        return "2s"
    return "1s"


def parse_sales_sheet_excel(file_content: bytes, filename: str = "") -> List[Dict[str, Any]]:
    """
    엑셀 바이트에서 명세대로 데이터만 추출(export). DB는 건드리지 않음.
    Returns:
        DB insert용 딕셔너리 리스트. session_id는 호출하는 쪽에서 채움(예: 요청 session_id).
    """
    wb = openpyxl.load_workbook(io.BytesIO(file_content), read_only=True, data_only=True)
    ws = wb.active
    sheet_type = _sheet_type_from_sheet(ws, filename)  # 1s or 2s 만 파일에서 판별

    records = []
    for excel_row, period_type in ROW_PERIOD.items():
        for category_one_index, (target_col, result_col) in enumerate(COLS_1BASED):
            category = category_one_index + 1
            target_val = _cell_value(ws, excel_row, target_col)
            result_val = _cell_value(ws, excel_row, result_col)
            if target_val is not None:
                records.append({
                    "session_id": "",  # save_sales_sheet_from_excel 에서 요청 session_id 로 채움
                    "sheet_type": sheet_type,
                    "period_type": period_type,
                    "record_type": "target",
                    "category": category,
                    "value": target_val,
                })
            if result_val is not None:
                records.append({
                    "session_id": "",
                    "sheet_type": sheet_type,
                    "period_type": period_type,
                    "record_type": "result",
                    "category": category,
                    "value": result_val,
                })
    wb.close()
    return records


def save_sales_sheet_from_excel(
    file_content: bytes,
    filename: str,
    session_id: str,
) -> int:
    """
    엑셀 업로드 → 파싱 → session_id 를 요청 세션으로 통일 → DB 저장.
    Returns:
        저장(INSERT/UPDATE)된 행 수.
    """
    records = parse_sales_sheet_excel(file_content, filename)
    if not records:
        return 0
    for r in records:
        r["session_id"] = session_id
    return insert_sales_sheet_records(records)
