diff --git a/run.py b/run.py index e0409d5c5..10a1bba51 100644 --- a/run.py +++ b/run.py @@ -1,11 +1,15 @@ import argparse +import asyncio import copy as cp import datetime import json import os import os.path as osp import subprocess +import sys from functools import partial +from pathlib import Path +from typing import List import pandas as pd from tabulate import tabulate @@ -46,16 +50,21 @@ def get_gpu_list(): ) +from vlmeval.api import LMDeployAPI from vlmeval.config import supported_VLM from vlmeval.dataset import build_dataset from vlmeval.dataset.video_dataset_config import supported_video_datasets from vlmeval.inference import infer_data_job from vlmeval.inference_mt import infer_data_job_mt from vlmeval.inference_video import infer_data_job_video -from vlmeval.smp import (MMBenchOfficialServer, get_pred_file_format, githash, listinstr, load, - load_env, ls, prepare_reuse_files, proxy_set, setup_logger, timestr) +from vlmeval.smp import (MMBenchOfficialServer, build_eval_id, get_eval_file_format, get_logger, + get_pred_file_format, get_pred_file_path, githash, is_prediction_complete, + listinstr, load, load_env, prepare_reuse_files, proxy_set, setup_logger, + timestr, update_dataset_status, write_run_status) from vlmeval.utils.result_transfer import MMMU_result_transfer, MMTBench_result_transfer +logger = get_logger(__name__) + # Make WORLD_SIZE invisible when build models def build_model_from_config(cfg, model_name, use_vllm=False): @@ -104,6 +113,159 @@ def build_dataset_from_config(cfg, dataset_name): raise ValueError(f'Class {cls_name} is not supported in `vlmeval.dataset`') +def build_model_from_base_url(args): + """Build LMDeployAPI model kwargs from command-line arguments. + + Used by both local and API modes when --base-url is specified. + Returns a dict suitable for LMDeployAPI(**kwargs) or partial(LMDeployAPI, **kwargs). + """ + model_args = dict( + model=args.model[0] if isinstance(args.model, list) else args.model, + api_base=f"{args.base_url.rstrip('/')}/chat/completions", + key=args.key, + custom_prompt=args.custom_prompt, + max_tokens=args.max_tokens, + retry=args.retry, + timeout=args.timeout, + temperature=args.temperature, + top_k=args.top_k, + top_p=args.top_p, + repetition_penalty=args.repetition_penalty, + verbose=args.verbose, + ) + model_args = {k: v for k, v in model_args.items() if v is not None} + if args.thinker: + logger.warning('[Deprecated] Use `--max-tokens` and `--timeout` directly.') + model_args.update(dict(timeout=args.timeout * 2, max_tokens=args.max_tokens * 2)) + if args.extra_body: + try: + extra = json.loads(args.extra_body) + except Exception as e: + raise ValueError(f'Unable to parse the --extra-body value `{args.extra_body}`') from e + assert isinstance(extra, dict), '--extra-body must be a valid Python dict' + model_args.update(extra) + return model_args + + +def get_judge_kwargs(dataset_name, dataset_type, args): + """Determine judge kwargs based on dataset name and type. + + Uses run.py's logic as the canonical source for dataset-specific judge model + assignments, with additional entries from run_api.py (Video-MME). + Supports both local and API modes with mode-specific fallbacks. + """ + # Determine nproc with mode-specific fallback + if args.judge_api_nproc is not None: + nproc = args.judge_api_nproc + else: + nproc = args.api_nproc # local mode fallback + + # Determine retry with mode-specific fallback + if args.judge_retry is not None: + retry = args.judge_retry + else: + retry = args.retry + + judge_kwargs = { + 'nproc': nproc, + 'verbose': args.verbose, + 'retry': retry, + 'timeout': args.judge_timeout, + **(json.loads(args.judge_args) if args.judge_args else {}), + } + + if args.judge_base_url: + judge_kwargs['api_base'] = f"{args.judge_base_url.rstrip('/')}/chat/completions" + if args.judge_key: + judge_kwargs['key'] = args.judge_key + + if args.judge is not None: + judge_kwargs['model'] = args.judge + else: + if dataset_type in ['MCQ', 'Y/N', 'MCQ_MMMU_Pro'] or listinstr( + ['moviechat1k', 'mme-reasoning'], dataset_name.lower() + ): + if listinstr(['WeMath', 'MME-Reasoning'], dataset_name): + judge_kwargs['model'] = 'gpt-4o-mini' + elif listinstr(['VisualPuzzles'], dataset_name): + judge_kwargs['model'] = 'exact_matching' + elif listinstr(['PuzzleVQA'], dataset_name): + judge_kwargs['model'] = 'exact_matching' + elif listinstr(['VisuLogic'], dataset_name): + judge_kwargs['model'] = 'exact_matching' + else: + judge_kwargs['model'] = 'chatgpt-0125' + elif listinstr(['MMVet', 'LLaVABench', 'MMBench_Video'], dataset_name): + if listinstr(['LLaVABench_KO'], dataset_name): + judge_kwargs['model'] = 'gpt-4o-0806' + else: + judge_kwargs['model'] = 'gpt-4-turbo' + elif listinstr(['VGRPBench'], dataset_name): + judge_kwargs['model'] = 'gpt-4o' + elif listinstr( + ['MathVista', 'MathVerse', 'MathVision', 'LENS', 'DynaMath', 'VL-RewardBench', + 'LogicVista', 'MOAT', 'OCR_Reasoning', 'VTCBench', 'Asclepius', + 'MMSafetyBench', 'MSSBench', 'SIUO', 'SIUO_GEN', 'XSTest', 'Flames'], dataset_name + ): + judge_kwargs['model'] = 'gpt-4o-mini' + elif listinstr(['OlympiadBench'], dataset_name): + use_api_judger = judge_kwargs.get("olympiad_use_api_judger", False) + if use_api_judger: + judge_kwargs['model'] = 'gpt-4o-mini' + elif listinstr( + ['MMLongBench', 'MMDU', 'DUDE', 'SLIDEVQA', 'MIA-Bench', + 'WildVision', 'MMAlignBench', 'MM-IFEval'], dataset_name + ): + judge_kwargs['model'] = 'gpt-4o' + elif listinstr(['ChartMimic'], dataset_name): + judge_kwargs['model'] = 'gpt-4o' + elif listinstr(['VDC'], dataset_name): + judge_kwargs['model'] = 'llama31-8b' + elif listinstr(['Video_MMLU_QA', 'Video_MMLU_CAP'], dataset_name): + judge_kwargs['model'] = 'qwen-72b' + elif listinstr(['MMVMBench'], dataset_name): + judge_kwargs['model'] = 'gpt-4o' + elif listinstr(['CVQA_EN', 'CVQA_LOC'], dataset_name): + judge_kwargs['model'] = 'gpt-4.1' + elif listinstr(['M4Bench'], dataset_name): + judge_kwargs['model'] = 'gpt-4o' + elif listinstr(['AyaVisionBench'], dataset_name): + judge_kwargs['model'] = 'gpt-4.1' + elif listinstr(['MathCanvas'], dataset_name): + judge_kwargs['model'] = 'gpt-4.1-2025-04-14' + elif listinstr(['MMReason'], dataset_name): + judge_kwargs['model'] = 'gpt-4.1' + elif listinstr(['CoreCognition'], dataset_name): + judge_kwargs['model'] = 'gpt-4.1' + elif listinstr(['WorldVQA'], dataset_name): + judge_kwargs['model'] = 'gpt-4o-1120' + elif listinstr(['Video-MME'], dataset_name): + judge_kwargs['model'] = 'chatgpt-0125' + + if args.use_verifier: + judge_kwargs['use_verifier'] = True + if args.use_vllm: + judge_kwargs['use_vllm'] = True + + return judge_kwargs + + +def parse_reuse_aux_arg(reuse_aux): + if isinstance(reuse_aux, bool): + return 'all' if reuse_aux else 'none' + if isinstance(reuse_aux, int): + return 'all' if reuse_aux else 'none' + if isinstance(reuse_aux, str): + value = reuse_aux.strip().lower() + if value in ['all', 'infer', 'none']: + return value + if value in ['1', 'true', 'yes']: + return 'all' + if value in ['0', 'false', 'no']: + return 'none' + raise argparse.ArgumentTypeError('reuse_aux must be one of: all, infer, none') + + def parse_args(): help_msg = """\ You can launch the evaluation by setting either --data and --model or --config. @@ -176,41 +338,94 @@ def parse_args(): The keys in the `model` and `data` fields will be used for naming the prediction files and evaluation results. When launching with `--config`, args for API VLMs, such as `--retry`, `--verbose`, will be ignored. + +--api-mode: + Switch to the async API pipeline mode (originally run_api.py). This mode uses an optimized pipeline + for API-based models with cross-dataset unified inference queue, parallel inference and evaluation, + and better remote model utilization. """ parser = argparse.ArgumentParser(description=help_msg, formatter_class=argparse.RawTextHelpFormatter) + # Essential Args, Setting the Names of Datasets and Models parser.add_argument('--data', type=str, nargs='+', help='Names of Datasets') parser.add_argument('--model', type=str, nargs='+', help='Names of Models') parser.add_argument('--config', type=str, help='Path to the Config Json File') - # Work Dir + + # Work Dir & Mode parser.add_argument('--work-dir', type=str, default='./outputs', help='select the output directory') - # Infer + Eval or Infer Only parser.add_argument('--mode', type=str, default='all', choices=['all', 'infer', 'eval']) + # API Kwargs, Apply to API VLMs and Judge API LLMs - parser.add_argument('--api-nproc', type=int, default=4, help='Parallel API calling') - parser.add_argument('--retry', type=int, default=None, help='retry numbers for API VLMs') - parser.add_argument('--judge-args', type=str, default=None, help='Judge arguments in JSON format') - # Explicitly Set the Judge Model - parser.add_argument('--judge', type=str, default=None) - # Logging Utils + parser.add_argument('--api-nproc', type=int, default=32, help='Parallel API calling') + parser.add_argument('--retry', type=int, default=6, help='retry numbers for API VLMs') parser.add_argument('--verbose', action='store_true') - # Configuration for Resume - # Ignore: will not rerun failed VLM inference - parser.add_argument('--ignore', action='store_true', help='Ignore failed indices. ') - # Reuse: will reuse the existing prediction files + parser.add_argument('--keep-failed', action='store_true', + help='Keep failed predictions as-is instead of retrying them.') + parser.add_argument( + '--ignore', + action='store_true', + help='[Deprecated] Ignore failed indices, it is the default behavior now. ' + 'Use `--keep-failed` to disable it.') parser.add_argument('--reuse', action='store_true') - # Reuse-aux: if set, when reuse is True, will also reuse the auxiliary evaluation files - parser.add_argument('--reuse-aux', type=int, default=True, help='reuse auxiliary evaluation files') + parser.add_argument( + '--reuse-aux', + type=parse_reuse_aux_arg, + default='all', + help='Reuse auxiliary files: `all` for infer+eval aux, `infer` for inference-only aux, `none` for no aux.' + ) parser.add_argument( '--use-vllm', action='store_true', help='use vllm to generate, the flag is only supported in Llama4 for now') parser.add_argument('--use-verifier', action='store_true', help='use verifier to evaluate') + # Judge Args + parser.add_argument('--judge', type=str, default=None) + parser.add_argument('--judge-args', type=str, default=None, help='Judge arguments in JSON format') + parser.add_argument('--judge-base-url', type=str, default=None, help='Base URL of judge API') + parser.add_argument('--judge-key', type=str, default=None, help='API key for judge model') + parser.add_argument('--judge-api-nproc', type=int, default=None, + help='Parallel API calling for judger (defaults to follow --api-nproc)') + parser.add_argument('--judge-retry', type=int, default=None, + help='Retry times for failed judgement (defaults to follow --retry)') + parser.add_argument('--judge-timeout', type=int, default=600, + help='Max time in seconds for judgement.') + + # Inference Model Args (when --base-url is specified) + parser.add_argument('--base-url', type=str, default=None, + help='Base URL of OpenAI-compatible API (e.g. http://localhost:8080/v1). ' + 'If set, LMDeployAPI is used for inference without modifying config.py.') + parser.add_argument('--key', type=str, default='sk-admin', help='API key for inference model') + parser.add_argument('--thinker', action='store_true', + help='[Deprecated] Enable thinking mode: doubles timeout and max_tokens.') + parser.add_argument('--max-tokens', type=int, default=2 ** 15, + help='Max tokens for model generation.') + parser.add_argument('--temperature', type=float, default=None) + parser.add_argument('--top-k', type=int, default=None) + parser.add_argument('--top-p', type=float, default=None) + parser.add_argument('--repetition-penalty', type=float, default=None) + parser.add_argument('--timeout', type=int, default=1800, + help='Max time in seconds for a single inference request.') + parser.add_argument('--custom-prompt', type=str, default=None, + help='Manually select a model adapter by name.') + parser.add_argument('--extra-body', type=str, default=None, + help='Extra inference parameters as json dict string') + + # API Pipeline Args (only for --api-mode) + parser.add_argument('--api-mode', action='store_true', + help='Switch to async API pipeline mode') + parser.add_argument('--monitor-interval', type=int, default=30, + help='Status monitoring interval in seconds') + parser.add_argument('--debug', action='store_true', + help='Debug mode: run evaluation in main process') + args = parser.parse_args() + if args.ignore: + logger.warning('[Deprecated] the `--ignore` flag is deprecated since it is ' + 'the default behavior, use `--keep-failed` to disable it.') return args -def main(): - args = parse_args() +def run_local_mode(args): + """Original evaluation mode with GPU/distributed support.""" use_config, cfg = False, None if args.config is not None: assert args.data is None and args.model is None, '--data and --model should not be set when using --config' @@ -223,15 +438,19 @@ def main(): if 'MMEVAL_ROOT' in os.environ: args.work_dir = os.environ['MMEVAL_ROOT'] - date, commit_id = timestr('day'), githash(digits=8) - eval_id = f"T{date}_G{commit_id}" - logger = setup_logger(log_file=os.path.join(args.work_dir, 'logs', f'{eval_id}_{timestr()}.log')) + commit_id = githash(digits=8) + eval_id = build_eval_id() + setup_logger(log_file=os.path.join(args.work_dir, 'logs', f'{eval_id}_{timestr()}.log')) + + if args.mode == 'eval': + args.reuse = True + logger.info('Force to use `reuse=True` for eval mode.') if RANK == 0: if not args.reuse: - logger.warning('--reuse is not set, will not reuse previous (before one day) temporary files') + logger.warning('--reuse is not set, this run will start from a fresh output directory') else: - logger.warning('--reuse is set, will reuse the latest prediction & temporary pickle files') + logger.info(f'--reuse is set, reuse-aux={args.reuse_aux}') if not use_config: for k, v in supported_VLM.items(): @@ -262,22 +481,35 @@ def main(): for _, model_name in enumerate(args.model): logger.info(f'=========== {model_name} ===========') model = None - date, commit_id = timestr('day'), githash(digits=8) - eval_id = f"T{date}_G{commit_id}" pred_root = osp.join(args.work_dir, model_name, eval_id) pred_root_meta = osp.join(args.work_dir, model_name) os.makedirs(pred_root_meta, exist_ok=True) - - prev_pred_roots = ls(osp.join(args.work_dir, model_name), mode='dir') - if len(prev_pred_roots) and args.reuse: - prev_pred_roots.sort() - - if not osp.exists(pred_root): - os.makedirs(pred_root, exist_ok=True) + os.makedirs(pred_root, exist_ok=True) + + if RANK == 0: + write_run_status( + pred_root, + dict( + eval_id=eval_id, + created_at=datetime.datetime.now().astimezone().isoformat(), + commit=commit_id, + argv=sys.argv, + api_mode=False, + world_size=WORLD_SIZE, + pred_format=get_pred_file_format(), + eval_format=get_eval_file_format(), + mode=args.mode, + reuse=bool(args.reuse), + reuse_aux=args.reuse_aux, + )) if use_config: model = build_model_from_config(cfg['model'], model_name, args.use_vllm) + elif args.base_url: + model_args = build_model_from_base_url(args) + model_args['model'] = model_name + model = LMDeployAPI(**model_args) for _, dataset_name in enumerate(args.data): logger.info(f'----------- {dataset_name} -----------') @@ -285,8 +517,8 @@ def main(): dist.barrier() try: - pred_format = get_pred_file_format() - result_file_base = f'{model_name}_{dataset_name}.{pred_format}' + result_file = get_pred_file_path( + pred_root, model_name, dataset_name, use_env_format=True) if use_config: if WORLD_SIZE > 1: @@ -313,18 +545,47 @@ def main(): logger.error(f'Dataset {dataset_name} is not valid, will be skipped. ') continue - # Handling Multi-Turn Dataset - result_file = osp.join(pred_root, result_file_base) - # Reuse the previous prediction file if exists - if RANK == 0 and len(prev_pred_roots): - prepare_reuse_files( - pred_root_meta=pred_root_meta, eval_id=eval_id, model_name=model_name, - dataset_name=dataset_name, reuse=args.reuse, reuse_aux=args.reuse_aux + judge_kwargs = get_judge_kwargs(dataset_name, dataset.TYPE, args) + judge_signature = judge_kwargs.get('model', '') + + if RANK == 0: + reuse_ctx = prepare_reuse_files( + pred_root_meta=pred_root_meta, + eval_id=eval_id, + model_name=model_name, + dataset=dataset, + result_file=result_file, + reuse=args.reuse, + reuse_aux=args.reuse_aux, + retry_failed=not args.keep_failed, + judge_signature=judge_signature if args.mode != 'infer' else None, + world_size=WORLD_SIZE, + ) + update_dataset_status( + pred_root, + dataset_name, + source_run=reuse_ctx['source_eval_id'], + judge_signature=judge_signature, + reuse_aux=args.reuse_aux, ) + logger.info(judge_kwargs) if WORLD_SIZE > 1: dist.barrier() + prediction_complete = is_prediction_complete( + result_file, + dataset_indices=list(dataset.data['index']), + retry_failed=not args.keep_failed, + ) + if args.mode == 'eval' and not prediction_complete: + if RANK == 0: + logger.error( + f'No reusable completed prediction found for {model_name} x {dataset_name}, ' + 'skipping this combination in eval mode.' + ) + continue + if model is None: model = model_name # which is only a name @@ -336,10 +597,10 @@ def main(): work_dir=pred_root, model_name=model_name, dataset=dataset, - result_file_name=result_file_base, verbose=args.verbose, api_nproc=args.api_nproc, - use_vllm=args.use_vllm) + use_vllm=args.use_vllm, + retry_failed=not args.keep_failed) elif dataset.TYPE == 'MT': model = infer_data_job_mt( model, @@ -348,7 +609,7 @@ def main(): dataset=dataset, verbose=args.verbose, api_nproc=args.api_nproc, - ignore_failed=args.ignore, + retry_failed=not args.keep_failed, use_vllm=args.use_vllm) else: model = infer_data_job( @@ -358,83 +619,9 @@ def main(): dataset=dataset, verbose=args.verbose, api_nproc=args.api_nproc, - ignore_failed=args.ignore, + retry_failed=not args.keep_failed, use_vllm=args.use_vllm) - # Set the judge kwargs first before evaluation or dumping - - judge_kwargs = { - 'nproc': args.api_nproc, - 'verbose': args.verbose, - 'retry': args.retry if args.retry is not None else 3, - **(json.loads(args.judge_args) if args.judge_args else {}), - } - - if args.retry is not None: - judge_kwargs['retry'] = args.retry - if args.judge is not None: - judge_kwargs['model'] = args.judge - else: - print(dataset_name) - if dataset.TYPE in ['MCQ', 'Y/N', 'MCQ_MMMU_Pro'] or listinstr( - ['moviechat1k', 'mme-reasoning'], dataset_name.lower() - ): - if listinstr(['WeMath', 'MME-Reasoning'], dataset_name): - judge_kwargs['model'] = 'gpt-4o-mini' - elif listinstr(['VisualPuzzles'], dataset_name): - judge_kwargs['model'] = 'exact_matching' - elif listinstr(['PuzzleVQA'], dataset_name): - judge_kwargs['model'] = 'exact_matching' - elif listinstr(['VisuLogic'], dataset_name): - judge_kwargs['model'] = 'exact_matching' - else: - judge_kwargs['model'] = 'chatgpt-0125' - elif listinstr(['MMVet', 'LLaVABench', 'MMBench_Video'], dataset_name): - if listinstr(['LLaVABench_KO'], dataset_name): - judge_kwargs['model'] = 'gpt-4o-0806' - else: - judge_kwargs['model'] = 'gpt-4-turbo' - elif listinstr(['VGRPBench'], dataset_name): - judge_kwargs['model'] = 'gpt-4o' - elif listinstr(['MathVista', 'MathVerse', 'MathVision', 'LENS', 'DynaMath', 'VL-RewardBench', 'LogicVista', 'MOAT', 'OCR_Reasoning', 'VTCBench', 'Asclepius', 'MMSafetyBench', 'MSSBench', 'SIUO', 'SIUO_GEN', 'XSTest', 'Flames'], dataset_name): # noqa: E501 - judge_kwargs['model'] = 'gpt-4o-mini' - elif listinstr(['OlympiadBench'], dataset_name): - use_api_judger = judge_kwargs.get("olympiad_use_api_judger", False) - if use_api_judger: - judge_kwargs['model'] = 'gpt-4o-mini' - elif listinstr(['MMLongBench', 'MMDU', 'DUDE', 'SLIDEVQA', 'MIA-Bench', 'WildVision', 'MMAlignBench', 'MM-IFEval'], dataset_name): # noqa: E501 - judge_kwargs['model'] = 'gpt-4o' - elif listinstr(['ChartMimic'], dataset_name): - judge_kwargs['model'] = 'gpt-4o' - elif listinstr(['VDC'], dataset_name): - judge_kwargs['model'] = 'llama31-8b' - elif listinstr(['Video_MMLU_QA', 'Video_MMLU_CAP'], dataset_name): - judge_kwargs['model'] = 'qwen-72b' - elif listinstr(['MMVMBench'], dataset_name): - judge_kwargs['model'] = 'gpt-4o' - elif listinstr(['CVQA_EN', 'CVQA_LOC'], dataset_name): - judge_kwargs['model'] = 'gpt-4.1' - elif listinstr(['M4Bench'], dataset_name): - judge_kwargs['model'] = 'gpt-4o' - elif listinstr(['AyaVisionBench'], dataset_name): - judge_kwargs['model'] = 'gpt-4.1' - elif listinstr(['MathCanvas'], dataset_name): - judge_kwargs['model'] = 'gpt-4.1-2025-04-14' - elif listinstr(['MMReason'], dataset_name): - judge_kwargs['model'] = 'gpt-4.1', - elif listinstr(['CoreCognition'], dataset_name): - judge_kwargs['model'] = 'gpt-4.1' - elif listinstr(['WorldVQA'], dataset_name): - judge_kwargs['model'] = 'gpt-4o-1120' - - if args.use_verifier: - judge_kwargs['use_verifier'] = True - if args.use_vllm: - judge_kwargs['use_vllm'] = True - - if RANK == 0: - logger.info(judge_kwargs) - if WORLD_SIZE > 1: dist.barrier() @@ -504,13 +691,15 @@ def main(): # Create the symbolic links for the prediction files files = os.listdir(pred_root) files = [x for x in files if (f'{model_name}_{dataset_name}' in x or "status.json" in x)] + # Exclude temporary intermediate files + files = [x for x in files + if not x.endswith(('_checkpoint.pkl', '_PREV.pkl', '_structs.pkl'))] for f in files: - cwd = os.getcwd() - file_addr = osp.join(cwd, pred_root, f) - link_addr = osp.join(cwd, pred_root_meta, f) + file_addr = osp.join(pred_root, f) + link_addr = osp.join(pred_root_meta, f) if osp.exists(link_addr) or osp.islink(link_addr): os.remove(link_addr) - os.symlink(file_addr, link_addr) + os.symlink(osp.relpath(file_addr, pred_root_meta), link_addr) except Exception as e: logger.exception(f'Model {model_name} x Dataset {dataset_name} combination failed: {e}, ' @@ -521,6 +710,202 @@ def main(): dist.destroy_process_group() +def run_api_mode(args): + """Async API pipeline mode for API-based models. + + Uses an optimized pipeline with cross-dataset unified inference queue, + parallel inference and evaluation, and better remote model utilization. + """ + from vlmeval.api.adapters import get_adapter_registry + from vlmeval.inference_api import APIEvalPipeline, DatasetConfig + + # Validate model: API mode only supports a single model + if isinstance(args.model, list): + if len(args.model) > 1: + raise ValueError('API mode only supports a single model. Got: ' + str(args.model)) + args.model = args.model[0] + + # Validate custom_prompt at runtime + if args.custom_prompt: + registry = get_adapter_registry() + assert args.custom_prompt in registry, \ + f'Unknown adapter: {args.custom_prompt}. Available: {list(registry.keys())}' + + assert args.data, '--data must be set in API mode' + + # Prepare work dir and logging + commit_id = githash(digits=8) + eval_id = build_eval_id() + model_name = args.model.replace('/', '--') + + work_dir = Path(args.work_dir) / model_name + work_dir.mkdir(parents=True, exist_ok=True) + + pred_root = Path(args.work_dir) / model_name / eval_id + pred_root.mkdir(exist_ok=True) + + log_file = Path(work_dir) / 'logs' / f'{eval_id}_{datetime.datetime.now().strftime("%H%M%S")}.log' + setup_logger(log_file=str(log_file)) + logger.info(f'Log file: {log_file}') + + if args.mode == 'eval': + args.reuse = True + logger.info('Force to use `reuse=True` for eval mode.') + + if not args.reuse: + logger.warning('--reuse is not set, this run will start from a fresh output directory') + else: + logger.info(f'--reuse is set, reuse-aux={args.reuse_aux}') + + WORLD_SIZE_LOCAL = int(os.environ.get('WORLD_SIZE', 1)) + if WORLD_SIZE_LOCAL > 1: + logger.error("API pipeline does not support multi-process mode (WORLD_SIZE > 1).") + return + + # Build model args (shared across all datasets) + if args.base_url is not None: + model_args = build_model_from_base_url(args) + model_builder = partial(LMDeployAPI, **model_args) + else: + assert model_name in supported_VLM, \ + f'Model "{model_name}" not found in supported_VLM. Consider using --base-url to specify an API endpoint.' + model_builder = supported_VLM[model_name] + + write_run_status( + pred_root, + dict( + eval_id=eval_id, + created_at=datetime.datetime.now().astimezone().isoformat(), + commit=commit_id, + argv=sys.argv, + api_mode=True, + world_size=1, + pred_format=get_pred_file_format(), + eval_format=get_eval_file_format(), + mode=args.mode, + reuse=bool(args.reuse), + reuse_aux=args.reuse_aux, + )) + + # Prepare all datasets + dataset_configs: List[DatasetConfig] = [] + + for ds_name in args.data: + logger.info(f'-------------------- {ds_name} --------------------') + + try: + dataset_kwargs = {} + if ds_name in [ + 'MMLongBench_DOC', 'DUDE', 'DUDE_MINI', + 'SLIDEVQA', 'SLIDEVQA_MINI', + ]: + dataset_kwargs['model'] = model_name + dataset = build_dataset(ds_name, **dataset_kwargs) + + if dataset is None: + logger.error(f'Dataset {ds_name} is not valid, will be skipped.') + continue + + # Prepare the result file. + result_file = get_pred_file_path( + pred_root, model_name, ds_name, use_env_format=True) + + # Skip special datasets. + if ds_name in ['MMMU_TEST']: + logger.info(f'{ds_name} requires special handling, skipped in pipeline.') + continue + if 'MMT-Bench_ALL' in ds_name: + logger.info(f'{ds_name} requires special handling, skipped in pipeline.') + continue + + judge_kwargs = get_judge_kwargs(ds_name, dataset.TYPE, args) + judge_signature = judge_kwargs.get('model', '') + logger.info(f'Judge kwargs: {judge_kwargs}') + + reuse_ctx = prepare_reuse_files( + pred_root_meta=str(work_dir), + eval_id=eval_id, + model_name=model_name, + dataset=dataset, + result_file=result_file, + reuse=args.reuse, + reuse_aux=args.reuse_aux, + retry_failed=not args.keep_failed, + judge_signature=judge_signature if args.mode != 'infer' else None, + world_size=1, + ) + update_dataset_status( + str(pred_root), + ds_name, + source_run=reuse_ctx['source_eval_id'], + judge_signature=judge_signature, + reuse_aux=args.reuse_aux, + ) + if args.mode == 'eval' and not reuse_ctx['prediction_complete']: + logger.error( + f'No reusable completed prediction found for {model_name} x {ds_name}, ' + 'skipping this dataset in eval mode.' + ) + continue + + # Complete the dataset config + if dataset.MODALITY == 'VIDEO': + dataset_type = 'video' + elif dataset.TYPE == 'MT': + dataset_type = 'mt' + else: + dataset_type = 'image' + dataset_config = DatasetConfig( + dataset_name=ds_name, + dataset_obj=dataset, + dataset_type=dataset_type, + model_obj=model_builder(), + model_name=model_name, + work_dir=str(pred_root), + result_file=result_file, + judge_kwargs=judge_kwargs, + verbose=args.verbose + ) + dataset_configs.append(dataset_config) + + except Exception as e: + logger.exception(f'Failed to prepare dataset {ds_name}: {e}') + continue + + # Create and run pipeline + if len(dataset_configs) == 0: + logger.warning('No valid datasets to evaluate.') + return + + logger.info(f"Starting API Pipeline for model: {model_name}") + logger.info(f"Total datasets: {len(dataset_configs)}") + + pipeline = APIEvalPipeline( + dataset_configs=dataset_configs, + concurrency=args.api_nproc, + monitor_interval=args.monitor_interval, + run_infer=args.mode in {'infer', 'all'}, + run_eval=args.mode in {'eval', 'all'}, + debug=args.debug, + retry_failed=not args.keep_failed + ) + + try: + asyncio.run(pipeline.run()) + except KeyboardInterrupt: + logger.warning("Pipeline interrupted by user.") + except Exception as e: + logger.exception(f"Pipeline failed with error: {e}") + + +def main(): + args = parse_args() + if args.api_mode: + run_api_mode(args) + else: + run_local_mode(args) + + if __name__ == '__main__': load_env() main() diff --git a/run_api.py b/run_api.py deleted file mode 100644 index 478021aac..000000000 --- a/run_api.py +++ /dev/null @@ -1,397 +0,0 @@ -import argparse -import asyncio -import datetime -import json -import os -from functools import partial -from pathlib import Path -from typing import List - -from vlmeval.api import LMDeployAPI -from vlmeval.api.adapters import get_adapter_registry -from vlmeval.config import supported_VLM -from vlmeval.dataset import build_dataset -from vlmeval.inference_api import APIEvalPipeline, DatasetConfig -from vlmeval.smp import (get_pred_file_format, githash, listinstr, load_env, prepare_reuse_files, - setup_logger, timestr) - -group_dic = { - 'general-mini': ['MMMU_Pro_10c'], - 'math-reasoning-mini': ['MathVista_MINI', 'OlympiadBench', 'IPhO_2025', 'Physics'], - 'sci-reasoning-mini': ['SFE', 'MaCBench', 'MicroVQA', 'XLRS-Bench-lite', 'MSEarthMCQ'], - 'language-mini': ['MM-IFEval'], - 'coding-mini': ['ChartMimic_v2_direct'], - 'svg-mini': ['SArena_MINI'], - 'agent-mini': ['ScreenSpot_v2_Mobile', 'ScreenSpot_v2_Desktop', 'ScreenSpot_v2_Web'], - 'video-mini': ['Video-MME_64frame', 'VideoMMMU_48frame'], - 'sensing-mini': ['RefCOCO', 'OCRBench_v2_MINI', 'CCOCR', 'ChartQAPro', 'BLINK'], -} - - -def get_judge_kwargs(dataset_name: str, args) -> dict: - """Determine the default judge kwargs by dataset name.""" - judge_kwargs = { - 'nproc': args.judge_api_nproc, - 'verbose': args.verbose, - 'retry': args.judge_retry, - 'timeout': args.judge_timeout, - **(json.loads(args.judge_args) if args.judge_args else {}), - } - - if args.judge_base_url: - judge_kwargs['api_base'] = f"{args.judge_base_url.rstrip('/')}/chat/completions" - if args.judge_key: - judge_kwargs['key'] = args.judge_key - - if args.judge is not None: - judge_kwargs['model'] = args.judge - else: - judge_kwargs['model'] = 'gpt-4o-mini' # default - - if listinstr(['WeMath', 'MME-Reasoning'], dataset_name): - judge_kwargs['model'] = 'gpt-4o-mini' - elif listinstr(['VisuLogic'], dataset_name): - judge_kwargs['model'] = 'exact_matching' - elif listinstr(['MMVet', 'LLaVABench', 'MMBench_Video'], dataset_name): - if listinstr(['LLaVABench_KO'], dataset_name): - judge_kwargs['model'] = 'gpt-4o-0806' - else: - judge_kwargs['model'] = 'gpt-4-turbo' - elif listinstr(['VGRPBench'], dataset_name): - judge_kwargs['model'] = 'gpt-4o' - elif listinstr(['MathVista', 'MathVerse', 'MathVision', 'DynaMath', - 'VL-RewardBench', 'LogicVista', 'MOAT', 'OCR_Reasoning'], dataset_name): - judge_kwargs['model'] = 'gpt-4o-mini' - elif listinstr(['OlympiadBench'], dataset_name): - use_api_judger = judge_kwargs.get("olympiad_use_api_judger", False) - if use_api_judger: - judge_kwargs['model'] = 'gpt-4o-mini' - elif listinstr(['MMLongBench', 'MMDU', 'DUDE', 'SLIDEVQA', 'MIA-Bench', - 'WildVision', 'MMAlignBench', 'MM-IFEval'], dataset_name): - judge_kwargs['model'] = 'gpt-4o' - elif listinstr(['ChartMimic'], dataset_name): - judge_kwargs['model'] = 'gpt-4o' - elif listinstr(['VDC'], dataset_name): - judge_kwargs['model'] = 'llama31-8b' - elif listinstr(['Video_MMLU_QA', 'Video_MMLU_CAP'], dataset_name): - judge_kwargs['model'] = 'qwen-72b' - elif listinstr(['MMVMBench'], dataset_name): - judge_kwargs['model'] = 'gpt-4o' - elif listinstr(['CVQA_EN', 'CVQA_LOC'], dataset_name): - judge_kwargs['model'] = 'gpt-4.1' - elif listinstr(['M4Bench'], dataset_name): - judge_kwargs['model'] = 'gpt-4o' - elif listinstr(['AyaVisionBench'], dataset_name): - judge_kwargs['model'] = 'gpt-4.1' - elif listinstr(['MathCanvas'], dataset_name): - judge_kwargs['model'] = 'gpt-4.1-2025-04-14' - elif listinstr(['MMReason'], dataset_name): - judge_kwargs['model'] = 'gpt-4.1' - elif listinstr(['Video-MME'], dataset_name): - judge_kwargs['model'] = 'chatgpt-0125' - - if args.use_verifier: - judge_kwargs['use_verifier'] = True - if args.use_vllm: - judge_kwargs['use_vllm'] = True - - return judge_kwargs - - -def parse_args(): - help_msg = """\ -VLMEvalKit API Pipeline Runner - -This script uses an optimized pipeline for API-based models with the following improvements: -- Cross-dataset unified inference queue -- Parallel inference and evaluation -- Better remote model utilization - -You can launch the evaluation by setting either --data and --model. - ---data and --model: - Specify dataset names and model configuration for API-based inference. - -For more details, see the documentation in run.py. -""" - parser = argparse.ArgumentParser( - description=help_msg, - formatter_class=argparse.RawTextHelpFormatter - ) - - parser.add_argument('--data', type=str, nargs='+', help='Names of Datasets') - parser.add_argument('--group', type=str, nargs='+', default=None, - help='Benchmark groups to evaluate (see group_dic). Use "all" to run all groups.') - - # ================ 推理模型参数 ============== - parser.add_argument('--model', type=str, required=True) - parser.add_argument('--base-url', type=str, default=None, - help='Base URL of OpenAI-compatible API (e.g. http://localhost:8080/v1). ' - 'If set, LMDeployAPI is used for inference without modifying config.py.') - parser.add_argument('--key', type=str, default='sk-admin', help='API key for inference model') - parser.add_argument('--thinker', action='store_true', - help='Enable thinking mode: doubles timeout and max_tokens.') - parser.add_argument('--use-enable-thinking', action='store_true', - help='Pass enable_thinking flag to the model.') - parser.add_argument('--enable-thinking', action='store_true', - help='Value of enable_thinking passed to model (requires --use-enable-thinking).') - parser.add_argument('--max-tokens', type=int, default=2 ** 15, - help='Max tokens for model generation.') - parser.add_argument('--temperature', type=float, default=None) - parser.add_argument('--top-k', type=int, default=None) - parser.add_argument('--top-p', type=float, default=None) - parser.add_argument('--repetition-penalty', type=float, default=None) - parser.add_argument('--presence-penalty', type=float, default=None) - parser.add_argument('--api-nproc', type=int, default=32, - help='Parallel API calling (inference concurrency)') - parser.add_argument('--timeout', type=int, default=1800, - help='Max time in seconds for a single inference request.') - parser.add_argument('--retry', type=int, default=6, - help='Retry times for failed inference.') - parser.add_argument('--custom-prompt', type=str, - choices=list(get_adapter_registry().keys()), default=None, - help='Manually select a model adapter by name.') - - # ================ judge 模型参数 ============== - parser.add_argument('--judge', type=str, default=None) - parser.add_argument('--judge-base-url', type=str, default=None, - help='Base URL of judge API') - parser.add_argument('--judge-key', type=str, default=None, - help='API key for judge model') - parser.add_argument('--judge-api-nproc', type=int, default=32, - help='Parallel API calling for judger') - parser.add_argument('--judge-retry', type=int, default=6, - help='Retry times for failed judgement.') - parser.add_argument('--judge-timeout', type=int, default=600, - help='Max time in seconds for judgement.') - # legacy judger parameters - parser.add_argument('--judge-args', type=str, default=None, - help='Judge arguments in JSON format') - - parser.add_argument('--work-dir', type=str, default='./outputs', - help='Select the output directory') - parser.add_argument('--mode', type=str, default='all', - choices=['all', 'infer', 'eval'], - help='Mode: all (infer+eval), infer (only), eval (only)') - parser.add_argument('--verbose', action='store_true') - parser.add_argument('--ignore', action='store_true', - help='Ignore failed indices') - parser.add_argument('--reuse', action='store_true', - help='Reuse existing prediction files') - parser.add_argument('--reuse-aux', type=int, default=True, - help='Reuse auxiliary evaluation files') - parser.add_argument('--use-vllm', action='store_true', - help='Use vllm to generate') - parser.add_argument('--use-verifier', action='store_true', - help='Use verifier to evaluate') - parser.add_argument('--monitor-interval', type=int, default=30, - help='Status monitoring interval (seconds)') - parser.add_argument('--debug', action='store_true', - help='Debug mode: run evaluation in main process') - - args = parser.parse_args() - return args - - -def main(): - args = parse_args() - - # ============================================== - # Resolve --group into dataset list - # ============================================== - if args.group is not None and len(args.group) > 0: - if 'all' in args.group: - groups = list(group_dic.keys()) - else: - groups = args.group - assert args.data is None, '--data and --group should not be set at the same time' - args.data = [] - for g in groups: - assert g in group_dic, f'Unknown group: {g}. Available: {list(group_dic.keys())}' - args.data.extend(group_dic[g]) - - assert args.data, '--data or --group must be set' - - # ============================================== - # Prepare work dir and logging - # ============================================== - date, commit_id = timestr('day'), githash(digits=8) - eval_id = f"T{date}_G{commit_id}" - model_name = args.model.replace('/', '--') - - # Work dir for the specified model - work_dir = Path(args.work_dir) / model_name - work_dir.mkdir(parents=True, exist_ok=True) - - # Work dir for the current run - pred_root = Path(args.work_dir) / model_name / eval_id - # List previous run - prev_pred_roots = sorted(d for d in work_dir.iterdir() if d.is_dir()) - pred_root.mkdir(exist_ok=True) - - log_file = Path(work_dir) / 'logs' / f'{eval_id}_{datetime.datetime.now().strftime("%H%M%S")}.log' - logger = setup_logger(log_file=str(log_file)) - logger.info(f'Log file: {log_file}') - - if args.mode == 'eval': - args.reuse = True - logger.info('Force to use `reuse=True` for eval mode.') - - if not args.reuse: - logger.warning('--reuse is not set, will not reuse previous temporary files') - else: - logger.info('--reuse is set, will reuse the latest prediction & temporary files') - - WORLD_SIZE = int(os.environ.get('WORLD_SIZE', 1)) - if WORLD_SIZE > 1: - logger.error("API pipeline does not support multi-process mode (WORLD_SIZE > 1).") - return - - # ============================================== - # Build model args (shared across all datasets) - # ============================================== - use_think_args = args.thinker - if args.base_url is not None: - model_args = dict( - model=args.model, - api_base=f"{args.base_url.rstrip('/')}/chat/completions", - key=args.key, - custom_prompt=args.custom_prompt, - max_tokens=args.max_tokens, - retry=args.retry, - timeout=args.timeout, - temperature=args.temperature, - top_k=args.top_k, - top_p=args.top_p, - repetition_penalty=args.repetition_penalty, - presence_penalty=args.presence_penalty, - verbose=args.verbose, - ) - model_args = {k: v for k, v in model_args.items() if v is not None} - if args.use_enable_thinking: - model_args['enable_thinking'] = args.enable_thinking - if use_think_args: - model_args.update(dict(timeout=args.timeout * 2, max_tokens=args.max_tokens * 2)) - model_builder = partial(LMDeployAPI, **model_args) - else: - assert model_name in supported_VLM, \ - f'Model "{model_name}" not found in supported_VLM. Consider using --base-url to specify an API endpoint.' - model_builder = supported_VLM[model_name] - - # ============================================== - # Prepare all datasets - # ============================================== - dataset_configs: List[DatasetConfig] = [] - - for ds_name in args.data: - logger.info(f'-------------------- {ds_name} --------------------') - - # Construct the dataset. - try: - dataset_kwargs = {} - if ds_name in [ - 'MMLongBench_DOC', 'DUDE', 'DUDE_MINI', - 'SLIDEVQA', 'SLIDEVQA_MINI', - ]: - dataset_kwargs['model'] = model_name - dataset = build_dataset(ds_name, **dataset_kwargs) - - if dataset is None: - logger.error(f'Dataset {ds_name} is not valid, will be skipped.') - continue - - # Prepare the result file. - pred_format = get_pred_file_format() - result_file_base = f'{model_name}_{ds_name}.{pred_format}' - result_file = str(pred_root / result_file_base) - - # Prepare the reuse file - if args.reuse and len(prev_pred_roots): - prepare_reuse_files( - pred_root_meta=str(work_dir), - eval_id=eval_id, - model_name=model_name, - dataset_name=ds_name, - reuse=args.reuse, - reuse_aux=args.reuse_aux - ) - - # Skip special datasets. - if ds_name in ['MMMU_TEST']: - logger.info(f'{ds_name} requires special handling, skipped in pipeline.') - continue - if 'MMT-Bench_ALL' in ds_name: - logger.info(f'{ds_name} requires special handling, skipped in pipeline.') - continue - - judge_kwargs = get_judge_kwargs(ds_name, args) - logger.info(f'Judge kwargs: {judge_kwargs}') - - # Complete the dataset config - if dataset.MODALITY == 'VIDEO': - dataset_type = 'video' - elif dataset.TYPE == 'MT': - dataset_type = 'mt' - else: - dataset_type = 'image' - dataset_config = DatasetConfig( - dataset_name=ds_name, - dataset_obj=dataset, - dataset_type=dataset_type, - model_obj=model_builder(), - model_name=model_name, - work_dir=str(pred_root), - result_file=result_file, - judge_kwargs=judge_kwargs, - verbose=args.verbose - ) - dataset_configs.append(dataset_config) - - except Exception as e: - logger.exception(f'Failed to prepare dataset {ds_name}: {e}') - continue - - # ============================================== - # Create and run pipeline - # ============================================== - if len(dataset_configs) == 0: - logger.warning('No valid datasets to evaluate.') - return - - logger.info(f"Starting API Pipeline for model: {model_name}") - logger.info(f"Total datasets: {len(dataset_configs)}") - - pipeline = APIEvalPipeline( - dataset_configs=dataset_configs, - concurrency=args.api_nproc, - monitor_interval=args.monitor_interval, - run_infer=args.mode in {'infer', 'all'}, - run_eval=args.mode in {'eval', 'all'}, - debug=args.debug - ) - - try: - asyncio.run(pipeline.run()) - except KeyboardInterrupt: - logger.warning("Pipeline interrupted by user.") - except Exception as e: - logger.exception(f"Pipeline failed with error: {e}") - - # Create symbolic links. - try: - files = list(pred_root.iterdir()) - for ds_name in args.data: - files_to_link = [f for f in files if f.is_file() and f'{model_name}_{ds_name}' in f.name] - for f in files_to_link: - file_addr = pred_root.absolute() / f.name - link_addr = work_dir.absolute() / f.name - if link_addr.exists() or link_addr.is_symlink(): - link_addr.unlink() - link_addr.symlink_to(file_addr.relative_to(link_addr.parent)) - except Exception as e: - logger.warning(f"Failed to create symbolic links: {e}") - - -if __name__ == '__main__': - load_env() - main() diff --git a/vlmeval/__init__.py b/vlmeval/__init__.py index aefba9a00..64eddfe09 100644 --- a/vlmeval/__init__.py +++ b/vlmeval/__init__.py @@ -1,4 +1,8 @@ import ssl +import warnings + +# Ignore pkg_resources warning due to jieba depends on it. +warnings.filterwarnings("ignore", category=UserWarning, message="pkg_resources is deprecated") # Temporarily bypass SSL certificate verification to download files from oss. ssl._create_default_https_context = ssl._create_unverified_context diff --git a/vlmeval/config.py b/vlmeval/config.py index 05af2ff76..290aeac4b 100644 --- a/vlmeval/config.py +++ b/vlmeval/config.py @@ -1066,7 +1066,7 @@ interns1_mini = { "Intern-S1-mini": partial( - vlm.InternS1Chat, model_path="/mnt/shared-storage-user/mllm/lijinsong/models/Intern-S1-mini/" + vlm.InternS1Chat, model_path="internlm/Intern-S1-mini" ), } diff --git a/vlmeval/inference.py b/vlmeval/inference.py index bad267115..c29c002c8 100644 --- a/vlmeval/inference.py +++ b/vlmeval/inference.py @@ -8,10 +8,11 @@ from tqdm import tqdm from vlmeval.config import supported_VLM -from vlmeval.smp import (dump, get_pred_file_format, get_pred_file_path, get_rank_and_world_size, - load) +from vlmeval.smp import (dump, get_logger, get_pred_file_format, get_pred_file_path, + get_rank_and_world_size, load) from vlmeval.utils import track_progress_rich +logger = get_logger(__name__) FAIL_MSG = 'Failed to obtain answer via API.' @@ -26,7 +27,7 @@ def parse_args(): # Only API model is accepted -def infer_data_api(model, work_dir, model_name, dataset, index_set=None, api_nproc=4, ignore_failed=False): +def infer_data_api(model, work_dir, model_name, dataset, index_set=None, api_nproc=4, retry_failed=True): rank, world_size = get_rank_and_world_size() assert rank == 0 and world_size == 1 dataset_name = dataset.dataset_name @@ -40,6 +41,8 @@ def infer_data_api(model, work_dir, model_name, dataset, index_set=None, api_npr model.set_dump_image(dataset.dump_image) lt, indices = len(data), list(data['index']) + # Build str→orig mapping for checkpoint key conversion + index_str_to_orig = {str(i): i for i in indices} structs = [] for i in range(lt): @@ -53,7 +56,7 @@ def infer_data_api(model, work_dir, model_name, dataset, index_set=None, api_npr struct = dataset.build_prompt(item) structs.append(struct) - out_file = f'{work_dir}/{model_name}_{dataset_name}_supp.pkl' + out_file = f'{work_dir}/{model_name}_{dataset_name}_checkpoint.pkl' # To reuse records in MMBench_V11 if dataset_name in ['MMBench', 'MMBench_CN']: @@ -62,8 +65,8 @@ def infer_data_api(model, work_dir, model_name, dataset, index_set=None, api_npr if osp.exists(v11_pred): try: reuse_inds = load('https://opencompass.openxlab.space/utils/mmb_reuse.pkl') - data = load(v11_pred) - ans_map = {x: y for x, y in zip(data['index'], data['prediction']) if x in reuse_inds} + data_v11 = load(v11_pred) + ans_map = {str(x): y for x, y in zip(data_v11['index'], data_v11['prediction']) if x in reuse_inds} dump(ans_map, out_file) except Exception as err: print(type(err), err) @@ -71,26 +74,32 @@ def infer_data_api(model, work_dir, model_name, dataset, index_set=None, api_npr res = {} if osp.exists(out_file): res = load(out_file) - if ignore_failed: + if retry_failed: res = {k: v for k, v in res.items() if FAIL_MSG not in v} + logger.info(f'Reuse {len(res)} inference results from previous run.') - structs = [s for i, s in zip(indices, structs) if i not in res] - indices = [i for i in indices if i not in res] + structs = [s for i, s in zip(indices, structs) if str(i) not in res] + indices = [i for i in indices if str(i) not in res] gen_func = model.generate structs = [dict(message=struct, dataset=dataset_name) for struct in structs] if len(structs): - track_progress_rich(gen_func, structs, nproc=api_nproc, chunksize=api_nproc, save=out_file, keys=indices) + str_indices = [str(i) for i in indices] + track_progress_rich(gen_func, structs, nproc=api_nproc, chunksize=api_nproc, save=out_file, keys=str_indices) - res = load(out_file) + # Load the full accumulated results (str keys) + if osp.exists(out_file): + res = load(out_file) + # Convert str keys back to original types for caller compatibility + result = {index_str_to_orig[k]: v for k, v in res.items() if k in index_str_to_orig} if index_set is not None: - res = {k: v for k, v in res.items() if k in index_set} - os.remove(out_file) - return res + result = {k: v for k, v in result.items() if k in index_set} + return result -def infer_data(model, model_name, work_dir, dataset, out_file, verbose=False, api_nproc=4, use_vllm=False): +def infer_data(model, model_name, work_dir, dataset, out_file, verbose=False, api_nproc=4, use_vllm=False, + retry_failed=True): dataset_name = dataset.dataset_name prev_file = f'{work_dir}/{model_name}_{dataset_name}_PREV.pkl' res = load(prev_file) if osp.exists(prev_file) else {} @@ -144,7 +153,8 @@ def infer_data(model, model_name, work_dir, dataset, out_file, verbose=False, ap model_name=model_name, dataset=dataset, index_set=set(indices), - api_nproc=api_nproc) + api_nproc=api_nproc, + retry_failed=retry_failed) for idx in indices: assert idx in supp res.update(supp) @@ -198,7 +208,7 @@ def _is_structured_record(v): # A wrapper for infer_data, do the pre & post processing def infer_data_job( - model, work_dir, model_name, dataset, verbose=False, api_nproc=4, ignore_failed=False, use_vllm=False + model, work_dir, model_name, dataset, verbose=False, api_nproc=4, retry_failed=True, use_vllm=False ): rank, world_size = get_rank_and_world_size() dataset_name = dataset.dataset_name @@ -209,9 +219,8 @@ def infer_data_job( if osp.exists(result_file): if rank == 0: data = load(result_file) - # breakpoint() results = {k: v for k, v in zip(data['index'], data['prediction'])} - if not ignore_failed: + if retry_failed: results = {k: v for k, v in results.items() if FAIL_MSG not in str(v)} dump(results, prev_file) if world_size > 1: @@ -222,7 +231,8 @@ def infer_data_job( model = infer_data( model=model, work_dir=work_dir, model_name=model_name, dataset=dataset, - out_file=out_file, verbose=verbose, api_nproc=api_nproc, use_vllm=use_vllm) + out_file=out_file, verbose=verbose, api_nproc=api_nproc, use_vllm=use_vllm, + retry_failed=retry_failed) if world_size > 1: dist.barrier() @@ -275,6 +285,13 @@ def split_thinking(s): dump(data, result_file) for i in range(world_size): os.remove(tmpl.format(i)) + # Clean up API checkpoint file + checkpoint_file = f'{work_dir}/{model_name}_{dataset_name}_checkpoint.pkl' + if osp.exists(checkpoint_file): + os.remove(checkpoint_file) + # Clean up PREV file + if osp.exists(prev_file): + os.remove(prev_file) if world_size > 1: dist.barrier() return model diff --git a/vlmeval/inference_api.py b/vlmeval/inference_api.py index 4fb0b63ee..e5e2ac62e 100644 --- a/vlmeval/inference_api.py +++ b/vlmeval/inference_api.py @@ -150,6 +150,7 @@ def __init__( run_infer: bool = True, run_eval: bool = True, debug: bool = False, + retry_failed: bool = True, ): """ Args: @@ -159,6 +160,7 @@ def __init__( run_infer: Whether to inference. run_eval: Whether to eval. debug: Debug mode (Evaluate in the main process). + retry_failed: Whether to retry previously failed samples. """ self.dataset_configs = dataset_configs self.concurrency = concurrency @@ -166,6 +168,7 @@ def __init__( self.run_infer = run_infer self.run_eval = run_eval self.debug = debug + self.retry_failed = retry_failed self.all_infer_done = False self.infer_executor = ThreadPoolExecutor(max_workers=concurrency) @@ -206,22 +209,43 @@ def _release_dataset_memory(self, cfg: DatasetConfig): logger.warning(f" [{dataset_name}] Failed to release dataset memory: {e}") def _shutdown_executors(self): - """Shutdown all Executor.""" + """Shutdown all executors and terminate child processes.""" try: - self.infer_executor.shutdown(wait=False) + self.infer_executor.shutdown(wait=False, cancel_futures=True) logger.debug("Shutdown infer_executor") - self.eval_executor.shutdown(wait=False) - logger.debug("Shutdown eval_executor") + for name, executor in [ + ("eval_executor", self.eval_executor), + ("producer_executor", self.producer_executor), + ]: + # 必须在 shutdown() 前获取进程引用。shutdown() 会唤醒管理线程 + # 执行清理,可能将 _processes 置为 None,之后就无法访问了。 + processes = getattr(executor, '_processes', None) or {} + alive = [p for p in processes.values() if p.is_alive()] + executor.shutdown(wait=False, cancel_futures=True) + self._terminate_workers(name, alive) - self.producer_executor.shutdown(wait=False) - logger.debug("Shutdown producer_executor") - - logger.info("🧹 All executors shutdown") + logger.info("All executors shutdown") except Exception as e: logger.warning(f"Failed to shutdown executors: {e}") + @staticmethod + def _terminate_workers(name, alive, timeout=5): + """Terminate worker processes. + + Sends SIGTERM first, waits up to *timeout* seconds, then SIGKILL + for any process that is still alive. + """ + for p in alive: + logger.debug(f"Terminating {name} worker (pid={p.pid})") + p.terminate() + for p in alive: + p.join(timeout=timeout) + if p.is_alive(): + logger.debug(f"Force killing {name} worker (pid={p.pid})") + p.kill() + def _get_checkpoint_file(self, dataset_name: str) -> Path: cfg = self.states[dataset_name] return Path(cfg.work_dir) / f"{cfg.model_name}_{dataset_name}_checkpoint.pkl" @@ -236,6 +260,8 @@ def _load_checkpoint(self, dataset_name: str) -> Dict[str, Any]: if checkpoint_file.exists(): try: results = load(str(checkpoint_file)) + if self.retry_failed: + results = {k: v for k, v in results.items() if FAIL_MSG not in str(v)} logger.info(f" [{dataset_name}] Loaded {len(results)} results from checkpoint") except Exception as e: logger.warning(f" [{dataset_name}] Failed to load checkpoint: {e}") @@ -246,11 +272,17 @@ def _load_checkpoint(self, dataset_name: str) -> Dict[str, Any]: try: data = load(str(result_path)) if isinstance(data, pd.DataFrame): - existing_results = { - str(idx): pred - for idx, pred in zip(data['index'], data['prediction']) - if FAIL_MSG not in str(pred) - } + if self.retry_failed: + existing_results = { + str(idx): pred + for idx, pred in zip(data['index'], data['prediction']) + if FAIL_MSG not in str(pred) + } + else: + existing_results = { + str(idx): pred + for idx, pred in zip(data['index'], data['prediction']) + } results.update(existing_results) logger.info(f" [{dataset_name}] Loaded {len(existing_results)} " "results from result file") @@ -309,8 +341,37 @@ def _save_final_result(self, dataset_name: str) -> bool: return True + def _create_symlinks(self, dataset_name: str): + """Create symbolic links for dataset results in the model base directory. + + Links are created as relative paths so that moving the output root + directory does not break them. + """ + cfg = self.states[dataset_name] + pred_root = Path(cfg.work_dir) + model_base_dir = pred_root.parent + + try: + if not pred_root.exists(): + return + for f in pred_root.iterdir(): + if not f.is_file(): + continue + if f'{cfg.model_name}_{dataset_name}' not in f.name: + continue + # Skip temporary intermediate files + if f.name.endswith(('_checkpoint.pkl', '_PREV.pkl', '_structs.pkl')): + continue + link_addr = model_base_dir / f.name + rel_target = f.relative_to(model_base_dir) + if link_addr.exists() or link_addr.is_symlink(): + link_addr.unlink() + link_addr.symlink_to(rel_target) + except Exception as e: + logger.warning(f" [{dataset_name}] Failed to create symlinks: {e}") + async def _producer(self): - """Genearte all sampels to inference.""" + """Generate all samples to inference.""" logger.info("📦 Initializing tasks and checking checkpoints...") for cfg in self.dataset_configs: @@ -339,6 +400,7 @@ async def _producer(self): # Save result file if not exists. if not Path(cfg.result_file).exists(): self._save_final_result(dataset_name) + self._create_symlinks(dataset_name) # Trigger evaluation. asyncio.create_task(self._trigger_eval(dataset_name)) continue @@ -566,9 +628,11 @@ def inference_call(): f"[{task.dataset_name}] Sample {task.sample_index}: " f"{output_preview} (took {inference_time:.2f}s)") - # Trigger evaluation if all tasks of the dataset is done. - if cfg.processed == cfg.total_samples and cfg.eval_status == EvalStatus.Pending: - if self._save_final_result(task.dataset_name): + # Save final result and create symlinks when all samples are done. + if cfg.processed == cfg.total_samples: + self._save_final_result(task.dataset_name) + self._create_symlinks(task.dataset_name) + if cfg.eval_status == EvalStatus.Pending: asyncio.create_task(self._trigger_eval(task.dataset_name)) except Exception as e: @@ -677,6 +741,8 @@ def _handle_eval_result(self, f" Check log file: {eval_log_path}" ) + # Update symlinks to capture evaluation output files. + self._create_symlinks(dataset_name) # Release dataset data after evaluation. self._release_dataset_memory(cfg) diff --git a/vlmeval/inference_mt.py b/vlmeval/inference_mt.py index dc564a3eb..0cefeb764 100644 --- a/vlmeval/inference_mt.py +++ b/vlmeval/inference_mt.py @@ -43,7 +43,7 @@ def chat_mt(model, messages, dataset_name): # Only API model is accepted -def infer_data_api(model, work_dir, model_name, dataset, index_set=None, api_nproc=4, ignore_failed=False): +def infer_data_api(model, work_dir, model_name, dataset, index_set=None, api_nproc=4, retry_failed=True): rank, world_size = get_rank_and_world_size() assert rank == 0 and world_size == 1 dataset_name = dataset.dataset_name @@ -56,33 +56,41 @@ def infer_data_api(model, work_dir, model_name, dataset, index_set=None, api_npr assert hasattr(model, 'chat_inner') lt, indices = len(data), list(data['index']) + # Build str→orig mapping for checkpoint key conversion + index_str_to_orig = {str(i): i for i in indices} structs = [dataset.build_prompt(data.iloc[i]) for i in range(lt)] - out_file = f'{work_dir}/{model_name}_{dataset_name}_supp.pkl' + out_file = f'{work_dir}/{model_name}_{dataset_name}_checkpoint.pkl' res = {} if osp.exists(out_file): res = load(out_file) - if ignore_failed: + if retry_failed: res = {k: v for k, v in res.items() if FAIL_MSG not in v} - structs = [s for i, s in zip(indices, structs) if i not in res] - indices = [i for i in indices if i not in res] + structs = [s for i, s in zip(indices, structs) if str(i) not in res] + indices = [i for i in indices if str(i) not in res] structs = [dict(model=model, messages=struct, dataset_name=dataset_name) for struct in structs] if len(structs): - track_progress_rich(chat_mt, structs, nproc=api_nproc, chunksize=api_nproc, save=out_file, keys=indices) + str_indices = [str(i) for i in indices] + track_progress_rich(chat_mt, structs, nproc=api_nproc, chunksize=api_nproc, save=out_file, keys=str_indices) - res = load(out_file) + # Load the full accumulated results (str keys) + if osp.exists(out_file): + res = load(out_file) + # Convert str keys back to original types for caller compatibility + result = {index_str_to_orig[k]: v for k, v in res.items() if k in index_str_to_orig} if index_set is not None: - res = {k: v for k, v in res.items() if k in index_set} - os.remove(out_file) - return res + result = {k: v for k, v in result.items() if k in index_set} + return result -def infer_data(model, model_name, work_dir, dataset, out_file, verbose=False, api_nproc=4, use_vllm=False): +def infer_data(model, model_name, work_dir, dataset, out_file, verbose=False, api_nproc=4, use_vllm=False, + retry_failed=True): dataset_name = dataset.dataset_name - res = {} + prev_file = f'{work_dir}/{model_name}_{dataset_name}_PREV.pkl' + res = load(prev_file) if osp.exists(prev_file) else {} if osp.exists(out_file): res.update(load(out_file)) @@ -134,7 +142,8 @@ def infer_data(model, model_name, work_dir, dataset, out_file, verbose=False, ap model_name=model_name, dataset=dataset, index_set=set(indices), - api_nproc=api_nproc) + api_nproc=api_nproc, + retry_failed=retry_failed) for idx in indices: assert idx in supp res.update(supp) @@ -171,18 +180,30 @@ def infer_data(model, model_name, work_dir, dataset, out_file, verbose=False, ap # A wrapper for infer_data, do the pre & post processing def infer_data_job_mt( - model, work_dir, model_name, dataset, verbose=False, api_nproc=4, ignore_failed=False, use_vllm=False + model, work_dir, model_name, dataset, verbose=False, api_nproc=4, retry_failed=True, use_vllm=False ): rank, world_size = get_rank_and_world_size() dataset_name = dataset.dataset_name result_file = get_pred_file_path(work_dir, model_name, dataset_name, use_env_format=True) + prev_file = f'{work_dir}/{model_name}_{dataset_name}_PREV.pkl' + if osp.exists(result_file): + if rank == 0: + data = load(result_file) + results = {k: v for k, v in zip(data['index'], data['prediction'])} + if retry_failed: + results = {k: v for k, v in results.items() if FAIL_MSG not in str(v)} + dump(results, prev_file) + if world_size > 1: + dist.barrier() + tmpl = osp.join(work_dir, '{}' + f'{world_size}_{dataset_name}.pkl') out_file = tmpl.format(rank) model = infer_data( model=model, work_dir=work_dir, model_name=model_name, dataset=dataset, - out_file=out_file, verbose=verbose, api_nproc=api_nproc, use_vllm=use_vllm) + out_file=out_file, verbose=verbose, api_nproc=api_nproc, use_vllm=use_vllm, + retry_failed=retry_failed) if world_size > 1: dist.barrier() @@ -202,4 +223,11 @@ def infer_data_job_mt( dump(data, result_file) for i in range(world_size): os.remove(tmpl.format(i)) + # Clean up API checkpoint file + checkpoint_file = f'{work_dir}/{model_name}_{dataset_name}_checkpoint.pkl' + if osp.exists(checkpoint_file): + os.remove(checkpoint_file) + # Clean up PREV file + if osp.exists(prev_file): + os.remove(prev_file) return model diff --git a/vlmeval/inference_video.py b/vlmeval/inference_video.py index ee1ea06f8..54f9090e2 100644 --- a/vlmeval/inference_video.py +++ b/vlmeval/inference_video.py @@ -2,6 +2,7 @@ import os import os.path as osp import warnings +from pathlib import Path import numpy as np import torch @@ -9,7 +10,7 @@ from tqdm import tqdm from vlmeval.config import supported_VLM -from vlmeval.smp import dump, get_rank_and_world_size, load +from vlmeval.smp import dump, get_pred_file_path, get_rank_and_world_size, load from vlmeval.utils import track_progress_rich FAIL_MSG = 'Failed to obtain answer via API.' @@ -26,7 +27,7 @@ def parse_args(): # Only API model is accepted -def infer_data_api(model, work_dir, model_name, dataset, samples_dict={}, api_nproc=4): +def infer_data_api(model, work_dir, model_name, dataset, samples_dict={}, api_nproc=4, retry_failed=True): rank, world_size = get_rank_and_world_size() assert rank == 0 and world_size == 1 dataset_name = dataset.dataset_name @@ -34,6 +35,8 @@ def infer_data_api(model, work_dir, model_name, dataset, samples_dict={}, api_np assert getattr(model, 'is_api', False) indices = list(samples_dict.keys()) + # Build str→orig mapping for checkpoint key conversion + index_str_to_orig = {str(i): i for i in indices} if getattr(model, 'backend', None) == 'genai': if dataset.nframe > 0: print( @@ -66,29 +69,40 @@ def infer_data_api(model, work_dir, model_name, dataset, samples_dict={}, api_np ) if dataset.nframe > 0: - out_file = f'{work_dir}/{model_name}_{dataset_name}_{dataset.nframe}frame_{packstr}_supp.pkl' + out_file = f'{work_dir}/{model_name}_{dataset_name}_{dataset.nframe}frame_{packstr}_checkpoint.pkl' else: - out_file = f'{work_dir}/{model_name}_{dataset_name}_{dataset.fps}fps_{packstr}_supp.pkl' + out_file = f'{work_dir}/{model_name}_{dataset_name}_{dataset.fps}fps_{packstr}_checkpoint.pkl' res = load(out_file) if osp.exists(out_file) else {} + if retry_failed: + res = {k: v for k, v in res.items() if FAIL_MSG not in str(v)} - structs = [s for i, s in zip(indices, structs) if i not in res or res[i] == FAIL_MSG] + structs = [s for i, s in zip(indices, structs) if str(i) not in res] structs = [struct for struct in structs if struct is not None] - indices = [i for i in indices if i not in res or res[i] == FAIL_MSG] + indices = [i for i in indices if str(i) not in res] gen_func = model.generate structs = [dict(message=struct, dataset=dataset_name) for struct in structs] if len(structs): - track_progress_rich(gen_func, structs, nproc=api_nproc, chunksize=api_nproc, save=out_file, keys=indices) + str_indices = [str(i) for i in indices] + track_progress_rich(gen_func, structs, nproc=api_nproc, chunksize=api_nproc, save=out_file, keys=str_indices) - res = load(out_file) - return res + # Load the full accumulated results (str keys) + if osp.exists(out_file): + res = load(out_file) + # Convert str keys back to original types for caller compatibility + result = {index_str_to_orig[k]: v for k, v in res.items() if k in index_str_to_orig} + return result -def infer_data(model, model_name, work_dir, dataset, out_file, verbose=False, api_nproc=4, use_vllm=False): - res = load(out_file) if osp.exists(out_file) else {} - rank, world_size = get_rank_and_world_size() +def infer_data(model, model_name, work_dir, dataset, out_file, verbose=False, api_nproc=4, use_vllm=False, + retry_failed=True): dataset_name = dataset.dataset_name + prev_file = f'{work_dir}/{model_name}_{dataset_name}_PREV.pkl' + res = load(prev_file) if osp.exists(prev_file) else {} + if osp.exists(out_file): + res.update(load(out_file)) + rank, world_size = get_rank_and_world_size() sample_indices = list(dataset.videos) if getattr(dataset, 'pack', False) else list(dataset.data['index']) samples = list(dataset.videos) if getattr(dataset, 'pack', False) else list(range(len(dataset.data))) @@ -126,7 +140,8 @@ def infer_data(model, model_name, work_dir, dataset, out_file, verbose=False, ap model_name=model_name, dataset=dataset, samples_dict={k: sample_map[k] for k in sample_indices_subrem}, - api_nproc=api_nproc) + api_nproc=api_nproc, + retry_failed=retry_failed) for k in sample_indices_subrem: assert k in supp res.update(supp) @@ -217,19 +232,31 @@ def infer_data_job_video( work_dir, model_name, dataset, - result_file_name, verbose=False, api_nproc=4, - use_vllm=False): + use_vllm=False, + retry_failed=True): dataset_name = dataset.dataset_name rank, world_size = get_rank_and_world_size() - result_file = osp.join(work_dir, result_file_name) - # Dump Predictions to Prev File if result file exists + result_file = get_pred_file_path(work_dir, model_name, dataset_name, use_env_format=True) + + prev_file = f'{work_dir}/{model_name}_{dataset_name}_PREV.pkl' if osp.exists(result_file): - return model + if retry_failed: + if rank == 0: + data = load(result_file) + results = {k: v for k, v in zip(data['index'], data['prediction'])} + results = {k: v for k, v in results.items() if FAIL_MSG not in str(v)} + if len(results) == len(data): + return model + dump(results, prev_file) + if world_size > 1: + dist.barrier() + else: + return model - tmpl = osp.join(work_dir, '{}' + f'{world_size}_{osp.splitext(result_file_name)[0]}.pkl') + tmpl = osp.join(work_dir, '{}' + f'{world_size}_{Path(result_file).stem}.pkl') out_file = tmpl.format(rank) model = infer_data( @@ -240,7 +267,8 @@ def infer_data_job_video( out_file=out_file, verbose=verbose, api_nproc=api_nproc, - use_vllm=use_vllm) + use_vllm=use_vllm, + retry_failed=retry_failed) if world_size > 1: dist.barrier() @@ -264,4 +292,19 @@ def infer_data_job_video( dump(meta, result_file) for i in range(world_size): os.remove(tmpl.format(i)) + # Clean up API checkpoint files + packstr = 'pack' if getattr(dataset, 'pack', False) else 'nopack' + if dataset.nframe > 0: + checkpoint_file = ( + f'{work_dir}/{model_name}_{dataset_name}_{dataset.nframe}frame_{packstr}_checkpoint.pkl' + ) + else: + checkpoint_file = ( + f'{work_dir}/{model_name}_{dataset_name}_{dataset.fps}fps_{packstr}_checkpoint.pkl' + ) + if osp.exists(checkpoint_file): + os.remove(checkpoint_file) + # Clean up PREV file + if osp.exists(prev_file): + os.remove(prev_file) return model diff --git a/vlmeval/smp/file.py b/vlmeval/smp/file.py index 87afb7d05..6f4c9bd69 100644 --- a/vlmeval/smp/file.py +++ b/vlmeval/smp/file.py @@ -1,4 +1,5 @@ import csv +import datetime import hashlib import json import mimetypes @@ -6,16 +7,25 @@ import os import os.path as osp import pickle +import re +import shutil import time -import warnings +from pathlib import Path import numpy as np import pandas as pd import validators +from .log import get_logger from .misc import toliststr from .vlm import decode_base64_to_image_file +NEW_EVAL_ID_PATTERN = re.compile(r'^T\d{8}-\d{6}$') +LEGACY_EVAL_ID_PATTERN = re.compile(r'^T\d{8}_G([0-9a-fA-F]+)$') +RUN_STATUS_NAME = 'status.json' +INFER_FAIL_MSG = 'Failed to obtain answer' +logger = get_logger(__name__) + def decode_img_omni(tup): root, im, p = tup @@ -168,7 +178,7 @@ def dump_tsv(data, f, quoting=csv.QUOTE_ALL): except Exception: # if dump failed, fallback to pkl format pkl_file = f.rsplit('.', 1)[0] + '.pkl' - warnings.warn(f'Failed to dump to {suffix} format, falling back to pkl: {pkl_file}') + logger.warning(f'Failed to dump to {suffix} format, falling back to pkl: {pkl_file}') return dump_pkl(data, pkl_file, **kwargs) @@ -435,27 +445,254 @@ def parquet_to_tsv(file_path): data.to_csv(osp.join(pth, f'{data_name}.tsv'), sep='\t', index=False) +def build_eval_id(): + return 'T' + datetime.datetime.now().strftime('%Y%m%d-%H%M%S') + + +def is_eval_run_id(eval_id): + return bool(NEW_EVAL_ID_PATTERN.match(eval_id) or LEGACY_EVAL_ID_PATTERN.match(eval_id)) + + +def get_run_status_path(run_dir): + return osp.join(run_dir, RUN_STATUS_NAME) + + +def load_run_status(run_dir): + status_file = Path(run_dir) / RUN_STATUS_NAME + if not status_file.exists(): + return {} + try: + data = load(str(status_file)) + except Exception as err: + logger.warning(f'Failed to load run status from {status_file}: {err}') + return {} + return data if isinstance(data, dict) else {} + + +def write_run_status(run_dir, status): + run_dir = Path(run_dir) + run_dir.mkdir(exist_ok=True) + status_file = run_dir / RUN_STATUS_NAME + tmp_file = status_file.with_suffix('.tmp') + with open(tmp_file, 'w', encoding='utf-8') as fout: + json.dump(status, fout, indent=2, ensure_ascii=False, cls=NumpyEncoder) + os.replace(tmp_file, status_file) + + +def update_dataset_status(run_dir, dataset_name, **kwargs): + status = load_run_status(run_dir) + datasets = status.setdefault('datasets', {}) + dataset_status = datasets.setdefault(dataset_name, {}) + dataset_status.update(kwargs) + write_run_status(run_dir, status) + return status + + +def _dataset_shadow_names(dataset_name): + from vlmeval.dataset import SUPPORTED_DATASETS + + return [d for d in SUPPORTED_DATASETS if d.startswith(dataset_name) and d != dataset_name] + + +def _filter_shadow_dataset_files(files, model_name, dataset_name): + shadow_names = _dataset_shadow_names(dataset_name) + if not shadow_names: + return files + + filtered = [] + for file_path in files: + base = osp.basename(file_path) + skip = False + for shadow_name in shadow_names: + if f'{model_name}_{shadow_name}' in base or f'_{shadow_name}.' in base or f'_{shadow_name}_' in base: + skip = True + break + if not skip: + filtered.append(file_path) + return filtered + + +def list_eval_run_dirs(root_dir, current_run_dir=None): + if not osp.isdir(root_dir): + return [] + + run_dirs = [] + current_run_dir = osp.abspath(current_run_dir) if current_run_dir is not None else None + for entry in os.listdir(root_dir): + path = osp.join(root_dir, entry) + if not osp.isdir(path) or not is_eval_run_id(entry): + continue + if current_run_dir is not None and osp.abspath(path) == current_run_dir: + continue + run_dirs.append(path) + run_dirs.sort(key=osp.getmtime, reverse=True) + return run_dirs + + +def find_prediction_files(run_dir, model_name, dataset_name): + if not osp.isdir(run_dir): + return [] + files = ls(run_dir, match=f'{model_name}_{dataset_name}.', mode='file') + files = _filter_shadow_dataset_files(files, model_name, dataset_name) + return sorted(files) + + +def _status_commit(run_dir, status=None): + status = status or {} + commit = status.get('commit') + if commit: + return str(commit) + match = LEGACY_EVAL_ID_PATTERN.match(osp.basename(run_dir)) + if match: + return match.group(1) + return None + + +def _prediction_table(pred_file): + try: + data = load(pred_file) + except Exception as err: + logger.warning(f'Failed to load prediction file {pred_file}: {err}') + return None + + if isinstance(data, pd.DataFrame): + frame = data + elif isinstance(data, list): + frame = pd.DataFrame(data) + elif isinstance(data, dict): + if not _should_convert_to_dataframe(data): + return None + if 'columns' in data and 'data' in data: + frame = pd.DataFrame(data['data'], columns=data['columns']) + else: + frame = pd.DataFrame(data) + else: + return None + + if 'index' not in frame or 'prediction' not in frame: + return None + return frame + + +def copy_prediction_file(src_file, dst_file): + os.makedirs(osp.dirname(dst_file), exist_ok=True) + src_suffix = osp.splitext(src_file)[1].lower() + dst_suffix = osp.splitext(dst_file)[1].lower() + if src_suffix == dst_suffix: + shutil.copy2(src_file, dst_file) + return True + + frame = _prediction_table(src_file) + if frame is None: + logger.warning( + f'Can not convert prediction file {src_file} to target format {dst_suffix}, reuse skipped.' + ) + return False + dump(frame, dst_file) + if osp.exists(dst_file): + return True + logger.warning( + f'Failed to materialize converted prediction file at {dst_file}, reuse skipped.' + ) + return False + + +def is_prediction_complete(pred_file, dataset_indices, retry_failed=True): + if not osp.exists(pred_file): + return False + frame = _prediction_table(pred_file) + if frame is None: + return False + + finished = { + str(idx): pred + for idx, pred in zip(frame['index'], frame['prediction']) + } + if retry_failed: + finished = {k: v for k, v in finished.items() if INFER_FAIL_MSG not in str(v)} + return all(str(idx) in finished for idx in dataset_indices) + + +def get_infer_aux_file_names(model_name, dataset, result_file_base, world_size=1) -> set: + dataset_name = dataset.dataset_name + infer_aux = {f'{model_name}_{dataset_name}_checkpoint.pkl'} + + if dataset.MODALITY == 'VIDEO': + packstr = 'pack' if getattr(dataset, 'pack', False) else 'nopack' + if getattr(dataset, 'nframe', 0) > 0: + video_prefix = f'{model_name}_{dataset_name}_{dataset.nframe}frame_{packstr}' + else: + video_prefix = f'{model_name}_{dataset_name}_{dataset.fps}fps_{packstr}' + infer_aux.add(f'{video_prefix}_checkpoint.pkl') + infer_aux.add(f'{video_prefix}_structs.pkl') + + result_stem = osp.splitext(result_file_base)[0] + for rank in range(world_size): + infer_aux.add(f'{rank}{world_size}_{result_stem}.pkl') + else: + for rank in range(world_size): + infer_aux.add(f'{rank}{world_size}_{dataset_name}.pkl') + return infer_aux + + +def select_reuse_run( + pred_root_meta, + eval_id, + model_name, + dataset_name, + result_file, + infer_aux_file_names, +): + current_run_dir = osp.join(pred_root_meta, eval_id) + run_dirs = list_eval_run_dirs(pred_root_meta, current_run_dir=current_run_dir) + + candidates = [] + for run_dir in run_dirs: + status = load_run_status(run_dir) + prediction_files = find_prediction_files(run_dir, model_name, dataset_name) + prediction_file = None + if prediction_files: + exact_name = osp.basename(result_file) + exact_match = [x for x in prediction_files if osp.basename(x) == exact_name] + prediction_file = exact_match[0] if exact_match else prediction_files[0] + if len(prediction_files) > 1: + logger.warning( + f'Multiple prediction candidates in {run_dir}: ' + f'{prediction_files}. Will use {prediction_file}' + ) + + infer_aux_files = [ + osp.join(run_dir, file_name) + for file_name in infer_aux_file_names + if osp.exists(osp.join(run_dir, file_name)) + ] + if prediction_file is None and not infer_aux_files: + continue + + item = dict( + run_dir=run_dir, + status=status, + prediction_file=prediction_file, + infer_aux_files=infer_aux_files, + ) + candidates.append(item) + + return candidates[0] if len(candidates) else None + + def fetch_aux_files(eval_file): file_root = osp.dirname(eval_file) file_name = osp.basename(eval_file) eval_id = osp.basename(file_root) - if eval_id[:3] == 'T20' and eval_id[9:11] == '_G': + if is_eval_run_id(eval_id): model_name = osp.basename(osp.dirname(file_root)) else: model_name = eval_id dataset_name = osp.splitext(file_name)[0][len(model_name) + 1:] - from vlmeval.dataset import SUPPORTED_DATASETS - to_handle = [] - for d in SUPPORTED_DATASETS: - if d.startswith(dataset_name) and d != dataset_name: - to_handle.append(d) - fs = ls(file_root, match=f'{model_name}_{dataset_name}') - if len(to_handle): - for d in to_handle: - fs = [x for x in fs if d not in x] - return fs + fs = ls(file_root, match=f'{model_name}_{dataset_name}', mode='file') + return _filter_shadow_dataset_files(fs, model_name, dataset_name) def get_file_extension(file_path): @@ -484,57 +721,83 @@ def ends_with_list(s, lst): return eval_file.replace(f'.{original_ext}', f'{suffix}.{target_format}') -def prepare_reuse_files(pred_root_meta, eval_id, model_name, dataset_name, reuse, reuse_aux): - import shutil - - from .misc import timestr +def prepare_reuse_files( + pred_root_meta, + eval_id, + model_name, + dataset, + result_file, + reuse, + reuse_aux, + retry_failed=True, + judge_signature=None, + world_size=1, +): work_dir = osp.join(pred_root_meta, eval_id) os.makedirs(work_dir, exist_ok=True) + dataset_name = dataset.dataset_name + context = dict( + source_eval_id=None, + prediction_complete=False, + ) if not reuse: - files = ls(work_dir, match=f'{model_name}_{dataset_name}') - if len(files): - t_str = timestr('second') - bak_dir = osp.join(work_dir, f'bak_{t_str}_{dataset_name}') - os.makedirs(bak_dir, exist_ok=True) - for f in files: - shutil.move(f, bak_dir) - warnings.warn( - f'--reuse flag not set but history records detected in {work_dir}. ' - f'Those files are moved to {bak_dir} for backup. ' - ) - return - # reuse flag is set - prev_pred_roots = ls(pred_root_meta, mode='dir') - prev_pred_roots.sort() - prev_pred_roots.remove(work_dir) - - files = ls(work_dir, match=f'{model_name}_{dataset_name}.') - prev_file = None - prev_aux_files = None - if len(files): - pass - else: - for root in prev_pred_roots[::-1]: - fs = ls(root, match=f'{model_name}_{dataset_name}.') - if len(fs): - if len(fs) > 1: - warnings.warn(f'Multiple candidates in {root}: {fs}. Will use {fs[0]}') - prev_file = fs[0] - prev_aux_files = fetch_aux_files(prev_file) - break - if prev_file is not None: - warnings.warn(f'--reuse is set, will reuse prediction file {prev_file}') - os.system(f'cp {prev_file} {work_dir}') - - if not reuse_aux: - warnings.warn(f'--reuse-aux is not set, all auxiliary files in {work_dir} are removed. ') - os.system(f'rm -rf {osp.join(work_dir, f"{model_name}_{dataset_name}_*openai*")}') - os.system(f'rm -rf {osp.join(work_dir, f"{model_name}_{dataset_name}_*csv")}') - os.system(f'rm -rf {osp.join(work_dir, f"{model_name}_{dataset_name}_*json")}') - os.system(f'rm -rf {osp.join(work_dir, f"{model_name}_{dataset_name}_*pkl")}') - os.system(f'rm -rf {osp.join(work_dir, f"{model_name}_{dataset_name}_*gpt*")}') - elif prev_aux_files is not None: - for f in prev_aux_files: - os.system(f'cp {f} {work_dir}') - warnings.warn(f'--reuse-aux is set, will reuse auxiliary file {f}') - return + return context + + infer_aux_file_names = get_infer_aux_file_names( + model_name=model_name, + dataset=dataset, + result_file_base=osp.basename(result_file), + world_size=world_size, + ) + source = select_reuse_run( + pred_root_meta=pred_root_meta, + eval_id=eval_id, + model_name=model_name, + dataset_name=dataset_name, + result_file=result_file, + infer_aux_file_names=infer_aux_file_names, + ) + if source is None: + return context + + context['source_eval_id'] = osp.basename(source['run_dir']) + + src_prediction_file = source['prediction_file'] + if src_prediction_file is not None and copy_prediction_file(src_prediction_file, result_file): + logger.info(f'--reuse is set, will reuse prediction file {src_prediction_file}') + + if reuse_aux in ['infer', 'all']: + for src_file in source['infer_aux_files']: + dst_file = osp.join(work_dir, osp.basename(src_file)) + shutil.copy2(src_file, dst_file) + logger.info(f'--reuse-aux={reuse_aux}, will reuse inference auxiliary file {src_file}') + + context['prediction_complete'] = is_prediction_complete( + result_file, + dataset_indices=list(dataset.data['index']), + retry_failed=retry_failed, + ) + + source_dataset_status = source['status'].get('datasets', {}).get(dataset_name, {}) + source_judge_signature = source_dataset_status.get('judge_signature') + if ( + reuse_aux == 'all' + and judge_signature is not None + and context['prediction_complete'] + and src_prediction_file is not None + and source_judge_signature == judge_signature + ): + infer_model_aux = { + file_name + for file_name in infer_aux_file_names + if file_name.startswith(f'{model_name}_{dataset_name}_') + } + all_aux_files = fetch_aux_files(src_prediction_file) + for src_file in all_aux_files: + base = osp.basename(src_file) + if base == osp.basename(src_prediction_file) or base in infer_model_aux: + continue + dst_file = osp.join(work_dir, base) + shutil.copy2(src_file, dst_file) + logger.info(f'--reuse-aux=all, will reuse evaluation auxiliary file {src_file}') + return context