cirro.helpers.preprocess_dataset

  1import json
  2import logging
  3import os
  4import warnings
  5from io import StringIO
  6from pathlib import Path
  7from typing import TYPE_CHECKING, Union, Optional
  8
  9import boto3
 10from botocore import UNSIGNED
 11from botocore.config import Config
 12from botocore.exceptions import NoCredentialsError
 13
 14if TYPE_CHECKING:
 15    from pandas import DataFrame
 16
 17from cirro.models.s3_path import S3Path
 18
 19logger = logging.getLogger(__name__)
 20
 21
 22def _fix_s3_path(path: str) -> str:
 23    """
 24    Fix the S3 path to ensure it starts with 's3://'.
 25    """
 26    normalized_path = path.replace(os.sep, '/').strip()
 27    if normalized_path.startswith("s3:/") and not normalized_path.startswith("s3://"):
 28        return normalized_path.replace("s3:/", "s3://", 1)
 29    return path
 30
 31
 32def write_json(dat, local_path: str, indent=4):
 33    """Write a JSON object to a local file."""
 34    with Path(local_path).open(mode="wt") as handle:
 35        return json.dump(dat, handle, indent=indent)
 36
 37
 38def read_csv(path: str, required_columns=None) -> 'DataFrame':
 39    """Read a CSV from the dataset and check for any required columns."""
 40    if required_columns is None:
 41        required_columns = []
 42    path = _fix_s3_path(path)
 43    import pandas as pd
 44    df = pd.read_csv(path)
 45    for col in required_columns:
 46        assert col in df.columns.values, f"Did not find expected columns {col} in {path}"
 47    return df
 48
 49
 50def read_json(path: str):
 51    """Read a JSON object from a local file or S3 path."""
 52    path = _fix_s3_path(path)
 53    s3_path = S3Path(path)
 54
 55    if s3_path.valid:
 56        try:
 57            s3 = boto3.client('s3')
 58            retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key)
 59        except NoCredentialsError:
 60            s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
 61            retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key)
 62        text = retr['Body'].read().decode()
 63    else:
 64        with Path(path).open() as handle:
 65            text = handle.read()
 66
 67    # Parse JSON
 68    return json.loads(text)
 69
 70
 71class PreprocessDataset:
 72    """
 73    Helper functions for performing preparatory tasks immediately before launching
 74    the analysis workflow for a dataset.
 75
 76    For use in the `preprocess.py` script.
 77    More info: https://docs.cirro.bio/pipelines/preprocess-script/
 78    """
 79    samplesheet: 'DataFrame'
 80    """
 81    A pandas DataFrame containing all of the metadata assigned to the samples present
 82    in the input datasets (at the time of analysis).
 83
 84    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet
 85    """
 86    files: 'DataFrame'
 87    """
 88    A DataFrame containing information on the files contained in the input datasets,
 89    and the sample that each file is assigned to.
 90
 91    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles
 92    """
 93    params: dict
 94    """
 95    A dictionary with all of the parameter values populated by user input
 96    using the process-form.json and process-input.json configurations.
 97
 98    This is read-only, use `add_param` to add new parameters or `remove_param` to remove them.
 99
100    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams
101    """
102    metadata: dict
103    """
104    Detailed information about the dataset at the time of analysis,
105    including the project, process, and input datasets.
106
107    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata
108    """
109    dataset_root: str
110    """
111    Base path to the dataset
112    """
113
114    _PARAMS_FILE = "params.json"
115    _REFERENCES_BASE = "s3://pubweb-references"
116
117    def __init__(self,
118                 samplesheet: Union['DataFrame', str, Path],
119                 files: Union['DataFrame', str, Path],
120                 params: dict = None,
121                 metadata: dict = None,
122                 dataset_root: str = None):
123        import pandas as pd
124        # Convert DataFrame to string if necessary
125        if isinstance(samplesheet, str):
126            samplesheet = pd.read_csv(StringIO(samplesheet))
127        if isinstance(samplesheet, Path):
128            samplesheet = read_csv(str(samplesheet))
129        if isinstance(files, str):
130            files = pd.read_csv(StringIO(files))
131        if isinstance(files, Path):
132            files = read_csv(str(files))
133        if params is None:
134            params = {}
135        if metadata is None:
136            metadata = {}
137
138        self.samplesheet = samplesheet
139        self.files = files
140        self.params = params
141        self.metadata = metadata
142        self.dataset_root = dataset_root
143        self.logger = logger
144
145    @classmethod
146    def from_path(cls, dataset_root: str, config_directory='config'):
147        """
148        Creates an instance from a path
149        (useful for testing or when running the script outside Cirro)
150        """
151        config_directory = Path(dataset_root, config_directory)
152
153        files = read_csv(
154            str(Path(config_directory, "files.csv")),
155            required_columns=["sample", "file"]
156        )
157
158        samplesheet = read_csv(
159            str(Path(config_directory, "samplesheet.csv")),
160            required_columns=["sample"]
161        )
162
163        params = read_json(
164            str(Path(config_directory, "params.json")),
165        )
166
167        metadata = read_json(
168            str(Path(config_directory, "metadata.json")),
169        )
170
171        return cls(files=files,
172                   samplesheet=samplesheet,
173                   params=params,
174                   metadata=metadata,
175                   dataset_root=dataset_root)
176
177    @classmethod
178    def from_running(cls):
179        """
180        Creates an instance from the currently running dataset
181        (expected to be called from inside a Cirro analysis process)
182        """
183        logging.basicConfig(level=logging.INFO,
184                            format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s')
185        dataset_path = os.getenv("PW_S3_DATASET")
186        return cls.from_path(dataset_path)
187
188    @property
189    def references_base(self):
190        """
191        Returns the base URL for references.
192        This is used to access public references in the Cirro system.
193        """
194        return self._REFERENCES_BASE
195
196    def log(self):
197        """Print logging messages about the dataset."""
198        logger.info(f"Storage location for dataset: {self.dataset_root}")
199        logger.info(f"Number of files in dataset: {self.files.shape[0]:,}")
200        logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}")
201
202    def add_param(self, name: str, value, overwrite=False, log=True):
203        """Add a parameter to the dataset."""
204
205        assert overwrite or name not in self.params, \
206            f"Cannot add parameter {name}, already exists (and overwrite=False)"
207
208        if log:
209            logger.info(f"Adding parameter {name} = {value}")
210        self.params[name] = value
211
212        if log:
213            logger.info("Saving parameters")
214        write_json(self.params, self._PARAMS_FILE)
215
216    def remove_param(self, name: str, force=False):
217        """Remove a parameter from the dataset."""
218
219        assert force or name in self.params, \
220            f"Cannot remove parameter {name}, does not exist (and force=False)"
221
222        logger.info(f"Removing parameter {name}")
223        if name in self.params:
224            del self.params[name]
225
226        logger.info("Saving parameters")
227        write_json(self.params, self._PARAMS_FILE)
228
229    def keep_params(self, params_to_keep: list[str]):
230        """Keep only the specified parameters in the dataset."""
231        logger.info(f"Keeping parameters: {params_to_keep}")
232        self.params = {
233            k: v for k, v in self.params.items()
234            if k in params_to_keep
235        }
236        write_json(self.params, self._PARAMS_FILE)
237
238    def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"):
239        """Replace all instances of a text string in the compute config file."""
240
241        assert os.path.exists(fp), f"File does not exist: {fp}"
242        with open(fp, 'r') as handle:
243            compute = handle.read()
244        n = len(compute.split(from_str)) - 1
245        logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}")
246        compute = compute.replace(from_str, to_str)
247        with open(fp, 'wt') as handle:
248            handle.write(compute)
249
250    def pivot_samplesheet(
251            self,
252            index=None,
253            pivot_columns: Union[Optional[str], list[str]] = 'read',
254            metadata_columns: list[str] = None,
255            column_prefix: str = "fastq_",
256            file_filter_predicate: str = None
257    ):
258        """
259        Combines data from both the samples and files table into a wide format with
260        each sample on a row and each file in a column.
261        The file column indexes are created by default from the `read` column, but can be customized.
262
263        For example, if the `files` table has columns `sample`, `read`, and `file`,
264        and the `samplesheet` has columns `sample`, `status`, and `group`, the output
265        will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`.
266
267        Args:
268            index: List[str], used to make the frames new index, defaults to
269             `["sampleIndex", "sample", "lane"]`
270            pivot_columns: str or List[str], columns to pivot on and create the new column,
271             defaults to 'read'. This effectively makes the column `<column_prefix><read>'.
272             If the column is not defined or not present, the pivot column will be generated
273             from the file number index.
274            metadata_columns: List[str], metadata columns to include in the output,
275             defaults to all columns that are available from the sample metadata.
276             If your pipeline doesn't like extra columns, make sure to specify the allowed columns here.
277            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
278            file_filter_predicate: str, optional, a pandas query string to filter the files table.
279             A common use case would be to filter out indexed reads, e.g. `readType == "R"`.
280
281        Returns:
282            DataFrame: A wide-format sample sheet with the specified columns pivoted.
283        """
284        import pandas as pd
285
286        pivoted_files = self.pivot_files(index=index,
287                                         pivot_columns=pivot_columns,
288                                         column_prefix=column_prefix,
289                                         file_filter_predicate=file_filter_predicate)
290        combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many")
291
292        # Default to keeping all columns
293        if metadata_columns is None:
294            metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist()
295
296        # Keep only the specified metadata columns
297        all_columns = combined.axes[1]
298        for column in all_columns:
299            if (column not in metadata_columns
300                    # These columns are required, never drop them
301                    and column_prefix not in column
302                    and 'sample' != column):
303                combined = combined.drop(columns=[column])
304
305        return combined
306
307    def pivot_files(
308            self,
309            index: list[str] = None,
310            pivot_columns: Union[str, list[str]] = 'read',
311            column_prefix: str = "fastq_",
312            file_filter_predicate: str = None
313    ):
314        """
315        Format the files table into a wide format with each sample on a row
316        and each file in a column. The column indexes are created by default
317        from the `read` column, but can be customized. This is useful for
318        paired-end sequencing data where you want to have the columns
319        `sample`, `fastq_1`, and `fastq_2` as the output.
320
321        Args:
322            index: List[str], used to make the frames new index, defaults to
323            pivot_columns: str or List[str], columns to pivot on and create the new column,
324             defaults to 'read'. This effectively makes the column `<column_prefix><read>`
325            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
326            file_filter_predicate: str, optional, a pandas query string to filter the files table.
327
328        Returns:
329            DataFrame: A wide-format sample sheet with the specified columns pivoted.
330        """
331        if index is None:
332            index = ["sampleIndex", "sample", "lane"]
333        logger.info("Formatting a wide files table")
334        logger.info("File table (long)")
335        logger.info(self.files.head().to_csv(index=False))
336
337        files = self.files
338
339        if file_filter_predicate is not None:
340            # Filter the files table based on the predicate
341            files = files.query(file_filter_predicate)
342
343        # If we don't have access to the column defined, just use the file number
344        # By default this is 'read' but the data might not be paired
345        pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0
346        if not pivot_columns_defined or pivot_columns not in files.columns.values:
347            logger.warning("Pivot column not found, grouping by sample instead.")
348            files['file_num'] = files.groupby('sample').cumcount() + 1
349            pivot_columns = 'file_num'
350
351        if isinstance(pivot_columns, str):
352            pivot_columns = [pivot_columns]
353
354        assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table"
355        assert 'file' in files.columns.values, "Column 'file' must be present in the file table"
356        assert isinstance(index, list), f"index must be a list (not {type(index)})"
357
358        # Get the list of columns from the inputs
359        input_columns = files.columns.values
360
361        # Format as a wide dataset
362        # Note that all the columns in `index` will be added if they are not already present
363        wide_df = files.reindex(
364            columns=index + pivot_columns + ['file']
365        )
366        wide_df = wide_df.pivot(
367            index=index,
368            columns=pivot_columns,
369            values='file'
370        )
371        # Rename the columns to have a prefix, e.g. 'fastq_'
372        wide_df = wide_df.rename(
373            columns=lambda i: f"{column_prefix}{int(i)}"
374        )
375        wide_df = wide_df.reset_index()
376
377        # Remove any columns from the output which were added from `index`
378        for cname in index:
379            if cname not in input_columns:
380                wide_df = wide_df.drop(columns=[cname])
381        # Remove any extra unnecessary columns
382        wide_df = wide_df.drop(columns=pivot_columns, errors='ignore')
383        return wide_df
384
385    def wide_samplesheet(
386            self,
387            index=None,
388            columns='read',
389            values="file",  # noqa
390            column_prefix="fastq_"
391    ):
392        """
393        Format the samplesheet into a wide format with each sample on a row
394
395        This is a legacy method, please use `pivot_samplesheet` instead.
396        """
397        warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.",
398                      DeprecationWarning, stacklevel=2)
399        if values != "file":
400            raise ValueError("The only supported value for `values` is 'file'")
401        return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)
logger = <Logger cirro.helpers.preprocess_dataset (WARNING)>
def write_json(dat, local_path: str, indent=4):
33def write_json(dat, local_path: str, indent=4):
34    """Write a JSON object to a local file."""
35    with Path(local_path).open(mode="wt") as handle:
36        return json.dump(dat, handle, indent=indent)

Write a JSON object to a local file.

def read_csv(path: str, required_columns=None) -> pandas.core.frame.DataFrame:
39def read_csv(path: str, required_columns=None) -> 'DataFrame':
40    """Read a CSV from the dataset and check for any required columns."""
41    if required_columns is None:
42        required_columns = []
43    path = _fix_s3_path(path)
44    import pandas as pd
45    df = pd.read_csv(path)
46    for col in required_columns:
47        assert col in df.columns.values, f"Did not find expected columns {col} in {path}"
48    return df

Read a CSV from the dataset and check for any required columns.

def read_json(path: str):
51def read_json(path: str):
52    """Read a JSON object from a local file or S3 path."""
53    path = _fix_s3_path(path)
54    s3_path = S3Path(path)
55
56    if s3_path.valid:
57        try:
58            s3 = boto3.client('s3')
59            retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key)
60        except NoCredentialsError:
61            s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
62            retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key)
63        text = retr['Body'].read().decode()
64    else:
65        with Path(path).open() as handle:
66            text = handle.read()
67
68    # Parse JSON
69    return json.loads(text)

Read a JSON object from a local file or S3 path.

class PreprocessDataset:
 72class PreprocessDataset:
 73    """
 74    Helper functions for performing preparatory tasks immediately before launching
 75    the analysis workflow for a dataset.
 76
 77    For use in the `preprocess.py` script.
 78    More info: https://docs.cirro.bio/pipelines/preprocess-script/
 79    """
 80    samplesheet: 'DataFrame'
 81    """
 82    A pandas DataFrame containing all of the metadata assigned to the samples present
 83    in the input datasets (at the time of analysis).
 84
 85    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet
 86    """
 87    files: 'DataFrame'
 88    """
 89    A DataFrame containing information on the files contained in the input datasets,
 90    and the sample that each file is assigned to.
 91
 92    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles
 93    """
 94    params: dict
 95    """
 96    A dictionary with all of the parameter values populated by user input
 97    using the process-form.json and process-input.json configurations.
 98
 99    This is read-only, use `add_param` to add new parameters or `remove_param` to remove them.
100
101    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams
102    """
103    metadata: dict
104    """
105    Detailed information about the dataset at the time of analysis,
106    including the project, process, and input datasets.
107
108    More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata
109    """
110    dataset_root: str
111    """
112    Base path to the dataset
113    """
114
115    _PARAMS_FILE = "params.json"
116    _REFERENCES_BASE = "s3://pubweb-references"
117
118    def __init__(self,
119                 samplesheet: Union['DataFrame', str, Path],
120                 files: Union['DataFrame', str, Path],
121                 params: dict = None,
122                 metadata: dict = None,
123                 dataset_root: str = None):
124        import pandas as pd
125        # Convert DataFrame to string if necessary
126        if isinstance(samplesheet, str):
127            samplesheet = pd.read_csv(StringIO(samplesheet))
128        if isinstance(samplesheet, Path):
129            samplesheet = read_csv(str(samplesheet))
130        if isinstance(files, str):
131            files = pd.read_csv(StringIO(files))
132        if isinstance(files, Path):
133            files = read_csv(str(files))
134        if params is None:
135            params = {}
136        if metadata is None:
137            metadata = {}
138
139        self.samplesheet = samplesheet
140        self.files = files
141        self.params = params
142        self.metadata = metadata
143        self.dataset_root = dataset_root
144        self.logger = logger
145
146    @classmethod
147    def from_path(cls, dataset_root: str, config_directory='config'):
148        """
149        Creates an instance from a path
150        (useful for testing or when running the script outside Cirro)
151        """
152        config_directory = Path(dataset_root, config_directory)
153
154        files = read_csv(
155            str(Path(config_directory, "files.csv")),
156            required_columns=["sample", "file"]
157        )
158
159        samplesheet = read_csv(
160            str(Path(config_directory, "samplesheet.csv")),
161            required_columns=["sample"]
162        )
163
164        params = read_json(
165            str(Path(config_directory, "params.json")),
166        )
167
168        metadata = read_json(
169            str(Path(config_directory, "metadata.json")),
170        )
171
172        return cls(files=files,
173                   samplesheet=samplesheet,
174                   params=params,
175                   metadata=metadata,
176                   dataset_root=dataset_root)
177
178    @classmethod
179    def from_running(cls):
180        """
181        Creates an instance from the currently running dataset
182        (expected to be called from inside a Cirro analysis process)
183        """
184        logging.basicConfig(level=logging.INFO,
185                            format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s')
186        dataset_path = os.getenv("PW_S3_DATASET")
187        return cls.from_path(dataset_path)
188
189    @property
190    def references_base(self):
191        """
192        Returns the base URL for references.
193        This is used to access public references in the Cirro system.
194        """
195        return self._REFERENCES_BASE
196
197    def log(self):
198        """Print logging messages about the dataset."""
199        logger.info(f"Storage location for dataset: {self.dataset_root}")
200        logger.info(f"Number of files in dataset: {self.files.shape[0]:,}")
201        logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}")
202
203    def add_param(self, name: str, value, overwrite=False, log=True):
204        """Add a parameter to the dataset."""
205
206        assert overwrite or name not in self.params, \
207            f"Cannot add parameter {name}, already exists (and overwrite=False)"
208
209        if log:
210            logger.info(f"Adding parameter {name} = {value}")
211        self.params[name] = value
212
213        if log:
214            logger.info("Saving parameters")
215        write_json(self.params, self._PARAMS_FILE)
216
217    def remove_param(self, name: str, force=False):
218        """Remove a parameter from the dataset."""
219
220        assert force or name in self.params, \
221            f"Cannot remove parameter {name}, does not exist (and force=False)"
222
223        logger.info(f"Removing parameter {name}")
224        if name in self.params:
225            del self.params[name]
226
227        logger.info("Saving parameters")
228        write_json(self.params, self._PARAMS_FILE)
229
230    def keep_params(self, params_to_keep: list[str]):
231        """Keep only the specified parameters in the dataset."""
232        logger.info(f"Keeping parameters: {params_to_keep}")
233        self.params = {
234            k: v for k, v in self.params.items()
235            if k in params_to_keep
236        }
237        write_json(self.params, self._PARAMS_FILE)
238
239    def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"):
240        """Replace all instances of a text string in the compute config file."""
241
242        assert os.path.exists(fp), f"File does not exist: {fp}"
243        with open(fp, 'r') as handle:
244            compute = handle.read()
245        n = len(compute.split(from_str)) - 1
246        logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}")
247        compute = compute.replace(from_str, to_str)
248        with open(fp, 'wt') as handle:
249            handle.write(compute)
250
251    def pivot_samplesheet(
252            self,
253            index=None,
254            pivot_columns: Union[Optional[str], list[str]] = 'read',
255            metadata_columns: list[str] = None,
256            column_prefix: str = "fastq_",
257            file_filter_predicate: str = None
258    ):
259        """
260        Combines data from both the samples and files table into a wide format with
261        each sample on a row and each file in a column.
262        The file column indexes are created by default from the `read` column, but can be customized.
263
264        For example, if the `files` table has columns `sample`, `read`, and `file`,
265        and the `samplesheet` has columns `sample`, `status`, and `group`, the output
266        will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`.
267
268        Args:
269            index: List[str], used to make the frames new index, defaults to
270             `["sampleIndex", "sample", "lane"]`
271            pivot_columns: str or List[str], columns to pivot on and create the new column,
272             defaults to 'read'. This effectively makes the column `<column_prefix><read>'.
273             If the column is not defined or not present, the pivot column will be generated
274             from the file number index.
275            metadata_columns: List[str], metadata columns to include in the output,
276             defaults to all columns that are available from the sample metadata.
277             If your pipeline doesn't like extra columns, make sure to specify the allowed columns here.
278            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
279            file_filter_predicate: str, optional, a pandas query string to filter the files table.
280             A common use case would be to filter out indexed reads, e.g. `readType == "R"`.
281
282        Returns:
283            DataFrame: A wide-format sample sheet with the specified columns pivoted.
284        """
285        import pandas as pd
286
287        pivoted_files = self.pivot_files(index=index,
288                                         pivot_columns=pivot_columns,
289                                         column_prefix=column_prefix,
290                                         file_filter_predicate=file_filter_predicate)
291        combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many")
292
293        # Default to keeping all columns
294        if metadata_columns is None:
295            metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist()
296
297        # Keep only the specified metadata columns
298        all_columns = combined.axes[1]
299        for column in all_columns:
300            if (column not in metadata_columns
301                    # These columns are required, never drop them
302                    and column_prefix not in column
303                    and 'sample' != column):
304                combined = combined.drop(columns=[column])
305
306        return combined
307
308    def pivot_files(
309            self,
310            index: list[str] = None,
311            pivot_columns: Union[str, list[str]] = 'read',
312            column_prefix: str = "fastq_",
313            file_filter_predicate: str = None
314    ):
315        """
316        Format the files table into a wide format with each sample on a row
317        and each file in a column. The column indexes are created by default
318        from the `read` column, but can be customized. This is useful for
319        paired-end sequencing data where you want to have the columns
320        `sample`, `fastq_1`, and `fastq_2` as the output.
321
322        Args:
323            index: List[str], used to make the frames new index, defaults to
324            pivot_columns: str or List[str], columns to pivot on and create the new column,
325             defaults to 'read'. This effectively makes the column `<column_prefix><read>`
326            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
327            file_filter_predicate: str, optional, a pandas query string to filter the files table.
328
329        Returns:
330            DataFrame: A wide-format sample sheet with the specified columns pivoted.
331        """
332        if index is None:
333            index = ["sampleIndex", "sample", "lane"]
334        logger.info("Formatting a wide files table")
335        logger.info("File table (long)")
336        logger.info(self.files.head().to_csv(index=False))
337
338        files = self.files
339
340        if file_filter_predicate is not None:
341            # Filter the files table based on the predicate
342            files = files.query(file_filter_predicate)
343
344        # If we don't have access to the column defined, just use the file number
345        # By default this is 'read' but the data might not be paired
346        pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0
347        if not pivot_columns_defined or pivot_columns not in files.columns.values:
348            logger.warning("Pivot column not found, grouping by sample instead.")
349            files['file_num'] = files.groupby('sample').cumcount() + 1
350            pivot_columns = 'file_num'
351
352        if isinstance(pivot_columns, str):
353            pivot_columns = [pivot_columns]
354
355        assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table"
356        assert 'file' in files.columns.values, "Column 'file' must be present in the file table"
357        assert isinstance(index, list), f"index must be a list (not {type(index)})"
358
359        # Get the list of columns from the inputs
360        input_columns = files.columns.values
361
362        # Format as a wide dataset
363        # Note that all the columns in `index` will be added if they are not already present
364        wide_df = files.reindex(
365            columns=index + pivot_columns + ['file']
366        )
367        wide_df = wide_df.pivot(
368            index=index,
369            columns=pivot_columns,
370            values='file'
371        )
372        # Rename the columns to have a prefix, e.g. 'fastq_'
373        wide_df = wide_df.rename(
374            columns=lambda i: f"{column_prefix}{int(i)}"
375        )
376        wide_df = wide_df.reset_index()
377
378        # Remove any columns from the output which were added from `index`
379        for cname in index:
380            if cname not in input_columns:
381                wide_df = wide_df.drop(columns=[cname])
382        # Remove any extra unnecessary columns
383        wide_df = wide_df.drop(columns=pivot_columns, errors='ignore')
384        return wide_df
385
386    def wide_samplesheet(
387            self,
388            index=None,
389            columns='read',
390            values="file",  # noqa
391            column_prefix="fastq_"
392    ):
393        """
394        Format the samplesheet into a wide format with each sample on a row
395
396        This is a legacy method, please use `pivot_samplesheet` instead.
397        """
398        warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.",
399                      DeprecationWarning, stacklevel=2)
400        if values != "file":
401            raise ValueError("The only supported value for `values` is 'file'")
402        return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)

Helper functions for performing preparatory tasks immediately before launching the analysis workflow for a dataset.

For use in the preprocess.py script. More info: https://docs.cirro.bio/pipelines/preprocess-script/

PreprocessDataset( samplesheet: Union[pandas.core.frame.DataFrame, str, pathlib.Path], files: Union[pandas.core.frame.DataFrame, str, pathlib.Path], params: dict = None, metadata: dict = None, dataset_root: str = None)
118    def __init__(self,
119                 samplesheet: Union['DataFrame', str, Path],
120                 files: Union['DataFrame', str, Path],
121                 params: dict = None,
122                 metadata: dict = None,
123                 dataset_root: str = None):
124        import pandas as pd
125        # Convert DataFrame to string if necessary
126        if isinstance(samplesheet, str):
127            samplesheet = pd.read_csv(StringIO(samplesheet))
128        if isinstance(samplesheet, Path):
129            samplesheet = read_csv(str(samplesheet))
130        if isinstance(files, str):
131            files = pd.read_csv(StringIO(files))
132        if isinstance(files, Path):
133            files = read_csv(str(files))
134        if params is None:
135            params = {}
136        if metadata is None:
137            metadata = {}
138
139        self.samplesheet = samplesheet
140        self.files = files
141        self.params = params
142        self.metadata = metadata
143        self.dataset_root = dataset_root
144        self.logger = logger
samplesheet: pandas.core.frame.DataFrame

A pandas DataFrame containing all of the metadata assigned to the samples present in the input datasets (at the time of analysis).

More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet

files: pandas.core.frame.DataFrame

A DataFrame containing information on the files contained in the input datasets, and the sample that each file is assigned to.

More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles

params: dict

A dictionary with all of the parameter values populated by user input using the process-form.json and process-input.json configurations.

This is read-only, use add_param to add new parameters or remove_param to remove them.

More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams

metadata: dict

Detailed information about the dataset at the time of analysis, including the project, process, and input datasets.

More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata

dataset_root: str

Base path to the dataset

logger
@classmethod
def from_path(cls, dataset_root: str, config_directory='config'):
146    @classmethod
147    def from_path(cls, dataset_root: str, config_directory='config'):
148        """
149        Creates an instance from a path
150        (useful for testing or when running the script outside Cirro)
151        """
152        config_directory = Path(dataset_root, config_directory)
153
154        files = read_csv(
155            str(Path(config_directory, "files.csv")),
156            required_columns=["sample", "file"]
157        )
158
159        samplesheet = read_csv(
160            str(Path(config_directory, "samplesheet.csv")),
161            required_columns=["sample"]
162        )
163
164        params = read_json(
165            str(Path(config_directory, "params.json")),
166        )
167
168        metadata = read_json(
169            str(Path(config_directory, "metadata.json")),
170        )
171
172        return cls(files=files,
173                   samplesheet=samplesheet,
174                   params=params,
175                   metadata=metadata,
176                   dataset_root=dataset_root)

Creates an instance from a path (useful for testing or when running the script outside Cirro)

@classmethod
def from_running(cls):
178    @classmethod
179    def from_running(cls):
180        """
181        Creates an instance from the currently running dataset
182        (expected to be called from inside a Cirro analysis process)
183        """
184        logging.basicConfig(level=logging.INFO,
185                            format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s')
186        dataset_path = os.getenv("PW_S3_DATASET")
187        return cls.from_path(dataset_path)

Creates an instance from the currently running dataset (expected to be called from inside a Cirro analysis process)

references_base
189    @property
190    def references_base(self):
191        """
192        Returns the base URL for references.
193        This is used to access public references in the Cirro system.
194        """
195        return self._REFERENCES_BASE

Returns the base URL for references. This is used to access public references in the Cirro system.

def log(self):
197    def log(self):
198        """Print logging messages about the dataset."""
199        logger.info(f"Storage location for dataset: {self.dataset_root}")
200        logger.info(f"Number of files in dataset: {self.files.shape[0]:,}")
201        logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}")

Print logging messages about the dataset.

def add_param(self, name: str, value, overwrite=False, log=True):
203    def add_param(self, name: str, value, overwrite=False, log=True):
204        """Add a parameter to the dataset."""
205
206        assert overwrite or name not in self.params, \
207            f"Cannot add parameter {name}, already exists (and overwrite=False)"
208
209        if log:
210            logger.info(f"Adding parameter {name} = {value}")
211        self.params[name] = value
212
213        if log:
214            logger.info("Saving parameters")
215        write_json(self.params, self._PARAMS_FILE)

Add a parameter to the dataset.

def remove_param(self, name: str, force=False):
217    def remove_param(self, name: str, force=False):
218        """Remove a parameter from the dataset."""
219
220        assert force or name in self.params, \
221            f"Cannot remove parameter {name}, does not exist (and force=False)"
222
223        logger.info(f"Removing parameter {name}")
224        if name in self.params:
225            del self.params[name]
226
227        logger.info("Saving parameters")
228        write_json(self.params, self._PARAMS_FILE)

Remove a parameter from the dataset.

def keep_params(self, params_to_keep: list[str]):
230    def keep_params(self, params_to_keep: list[str]):
231        """Keep only the specified parameters in the dataset."""
232        logger.info(f"Keeping parameters: {params_to_keep}")
233        self.params = {
234            k: v for k, v in self.params.items()
235            if k in params_to_keep
236        }
237        write_json(self.params, self._PARAMS_FILE)

Keep only the specified parameters in the dataset.

def update_compute(self, from_str: str, to_str: str, fp='nextflow-override.config'):
239    def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"):
240        """Replace all instances of a text string in the compute config file."""
241
242        assert os.path.exists(fp), f"File does not exist: {fp}"
243        with open(fp, 'r') as handle:
244            compute = handle.read()
245        n = len(compute.split(from_str)) - 1
246        logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}")
247        compute = compute.replace(from_str, to_str)
248        with open(fp, 'wt') as handle:
249            handle.write(compute)

Replace all instances of a text string in the compute config file.

def pivot_samplesheet( self, index=None, pivot_columns: Union[str, NoneType, list[str]] = 'read', metadata_columns: list[str] = None, column_prefix: str = 'fastq_', file_filter_predicate: str = None):
251    def pivot_samplesheet(
252            self,
253            index=None,
254            pivot_columns: Union[Optional[str], list[str]] = 'read',
255            metadata_columns: list[str] = None,
256            column_prefix: str = "fastq_",
257            file_filter_predicate: str = None
258    ):
259        """
260        Combines data from both the samples and files table into a wide format with
261        each sample on a row and each file in a column.
262        The file column indexes are created by default from the `read` column, but can be customized.
263
264        For example, if the `files` table has columns `sample`, `read`, and `file`,
265        and the `samplesheet` has columns `sample`, `status`, and `group`, the output
266        will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`.
267
268        Args:
269            index: List[str], used to make the frames new index, defaults to
270             `["sampleIndex", "sample", "lane"]`
271            pivot_columns: str or List[str], columns to pivot on and create the new column,
272             defaults to 'read'. This effectively makes the column `<column_prefix><read>'.
273             If the column is not defined or not present, the pivot column will be generated
274             from the file number index.
275            metadata_columns: List[str], metadata columns to include in the output,
276             defaults to all columns that are available from the sample metadata.
277             If your pipeline doesn't like extra columns, make sure to specify the allowed columns here.
278            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
279            file_filter_predicate: str, optional, a pandas query string to filter the files table.
280             A common use case would be to filter out indexed reads, e.g. `readType == "R"`.
281
282        Returns:
283            DataFrame: A wide-format sample sheet with the specified columns pivoted.
284        """
285        import pandas as pd
286
287        pivoted_files = self.pivot_files(index=index,
288                                         pivot_columns=pivot_columns,
289                                         column_prefix=column_prefix,
290                                         file_filter_predicate=file_filter_predicate)
291        combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many")
292
293        # Default to keeping all columns
294        if metadata_columns is None:
295            metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist()
296
297        # Keep only the specified metadata columns
298        all_columns = combined.axes[1]
299        for column in all_columns:
300            if (column not in metadata_columns
301                    # These columns are required, never drop them
302                    and column_prefix not in column
303                    and 'sample' != column):
304                combined = combined.drop(columns=[column])
305
306        return combined

Combines data from both the samples and files table into a wide format with each sample on a row and each file in a column. The file column indexes are created by default from the read column, but can be customized.

For example, if the files table has columns sample, read, and file, and the samplesheet has columns sample, status, and group, the output will have columns sample, fastq_1, fastq_2, status, and group.

Arguments:
  • index: List[str], used to make the frames new index, defaults to ["sampleIndex", "sample", "lane"]
  • pivot_columns: str or List[str], columns to pivot on and create the new column, defaults to 'read'. This effectively makes the column `'. If the column is not defined or not present, the pivot column will be generated from the file number index.
  • metadata_columns: List[str], metadata columns to include in the output, defaults to all columns that are available from the sample metadata. If your pipeline doesn't like extra columns, make sure to specify the allowed columns here.
  • column_prefix: str, optional, prefix for the new columns, defaults to fastq_.
  • file_filter_predicate: str, optional, a pandas query string to filter the files table. A common use case would be to filter out indexed reads, e.g. readType == "R".
Returns:

DataFrame: A wide-format sample sheet with the specified columns pivoted.

def pivot_files( self, index: list[str] = None, pivot_columns: Union[str, list[str]] = 'read', column_prefix: str = 'fastq_', file_filter_predicate: str = None):
308    def pivot_files(
309            self,
310            index: list[str] = None,
311            pivot_columns: Union[str, list[str]] = 'read',
312            column_prefix: str = "fastq_",
313            file_filter_predicate: str = None
314    ):
315        """
316        Format the files table into a wide format with each sample on a row
317        and each file in a column. The column indexes are created by default
318        from the `read` column, but can be customized. This is useful for
319        paired-end sequencing data where you want to have the columns
320        `sample`, `fastq_1`, and `fastq_2` as the output.
321
322        Args:
323            index: List[str], used to make the frames new index, defaults to
324            pivot_columns: str or List[str], columns to pivot on and create the new column,
325             defaults to 'read'. This effectively makes the column `<column_prefix><read>`
326            column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`.
327            file_filter_predicate: str, optional, a pandas query string to filter the files table.
328
329        Returns:
330            DataFrame: A wide-format sample sheet with the specified columns pivoted.
331        """
332        if index is None:
333            index = ["sampleIndex", "sample", "lane"]
334        logger.info("Formatting a wide files table")
335        logger.info("File table (long)")
336        logger.info(self.files.head().to_csv(index=False))
337
338        files = self.files
339
340        if file_filter_predicate is not None:
341            # Filter the files table based on the predicate
342            files = files.query(file_filter_predicate)
343
344        # If we don't have access to the column defined, just use the file number
345        # By default this is 'read' but the data might not be paired
346        pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0
347        if not pivot_columns_defined or pivot_columns not in files.columns.values:
348            logger.warning("Pivot column not found, grouping by sample instead.")
349            files['file_num'] = files.groupby('sample').cumcount() + 1
350            pivot_columns = 'file_num'
351
352        if isinstance(pivot_columns, str):
353            pivot_columns = [pivot_columns]
354
355        assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table"
356        assert 'file' in files.columns.values, "Column 'file' must be present in the file table"
357        assert isinstance(index, list), f"index must be a list (not {type(index)})"
358
359        # Get the list of columns from the inputs
360        input_columns = files.columns.values
361
362        # Format as a wide dataset
363        # Note that all the columns in `index` will be added if they are not already present
364        wide_df = files.reindex(
365            columns=index + pivot_columns + ['file']
366        )
367        wide_df = wide_df.pivot(
368            index=index,
369            columns=pivot_columns,
370            values='file'
371        )
372        # Rename the columns to have a prefix, e.g. 'fastq_'
373        wide_df = wide_df.rename(
374            columns=lambda i: f"{column_prefix}{int(i)}"
375        )
376        wide_df = wide_df.reset_index()
377
378        # Remove any columns from the output which were added from `index`
379        for cname in index:
380            if cname not in input_columns:
381                wide_df = wide_df.drop(columns=[cname])
382        # Remove any extra unnecessary columns
383        wide_df = wide_df.drop(columns=pivot_columns, errors='ignore')
384        return wide_df

Format the files table into a wide format with each sample on a row and each file in a column. The column indexes are created by default from the read column, but can be customized. This is useful for paired-end sequencing data where you want to have the columns sample, fastq_1, and fastq_2 as the output.

Arguments:
  • index: List[str], used to make the frames new index, defaults to
  • pivot_columns: str or List[str], columns to pivot on and create the new column, defaults to 'read'. This effectively makes the column <column_prefix><read>
  • column_prefix: str, optional, prefix for the new columns, defaults to fastq_.
  • file_filter_predicate: str, optional, a pandas query string to filter the files table.
Returns:

DataFrame: A wide-format sample sheet with the specified columns pivoted.

def wide_samplesheet( self, index=None, columns='read', values='file', column_prefix='fastq_'):
386    def wide_samplesheet(
387            self,
388            index=None,
389            columns='read',
390            values="file",  # noqa
391            column_prefix="fastq_"
392    ):
393        """
394        Format the samplesheet into a wide format with each sample on a row
395
396        This is a legacy method, please use `pivot_samplesheet` instead.
397        """
398        warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.",
399                      DeprecationWarning, stacklevel=2)
400        if values != "file":
401            raise ValueError("The only supported value for `values` is 'file'")
402        return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)

Format the samplesheet into a wide format with each sample on a row

This is a legacy method, please use pivot_samplesheet instead.