cirro.helpers.preprocess_dataset

  1import json
  2import logging
  3import os
  4import warnings
  5from io import StringIO
  6from pathlib import Path
  7from typing import TYPE_CHECKING, Union, Optional
  8
  9import boto3
 10
 11if TYPE_CHECKING:
 12    from pandas import DataFrame
 13
 14from cirro.models.s3_path import S3Path
 15
 16logger = logging.getLogger(__name__)
 17
 18
 19def _fix_s3_path(path: str) -> str:
 20    """
 21    Fix the S3 path to ensure it starts with 's3://'.
 22    """
 23    normalized_path = path.replace(os.sep, '/').strip()
 24    if normalized_path.startswith("s3:/") and not normalized_path.startswith("s3://"):
 25        return normalized_path.replace("s3:/", "s3://", 1)
 26    return path
 27
 28
 29def write_json(dat, local_path: str, indent=4):
 30    """Write a JSON object to a local file."""
 31    with Path(local_path).open(mode="wt") as handle:
 32        return json.dump(dat, handle, indent=indent)
 33
 34
 35def read_csv(path: str, required_columns=None) -> 'DataFrame':
 36    """Read a CSV from the dataset and check for any required columns."""
 37    if required_columns is None:
 38        required_columns = []
 39    path = _fix_s3_path(path)
 40    import pandas as pd
 41    df = pd.read_csv(path)
 42    for col in required_columns:
 43        assert col in df.columns.values, f"Did not find expected columns {col} in {path}"
 44    return df
 45
 46
 47def read_json(path: str):
 48    """Read a JSON object from a local file or S3 path."""
 49    path = _fix_s3_path(path)
 50    s3_path = S3Path(path)
 51
 52    if s3_path.valid:
 53        s3 = boto3.client('s3')
 54        retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key)
 55        text = retr['Body'].read().decode()
 56    else:
 57        with Path(path).open() as handle:
 58            text = handle.read()
 59
 60    # Parse JSON
 61    return json.loads(text)
 62
 63
 64class PreprocessDataset:
 65    """
 66    Helper functions for performing preparatory tasks immediately before launching
 67    the analysis workflow for a dataset.
 68
 69    For use in the `preprocess.py` script.
 70    More info: https://docs.cirro.bio/pipelines/preprocess-script/
 71    """
 72    samplesheet: 'DataFrame'
 73    """
 74    A pandas DataFrame containing all of the metadata assigned to the samples present
 75    in the input datasets (at the time of analysis).
 76
 77    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet
 78    """
 79    files: 'DataFrame'
 80    """
 81    A DataFrame containing information on the files contained in the input datasets,
 82    and the sample that each file is assigned to.
 83
 84    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles
 85    """
 86    params: dict
 87    """
 88    A dictionary with all of the parameter values populated by user input
 89    using the process-form.json and process-input.json configurations.
 90
 91    This is read-only, use `add_param` to add new parameters or `remove_param` to remove them.
 92
 93    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams
 94    """
 95    metadata: dict
 96    """
 97    Detailed information about the dataset at the time of analysis,
 98    including the project, process, and input datasets.
 99
100    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata
101    """
102    dataset_root: str
103    """
104    Base path to the dataset
105    """
106
107    _PARAMS_FILE = "params.json"
108    _REFERENCES_BASE = "s3://pubweb-references"
109
110    def __init__(self,
111                 samplesheet: Union['DataFrame', str, Path],
112                 files: Union['DataFrame', str, Path],
113                 params: dict = None,
114                 metadata: dict = None,
115                 dataset_root: str = None):
116        import pandas as pd
117        # Convert DataFrame to string if necessary
118        if isinstance(samplesheet, str):
119            samplesheet = pd.read_csv(StringIO(samplesheet))
120        if isinstance(samplesheet, Path):
121            samplesheet = read_csv(str(samplesheet))
122        if isinstance(files, str):
123            files = pd.read_csv(StringIO(files))
124        if isinstance(files, Path):
125            files = read_csv(str(files))
126        if params is None:
127            params = {}
128        if metadata is None:
129            metadata = {}
130
131        self.samplesheet = samplesheet
132        self.files = files
133        self.params = params
134        self.metadata = metadata
135        self.dataset_root = dataset_root
136        self.logger = logger
137
138    @classmethod
139    def from_path(cls, dataset_root: str, config_directory='config'):
140        """
141        Creates an instance from a path
142        (useful for testing or when running the script outside Cirro)
143        """
144        config_directory = Path(dataset_root, config_directory)
145
146        files = read_csv(
147            str(Path(config_directory, "files.csv")),
148            required_columns=["sample", "file"]
149        )
150
151        samplesheet = read_csv(
152            str(Path(config_directory, "samplesheet.csv")),
153            required_columns=["sample"]
154        )
155
156        params = read_json(
157            str(Path(config_directory, "params.json")),
158        )
159
160        metadata = read_json(
161            str(Path(config_directory, "metadata.json")),
162        )
163
164        return cls(files=files,
165                   samplesheet=samplesheet,
166                   params=params,
167                   metadata=metadata,
168                   dataset_root=dataset_root)
169
170    @classmethod
171    def from_running(cls):
172        """
173        Creates an instance from the currently running dataset
174        (expected to be called from inside a Cirro analysis process)
175        """
176        logging.basicConfig(level=logging.INFO,
177                            format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s')
178        dataset_path = os.getenv("PW_S3_DATASET")
179        return cls.from_path(dataset_path)
180
181    @property
182    def references_base(self):
183        """
184        Returns the base URL for references.
185        This is used to access public references in the Cirro system.
186        """
187        return self._REFERENCES_BASE
188
189    def log(self):
190        """Print logging messages about the dataset."""
191        logger.info(f"Storage location for dataset: {self.dataset_root}")
192        logger.info(f"Number of files in dataset: {self.files.shape[0]:,}")
193        logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}")
194
195    def add_param(self, name: str, value, overwrite=False, log=True):
196        """Add a parameter to the dataset."""
197
198        assert overwrite or name not in self.params, \
199            f"Cannot add parameter {name}, already exists (and overwrite=False)"
200
201        if log:
202            logger.info(f"Adding parameter {name} = {value}")
203        self.params[name] = value
204
205        if log:
206            logger.info("Saving parameters")
207        write_json(self.params, self._PARAMS_FILE)
208
209    def remove_param(self, name: str, force=False):
210        """Remove a parameter from the dataset."""
211
212        assert force or name in self.params, \
213            f"Cannot remove parameter {name}, does not exist (and force=False)"
214
215        logger.info(f"Removing parameter {name}")
216        if name in self.params:
217            del self.params[name]
218
219        logger.info("Saving parameters")
220        write_json(self.params, self._PARAMS_FILE)
221
222    def keep_params(self, params_to_keep: list[str]):
223        """Keep only the specified parameters in the dataset."""
224        logger.info(f"Keeping parameters: {params_to_keep}")
225        self.params = {
226            k: v for k, v in self.params.items()
227            if k in params_to_keep
228        }
229        write_json(self.params, self._PARAMS_FILE)
230
231    def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"):
232        """Replace all instances of a text string in the compute config file."""
233
234        assert os.path.exists(fp), f"File does not exist: {fp}"
235        with open(fp, 'r') as handle:
236            compute = handle.read()
237        n = len(compute.split(from_str)) - 1
238        logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}")
239        compute = compute.replace(from_str, to_str)
240        with open(fp, 'wt') as handle:
241            handle.write(compute)
242
243    def pivot_samplesheet(
244            self,
245            index=None,
246            pivot_columns: Union[Optional[str], list[str]] = 'read',
247            metadata_columns: list[str] = None,
248            column_prefix: str = "fastq_",
249            file_filter_predicate: str = None
250    ):
251        """
252        Combines data from both the samples and files table into a wide format with
253        each sample on a row and each file in a column.
254        The file column indexes are created by default from the `read` column, but can be customized.
255
256        For example, if the `files` table has columns `sample`, `read`, and `file`,
257        and the `samplesheet` has columns `sample`, `status`, and `group`, the output
258        will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`.
259
260        Args:
261            index: List[str], used to make the frames new index, defaults to
262             `["sampleIndex", "sample", "lane"]`
263            pivot_columns: str or List[str], columns to pivot on and create the new column,
264             defaults to 'read'. This effectively makes the column `<column_prefix><read>'.
265             If the column is not defined or not present, the pivot column will be generated
266             from the file number index.
267            metadata_columns: List[str], metadata columns to include in the output,
268             defaults to all columns that are available from the sample metadata.
269             If your pipeline doesn't like extra columns, make sure to specify the allowed columns here.
270            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
271            file_filter_predicate: str, optional, a pandas query string to filter the files table.
272             A common use case would be to filter out indexed reads, e.g. `readType == "R"`.
273
274        Returns:
275            DataFrame: A wide-format sample sheet with the specified columns pivoted.
276        """
277        import pandas as pd
278
279        pivoted_files = self.pivot_files(index=index,
280                                         pivot_columns=pivot_columns,
281                                         column_prefix=column_prefix,
282                                         file_filter_predicate=file_filter_predicate)
283        combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many")
284
285        # Default to keeping all columns
286        if metadata_columns is None:
287            metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist()
288
289        # Keep only the specified metadata columns
290        all_columns = combined.axes[1]
291        for column in all_columns:
292            if (column not in metadata_columns
293                    # These columns are required, never drop them
294                    and column_prefix not in column
295                    and 'sample' != column):
296                combined = combined.drop(columns=[column])
297
298        return combined
299
300    def pivot_files(
301            self,
302            index: list[str] = None,
303            pivot_columns: Union[str, list[str]] = 'read',
304            column_prefix: str = "fastq_",
305            file_filter_predicate: str = None
306    ):
307        """
308        Format the files table into a wide format with each sample on a row
309        and each file in a column. The column indexes are created by default
310        from the `read` column, but can be customized. This is useful for
311        paired-end sequencing data where you want to have the columns
312        `sample`, `fastq_1`, and `fastq_2` as the output.
313
314        Args:
315            index: List[str], used to make the frames new index, defaults to
316            pivot_columns: str or List[str], columns to pivot on and create the new column,
317             defaults to 'read'. This effectively makes the column `<column_prefix><read>`
318            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
319            file_filter_predicate: str, optional, a pandas query string to filter the files table.
320
321        Returns:
322            DataFrame: A wide-format sample sheet with the specified columns pivoted.
323        """
324        if index is None:
325            index = ["sampleIndex", "sample", "lane"]
326        logger.info("Formatting a wide files table")
327        logger.info("File table (long)")
328        logger.info(self.files.head().to_csv(index=False))
329
330        files = self.files
331
332        if file_filter_predicate is not None:
333            # Filter the files table based on the predicate
334            files = files.query(file_filter_predicate)
335
336        # If we don't have access to the column defined, just use the file number
337        # By default this is 'read' but the data might not be paired
338        pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0
339        if not pivot_columns_defined or pivot_columns not in files.columns.values:
340            logger.warning("Pivot column not found, grouping by sample instead.")
341            files['file_num'] = files.groupby('sample').cumcount() + 1
342            pivot_columns = 'file_num'
343
344        if isinstance(pivot_columns, str):
345            pivot_columns = [pivot_columns]
346
347        assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table"
348        assert 'file' in files.columns.values, "Column 'file' must be present in the file table"
349        assert isinstance(index, list), f"index must be a list (not {type(index)})"
350
351        # Get the list of columns from the inputs
352        input_columns = files.columns.values
353
354        # Format as a wide dataset
355        # Note that all the columns in `index` will be added if they are not already present
356        wide_df = files.reindex(
357            columns=index + pivot_columns + ['file']
358        )
359        wide_df = wide_df.pivot(
360            index=index,
361            columns=pivot_columns,
362            values='file'
363        )
364        # Rename the columns to have a prefix, e.g. 'fastq_'
365        wide_df = wide_df.rename(
366            columns=lambda i: f"{column_prefix}{int(i)}"
367        )
368        wide_df = wide_df.reset_index()
369
370        # Remove any columns from the output which were added from `index`
371        for cname in index:
372            if cname not in input_columns:
373                wide_df = wide_df.drop(columns=[cname])
374        # Remove any extra unnecessary columns
375        wide_df = wide_df.drop(columns=pivot_columns, errors='ignore')
376        return wide_df
377
378    def wide_samplesheet(
379            self,
380            index=None,
381            columns='read',
382            values="file",  # noqa
383            column_prefix="fastq_"
384    ):
385        """
386        Format the samplesheet into a wide format with each sample on a row
387
388        This is a legacy method, please use `pivot_samplesheet` instead.
389        """
390        warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.",
391                      DeprecationWarning, stacklevel=2)
392        if values != "file":
393            raise ValueError("The only supported value for `values` is 'file'")
394        return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)
logger = <Logger cirro.helpers.preprocess_dataset (WARNING)>
def write_json(dat, local_path: str, indent=4):
30def write_json(dat, local_path: str, indent=4):
31    """Write a JSON object to a local file."""
32    with Path(local_path).open(mode="wt") as handle:
33        return json.dump(dat, handle, indent=indent)

Write a JSON object to a local file.

def read_csv(path: str, required_columns=None) -> pandas.core.frame.DataFrame:
36def read_csv(path: str, required_columns=None) -> 'DataFrame':
37    """Read a CSV from the dataset and check for any required columns."""
38    if required_columns is None:
39        required_columns = []
40    path = _fix_s3_path(path)
41    import pandas as pd
42    df = pd.read_csv(path)
43    for col in required_columns:
44        assert col in df.columns.values, f"Did not find expected columns {col} in {path}"
45    return df

Read a CSV from the dataset and check for any required columns.

def read_json(path: str):
48def read_json(path: str):
49    """Read a JSON object from a local file or S3 path."""
50    path = _fix_s3_path(path)
51    s3_path = S3Path(path)
52
53    if s3_path.valid:
54        s3 = boto3.client('s3')
55        retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key)
56        text = retr['Body'].read().decode()
57    else:
58        with Path(path).open() as handle:
59            text = handle.read()
60
61    # Parse JSON
62    return json.loads(text)

Read a JSON object from a local file or S3 path.

class PreprocessDataset:
 65class PreprocessDataset:
 66    """
 67    Helper functions for performing preparatory tasks immediately before launching
 68    the analysis workflow for a dataset.
 69
 70    For use in the `preprocess.py` script.
 71    More info: https://docs.cirro.bio/pipelines/preprocess-script/
 72    """
 73    samplesheet: 'DataFrame'
 74    """
 75    A pandas DataFrame containing all of the metadata assigned to the samples present
 76    in the input datasets (at the time of analysis).
 77
 78    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet
 79    """
 80    files: 'DataFrame'
 81    """
 82    A DataFrame containing information on the files contained in the input datasets,
 83    and the sample that each file is assigned to.
 84
 85    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles
 86    """
 87    params: dict
 88    """
 89    A dictionary with all of the parameter values populated by user input
 90    using the process-form.json and process-input.json configurations.
 91
 92    This is read-only, use `add_param` to add new parameters or `remove_param` to remove them.
 93
 94    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams
 95    """
 96    metadata: dict
 97    """
 98    Detailed information about the dataset at the time of analysis,
 99    including the project, process, and input datasets.
100
101    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata
102    """
103    dataset_root: str
104    """
105    Base path to the dataset
106    """
107
108    _PARAMS_FILE = "params.json"
109    _REFERENCES_BASE = "s3://pubweb-references"
110
111    def __init__(self,
112                 samplesheet: Union['DataFrame', str, Path],
113                 files: Union['DataFrame', str, Path],
114                 params: dict = None,
115                 metadata: dict = None,
116                 dataset_root: str = None):
117        import pandas as pd
118        # Convert DataFrame to string if necessary
119        if isinstance(samplesheet, str):
120            samplesheet = pd.read_csv(StringIO(samplesheet))
121        if isinstance(samplesheet, Path):
122            samplesheet = read_csv(str(samplesheet))
123        if isinstance(files, str):
124            files = pd.read_csv(StringIO(files))
125        if isinstance(files, Path):
126            files = read_csv(str(files))
127        if params is None:
128            params = {}
129        if metadata is None:
130            metadata = {}
131
132        self.samplesheet = samplesheet
133        self.files = files
134        self.params = params
135        self.metadata = metadata
136        self.dataset_root = dataset_root
137        self.logger = logger
138
139    @classmethod
140    def from_path(cls, dataset_root: str, config_directory='config'):
141        """
142        Creates an instance from a path
143        (useful for testing or when running the script outside Cirro)
144        """
145        config_directory = Path(dataset_root, config_directory)
146
147        files = read_csv(
148            str(Path(config_directory, "files.csv")),
149            required_columns=["sample", "file"]
150        )
151
152        samplesheet = read_csv(
153            str(Path(config_directory, "samplesheet.csv")),
154            required_columns=["sample"]
155        )
156
157        params = read_json(
158            str(Path(config_directory, "params.json")),
159        )
160
161        metadata = read_json(
162            str(Path(config_directory, "metadata.json")),
163        )
164
165        return cls(files=files,
166                   samplesheet=samplesheet,
167                   params=params,
168                   metadata=metadata,
169                   dataset_root=dataset_root)
170
171    @classmethod
172    def from_running(cls):
173        """
174        Creates an instance from the currently running dataset
175        (expected to be called from inside a Cirro analysis process)
176        """
177        logging.basicConfig(level=logging.INFO,
178                            format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s')
179        dataset_path = os.getenv("PW_S3_DATASET")
180        return cls.from_path(dataset_path)
181
182    @property
183    def references_base(self):
184        """
185        Returns the base URL for references.
186        This is used to access public references in the Cirro system.
187        """
188        return self._REFERENCES_BASE
189
190    def log(self):
191        """Print logging messages about the dataset."""
192        logger.info(f"Storage location for dataset: {self.dataset_root}")
193        logger.info(f"Number of files in dataset: {self.files.shape[0]:,}")
194        logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}")
195
196    def add_param(self, name: str, value, overwrite=False, log=True):
197        """Add a parameter to the dataset."""
198
199        assert overwrite or name not in self.params, \
200            f"Cannot add parameter {name}, already exists (and overwrite=False)"
201
202        if log:
203            logger.info(f"Adding parameter {name} = {value}")
204        self.params[name] = value
205
206        if log:
207            logger.info("Saving parameters")
208        write_json(self.params, self._PARAMS_FILE)
209
210    def remove_param(self, name: str, force=False):
211        """Remove a parameter from the dataset."""
212
213        assert force or name in self.params, \
214            f"Cannot remove parameter {name}, does not exist (and force=False)"
215
216        logger.info(f"Removing parameter {name}")
217        if name in self.params:
218            del self.params[name]
219
220        logger.info("Saving parameters")
221        write_json(self.params, self._PARAMS_FILE)
222
223    def keep_params(self, params_to_keep: list[str]):
224        """Keep only the specified parameters in the dataset."""
225        logger.info(f"Keeping parameters: {params_to_keep}")
226        self.params = {
227            k: v for k, v in self.params.items()
228            if k in params_to_keep
229        }
230        write_json(self.params, self._PARAMS_FILE)
231
232    def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"):
233        """Replace all instances of a text string in the compute config file."""
234
235        assert os.path.exists(fp), f"File does not exist: {fp}"
236        with open(fp, 'r') as handle:
237            compute = handle.read()
238        n = len(compute.split(from_str)) - 1
239        logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}")
240        compute = compute.replace(from_str, to_str)
241        with open(fp, 'wt') as handle:
242            handle.write(compute)
243
244    def pivot_samplesheet(
245            self,
246            index=None,
247            pivot_columns: Union[Optional[str], list[str]] = 'read',
248            metadata_columns: list[str] = None,
249            column_prefix: str = "fastq_",
250            file_filter_predicate: str = None
251    ):
252        """
253        Combines data from both the samples and files table into a wide format with
254        each sample on a row and each file in a column.
255        The file column indexes are created by default from the `read` column, but can be customized.
256
257        For example, if the `files` table has columns `sample`, `read`, and `file`,
258        and the `samplesheet` has columns `sample`, `status`, and `group`, the output
259        will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`.
260
261        Args:
262            index: List[str], used to make the frames new index, defaults to
263             `["sampleIndex", "sample", "lane"]`
264            pivot_columns: str or List[str], columns to pivot on and create the new column,
265             defaults to 'read'. This effectively makes the column `<column_prefix><read>'.
266             If the column is not defined or not present, the pivot column will be generated
267             from the file number index.
268            metadata_columns: List[str], metadata columns to include in the output,
269             defaults to all columns that are available from the sample metadata.
270             If your pipeline doesn't like extra columns, make sure to specify the allowed columns here.
271            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
272            file_filter_predicate: str, optional, a pandas query string to filter the files table.
273             A common use case would be to filter out indexed reads, e.g. `readType == "R"`.
274
275        Returns:
276            DataFrame: A wide-format sample sheet with the specified columns pivoted.
277        """
278        import pandas as pd
279
280        pivoted_files = self.pivot_files(index=index,
281                                         pivot_columns=pivot_columns,
282                                         column_prefix=column_prefix,
283                                         file_filter_predicate=file_filter_predicate)
284        combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many")
285
286        # Default to keeping all columns
287        if metadata_columns is None:
288            metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist()
289
290        # Keep only the specified metadata columns
291        all_columns = combined.axes[1]
292        for column in all_columns:
293            if (column not in metadata_columns
294                    # These columns are required, never drop them
295                    and column_prefix not in column
296                    and 'sample' != column):
297                combined = combined.drop(columns=[column])
298
299        return combined
300
301    def pivot_files(
302            self,
303            index: list[str] = None,
304            pivot_columns: Union[str, list[str]] = 'read',
305            column_prefix: str = "fastq_",
306            file_filter_predicate: str = None
307    ):
308        """
309        Format the files table into a wide format with each sample on a row
310        and each file in a column. The column indexes are created by default
311        from the `read` column, but can be customized. This is useful for
312        paired-end sequencing data where you want to have the columns
313        `sample`, `fastq_1`, and `fastq_2` as the output.
314
315        Args:
316            index: List[str], used to make the frames new index, defaults to
317            pivot_columns: str or List[str], columns to pivot on and create the new column,
318             defaults to 'read'. This effectively makes the column `<column_prefix><read>`
319            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
320            file_filter_predicate: str, optional, a pandas query string to filter the files table.
321
322        Returns:
323            DataFrame: A wide-format sample sheet with the specified columns pivoted.
324        """
325        if index is None:
326            index = ["sampleIndex", "sample", "lane"]
327        logger.info("Formatting a wide files table")
328        logger.info("File table (long)")
329        logger.info(self.files.head().to_csv(index=False))
330
331        files = self.files
332
333        if file_filter_predicate is not None:
334            # Filter the files table based on the predicate
335            files = files.query(file_filter_predicate)
336
337        # If we don't have access to the column defined, just use the file number
338        # By default this is 'read' but the data might not be paired
339        pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0
340        if not pivot_columns_defined or pivot_columns not in files.columns.values:
341            logger.warning("Pivot column not found, grouping by sample instead.")
342            files['file_num'] = files.groupby('sample').cumcount() + 1
343            pivot_columns = 'file_num'
344
345        if isinstance(pivot_columns, str):
346            pivot_columns = [pivot_columns]
347
348        assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table"
349        assert 'file' in files.columns.values, "Column 'file' must be present in the file table"
350        assert isinstance(index, list), f"index must be a list (not {type(index)})"
351
352        # Get the list of columns from the inputs
353        input_columns = files.columns.values
354
355        # Format as a wide dataset
356        # Note that all the columns in `index` will be added if they are not already present
357        wide_df = files.reindex(
358            columns=index + pivot_columns + ['file']
359        )
360        wide_df = wide_df.pivot(
361            index=index,
362            columns=pivot_columns,
363            values='file'
364        )
365        # Rename the columns to have a prefix, e.g. 'fastq_'
366        wide_df = wide_df.rename(
367            columns=lambda i: f"{column_prefix}{int(i)}"
368        )
369        wide_df = wide_df.reset_index()
370
371        # Remove any columns from the output which were added from `index`
372        for cname in index:
373            if cname not in input_columns:
374                wide_df = wide_df.drop(columns=[cname])
375        # Remove any extra unnecessary columns
376        wide_df = wide_df.drop(columns=pivot_columns, errors='ignore')
377        return wide_df
378
379    def wide_samplesheet(
380            self,
381            index=None,
382            columns='read',
383            values="file",  # noqa
384            column_prefix="fastq_"
385    ):
386        """
387        Format the samplesheet into a wide format with each sample on a row
388
389        This is a legacy method, please use `pivot_samplesheet` instead.
390        """
391        warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.",
392                      DeprecationWarning, stacklevel=2)
393        if values != "file":
394            raise ValueError("The only supported value for `values` is 'file'")
395        return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)

Helper functions for performing preparatory tasks immediately before launching the analysis workflow for a dataset.

For use in the preprocess.py script. More info: https://docs.cirro.bio/pipelines/preprocess-script/

PreprocessDataset( samplesheet: Union[pandas.core.frame.DataFrame, str, pathlib.Path], files: Union[pandas.core.frame.DataFrame, str, pathlib.Path], params: dict = None, metadata: dict = None, dataset_root: str = None)
111    def __init__(self,
112                 samplesheet: Union['DataFrame', str, Path],
113                 files: Union['DataFrame', str, Path],
114                 params: dict = None,
115                 metadata: dict = None,
116                 dataset_root: str = None):
117        import pandas as pd
118        # Convert DataFrame to string if necessary
119        if isinstance(samplesheet, str):
120            samplesheet = pd.read_csv(StringIO(samplesheet))
121        if isinstance(samplesheet, Path):
122            samplesheet = read_csv(str(samplesheet))
123        if isinstance(files, str):
124            files = pd.read_csv(StringIO(files))
125        if isinstance(files, Path):
126            files = read_csv(str(files))
127        if params is None:
128            params = {}
129        if metadata is None:
130            metadata = {}
131
132        self.samplesheet = samplesheet
133        self.files = files
134        self.params = params
135        self.metadata = metadata
136        self.dataset_root = dataset_root
137        self.logger = logger
samplesheet: pandas.core.frame.DataFrame

A pandas DataFrame containing all of the metadata assigned to the samples present in the input datasets (at the time of analysis).

More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet

files: pandas.core.frame.DataFrame

A DataFrame containing information on the files contained in the input datasets, and the sample that each file is assigned to.

More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles

params: dict

A dictionary with all of the parameter values populated by user input using the process-form.json and process-input.json configurations.

This is read-only, use add_param to add new parameters or remove_param to remove them.

More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams

metadata: dict

Detailed information about the dataset at the time of analysis, including the project, process, and input datasets.

More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata

dataset_root: str

Base path to the dataset

logger
@classmethod
def from_path(cls, dataset_root: str, config_directory='config'):
139    @classmethod
140    def from_path(cls, dataset_root: str, config_directory='config'):
141        """
142        Creates an instance from a path
143        (useful for testing or when running the script outside Cirro)
144        """
145        config_directory = Path(dataset_root, config_directory)
146
147        files = read_csv(
148            str(Path(config_directory, "files.csv")),
149            required_columns=["sample", "file"]
150        )
151
152        samplesheet = read_csv(
153            str(Path(config_directory, "samplesheet.csv")),
154            required_columns=["sample"]
155        )
156
157        params = read_json(
158            str(Path(config_directory, "params.json")),
159        )
160
161        metadata = read_json(
162            str(Path(config_directory, "metadata.json")),
163        )
164
165        return cls(files=files,
166                   samplesheet=samplesheet,
167                   params=params,
168                   metadata=metadata,
169                   dataset_root=dataset_root)

Creates an instance from a path (useful for testing or when running the script outside Cirro)

@classmethod
def from_running(cls):
171    @classmethod
172    def from_running(cls):
173        """
174        Creates an instance from the currently running dataset
175        (expected to be called from inside a Cirro analysis process)
176        """
177        logging.basicConfig(level=logging.INFO,
178                            format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s')
179        dataset_path = os.getenv("PW_S3_DATASET")
180        return cls.from_path(dataset_path)

Creates an instance from the currently running dataset (expected to be called from inside a Cirro analysis process)

references_base
182    @property
183    def references_base(self):
184        """
185        Returns the base URL for references.
186        This is used to access public references in the Cirro system.
187        """
188        return self._REFERENCES_BASE

Returns the base URL for references. This is used to access public references in the Cirro system.

def log(self):
190    def log(self):
191        """Print logging messages about the dataset."""
192        logger.info(f"Storage location for dataset: {self.dataset_root}")
193        logger.info(f"Number of files in dataset: {self.files.shape[0]:,}")
194        logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}")

Print logging messages about the dataset.

def add_param(self, name: str, value, overwrite=False, log=True):
196    def add_param(self, name: str, value, overwrite=False, log=True):
197        """Add a parameter to the dataset."""
198
199        assert overwrite or name not in self.params, \
200            f"Cannot add parameter {name}, already exists (and overwrite=False)"
201
202        if log:
203            logger.info(f"Adding parameter {name} = {value}")
204        self.params[name] = value
205
206        if log:
207            logger.info("Saving parameters")
208        write_json(self.params, self._PARAMS_FILE)

Add a parameter to the dataset.

def remove_param(self, name: str, force=False):
210    def remove_param(self, name: str, force=False):
211        """Remove a parameter from the dataset."""
212
213        assert force or name in self.params, \
214            f"Cannot remove parameter {name}, does not exist (and force=False)"
215
216        logger.info(f"Removing parameter {name}")
217        if name in self.params:
218            del self.params[name]
219
220        logger.info("Saving parameters")
221        write_json(self.params, self._PARAMS_FILE)

Remove a parameter from the dataset.

def keep_params(self, params_to_keep: list[str]):
223    def keep_params(self, params_to_keep: list[str]):
224        """Keep only the specified parameters in the dataset."""
225        logger.info(f"Keeping parameters: {params_to_keep}")
226        self.params = {
227            k: v for k, v in self.params.items()
228            if k in params_to_keep
229        }
230        write_json(self.params, self._PARAMS_FILE)

Keep only the specified parameters in the dataset.

def update_compute(self, from_str: str, to_str: str, fp='nextflow-override.config'):
232    def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"):
233        """Replace all instances of a text string in the compute config file."""
234
235        assert os.path.exists(fp), f"File does not exist: {fp}"
236        with open(fp, 'r') as handle:
237            compute = handle.read()
238        n = len(compute.split(from_str)) - 1
239        logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}")
240        compute = compute.replace(from_str, to_str)
241        with open(fp, 'wt') as handle:
242            handle.write(compute)

Replace all instances of a text string in the compute config file.

def pivot_samplesheet( self, index=None, pivot_columns: Union[str, NoneType, list[str]] = 'read', metadata_columns: list[str] = None, column_prefix: str = 'fastq_', file_filter_predicate: str = None):
244    def pivot_samplesheet(
245            self,
246            index=None,
247            pivot_columns: Union[Optional[str], list[str]] = 'read',
248            metadata_columns: list[str] = None,
249            column_prefix: str = "fastq_",
250            file_filter_predicate: str = None
251    ):
252        """
253        Combines data from both the samples and files table into a wide format with
254        each sample on a row and each file in a column.
255        The file column indexes are created by default from the `read` column, but can be customized.
256
257        For example, if the `files` table has columns `sample`, `read`, and `file`,
258        and the `samplesheet` has columns `sample`, `status`, and `group`, the output
259        will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`.
260
261        Args:
262            index: List[str], used to make the frames new index, defaults to
263             `["sampleIndex", "sample", "lane"]`
264            pivot_columns: str or List[str], columns to pivot on and create the new column,
265             defaults to 'read'. This effectively makes the column `<column_prefix><read>'.
266             If the column is not defined or not present, the pivot column will be generated
267             from the file number index.
268            metadata_columns: List[str], metadata columns to include in the output,
269             defaults to all columns that are available from the sample metadata.
270             If your pipeline doesn't like extra columns, make sure to specify the allowed columns here.
271            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
272            file_filter_predicate: str, optional, a pandas query string to filter the files table.
273             A common use case would be to filter out indexed reads, e.g. `readType == "R"`.
274
275        Returns:
276            DataFrame: A wide-format sample sheet with the specified columns pivoted.
277        """
278        import pandas as pd
279
280        pivoted_files = self.pivot_files(index=index,
281                                         pivot_columns=pivot_columns,
282                                         column_prefix=column_prefix,
283                                         file_filter_predicate=file_filter_predicate)
284        combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many")
285
286        # Default to keeping all columns
287        if metadata_columns is None:
288            metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist()
289
290        # Keep only the specified metadata columns
291        all_columns = combined.axes[1]
292        for column in all_columns:
293            if (column not in metadata_columns
294                    # These columns are required, never drop them
295                    and column_prefix not in column
296                    and 'sample' != column):
297                combined = combined.drop(columns=[column])
298
299        return combined

Combines data from both the samples and files table into a wide format with each sample on a row and each file in a column. The file column indexes are created by default from the read column, but can be customized.

For example, if the files table has columns sample, read, and file, and the samplesheet has columns sample, status, and group, the output will have columns sample, fastq_1, fastq_2, status, and group.

Arguments:
  • index: List[str], used to make the frames new index, defaults to ["sampleIndex", "sample", "lane"]
  • pivot_columns: str or List[str], columns to pivot on and create the new column, defaults to 'read'. This effectively makes the column `'. If the column is not defined or not present, the pivot column will be generated from the file number index.
  • metadata_columns: List[str], metadata columns to include in the output, defaults to all columns that are available from the sample metadata. If your pipeline doesn't like extra columns, make sure to specify the allowed columns here.
  • column_prefix: str, optional, prefix for the new columns, defaults to fastq_.
  • file_filter_predicate: str, optional, a pandas query string to filter the files table. A common use case would be to filter out indexed reads, e.g. readType == "R".
Returns:

DataFrame: A wide-format sample sheet with the specified columns pivoted.

def pivot_files( self, index: list[str] = None, pivot_columns: Union[str, list[str]] = 'read', column_prefix: str = 'fastq_', file_filter_predicate: str = None):
301    def pivot_files(
302            self,
303            index: list[str] = None,
304            pivot_columns: Union[str, list[str]] = 'read',
305            column_prefix: str = "fastq_",
306            file_filter_predicate: str = None
307    ):
308        """
309        Format the files table into a wide format with each sample on a row
310        and each file in a column. The column indexes are created by default
311        from the `read` column, but can be customized. This is useful for
312        paired-end sequencing data where you want to have the columns
313        `sample`, `fastq_1`, and `fastq_2` as the output.
314
315        Args:
316            index: List[str], used to make the frames new index, defaults to
317            pivot_columns: str or List[str], columns to pivot on and create the new column,
318             defaults to 'read'. This effectively makes the column `<column_prefix><read>`
319            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
320            file_filter_predicate: str, optional, a pandas query string to filter the files table.
321
322        Returns:
323            DataFrame: A wide-format sample sheet with the specified columns pivoted.
324        """
325        if index is None:
326            index = ["sampleIndex", "sample", "lane"]
327        logger.info("Formatting a wide files table")
328        logger.info("File table (long)")
329        logger.info(self.files.head().to_csv(index=False))
330
331        files = self.files
332
333        if file_filter_predicate is not None:
334            # Filter the files table based on the predicate
335            files = files.query(file_filter_predicate)
336
337        # If we don't have access to the column defined, just use the file number
338        # By default this is 'read' but the data might not be paired
339        pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0
340        if not pivot_columns_defined or pivot_columns not in files.columns.values:
341            logger.warning("Pivot column not found, grouping by sample instead.")
342            files['file_num'] = files.groupby('sample').cumcount() + 1
343            pivot_columns = 'file_num'
344
345        if isinstance(pivot_columns, str):
346            pivot_columns = [pivot_columns]
347
348        assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table"
349        assert 'file' in files.columns.values, "Column 'file' must be present in the file table"
350        assert isinstance(index, list), f"index must be a list (not {type(index)})"
351
352        # Get the list of columns from the inputs
353        input_columns = files.columns.values
354
355        # Format as a wide dataset
356        # Note that all the columns in `index` will be added if they are not already present
357        wide_df = files.reindex(
358            columns=index + pivot_columns + ['file']
359        )
360        wide_df = wide_df.pivot(
361            index=index,
362            columns=pivot_columns,
363            values='file'
364        )
365        # Rename the columns to have a prefix, e.g. 'fastq_'
366        wide_df = wide_df.rename(
367            columns=lambda i: f"{column_prefix}{int(i)}"
368        )
369        wide_df = wide_df.reset_index()
370
371        # Remove any columns from the output which were added from `index`
372        for cname in index:
373            if cname not in input_columns:
374                wide_df = wide_df.drop(columns=[cname])
375        # Remove any extra unnecessary columns
376        wide_df = wide_df.drop(columns=pivot_columns, errors='ignore')
377        return wide_df

Format the files table into a wide format with each sample on a row and each file in a column. The column indexes are created by default from the read column, but can be customized. This is useful for paired-end sequencing data where you want to have the columns sample, fastq_1, and fastq_2 as the output.

Arguments:
  • index: List[str], used to make the frames new index, defaults to
  • pivot_columns: str or List[str], columns to pivot on and create the new column, defaults to 'read'. This effectively makes the column <column_prefix><read>
  • column_prefix: str, optional, prefix for the new columns, defaults to fastq_.
  • file_filter_predicate: str, optional, a pandas query string to filter the files table.
Returns:

DataFrame: A wide-format sample sheet with the specified columns pivoted.

def wide_samplesheet( self, index=None, columns='read', values='file', column_prefix='fastq_'):
379    def wide_samplesheet(
380            self,
381            index=None,
382            columns='read',
383            values="file",  # noqa
384            column_prefix="fastq_"
385    ):
386        """
387        Format the samplesheet into a wide format with each sample on a row
388
389        This is a legacy method, please use `pivot_samplesheet` instead.
390        """
391        warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.",
392                      DeprecationWarning, stacklevel=2)
393        if values != "file":
394            raise ValueError("The only supported value for `values` is 'file'")
395        return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)

Format the samplesheet into a wide format with each sample on a row

This is a legacy method, please use pivot_samplesheet instead.