Skip to content

CLI

cli

__version__ = '0.1.0' module-attribute

logger = logging.getLogger(__name__) module-attribute

LOG_MAPPING = {'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'WARNING': logging.WARNING, 'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL} module-attribute

Pipeline(config)

Main pipeline class for optiMHC, encapsulating the full data processing workflow.

This class orchestrates input parsing, feature generation, rescoring, result saving, and visualization. It supports both single-run and experiment modes (multiple feature/model combinations).

Parameters:

Name Type Description Default
config str, dict, or Config

Path to YAML config, dict, or Config object.

required

Examples:

>>> from optimhc.core import Pipeline
>>> pipeline = Pipeline(config)
>>> pipeline.run()

Initialize the pipeline with a configuration file, dict, or Config object.

Parameters:

Name Type Description Default
config str, dict, or Config

Path to YAML config, dict, or Config object.

required
Source code in optimhc/core/pipeline.py
def __init__(self, config):
    """
    Initialize the pipeline with a configuration file, dict, or Config object.

    Parameters
    ----------
    config : str, dict, or Config
        Path to YAML config, dict, or Config object.
    """
    logger.debug(f"config: {config}")
    if isinstance(config, Config):
        self.config = config
    else:
        self.config = Config(config)
    self.config.validate()
    self.experiment = self.config.get("experimentName", "optimhc_experiment")
    self.output_dir = os.path.join(self.config["outputDir"], self.experiment)
    os.makedirs(self.output_dir, exist_ok=True)

    self.visualization_enabled = self.config.get("visualization", True)
    self.save_models = self.config.get("saveModels", True)
    self.to_flashlfq = self.config.get("toFlashLFQ", True)
    self.test_fdr = self.config.get("rescore", {}).get("testFDR", 0.01)
    self.train_fdr = self.config.get("rescore", {}).get("trainFDR", 0.01)
    self.model_type = self.config.get("rescore", {}).get("model", "Percolator")
    self.n_jobs = self.config.get("rescore", {}).get("numJobs", 1)

read_input()

Read input PSMs based on configuration.

Returns:

Type Description
PsmContainer

Object containing loaded PSMs.

Raises:

Type Description
ValueError

If input type is unsupported.

Exception

If file reading fails.

Source code in optimhc/core/pipeline.py
def read_input(self):
    """
    Read input PSMs based on configuration.

    Returns
    -------
    PsmContainer
        Object containing loaded PSMs.

    Raises
    ------
    ValueError
        If input type is unsupported.
    Exception
        If file reading fails.
    """
    input_type = self.config["inputType"]
    input_files = self.config["inputFile"]
    if not isinstance(input_files, list):
        input_files = [input_files]

    try:
        if input_type == "pepxml":
            psms = read_pepxml(input_files, decoy_prefix=self.config["decoyPrefix"])
        elif input_type == "pin":
            psms = read_pin(
                input_files,
                retention_time_column=self.config.get("retentionTimeColumn"),
            )
        else:
            raise ValueError(f"Unsupported input type: {input_type}")
        return psms
    except Exception as e:
        logger.error(f"Failed to read input files: {e}")
        raise

rescore(psms, model_type=None, n_jobs=None, test_fdr=None, rescoring_features=None)

Perform rescoring on the PSMs using the specified or configured model.

Parameters:

Name Type Description Default
psms PsmContainer

PSM container object.

required
model_type str

Model type ('XGBoost', 'RandomForest', 'Percolator').

None
n_jobs int

Number of parallel jobs.

None
test_fdr float

FDR threshold. List of features to use for rescoring.

None

Returns:

Name Type Description
results Results

Rescoring results.

models list

Trained models.

Notes

Rescoring logic is adapted from mokapot (https://mokapot.readthedocs.io/)

Source code in optimhc/core/pipeline.py
def rescore(self, psms, model_type=None, n_jobs=None, test_fdr=None, rescoring_features=None):
    """
    Perform rescoring on the PSMs using the specified or configured model.

    Parameters
    ----------
    psms : PsmContainer
        PSM container object.
    model_type : str, optional
        Model type ('XGBoost', 'RandomForest', 'Percolator').
    n_jobs : int, optional
        Number of parallel jobs.
    test_fdr : float, optional
        FDR threshold.
        List of features to use for rescoring.

    Returns
    -------
    results : mokapot.Results
        Rescoring results.
    models : list
        Trained models.

    Notes
    -----
    Rescoring logic is adapted from mokapot (https://mokapot.readthedocs.io/)
    """
    test_fdr = test_fdr if test_fdr is not None else self.test_fdr
    model_type = model_type if model_type is not None else self.model_type
    n_jobs = n_jobs if n_jobs is not None else self.n_jobs

    train_fdr = getattr(self, "train_fdr", 0.01)
    model_cls = rescore_model_factory.get_model(model_type)
    model = model_cls.from_config(self._build_model_config(train_fdr=train_fdr, n_jobs=n_jobs))

    kwargs = {}
    if rescoring_features is not None:
        kwargs["rescoring_features"] = rescoring_features

    results, models = mokapot.rescore(psms, model=model, test_fdr=test_fdr, **kwargs)
    return results, models

save_results(psms, results, models, output_dir=None, file_root='optimhc')

Save rescoring results, PSM data, and trained models to disk.

Parameters:

Name Type Description Default
psms PsmContainer

PSM container object.

required
results Results

Rescoring results.

required
models list

Trained models.

required
output_dir str

Output directory.

None
file_root str

Root name for output files.

'optimhc'
Source code in optimhc/core/pipeline.py
def save_results(self, psms, results, models, output_dir=None, file_root="optimhc"):
    """
    Save rescoring results, PSM data, and trained models to disk.

    Parameters
    ----------
    psms : PsmContainer
        PSM container object.
    results : mokapot.Results
        Rescoring results.
    models : list
        Trained models.
    output_dir : str, optional
        Output directory.
    file_root : str, optional
        Root name for output files.
    """
    output_dir = output_dir if output_dir is not None else self.output_dir

    results.to_txt(dest_dir=output_dir, file_root=file_root, decoys=True)
    psms.write_pin(os.path.join(output_dir, f"{file_root}.pin"))

    if self.save_models:
        model_dir = os.path.join(output_dir, "models")
        os.makedirs(model_dir, exist_ok=True)
        logger.info(f"Saving models to {model_dir}")
        for i, model in enumerate(models):
            model.save(os.path.join(model_dir, f"{file_root}.model{i}"))

    if self.to_flashlfq:
        mokapot.to_flashLFQ(results, output_dir, file_name=f"{file_root}.FlashLFQ.txt")

visualize_results(psms, results, models, output_dir=None, sources=None)

Generate and save visualizations for the analysis results.

Parameters:

Name Type Description Default
psms PsmContainer

PSM container object.

required
results Results

Rescoring results.

required
models list

Trained models.

required
output_dir str

Output directory.

None
sources list

Feature sources to include in visualizations.

None
Source code in optimhc/core/pipeline.py
def visualize_results(self, psms, results, models, output_dir=None, sources=None):
    """
    Generate and save visualizations for the analysis results.

    Parameters
    ----------
    psms : PsmContainer
        PSM container object.
    results : mokapot.Results
        Rescoring results.
    models : list
        Trained models.
    output_dir : str, optional
        Output directory.
    sources : list, optional
        Feature sources to include in visualizations.
    """
    if not self.visualization_enabled:
        logger.info("Visualization is disabled. Skipping...")
        return

    output_dir = output_dir if output_dir is not None else self.output_dir
    fig_dir = os.path.join(output_dir, "figures")
    os.makedirs(fig_dir, exist_ok=True)

    plot_qvalues(
        results,
        save_path=os.path.join(fig_dir, "qvalues.png"),
        threshold=0.05,
    )

    if sources:
        rescoring_features = {k: v for k, v in psms.rescoring_features.items() if k in sources}
    else:
        rescoring_features = psms.rescoring_features

    plot_feature_importance(
        models,
        rescoring_features,
        save_path=os.path.join(fig_dir, "feature_importance.png"),
    )
    visualize_feature_correlation(
        psms,
        save_path=os.path.join(fig_dir, "feature_correlation.png"),
    )
    visualize_target_decoy_features(
        psms,
        num_cols=4,
        save_path=os.path.join(fig_dir, "target_decoy_histogram.png"),
    )

run()

Run the complete optiMHC pipeline (single run mode).

This method executes the full workflow: input parsing, feature generation, rescoring, saving, and visualization.

Returns:

Name Type Description
psms PsmContainer

PSM container object.

results Results

Rescoring results.

models list

Trained models.

Source code in optimhc/core/pipeline.py
def run(self):
    """
    Run the complete optiMHC pipeline (single run mode).

    This method executes the full workflow: input parsing, feature generation, rescoring, saving, and visualization.

    Returns
    -------
    psms : PsmContainer
        PSM container object.
    results : mokapot.Results
        Rescoring results.
    models : list
        Trained models.
    """
    logger.info("Starting analysis pipeline")

    psms = self.read_input()
    psms = self._generate_features(psms)
    results, models = self.rescore(psms)
    self.save_results(psms, results, models)
    self.visualize_results(psms, results, models)

    logger.info(f"Analysis pipeline completed, results saved to {self.output_dir}")
    return psms, results, models

run_experiments()

Run experiments with different feature/model combinations using multiprocessing.

Each experiment is executed in its own process for complete resource isolation. The experiment configurations must be provided in the config under the 'experiments' key.

Returns:

Type Description
None
Source code in optimhc/core/pipeline.py
def run_experiments(self):
    """
    Run experiments with different feature/model combinations using multiprocessing.

    Each experiment is executed in its own process for complete resource isolation.
    The experiment configurations must be provided in the config under the 'experiments' key.

    Returns
    -------
    None
    """
    logger.info("Starting experiment mode with multiple feature combinations")

    psms = self.read_input()
    psms = self._generate_features(psms)
    pin_path = os.path.join(self.output_dir, f"optimhc.{self.experiment}.pin")
    psms.write_pin(pin_path)
    fig_summary_dir = os.path.join(self.output_dir, "figures")
    os.makedirs(fig_summary_dir, exist_ok=True)
    visualize_feature_correlation(
        psms,
        save_path=os.path.join(fig_summary_dir, "feature_correlation.png"),
    )
    # visualize_target_decoy_features(
    #     psms,
    #     num_cols=4,
    #     save_path=os.path.join(fig_summary_dir, 'target_decoy_histogram.png'),
    # )

    experiment_configs = self.config.get("experiments", [])
    processes = []
    for i, exp_config in enumerate(experiment_configs):
        exp_name = exp_config.get("name", f"Experiment_{i + 1}")
        exp_dir = os.path.join(self.output_dir, exp_name)

        logger.info(f"Starting experiment '{exp_name}' in a separate process")
        p = Process(
            target=self._run_single_experiment,
            args=(psms, exp_config, exp_name, exp_dir),
        )
        p.start()
        processes.append(p)

    # Wait for all experiment processes to finish
    for p in processes:
        p.join()

    logger.info("All experiments completed")

Config(config_source=None)

Configuration manager for optiMHC pipeline.

This class handles loading, validating, and providing access to configuration parameters from YAML files or dictionaries. It implements a fail-fast validation strategy to ensure configuration correctness before pipeline execution.

Parameters:

Name Type Description Default
config_source str or dict or None

Path to YAML file, dictionary with configuration, or None for default config. If None, uses DEFAULT_CONFIG.

None

Attributes:

Name Type Description
_config dict

The internal configuration dictionary.

Raises:

Type Description
ValueError

If configuration is invalid or required parameters are missing.

FileNotFoundError

If specified YAML file does not exist.

YAMLError

If YAML file is malformed.

Examples:

>>> # Load from YAML file
>>> config = Config("path/to/config.yaml")
>>> # Load from dictionary
>>> config_dict = {
...     "inputType": "pepxml",
...     "inputFile": ["data.pep.xml"],
...     "outputDir": "./results",
...     "rescore": {"testFDR": 0.01, "model": "Percolator"}
... }
>>> config = Config(config_dict)
>>> # Use default configuration
>>> config = Config()
>>> # Access configuration values
>>> input_type = config["inputType"]
>>> output_dir = config.get("outputDir", "./default")
>>> # Save configuration to file
>>> config.save("output_config.yaml")

Initialize Config from a YAML file path or a dictionary.

Parameters:

Name Type Description Default
config_source str or dict or None

Path to YAML file, dictionary with configuration, or None for default config. If None, uses DEFAULT_CONFIG.

None

Raises:

Type Description
ValueError

If config_source is neither a string, dict, nor None.

FileNotFoundError

If specified YAML file does not exist.

YAMLError

If YAML file is malformed.

Source code in optimhc/core/config.py
def __init__(self, config_source=None):
    """
    Initialize Config from a YAML file path or a dictionary.

    Parameters
    ----------
    config_source : str or dict or None, optional
        Path to YAML file, dictionary with configuration, or None for default config.
        If None, uses DEFAULT_CONFIG.

    Raises
    ------
    ValueError
        If config_source is neither a string, dict, nor None.
    FileNotFoundError
        If specified YAML file does not exist.
    yaml.YAMLError
        If YAML file is malformed.
    """
    if config_source is None:
        self._config = deepcopy(DEFAULT_CONFIG)
    elif isinstance(config_source, str):
        with open(config_source, "r") as f:
            user_config = yaml.safe_load(f)
        self._config = _deep_merge(DEFAULT_CONFIG, user_config)
    elif isinstance(config_source, dict):
        self._config = _deep_merge(DEFAULT_CONFIG, config_source)
    else:
        raise ValueError("Config source must be a file path, dict, or None.")

validate()

Validate the configuration using a fail-fast strategy.

This method performs comprehensive validation of the configuration, including required fields, data types, file existence, and feature generator configurations.

Raises:

Type Description
ValueError

If any validation check fails. The error message will indicate the specific validation failure.

Notes

The validation includes checks for: - Required fields (inputType, inputFile, outputDir, rescore) - Input file existence and type - Output directory creation - Rescore configuration validity (TODO) - Feature generator configuration primitive validity (TODO) - Optional parameter validity (TODO): we should validate 'allele' first !!!

Source code in optimhc/core/config.py
def validate(self):
    """
    Validate the configuration using a fail-fast strategy.

    This method performs comprehensive validation of the configuration,
    including required fields, data types, file existence, and feature
    generator configurations.

    Raises
    ------
    ValueError
        If any validation check fails. The error message will indicate
        the specific validation failure.

    Notes
    -----
    The validation includes checks for:
    - Required fields (inputType, inputFile, outputDir, rescore)
    - Input file existence and type
    - Output directory creation
    - Rescore configuration validity (TODO)
    - Feature generator configuration primitive validity (TODO)
    - Optional parameter validity (TODO): we should validate 'allele' first !!!
    """
    if not isinstance(self._config, dict):
        logger.error("Configuration must be a dictionary")
        raise ValueError("Configuration must be a dictionary")

    required_fields = ["inputType", "inputFile", "outputDir", "rescore"]
    for field in required_fields:
        if field not in self._config:
            logger.error(f"Missing required configuration: '{field}'")
            raise ValueError(f"Missing required configuration: '{field}'")

        if field == "inputFile" and self._config[field] == []:
            logger.error("inputFile list cannot be empty")
            raise ValueError("inputFile list cannot be empty")
        elif self._config[field] in (None, "", []):
            logger.error(f"Required configuration '{field}' cannot be empty")
            raise ValueError(f"Required configuration '{field}' cannot be empty")

    if self._config["inputType"] not in ("pepxml", "pin"):
        logger.error("inputType must be 'pepxml' or 'pin'")
        raise ValueError("inputType must be 'pepxml' or 'pin'")

    if (
        self._config["inputType"] == "pin"
        and self._config.get("retentionTimeColumn", None) is None
    ):
        logger.error("retentionTimeColumn must be specified when inputType is 'pin'")
        raise ValueError("retentionTimeColumn must be specified when inputType is 'pin'")

    input_files = self._config["inputFile"]
    if not isinstance(input_files, (list, tuple)):
        logger.debug(f"inputFile is not a list or tuple: {input_files}. Converting to list.")
        self._config["inputFile"] = [input_files]
        input_files = self._config["inputFile"]
    if not input_files:
        logger.error("inputFile list cannot be empty")
        raise ValueError("inputFile list cannot be empty")

    for file_path in input_files:
        if not os.path.exists(file_path):
            logger.error(f"Input file does not exist: {file_path}")
            raise ValueError(f"Input file does not exist: {file_path}")

    output_dir = self._config["outputDir"]
    if not isinstance(output_dir, str):
        logger.error("outputDir must be a string")
        raise ValueError("outputDir must be a string")
    if not output_dir:
        logger.error("outputDir is required")
        raise ValueError("outputDir is required")
    os.makedirs(output_dir, exist_ok=True)

    # TODO: Validate feature generator configuration
    valid_generators = {
        "Basic",
        "OverlappingPeptide",
        "PWM",
        "MHCflurry",
        "NetMHCpan",
        "NetMHCIIpan",
        "DeepLC",
        "SpectralSimilarity",
    }

    if self._config["featureGenerator"] is None:
        logger.warning("No feature generators specified.")
        self._config["featureGenerator"] = []
    for fg in self._config["featureGenerator"]:
        if fg["name"] not in valid_generators:
            logger.error(f"Invalid feature generator: {fg['name']}")
            raise ValueError(f"Invalid feature generator: {fg['name']}")

    valid_sources = valid_generators | {"Original", "ContigFeatures"}
    if self._config.get("experiments", None) is not None:  # experiment mode
        if not isinstance(self._config["experiments"], list):
            logger.error("experiments must be a list")
            raise ValueError("experiments must be a list")
        for exp in self._config["experiments"]:
            for source in exp.get("source", []):
                if source not in valid_sources:
                    logger.error(f"Invalid source in experiments: {source}")
                    raise ValueError(f"Invalid source in experiments: {source}")

setup_logging(level='INFO')

Source code in optimhc/cli.py
def setup_logging(level: str = "INFO") -> None:
    if level not in LOG_MAPPING:
        raise ValueError(f"Invalid log level: {level}")
    logging.basicConfig(
        level=LOG_MAPPING[level],
        format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
        force=True,
    )

    # mhctools attaches its own INFO-level handlers to its loggers
    # https://github.com/openvax/mhctools/blob/master/mhctools/logging.conf
    for name in [
        "mhctools",
        "mhctools.base_commandline_predictor",
        "mhctools.netmhc",
        "mhctools.netmhciipan",
        "mhctools.process_helpers",
        "mhctools.cleanup_context",
    ]:
        lg = logging.getLogger(name)
        lg.handlers.clear()
        lg.disabled = True
        lg.propagate = False
        lg.setLevel(logging.CRITICAL)

cli()

OptiMHC - A optimized rescoring pipeline for immunopeptidomics data.

Source code in optimhc/cli.py
@click.group()
@click.version_option(version=__version__, prog_name="optimhc")
def cli():
    """
    OptiMHC - A optimized rescoring pipeline for immunopeptidomics data.
    """
    pass

pipeline(config, inputtype, inputfile, decoyprefix, outputdir, visualization, numprocesses, allele, featuregenerator, loglevel, testfdr, model)

Run the optiMHC pipeline with the specified configuration.

Source code in optimhc/cli.py
@cli.command()
@click.option(
    "--config",
    type=click.Path(exists=True),
    help="Path to YAML configuration file",
)
@click.option(
    "--inputType",
    type=click.Choice(["pepxml", "pin"]),
    help="Type of input file",
)
@click.option(
    "--inputFile",
    type=click.Path(exists=True),
    multiple=True,
    help="Path(s) to input PSM file(s). Can be specified multiple times for multiple files.",
)
@click.option(
    "--decoyPrefix",
    type=str,
    help="Prefix used to identify decoy sequences",
)
@click.option(
    "--outputDir",
    type=click.Path(),
    help="Output directory",
)
@click.option(
    "--visualization/--no-visualization",
    is_flag=True,
    default=None,
    help="Enable/disable visualization",
)
@click.option(
    "--numProcesses",
    type=int,
    help="Number of parallel processes",
)
@click.option(
    "--allele",
    type=str,
    multiple=True,
    help="Allele(s) for which predictions will be computed",
)
@click.option(
    "--featureGenerator",
    type=str,
    multiple=True,
    help="Feature generator configuration in JSON format",
)
@click.option(
    "--logLevel",
    type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"]),
    help="Logging level",
)
@click.option(
    "--testFDR",
    type=float,
    help="FDR threshold for testing",
)
@click.option(
    "--model",
    type=click.Choice(["Percolator", "XGBoost", "RandomForest"]),
    help="Model to use for rescoring",
)
def pipeline(
    config,
    inputtype,
    inputfile,
    decoyprefix,
    outputdir,
    visualization,
    numprocesses,
    allele,
    featuregenerator,
    loglevel,
    testfdr,
    model,
):
    """Run the optiMHC pipeline with the specified configuration."""
    pipeline_config = Config(config) if config else Config()

    if inputtype:
        pipeline_config["inputType"] = inputtype
    if inputfile:
        pipeline_config["inputFile"] = list(inputfile)
    if decoyprefix:
        pipeline_config["decoyPrefix"] = decoyprefix
    if outputdir:
        pipeline_config["outputDir"] = outputdir
    if visualization is not None:
        pipeline_config["visualization"] = visualization
    if numprocesses:
        pipeline_config["numProcesses"] = numprocesses
    if allele:
        pipeline_config["allele"] = list(allele)
    if loglevel:
        pipeline_config["logLevel"] = loglevel
    if featuregenerator:
        feature_generators = []
        for fg in featuregenerator:
            try:
                fg_config = json.loads(fg)
                feature_generators.append(fg_config)
            except json.JSONDecodeError as e:
                raise click.BadParameter(f"Invalid JSON format for feature generator: {e}")
        pipeline_config["featureGenerator"] = feature_generators
    if testfdr:
        pipeline_config["rescore"]["testFDR"] = testfdr
    if model:
        pipeline_config["rescore"]["model"] = model

    setup_logging(pipeline_config["logLevel"])
    pipeline_config.validate()
    Pipeline(pipeline_config).run()

experiment(config)

Run multiple experiments with different feature combinations.

Source code in optimhc/cli.py
@cli.command()
@click.option(
    "--config",
    type=click.Path(exists=True),
    required=True,
    help="Path to YAML configuration file",
)
def experiment(config):
    """Run multiple experiments with different feature combinations."""
    pipeline_config = Config(config)
    setup_logging(pipeline_config["logLevel"])

    Pipeline(pipeline_config).run_experiments()