#
# This file is part of Sequana software
#
# Copyright (c) 2016-2021 - Sequana Dev Team (https://sequana.readthedocs.io)
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# Website: https://github.com/sequana/sequana
# Documentation: http://sequana.readthedocs.io
# Contributors: https://github.com/sequana/sequana/graphs/contributors
##############################################################################
import glob
import os
import re
import sys
import colorlog
from sequana_pipetools.misc import PipetoolsException
logger = colorlog.getLogger(__name__)
[docs]
class FileFactory:
"""Factory to handle a set of files
::
from sequana.snaketools import FileFactory
ff = FileFactory("H*.gz")
ff.filenames
A set of useful methods are available based on this convention::
>>> fullpath = /home/user/test/A.fastq.gz
>>> dirname(fullpath)
'/home/user/test'
>>> basename(fullpath)
'A.fastq.gz'
>>> realpath(fullpath) # is .., expanded to /home/user/test
>>> all_extensions
"fastq.gz"
>>> extensions
".gz"
A **basename** is the name of a directory in a Unix pathname that occurs
after the last slash.
dirname, returns everything but the final basename in a pathname. Both
.. versionchanged:: 0.8.7 attributes were recomputed at each accession. For
small projects, this is transparent, but on novogene or large set of samples,
this is taking too much time. This was done in case FileFactorry
attributes such as input directorty or pattern are changed. In practice this
does not happen, so we can write the basenames and other attributes in variables
once for all
Some files may be prefixed with a common name separated by a dot. For example,
with pacbio data you may have::
demultiplex.A.fastq.gz
demultiplex.B.fastq.gz
We remove the common prefixes automatically so that you end up with A and B as sample names.
"""
def __init__(self, pattern, extra_prefixes_to_strip=[], sample_pattern=None, exclude_pattern=None, **kwargs):
""".. rubric:: Constructor
:param pattern: can be a filename, list of filenames, or a global
pattern (a unix regular expression with wildcards). For instance,
``*/*fastq.gz``
:param extra_prefixes_to_strip: we automatically remove common prefixes.
However, you may have extra prefixes not common to all samples
that needs to be removed. Provide a list with extra_prefixes_to_strip
including trailing dot or not.
:param sample_pattern: if a sample pattern is provided, prefix are
not removed automatically. The sample_pattern must include the string
{sample} to define the expected sample name. For instance given
a filename A_sorted.fastq.gz where sorted appears in all sample
buy is not wished, use sample_pattern='{sample}_sorted.fastq.gz'
and your sample will be only 'A'.
"""
self.pattern = pattern
self.extra_prefixes_to_strip = extra_prefixes_to_strip
self.sample_pattern = sample_pattern
self.exclude_pattern = exclude_pattern
try:
if os.path.exists(pattern):
self._glob = [pattern]
else:
self._glob = glob.glob(pattern, recursive=True)
except TypeError:
# Error if pattern is a list of file
for filename in pattern:
if not os.path.exists(filename): # pragma: no cover
raise FileNotFoundError(f"This file {filename} does not exist")
self._glob = pattern[:]
# remove directories if they exist
self._glob = [x for x in self._glob if not os.path.isdir(x)]
# remove candidates that have the exclude pattern
if self.exclude_pattern: # pragma: no cover
self._glob = [x for x in self._glob if not self.exclude_pattern in x]
def _get_realpaths(self):
return [os.path.realpath(filename) for filename in self._glob]
realpaths = property(_get_realpaths, doc="the full path (e.g., /home/user/readme.txt)")
def _get_basenames(self):
return [os.path.split(filename)[1] for filename in self._glob]
basenames = property(_get_basenames, doc="the basename without the path (e.g. readme.txt)")
def _get_filenames(self):
# make sure there is a '.' at the end
prefixes_to_strip = []
# if only one sample, we do not alter the sample name.
# we just take the first word.
if len(self._glob) > 1 and not self.sample_pattern:
splitted = [x.split(".") for x in self.basenames]
min_split = min([len(x) for x in splitted])
for i in range(min_split):
S = set([x[i] for x in splitted])
if len(S) == 1:
# a redundant prefix to remove
prefixes_to_strip.append(S.pop() + ".")
elif len(S) == len(self._glob):
# we found different sample name; we can stop here
break
else: # pragma: no cover
# if we have a mix of names with duplicated names, this
# will be problem but users may provide a solution by
# providing other prefixes to remove so not exception here.
# Will be raised later if needed
pass
# Even though we now have unique samples names, some prefixes
# from the users can still be stripped from the left hand side
# we remove the dot (if it exists) and add one to be sure it is
# there.
prefixes_to_strip += [x.strip(".") + "." for x in self.extra_prefixes_to_strip]
def func(filename):
# if sample pattern is provided, use it in place of the prefixes
if self.sample_pattern:
prefix, suffix = self.sample_pattern.split("{sample}")
res = filename[:]
if filename.startswith(prefix) and filename.endswith(suffix):
res = res[len(prefix) : len(res) - len(suffix)]
else: # pragma: no cover
raise PipetoolsException(f"Your sample pattern does not match the filename {filename}")
else:
res = filename[:]
for prefix in prefixes_to_strip:
res = res[res.startswith(prefix) and len(prefix) :]
return res.split(".")[0]
filenames = [func(basename) for basename in self.basenames]
if len(set(filenames)) != len(self._glob):
raise PipetoolsException(
f"Your sample names do not seem to be unique. After removing some common prefixes ({prefixes_to_strip}), we end up with {filenames}"
)
return filenames
filenames = property(_get_filenames, doc="get basename without extension (e.g., readme)")
def _get_extensions(self):
return [os.path.splitext(filename)[1] for filename in self._glob]
extensions = property(_get_extensions, doc="get final extension ")
def _get_all_extensions(self):
return [basename.split(".", 1)[1] if "." in basename else "" for basename in self.basenames]
all_extensions = property(_get_all_extensions, doc=" get all trailing extensions")
def __len__(self):
return len(self.filenames)
def __str__(self):
return "Found %s file(s)" % len(self)
[docs]
class FastQFactory(FileFactory):
"""FastQ Factory tool
In NGS experiments, reads are stored in a so-called FastQ file. The file is
named::
PREFIX_R1_SUFFIX.fastq.gz
where _R1_ tag is always to be found. This is a single-ended case. In
paired case, a second file is to be found::
PREFIX_R2_SUFFIX.fastq.gz
The PREFIX indicates the sample name. The SUFFIX does not convey any
information per se. The default read tag ("_R[12]_") handle this case.
The following commands expect to find data with a readtag _R1_ or _R2_::
FastQFactory("*fastq.gz")
This behaviour can be changed if data have another read tags. (e.g. "[12].fastq.gz")::
FastQFactory("*fastq.gz", read_tag="_[12].")
Sometimes, e.g. in long reads experiments (for instance), naming convention is
different and may not be single/paired end convention. If so, set the
readtag to None.::
FastQFactory("*ccs.fastq.gz", read_tag=None)
In such case, the :attr:`paired` is set to False.
In a directory (recursively or not), there could be lots of samples. This
class can be used to get all the sample prefix in the :attr:`tags`
attribute.
Given a tag, one can get the corresponding file(s)::
ff = FastQFactory("*fastq.gz")
ff.tags
ff.get_file1(ff.tags[0])
len(ff)
"""
def __init__(
self,
pattern,
extension=["fq.gz", "fastq.gz"],
read_tag="_R[12]_",
extra_prefixes_to_strip=[],
sample_pattern=None,
exclude_pattern=None,
**kwargs,
):
r""".. rubric:: Constructor
:param str pattern: a global pattern (e.g., ``H*fastq.gz``)
:param list extension: not used
:param str read_tag: regex tag used to join paired end files. Some
characters need to be escaped with a \ character to be interpreted as
character. (e.g. '_R[12]_\.fastq\.gz')
:param extra_prefixes_to_strip: we automatically remove common prefixes.
However, you may have extra prefixes not common to all samples
that needs to be removed. Provide a list with extra_prefixes_to_strip
including trailing dot or not.
:param sample_pattern: if a sample pattern is provided, prefix are
not removed automatically. The sample_pattern must include the string
{sample} to define the expected sample name. For instance given
a filename A_sorted.fastq.gz where sorted appears in all sample
buy is not wished, use sample_pattern='{sample}_sorted.fastq.gz'
and your sample will be only 'A'.
"""
super(FastQFactory, self).__init__(
pattern,
extra_prefixes_to_strip=extra_prefixes_to_strip,
sample_pattern=sample_pattern,
exclude_pattern=exclude_pattern,
)
self.read_tag = read_tag
# Filter out reads that do not have the read_tag
# https://github.com/sequana/sequana/issues/480
if self.read_tag is None or self.read_tag.strip() == "":
self.read_tag = ""
if self.read_tag:
remaining = [filename for filename in self._glob if re.search(self.read_tag, os.path.basename(filename))]
if len(remaining) < len(self._glob):
diff = len(self._glob) - len(remaining)
logger.warning(f"Filtered out {diff} files that do not contain the read_tag ({self.read_tag})")
self._glob = remaining
# check if tag is informative
if self.read_tag != "":
if "[12]" not in self.read_tag:
msg = "the read_tag parameter must contain '[12]' to differentiate read 1 and 2."
logger.error(msg)
raise ValueError(msg)
elif self.read_tag == "[12]":
msg = "The read_tag parameter must be more informative than just have [12]"
logger.error(msg)
raise ValueError(msg)
if len(self.filenames) == 0:
msg = "No files found with the requested pattern ({}) and readtag ({}). If your data is not paired, set the readtag to empty string"
msg = msg.format(pattern, self.read_tag)
logger.error(msg)
raise ValueError(msg)
# save pattern and read tag
self.pattern = pattern
# If a user uses a . it should be taken into account hence the regex
# that follows
if self.read_tag:
re_read_tag = re.compile(self.read_tag.replace(r".", r"\."))
self.tags = list({re_read_tag.split(f)[0] for f in self.basenames})
else:
self.tags = list({f.split(".")[0] for f in self.basenames})
if len(self.tags) == 0:
msg = "No sample found. Tag '{0}' is not relevant".format(self.read_tag)
logger.error(msg)
raise ValueError(msg)
logger.info("Found %s projects/samples " % len(self.tags))
def _get_file(self, tag, r):
if tag is None:
if len(self.tags) == 1:
tag = self.tags[0]
elif len(self.tags) > 1:
raise ValueError("Ambiguous tag. You must provide one " "(sequana.FastQFactory)")
else:
assert tag in self.tags, "invalid tag"
# retrieve file of tag
if self.read_tag:
read_tag = self.read_tag.replace("[12]", r)
# changed in v0.8.7 tricky hanhling of sample names
# https://github.com/sequana/sequana/issues/576
candidates = [
realpath
for basename, realpath in zip(self.basenames, self.realpaths)
if read_tag in basename and tag == basename.split(read_tag)[0]
]
else:
read_tag = ""
candidates = [
realpath for basename, realpath in zip(self.basenames, self.realpaths) if basename.startswith(tag)
]
if len(candidates) == 0 and r == "2":
# assuming there is no R2
return None
elif len(candidates) == 1:
return candidates[0]
elif len(candidates) == 0:
msg = "Found no valid matches for {}. "
msg += "Files must have the tag ({}) and an underscore somewhere"
msg = msg.format(tag, read_tag)
logger.critical(msg)
raise Exception
else:
logger.critical("Found too many candidates: %s " % candidates)
msg = "Found too many candidates or identical names: %s " % candidates
msg += "Files must have the tag %s" % read_tag
raise ValueError(msg)
[docs]
def get_file1(self, tag=None):
return self._get_file(tag, "1")
[docs]
def get_file2(self, tag=None):
return self._get_file(tag, "2")
def _get_paired(self):
# If there is no read_tag, this means data is unpaired
if self.read_tag == "":
return False
# change [12] regex
rt1 = self.read_tag.replace("[12]", "1")
rt2 = self.read_tag.replace("[12]", "2")
# count number of occurences
# Note1: the filenames do not have the extension and therefore the final "."
# Note2: the tag _1 and _2 may be used at the end of the filename.
# and are not robust enough. Indeed, sample names may contain the
# _1 or _2 strings. Yet, usually, if used, the _1 and _2 are at
# the end of the filename. Thereofre the read "1." and "_2." can
# be used robustly.
# Given Note1 and Note2, we decided to count the R1 and R2 tag by
# appending the "." in the variable 'this' so that if used in the
# read_tag it can be match
R1 = [1 for this in self.filenames if rt1 in this + "."]
R2 = [1 for this in self.filenames if rt2 in this + "."]
if len(R2) == 0:
return False
elif R1 == R2:
return True
else: # pragma: no cover
logger.error("Mix of paired and single-end data sets {}".format(self.pattern))
sys.exit(1)
paired = property(_get_paired, doc="guess whether data is paired or not")
def __len__(self):
return len(self.tags)