Skip to content

Latest commit

 

History

History
886 lines (665 loc) · 22.9 KB

File metadata and controls

886 lines (665 loc) · 22.9 KB

docs/batch.md - PowerBarcoder 批次上傳模組技術說明

1. 模組概述

1.1 模組功能

PowerBarcoder 批次上傳模組提供一個完整的多配置文件管理系統,允許使用者同時上傳多個 YAML 配置檔案並以隊列模式執行分析流程。本模組採用前後端分離架構,後端使用 SQLite 資料庫進行持久化存儲,前端使用 Vue.js 提供互動式使用者介面,支援拖放上傳、優先級設定、即時狀態監控等功能。

1.2 系統特色

  • 多檔案批次上傳:支援同時上傳多個 YAML 配置檔案
  • 拖放式介面:直觀的檔案拖放上傳體驗
  • 優先級管理:可為每個批次設定執行優先級
  • 即時狀態監控:提供即時的隊列狀態與執行進度
  • 持久化存儲:使用 SQLite 確保資料可靠性
  • 自動化執行:支援自動按順序執行批次任務
  • 錯誤處理:完整的錯誤追蹤與恢復機制

1.3 使用場景

  • 大規模樣本處理:同時分析多個實驗條件或樣本群組
  • 參數比較研究:使用不同參數配置進行比較分析
  • 自動化流水線:建立無人值守的自動化分析流程
  • 批次實驗管理:統一管理多個相關的分析任務

2. 系統架構設計

2.1 整體架構流程

flowchart TD
    A[使用者前端介面] --> B[配置檔案上傳]
    B --> C{檔案驗證}
    C -- 通過 --> D[解析 YAML 配置]
    C -- 失敗 --> E[錯誤回饋]
    
    D --> F[BatchStatusManager]
    F --> G[SQLite 資料庫]
    G --> H[隊列項目存儲]
    
    H --> I[開始處理]
    I --> J[取得下一個待處理項目]
    J --> K{是否有待處理項目?}
    K -- 有 --> L[執行 PowerBarcoder 流程]
    K -- 無 --> M[等待新項目]
    
    L --> N[更新執行狀態]
    N --> O{執行成功?}
    O -- 成功 --> P[標記為完成]
    O -- 失敗 --> Q[標記為失敗]
    
    P --> R[自動處理下一項目]
    Q --> R
    R --> J
    
    subgraph "前端監控"
        S[狀態輪詢]
        T[即時更新顯示]
        U[使用者操作]
    end
    
    G -.-> S
    S --> T
    T --> U
    U --> A
Loading

2.2 核心組件架構

2.2.1 前端組件 (ConfigUploaderModal.js)

  • 檔案管理:拖放上傳、檔案選擇、預覽功能
  • 優先級設定:可調整每個配置的執行優先級
  • 狀態監控:即時顯示隊列狀態與執行進度
  • 操作控制:開始處理、刪除項目、重新載入等功能

2.2.2 後端 API 層 (app.py)

  • 配置上傳處理 (/upload-yaml-config)
  • 隊列狀態查詢 (/batch-queue-status)
  • 項目管理 (/delete-batch-item, /start-processing)
  • 狀態重載 (/reload-queue-state)
  • 子程序輸出串流 (SSE) (/stream/<batch_id>)

2.2.3 批次狀態管理器 (BatchStatusManager)

  • SQLite 資料庫管理:持久化存儲隊列資料
  • 項目生命週期管理:從待處理到完成的狀態轉換
  • 自動化執行引擎:按優先級順序自動執行批次

3. 核心資料結構

3.1 BatchQueueItem 資料類

@dataclass
class BatchQueueItem:
    queue_id: str                    # 隊列項目ID
    batch_id: str                    # 批次ID
    config_data: Dict                # 配置數據
    original_filename: Optional[str] # 原始文件名
    priority: int = 0                # 優先級(數字越大越優先)
    created_time: Optional[datetime] # 建立時間
    start_time: Optional[datetime]   # 開始執行時間
    end_time: Optional[datetime]     # 結束時間
    status: QueueStatus             # 執行狀態
    error_message: Optional[str]     # 錯誤訊息

3.2 狀態枚舉

class QueueStatus(Enum):
    PENDING = "pending"      # 等待執行
    RUNNING = "running"      # 正在執行
    COMPLETED = "completed"  # 已完成
    FAILED = "failed"        # 執行失敗

class BatchStatus(Enum):
    RUNNING = "running"      # 正在運行
    COMPLETED = "completed"  # 已正常完成
    FAILED = "failed"        # 執行失敗

3.3 SQLite 資料庫結構

batch_queue 表

CREATE TABLE IF NOT EXISTS batch_queue (
    queue_id TEXT PRIMARY KEY,
    batch_id TEXT NOT NULL,
    config_data TEXT NOT NULL,        -- JSON 格式的配置數據
    original_filename TEXT,
    priority INTEGER DEFAULT 0,
    created_time TEXT NOT NULL,
    start_time TEXT,
    end_time TEXT,
    status TEXT NOT NULL,
    error_message TEXT
);

system_state 表

CREATE TABLE IF NOT EXISTS system_state (
    key TEXT PRIMARY KEY,
    value TEXT
);

Note: in the implementation system_state stores is_processing as the string values 'true' / 'false' in the database; when returned by the API the code converts this to a boolean for convenience.

4. 主要操作流程

4.1 配置上傳流程

sequenceDiagram
    participant U as 使用者
    participant F as 前端
    participant A as API層
    participant B as BatchStatusManager
    participant D as SQLite DB

    U->>F: 選擇/拖放 YAML 檔案
    F->>F: 驗證檔案格式
    F->>F: 設定優先級
    U->>F: 點擊上傳
    F->>A: POST /upload-yaml-config
    A->>A: 解析 YAML 內容
    A->>B: add_configs_to_queue()
    B->>B: 生成 queue_id 和 batch_id
    B->>D: 插入隊列記錄
    D-->>B: 確認寫入
    B-->>A: 返回 queue_ids
    A-->>F: 返回成功回應
    F->>F: 顯示成功訊息
    F->>F: 開始狀態監控
Loading

4.2 批次執行流程

sequenceDiagram
    participant F as 前端
    participant A as API層
    participant B as BatchStatusManager
    participant D as SQLite DB
    participant P as PowerBarcoder

    F->>A: POST /start-processing
    A->>B: start_processing()
    B->>D: 查詢待處理項目
    D-->>B: 返回最高優先級項目
    B->>D: 更新狀態為 RUNNING
    B->>B: _process_batch_item_from_db()
    B->>B: _save_batch_config()
    B->>P: 執行 PowerBarcoder 流程
    P-->>B: 執行結果
    B->>D: 更新最終狀態
    B->>B: _auto_process_next_item()
    B->>D: 查詢下一個項目
    alt 有下一個項目
        B->>B: 繼續處理
    else 無項目
        B->>B: 等待新項目
    end
Loading

4.3 狀態監控流程

sequenceDiagram
    participant F as 前端
    participant A as API層
    participant B as BatchStatusManager
    participant D as SQLite DB

    loop 每 15 秒
        F->>A: GET /batch-queue-status
        A->>B: get_queue_status()
        B->>D: 查詢所有項目狀態
        D-->>B: 返回狀態數據
        B->>B: 統計各狀態數量
        B-->>A: 返回隊列狀態
        A-->>F: JSON 格式狀態
        F->>F: 更新顯示
    end
Loading

5. API 介面規格

5.1 配置上傳 API

POST /upload-yaml-config

請求格式

{
    "mode": "batch",
    "configs": [
        {
            "filename": "config1.yml",
            "content": "batch_name: test1\n...",
            "priority": 1
        },
        {
            "filename": "config2.yml", 
            "content": "batch_name: test2\n...",
            "priority": 0
        }
    ],
    "batch_interval": 0,
    "execution_order": "sequential"
}

回應格式

{
    "status": "success",
    "message": "成功添加 2 個批次到執行隊列",
    "batch_ids": ["batch_20250903141500_001_123", "batch_20250903141500_002_456"],
    "queue_id": "202509031415"
}

5.2 隊列狀態查詢 API

GET /batch-queue-status

查詢回應格式

{
    "status": "success",
    "queue_status": {
        "is_processing": false,
        "current_batch": "",
        "pending": 2,
        "running": 0,
        "completed": 1,
        "failed": 0,
        "total": 3,
        "queue_items": [
            {
                "queue_id": "queue_20250903141500_001",
                "batch_id": "batch_20250903141500_001_123",
                "original_filename": "config1.yml",
                "priority": 1,
                "status": "pending",
                "created_time": "2025-09-03T14:15:00.123456",
                "config_data": {...}
            }
        ],
        "completed_items": [...]
    }
}

5.3 開始處理 API

POST /start-processing

處理回應格式

{
    "status": "success",
    "message": "開始處理批次: batch_20250903141500_001_123",
    "batch_id": "batch_20250903141500_001_123"
}

5.4 刪除項目 API

POST /delete-batch-item

刪除請求格式

{
    "queue_id": "queue_20250903141500_001"
}

刪除回應格式

{
    "status": "success",
    "message": "批次項目已成功刪除",
    "queue_status": {...}
}

5.5 啟動單一批次 / run-procedure

POST /run-procedure

此端點用於啟動 PowerBarcoder 的執行流程。請求體為 JSON,允許選填 batch_id(若提供則使用該 ID;否則系統會以時間戳生成一個新的 batch_id)。可選參數 override_config(布林)表示強制覆蓋既有 config.yml。

請求示意:

{
    "batch_id": "batch_20250903141500_001_123",  
    "override_config": false,
    "other_config": { ... }
}

回應示意:

{
    "status": "success",
    "message": "處理已啟動,批次ID: batch_20250903141500_001_123",
    "batch_id": "batch_20250903141500_001_123"
}

Implementation: see app.py::run_procedure_http (程式會在伺服器端建立子程序並以 SSE/輸出隊列提供日誌串流)。

Note: when the server generates a batch_id (if one is not supplied), it uses a timestamp and may append a short UUID segment if a collision is detected; additionally when add_configs_to_queue inserts configs it rewrites each config's resultDataPath to include the generated batch_id so results are stored per-batch.

5.6 取得批次狀態 / batch-status

GET /batch-status/<batch_id>

回傳該 batch 的狀態資訊(由 BatchStatusManager 提供的資料結構),額外會回傳 process_running(若該 batch 仍在 active_processes 中且子程序尚未結束,則為 true)。

回應範例(簡化):

{
    "status": "success",
    "batch_status": {
        "batch_id": "batch_20250903141500_001_123",
        "status": "running",
        "process_running": true,
        "created_time": "2025-09-03T14:15:00.123456",
        "details": { ... }
    }
}

Implementation: see app.py::get_batch_status_http

5.7 即時序列分析 SSE / analyze-sequence-stream

GET /analyze-sequence-stream?batch_id=<>&locus=<>&identifier_type=<>&identifier_value=<>

這是一個使用 Server-Sent Events (SSE) 的串流 API,針對單一樣本(可用 barcode_id 或 sample_id 指定)執行 LLM-based 的分析並逐步回傳狀態與內容。必填參數:batch_idlocusidentifier_type(例如 barcode_idsample_id)、identifier_value

串流中會以 JSON 字串做為 SSE 的 data 部分,常見型別包括:

  • status:進度或狀態訊息(包含 progress 欄位)
  • prompt:發送給 LLM 的 prompt 內容(提示文字)
  • llm_response:LLM 的分段回應
  • error / warning:錯誤或警告訊息
  • complete:解析後的分析結果摘要
  • final_complete:包含最終結構化結果與報告檔名

範例(片段):

data: {"type":"status","message":"Initializing analysis...","progress":10}

data: {"type":"prompt","content":"<prompt text here>"}

data: {"type":"llm_response","content":"...partial text..."}

data: {"type":"final_complete","result":{...}}

Implementation: see app.py::analyze_sequence_stream。此端點會在伺服器端使用 src.llm 中的客戶端、分析器與報表產生器來執行 streaming 分析;當 LLM 分析功能未啟用或參數缺失時會回傳 error 事件。

注意:此端點為 SSE 串流,需於客戶端使用 EventSource 或等價的 SSE 客戶端接收。

5.8 取得可用樣本清單 / get-available-samples

GET /get-available-samples/<batch_id>/<locus>

回傳該批次與基因座下的 barcode_id ↔ sample_id 對應清單,供前端在分析/選樣時使用。

回應範例:

{
    "status": "success",
    "data": {
        "samples": [
            {"barcode_id": "A01", "sample_id": "sample_001"},
            {"barcode_id": "A02", "sample_id": "sample_002"}
        ],
        "count": 2
    }
}

Implementation: see app.py::get_available_samples(內部使用 src.qc.qc_data_manager.QCDataManager 讀取 locus 資料庫)。

5.9 下載結果壓縮檔 / download

GET /download/<room_name>

會將指定 room(通常對應批次結果資料夾)下的每個 locus 的 *_qcReport.csvextracted_sequences/llm_analysis/*_recommended_sequences.fas 打包成 ZIP 提供下載。

Implementation: see app.py::download_result

5.10 重新載入隊列狀態 / reload-queue-state

POST /reload-queue-state

重新從 SQLite 讀取隊列狀態並回傳最新狀態(等同於後端強制刷新)。

回應示例:

{
    "status": "success",
    "message": "隊列狀態已重新載入",
    "queue_status": { ... }
}

Implementation: see app.py::reload_queue_state_api

5.11 下載分析報告 / download-analysis-report

POST /download-analysis-report

統一的報告下載 API,從持久目錄讀取 majority_vote_summary.json 並生成報告。優先提供已存在的 Markdown 報告,其次回傳 JSON 摘要內容。

下載報告請求格式

{
    "batch_id": "202508091806",
    "locus": "trnLF",
    "sample_id": "Christella_jaculosa_Wade3964_KTHU2097"
}

下載報告回應格式

  • 成功:回傳 Markdown 檔案或 JSON 檔案下載。
  • 失敗:回傳 JSON 格式錯誤訊息。

Implementation: see app.py::download_analysis_report

5.12 建立 ML 樹 / build-ml-trees

GET /build-ml-trees/<batch_id>

建立 Maximum Likelihood 系統發生樹。

建樹參數

  • batch_id (Path): 批次 ID
  • locus (Query, Optional): 指定單一基因座,預設處理所有基因座

建樹回應範例

{
    "status": "success",
    "batch_id": "batch_20250903141500_001_123",
    "total_loci": 2,
    "success_count": 2,
    "failed_count": 0,
    "success_loci": ["rbcL", "trnLF"],
    "failed_loci": [],
    "tree_files": {
        "rbcL": ["rbcL_tree.png"],
        "trnLF": ["trnLF_tree.png"]
    },
    "message": "成功為 2/2 個基因座建立 ML 樹"
}

Implementation: see app.py::build_ml_trees_api

5.13 其他簡單端點

  • GET /config-uploader:回傳 Config Uploader 的靜態頁面 (see app.py::config_uploader)。
  • GET /health:健康檢查回應(回傳 'OK'),(see app.py::health)。

6. 前端使用者介面

6.1 ConfigUploaderModal 組件功能

6.1.1 檔案上傳區域

  • 拖放支援:支援將 YAML 檔案拖放到指定區域
  • 多檔案選擇:可同時選擇多個配置檔案
  • 檔案驗證:自動驗證檔案格式(.yml, .yaml)
  • 預覽功能:可預覽配置檔案內容

6.1.2 配置管理

  • 優先級設定:為每個配置設定執行優先級(數字越大越優先)
  • 檔案移除:可移除已選擇的檔案
  • 檔案資訊:顯示檔案大小、名稱等資訊

6.1.3 隊列監控面板

  • 狀態統計:顯示 pending、running、completed、failed 數量
  • 當前執行:顯示目前正在執行的批次
  • 項目列表:顯示隊列中的所有項目及其狀態
  • 操作按鈕
    • Start Processing:開始處理下一個隊列項目
    • Reload State:重新載入隊列狀態
    • Delete:刪除指定的隊列項目

6.2 狀態顯示設計

6.2.1 狀態指示器

<span class="badge"
      :class="{
          'bg-secondary': status === 'pending',
          'bg-primary': status === 'running', 
          'bg-success': status === 'completed',
          'bg-danger': status === 'failed'
      }">
    {{ status }}
</span>

6.2.2 進度指示

  • 視覺化進度條:顯示整體隊列完成進度
  • 即時更新:每 15 秒自動刷新狀態
  • 動畫效果:processing 狀態使用旋轉動畫

7. 技術實作細節

7.1 唯一性保證機制

7.1.1 ID 生成策略

# 隊列項目ID
queue_id = f"queue_{datetime.now().strftime('%Y%m%d%H%M%S')}_{i:03d}"

# 批次ID(加入隨機後綴確保唯一性)
random_suffix = random.randint(100, 999)
batch_id = f"batch_{datetime.now().strftime('%Y%m%d%H%M%S')}_{i:03d}_{random_suffix}"

7.1.2 資料庫約束

  • 使用 queue_id 作為主鍵確保唯一性
  • 使用事務確保資料一致性

7.2 錯誤處理策略

7.2.1 前端錯誤處理

try {
    const response = await fetch(url, options);
    const data = await response.json();
    
    if (data.status === 'success') {
        // 成功處理
    } else {
        throw new Error(data.message);
    }
} catch (error) {
    this.$emit('show-error', 'Operation failed: ' + error.message);
}

7.2.2 後端錯誤處理

try:
    # 批次處理邏輯
    success = self._execute_powerbarcoder(item.batch_id)
    
    if success:
        # 更新為成功狀態
    else:
        # 更新為失敗狀態
        
except Exception as e:
    # 記錄錯誤並更新資料庫
    print(f"[ERROR] 處理批次時發生錯誤: {e}")
    # 更新為失敗狀態並記錄錯誤訊息

7.3 自動化執行引擎

7.3.1 優先級排序

SELECT * FROM batch_queue 
WHERE status IN ('pending')
ORDER BY priority DESC, created_time ASC
LIMIT 1

7.3.2 自動處理邏輯

def _auto_process_next_item(self):
    """自動處理下一個待處理的隊列項目"""
    try:
        next_item = self._get_next_pending_item_from_db()
        if next_item:
            processing_thread = threading.Thread(
                target=self._process_batch_item_from_db, 
                args=(next_item,), 
                daemon=True
            )
            processing_thread.start()
    except Exception as e:
        print(f"[ERROR] 自動處理下一項目失敗: {e}")

8. 配置與部署

8.1 環境變數設定

# 資料目錄路徑
POWERBARCODER_DATA_DIR=/PowerBarcoder_data

# SQLite 資料庫位置
# 自動設定為: ${POWERBARCODER_DATA_DIR}/batch_queue.db

8.2 目錄結構

PowerBarcoder_data/
├── batch_queue.db           # SQLite 資料庫
├── result/                  # 批次結果目錄
│   ├── batch_20250903141500_001_123/
│   │   ├── config.yml      # 批次配置檔案
│   │   ├── batch_20250903141500_001_123.json  # 批次狀態檔案
│   │   └── ...             # 分析結果
│   └── batch_20250903141500_002_456/
└── ...

8.3 Docker 配置

# 確保資料目錄掛載
VOLUME ["/PowerBarcoder_data"]

# 暴露 Flask 應用端口
EXPOSE 5000
# 運行容器
docker run -d -p 5000:5000 \
  -v "${PWD}:/PowerBarcoder" \
  -v "/path/to/data:/PowerBarcoder_data" \
  --name powerbarcoder \
  powerbarcoder

9. 使用者操作指引

9.1 基本操作流程

  1. 開啟配置上傳器

    • 訪問 http://localhost:5000/config-uploader
    • 或在主介面中點擊相應按鈕
  2. 上傳配置檔案

    • 拖放 YAML 檔案到上傳區域
    • 或點擊選擇檔案
    • 設定每個檔案的優先級
  3. 提交批次任務

    • 檢查已選擇的檔案列表
    • 點擊 "Add to Queue" 按鈕
    • 確認上傳成功訊息
  4. 監控執行狀態

    • 查看隊列統計資訊
    • 觀察項目執行進度
    • 必要時手動開始處理
  5. 管理隊列項目

    • 刪除不需要的項目
    • 重新載入狀態
    • 檢查錯誤訊息

9.2 進階操作

9.2.1 優先級管理

  • 數字越大優先級越高
  • 相同優先級按建立時間排序
  • 可隨時調整待處理項目的優先級

9.2.2 錯誤排除

  • 檢查 error_message 欄位了解失敗原因
  • 查看批次結果目錄中的詳細日誌
  • 重新提交修正後的配置

9.2.3 批次管理

  • 使用有意義的檔案名稱以便識別
  • 合理設定優先級避免重要任務延遲
  • 定期清理已完成的項目釋放資源

10. 疑難排解

10.1 常見問題

10.1.1 上傳失敗

  • 問題:YAML 解析錯誤
  • 解決:檢查 YAML 格式,確保縮排正確

10.1.2 隊列不執行

  • 問題:系統顯示 is_processing: false 但有待處理項目
  • 解決:手動點擊 "Start Processing" 按鈕

10.1.3 資料庫問題

  • 問題:SQLite 資料庫損毀或無法訪問
  • 解決:檢查資料目錄權限,必要時重新初始化資料庫

10.2 效能優化

10.2.1 大批次處理

  • 避免同時上傳過多配置檔案
  • 合理設定優先級分散系統負載
  • 監控系統資源使用情況

10.2.2 資料庫維護

  • 定期清理已完成的歷史記錄
  • 備份重要的配置和結果資料
  • 監控資料庫檔案大小

11. 開發者指引

11.1 擴展功能

11.1.1 新增狀態類型

class QueueStatus(Enum):
    PENDING = "pending"
    RUNNING = "running" 
    COMPLETED = "completed"
    FAILED = "failed"
    PAUSED = "paused"      # 新增:暫停狀態
    CANCELLED = "cancelled" # 新增:取消狀態

11.1.2 新增 API 端點

@app.route('/pause-batch-item', methods=['POST'])
def pause_batch_item():
    """暫停指定的批次項目"""
    # 實作暫停邏輯
    pass

11.2 整合其他模組

11.2.1 與日誌系統整合

def _execute_powerbarcoder(self, batch_id: str) -> bool:
    """執行PowerBarcoder流程並整合日誌"""
    try:
        # 設定日誌輸出
        log_manager.setup_batch_logging(batch_id)
        
        # 執行流程
        result = subprocess.run(...)
        
        return result.returncode == 0
    except Exception as e:
        log_manager.error(f"批次執行失敗: {e}")
        return False

11.2.2 與通知系統整合

def complete_batch(self, batch_id, success=True):
    """完成批次並發送通知"""
    super().complete_batch(batch_id, success)
    
    # 發送通知
    if success:
        notification_manager.send_success_notification(batch_id)
    else:
        notification_manager.send_failure_notification(batch_id)

11.3 測試策略

11.3.1 單元測試

def test_add_configs_to_queue():
    """測試配置添加到隊列"""
    manager = BatchStatusManager()
    configs = [{"batch_name": "test"}]
    
    queue_ids = manager.add_configs_to_queue(configs)
    
    assert len(queue_ids) == 1
    assert queue_ids[0].startswith("queue_")

11.3.2 整合測試

def test_full_batch_workflow():
    """測試完整批次工作流程"""
    # 上傳配置
    # 開始處理
    # 檢查狀態變化
    # 驗證結果
    pass

本文件提供了 PowerBarcoder 批次上傳模組的完整技術說明,涵蓋系統架構、API 規格、使用指引及開發者資源。開發者可依據此文件進行功能擴展與維護,使用者可參考操作指引高效使用批次處理功能。