Source code for sequana_pipetools.snaketools.pipeline_manager

#
#  This file is part of Sequana software
#
#  Copyright (c) 2016-2021 - Sequana Dev Team (https://sequana.readthedocs.io)
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  Website:       https://github.com/sequana/sequana
#  Documentation: http://sequana.readthedocs.io
#  Contributors:  https://github.com/sequana/sequana/graphs/contributors
##############################################################################
import importlib
import os
import shutil

import colorlog

from sequana_pipetools import get_package_version
from sequana_pipetools.misc import PipetoolsException
from sequana_pipetools.snaketools.errors import PipeError
from sequana_pipetools.snaketools.slurm import SlurmStats

from .file_factory import FastQFactory, FileFactory
from .module import Pipeline
from .pipeline_utils import OnSuccessCleaner
from .sequana_config import SequanaConfig

CITATION_MESSAGE = (
    "📚 Found Sequana useful? Please cite us:\n"
    "\tCokelaer et al. 'Sequana': a Set of Snakemake NGS pipelines,\n"
    "\tJournal of Open Source Software, 2(16), 352 (2017)\n"
    "\tDOI: https://doi.org/10.21105/joss.00352\n"
    "â„šī¸  More info: https://sequana.readthedocs.io"
)

logger = colorlog.getLogger(__name__)


[docs] def get_shell(tool_path: str, version: str) -> str: """Return a shell command string from the sequana_wrappers shell library. :param tool_path: slash-separated tool/command path, e.g. ``"bwa/align"`` :param version: shell command version, e.g. ``"v1"``, or ``"dev"`` for the development (unreleased) version. Example usage in a pipeline rules file:: shell: manager.get_shell("bwa/align", "v1") """ parts = tool_path.split("/") module_path = ".".join(["sequana_wrappers", "shells"] + parts + [version, "cmd"]) try: return importlib.import_module(module_path).CMD except ModuleNotFoundError: raise ModuleNotFoundError( f"Shell command version '{version}' not found: '{module_path}'.\n" f"Available versions are the subdirectories under " f"sequana_wrappers/shells/{'/'.join(parts)}/.\n" "Use version='dev' for the development version." )
[docs] def get_run(tool_path: str, version: str): """Return a Python callable from the sequana_wrappers snippets library. The returned callable has the signature ``execute(snakemake)`` and is intended for use inside Snakemake ``run:`` blocks. :param tool_path: slash-separated tool/command path, e.g. ``"rulegraph/run"`` :param version: snippet version, e.g. ``"v1"``, or ``"dev"`` for the development (unreleased) version. Example usage in a pipeline rules file:: run: manager.get_run("rulegraph/run", "v1")(snakemake) """ parts = tool_path.split("/") module_path = ".".join(["sequana_wrappers", "snippets"] + parts + [version, "code"]) try: return importlib.import_module(module_path).execute except ModuleNotFoundError: raise ModuleNotFoundError( f"Snippet version '{version}' not found: '{module_path}'.\n" f"Available versions are the subdirectories under " f"sequana_wrappers/snippets/{'/'.join(parts)}/.\n" "Use version='dev' for the development version." )
[docs] class PipelineManagerBase: """ For all files except FastQ, please use this class instead of PipelineManager. """ def __init__(self, name, config, schema=None, matplotlib_backend="Agg"): # Make sure the config is valid self.name = name cfg = SequanaConfig(config) if schema: if not cfg.check_config_with_schema(schema): raise PipetoolsException("Please check the config file, some mandatory values are missing.") # Populate the config with additional information self._sequana_config = cfg self.config = cfg.config self.config.pipeline_name = name self.matplotlib_backend = matplotlib_backend # some common choices self.sample = "{sample}" self.basename = "{sample}/%s/{sample}" self.setup()
[docs] def getmetadata(self): data = {} data["name"] = self.name data["rulegraph"] = ".sequana/rulegraph.svg" data["sequana_wrappers"] = self.config.get("sequana_wrappers", "latest") pipeline = importlib.import_module(f"sequana_pipelines.{self.name}") data["pipeline_version"] = pipeline.version return data
[docs] def get_shell(self, tool_path: str, version: str) -> str: """Return a shell command string from the sequana_wrappers shell library. Delegates to :func:`sequana_wrappers.get_shell`. Using the manager method avoids a separate import in pipeline rules files — ``manager`` is already available everywhere. :param tool_path: slash-separated tool/command path, e.g. ``"bwa/align"`` :param version: shell command version, e.g. ``"v1"`` Example:: shell: manager.get_shell("bwa/align", "v1") """ return get_shell(tool_path, version)
[docs] def get_run(self, tool_path: str, version: str): """Return a Python callable from the sequana_wrappers snippets library. Delegates to :func:`sequana_wrappers.get_run`. The returned callable has the signature ``execute(snakemake)`` and is intended for use inside Snakemake ``run:`` blocks. :param tool_path: slash-separated tool/command path, e.g. ``"rulegraph/run"`` :param version: snippet version, e.g. ``"v1"`` Example:: run: manager.get_run("rulegraph/run", "v1")(snakemake) """ return get_run(tool_path, version)
[docs] def error(self, msg): msg = ( f"{msg}\nPlease check the content of your config file. " "It should have those 3 key/value pairs in config.yaml (adapt to your needs):" "\n" "- input_directory: path_to_find_input_files\n" '- input_readtag: "_R[12]_"\n' '- input_pattern: "*.fastq.gz"\n' "\n" "You must set an input_directory key and an input_pattern key so that files can be found. \n" "You may omit input_directory but input_pattern must then correspond to a valid file pattern.\n" "You must also set input_readtag to a valid read tag for Illumina data (typically _R[12]_ or _[12]. ; not the trailing dot).\n" "If you are not analysing Illumina paired data (e.g., nanopore), let the input_readtag field empty." ) raise PipetoolsException(msg)
[docs] def getrawdata(self): """Return list of raw data A list of files is returned (one or two files) otherwise, a function compatible with snakemake is returned. This function contains a wildcard to each of the samples found by the manager. """ if not self.samples: # pragma: no cover raise ValueError( "Define the samples attribute as a dictionary with" " sample names as keys and the corresponding location as values." ) return lambda wildcards: self.samples[wildcards.sample]
[docs] def setup(self): """ 90% of the errors come from the fact that users did not set a matplotlib backend properly. In the setup() function, we set the backend to Agg on purpose. One can set this parameter to None to avoid this behaviour """ # The rulegraph wrapper expected manager.snakefile, which is defined here # Note also that the snakefile path when called from the rulegraph directory # has an extra 'rulegraph' in the path that is removed here. self._snakefile = os.path.abspath(self.name + ".rules").replace("rulegraph/", "") try: # check requirements if possible. This is the standalone application # requirements, not the files possibly provided in the config file Pipeline(self.name).check("warning") except ValueError: pass if self.matplotlib_backend: import matplotlib as mpl mpl.use(self.matplotlib_backend)
@property def snakefile(self): return self._snakefile
[docs] def clean_multiqc(self, filename): with open(filename, "r") as fin: with open(filename + "_tmp_", "w") as fout: line = fin.readline() while line: if """<a href="http://multiqc.info" target="_blank">""" in line: line = fin.readline() # read the image line = fin.readline() # read the ending </a> tag else: fout.write(line) line = fin.readline() # read the next line shutil.move(filename + "_tmp_", filename)
[docs] def onerror(self): """Try to report error from slurm""" try: p = PipeError(self.name) p.status() print( f"\nIf you encountered an error using sequana_{self.name}, please copy paste the above message and create a New Issue on https://github.com/sequana/{self.name}/issues" ) except Exception: # pragma: no cover pass try: from pathlib import Path from rich.console import Console from rich.panel import Panel from sequana_pipetools.diagnose import _sequana_tips console = Console() tips = _sequana_tips("", Path(".")) # strip the leading "---" separator used when appending to LLM output tips = tips.lstrip("\n-").strip() console.print(Panel(tips, title="💡 Sequana tips", border_style="bold green", padding=(1, 2))) except Exception: # pragma: no cover pass
[docs] def onsuccess(self): """Print a success message, pointing to summary.html if present.""" try: from pathlib import Path from rich.console import Console from rich.panel import Panel console = Console() summary = Path(".") / "summary.html" if summary.exists(): console.print( Panel( f"Open the summary report: [cyan]{summary.resolve()}[/cyan]", title="✅ Pipeline completed", border_style="bold green", padding=(1, 2), ) ) except Exception: # pragma: no cover pass
[docs] def teardown(self, extra_dirs_to_remove=None, extra_files_to_remove=None, outdir="."): # add a Makefile cleaner = OnSuccessCleaner(self.name, outdir=outdir) cleaner.directories_to_remove.extend(extra_dirs_to_remove or []) cleaner.files_to_remove.extend(extra_files_to_remove or []) cleaner.add_makefile() # create the version file given the requirements if os.path.exists(f"{outdir}/.sequana/tools.txt"): with open(f"{outdir}/.sequana/tools.txt", "r") as fin: deps = fin.readlines() with open(f"{outdir}/.sequana/versions.txt", "w") as fout: from versionix.parser import get_version for dep in deps: dep = dep.strip() try: version = get_version(dep, verbose=False) except (KeyError, SystemExit): version = "unknown" fout.write(f"{dep}\t{version}\n") from rich.console import Console from rich.panel import Panel console = Console() if os.path.exists("summary.html"): console.print("\n✅ Another successful analysis. Open summary.html in your browser. Have fun.") else: console.print("\n✅ Another successful analysis. Have fun.") console.print(Panel(CITATION_MESSAGE, title="Citation", border_style="bold cyan", padding=(1, 2))) # for HPC with slurm only try: slurm_stats = SlurmStats(outdir) slurm_stats.to_csv(f"{outdir}/.sequana/slurm_stats.txt") except Exception: pass
[docs] def get_html_summary(self, float="left", width=30): import pandas as pd vers = get_package_version(f"sequana_{self.name}") data = {"samples": len(self.samples), f"sequana_{self.name}_version": vers} try: data["paired"] = self.paired except AttributeError: # pragma: no cover pass df_general = pd.DataFrame( data, index=["summary"], ) htmltable = df_general.T.to_html() contents = """<div style="float:{}; width:{}%">{}</div>""".format(float, width, htmltable) return contents
[docs] class PipelineManagerDirectory(PipelineManagerBase): """Most generic pipeline manager Only checks for valid config file and its schema For all files except FastQ, please use this class instead of PipelineManager. """ def __init__(self, name, config, schema=None): super().__init__(name, config, schema) self.wrappers = self.config.get("sequana_wrappers", "main")
[docs] class PipelineManager(PipelineManagerBase): """Utility to manage easily the snakemake pipeline including input files Inside a snakefile, use it as follows:: from sequana_pipetools import PipelineManager manager = PipelineManager("pipeline_name", "config.yaml") This will expect some specific fields in the config file:: - input_directory: path_to_find_input_files - input_readtag: "_R[12]_" - input_pattern: "*.fastq.gz" and optional option: - exclude_pattern: You may omit the input_readtag, which is not required for non-paired data. For instance for pacbio and nanopore files, there are not paired and the read tag is not required. Instead, if you are dealing with Illumina/MGI data sets, you must provide this field IF AND ONLY IF you want your data to be processed as paired data (or single end). See later for more details. You may omit the input_directory but then the input_pattern must match files to be found locally. Behind the scene, the :class:`~sequana_pipetools.snaketools.file_factory.FileFactory` or :class:`~sequana_pipetools.snaketools.file_factory.FastQFactory` will provide the sample names and their tags in :attr:`samples` where tag are extracted from the sample names where read tags are removed (if required). The manager tells you if the samples are paired or not assuming all samples are homogeneous (either all paired or all single-ended) and a user read_tag that can discrimate the sample name unambigously. In Sequencing data, the sequences are stored in one file (single-ended) data or in two files (paired-data). In both cases, most common sequencers will append a so-called read-tag to identify the first and second file. Traditionnally, e.g., with illumina sequencers the read tag are _R1_ and _R2_ or a trailing _1 and _2 Note that samples names have sometimes this tag included. Consider e.g. `sample_replicate_1_R1_.fastq.gz` or `sample_replicate_1_1.fastq.gz` then you can imagine that it is tricky to handle. The sample names are extracted by cutting filenames on the first dot that is encoutered (before extension presumably). For instance the sample name for the file:: A.fastq.gz will be **A**. sometimes, you may have ambiguous names. For instance, they may start with a common prefix. Considere these two files:: demultiplex.A.fastq.gz demultiplex.B.fastq.gz They would both have the same sample name. So, we remove all trailing prefixes that are common to all files. Therefore, our sample names are as expected:: A B If extra prefixes not common to all samples are present and you want to remove them, it is still possible using a field in the config file called extra_prefixes_to_strip. For instance with :: demultiplex.A.fastq.gz demultiplex.mess.B.fastq.gz your sample names will be A and mess. You can set this pair key:value in the config file:: extra_prefixes_to_strip: ["mess"] and get *A* and *B* as sample names. If you have specific wishes to create sample names from the filenames, you may provide a function with the :attr:`sample_func` parameter. If so, you must provide the input_directory and input_pattern to identify the files to process. For instance, in the sequana_fastqc pipeline, you set the input_directory and input_pattern and use this function to extract the sample names :: from sequana_pipetools import SequanaManager def func(filename): return filename.split("/")[-1].split('.', 1)[0] pm = SequanaManager("fastqc", "config.yaml", sample_func=func) The manager can then easily access to the data with a :class:`FastQFactory` instance:: manager.ff.filenames Finally, you can also define a sample pattern using a simple syntax such as **demultiplex.{sample}.fastq.gz** and you can define this in your config file using:: sample_pattern: "demultiplex.{sample}.fastq.gz" in which case, no prefixes are removed. """ def __init__( self, name: str, config: str, schema=None, sample_func=None, extra_prefixes_to_strip=None, sample_pattern=None, exclude_pattern=None, **kwargs, ): """.. rubric:: Constructor :param name: name of the pipeline :param config: name of a configuration file :param str schema: YAML file to validate the config file :param sample_func: a user-defined function that extract sample names from filenames :param extra_prefixes_to_strip: we automatically remove common prefixes. However, you may have extra prefixes not common to all samples that needs to be removed. Provide a list with extra_prefixes_to_strip including trailing dot or not. :param sample_pattern: if a sample pattern is provided, prefix are not removed automatically. The sample_pattern must include the string {sample} to define the expected sample name. For instance given a filename A_sorted.fastq.gz where sorted appears in all sample buy is not wished, use sample_pattern='{sample}_sorted.fastq.gz' and your sample will be only 'A'. """ super().__init__(name, config, schema) cfg = self._sequana_config cfg.config.pipeline_name = self.name # can be provided in the config file or arguments self.sample_pattern = cfg.config.get("sample_pattern", sample_pattern) self.extra_prefixes_to_strip = cfg.config.get("extra_prefixes_to_strip", extra_prefixes_to_strip or []) self.exclude_pattern = cfg.config.get("exclude_pattern", exclude_pattern) # if input_directory is not filled, the input_pattern, if valid, will be used instead and must # be provided anyway. if "input_pattern" not in cfg.config: self.error("The PipelineManager expect the field 'input_pattern' to be in your config file") # uses the provided sequana_wrappers version if any, otherwise use the main branch of the wrappers if "sequana_wrappers" in cfg.config: logger.warning( "The sequana_wrappers parameter in the config file is deprecated and will be removed in a future version" ) self.wrappers = cfg.config.get("sequana_wrappers", "main") readtag = cfg.config.get("input_readtag", None) use_fastq_factory = True if readtag else False # input_pattern is required. input_directory may be optional # do we have an input_directory ? If so, input_pattern is required if "input_directory" in cfg.config and cfg.config.input_directory: directory = cfg.config.input_directory.strip() if not os.path.isdir(directory): self.error(f"The ({directory}) directory does not exist.") if cfg.config.input_pattern: glob_dir = directory + os.sep + cfg.config.input_pattern # otherwise, the input_pattern must be provided (checked above) and valid elif cfg.config.input_pattern: glob_dir = cfg.config.input_pattern # if user set the sample func, no need for fileFactory # The config uses the input_directory and input_pattern (compulsary). if sample_func: logger.info("Using sample_func function to get sample names as provided by the pipeline/user") path = cfg.config.input_directory pattern = cfg.config.input_pattern if path.strip(): pattern = path + os.sep + pattern self._get_any_files(pattern) # Here, we overwrite the sample name definition from sequana_pipetools to use the # function provided by the user. self.samples = {sample_func(filename): filename for filename in self.ff.realpaths} logger.info(f"Found {len(self.samples)} samples") elif use_fastq_factory: logger.info(f"Using FastQFactory (readtag {readtag})") self._get_fastq_files(glob_dir, cfg.config.input_readtag) if self.paired: logger.info("Paired data found") else: logger.info("Using FileFactory (no readtag)") self._get_any_files(glob_dir) logger.info(f"Found {len(self.samples)} samples") if not self.ff.filenames: self.error(f"No files were found with pattern {glob_dir} and read tag {readtag}.") # finally, keep track of the config file self.config = cfg.config def _get_paired(self): try: return self.ff.paired except AttributeError: return False paired = property(_get_paired) def _get_fastq_files(self, glob_dir, read_tag): """ """ self.ff = FastQFactory( glob_dir, read_tag=read_tag, extra_prefixes_to_strip=self.extra_prefixes_to_strip, sample_pattern=self.sample_pattern, exclude_pattern=self.exclude_pattern, ) # check whether it is paired or not. This is just to raise an error when # there is inconsistent mix of R1 and R2 _ = self.paired # samples contains a correspondance between the sample name and the # real filename location. self.samples = { tag: [self.ff.get_file1(tag), self.ff.get_file2(tag)] if self.ff.get_file2(tag) else [self.ff.get_file1(tag)] for tag in self.ff.tags } if len(self.ff.tags) == 0: raise ValueError( "Could not find Fastq files files with valid format " "(NAME_R1_<SUFFIX>.fastq.gz where <SUFFIX> is " "optional" ) def _get_any_files(self, pattern): self.ff = FileFactory( pattern, extra_prefixes_to_strip=self.extra_prefixes_to_strip, sample_pattern=self.sample_pattern, exclude_pattern=self.exclude_pattern, ) # samples contains a correspondance between the sample name and the # real filename location. self.samples = {tag: fl for tag, fl in zip(self.ff.filenames, self.ff.realpaths)}
[docs] def getrawdata(self, name="sample"): """Return list of raw data This function contains a wildcard to each of the samples found by the manager. """ return lambda wildcards: self.samples[getattr(wildcards, name)]