cirro.helpers.preprocess_dataset
1import json 2import logging 3import os 4import warnings 5from io import StringIO 6from pathlib import Path 7from typing import TYPE_CHECKING, Union, Optional 8 9import boto3 10from botocore import UNSIGNED 11from botocore.config import Config 12from botocore.exceptions import NoCredentialsError 13 14if TYPE_CHECKING: 15 from pandas import DataFrame 16 17from cirro.models.s3_path import S3Path 18 19logger = logging.getLogger(__name__) 20 21 22def _fix_s3_path(path: str) -> str: 23 """ 24 Fix the S3 path to ensure it starts with 's3://'. 25 """ 26 normalized_path = path.replace(os.sep, '/').strip() 27 if normalized_path.startswith("s3:/") and not normalized_path.startswith("s3://"): 28 return normalized_path.replace("s3:/", "s3://", 1) 29 return path 30 31 32def write_json(dat, local_path: str, indent=4): 33 """Write a JSON object to a local file.""" 34 with Path(local_path).open(mode="wt") as handle: 35 return json.dump(dat, handle, indent=indent) 36 37 38def read_csv(path: str, required_columns=None) -> 'DataFrame': 39 """Read a CSV from the dataset and check for any required columns.""" 40 if required_columns is None: 41 required_columns = [] 42 path = _fix_s3_path(path) 43 import pandas as pd 44 df = pd.read_csv(path) 45 for col in required_columns: 46 assert col in df.columns.values, f"Did not find expected columns {col} in {path}" 47 return df 48 49 50def read_json(path: str): 51 """Read a JSON object from a local file or S3 path.""" 52 path = _fix_s3_path(path) 53 s3_path = S3Path(path) 54 55 if s3_path.valid: 56 try: 57 s3 = boto3.client('s3') 58 retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key) 59 except NoCredentialsError: 60 s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) 61 retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key) 62 text = retr['Body'].read().decode() 63 else: 64 with Path(path).open() as handle: 65 text = handle.read() 66 67 # Parse JSON 68 return json.loads(text) 69 70 71class PreprocessDataset: 72 """ 73 Helper functions for performing preparatory tasks immediately before launching 74 the analysis workflow for a dataset. 75 76 For use in the `preprocess.py` script. 77 More info: https://docs.cirro.bio/pipelines/preprocess-script/ 78 """ 79 samplesheet: 'DataFrame' 80 """ 81 A pandas DataFrame containing all of the metadata assigned to the samples present 82 in the input datasets (at the time of analysis). 83 84 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet 85 """ 86 files: 'DataFrame' 87 """ 88 A DataFrame containing information on the files contained in the input datasets, 89 and the sample that each file is assigned to. 90 91 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles 92 """ 93 params: dict 94 """ 95 A dictionary with all of the parameter values populated by user input 96 using the process-form.json and process-input.json configurations. 97 98 This is read-only, use `add_param` to add new parameters or `remove_param` to remove them. 99 100 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams 101 """ 102 metadata: dict 103 """ 104 Detailed information about the dataset at the time of analysis, 105 including the project, process, and input datasets. 106 107 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata 108 """ 109 dataset_root: str 110 """ 111 Base path to the dataset 112 """ 113 114 _PARAMS_FILE = "params.json" 115 _REFERENCES_BASE = "s3://pubweb-references" 116 117 def __init__(self, 118 samplesheet: Union['DataFrame', str, Path], 119 files: Union['DataFrame', str, Path], 120 params: dict = None, 121 metadata: dict = None, 122 dataset_root: str = None): 123 import pandas as pd 124 # Convert DataFrame to string if necessary 125 if isinstance(samplesheet, str): 126 samplesheet = pd.read_csv(StringIO(samplesheet)) 127 if isinstance(samplesheet, Path): 128 samplesheet = read_csv(str(samplesheet)) 129 if isinstance(files, str): 130 files = pd.read_csv(StringIO(files)) 131 if isinstance(files, Path): 132 files = read_csv(str(files)) 133 if params is None: 134 params = {} 135 if metadata is None: 136 metadata = {} 137 138 self.samplesheet = samplesheet 139 self.files = files 140 self.params = params 141 self.metadata = metadata 142 self.dataset_root = dataset_root 143 self.logger = logger 144 145 @classmethod 146 def from_path(cls, dataset_root: str, config_directory='config'): 147 """ 148 Creates an instance from a path 149 (useful for testing or when running the script outside Cirro) 150 """ 151 config_directory = Path(dataset_root, config_directory) 152 153 files = read_csv( 154 str(Path(config_directory, "files.csv")), 155 required_columns=["sample", "file"] 156 ) 157 158 samplesheet = read_csv( 159 str(Path(config_directory, "samplesheet.csv")), 160 required_columns=["sample"] 161 ) 162 163 params = read_json( 164 str(Path(config_directory, "params.json")), 165 ) 166 167 metadata = read_json( 168 str(Path(config_directory, "metadata.json")), 169 ) 170 171 return cls(files=files, 172 samplesheet=samplesheet, 173 params=params, 174 metadata=metadata, 175 dataset_root=dataset_root) 176 177 @classmethod 178 def from_running(cls): 179 """ 180 Creates an instance from the currently running dataset 181 (expected to be called from inside a Cirro analysis process) 182 """ 183 logging.basicConfig(level=logging.INFO, 184 format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s') 185 dataset_path = os.getenv("PW_S3_DATASET") 186 return cls.from_path(dataset_path) 187 188 @property 189 def references_base(self): 190 """ 191 Returns the base URL for references. 192 This is used to access public references in the Cirro system. 193 """ 194 return self._REFERENCES_BASE 195 196 def log(self): 197 """Print logging messages about the dataset.""" 198 logger.info(f"Storage location for dataset: {self.dataset_root}") 199 logger.info(f"Number of files in dataset: {self.files.shape[0]:,}") 200 logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}") 201 202 def add_param(self, name: str, value, overwrite=False, log=True): 203 """Add a parameter to the dataset.""" 204 205 assert overwrite or name not in self.params, \ 206 f"Cannot add parameter {name}, already exists (and overwrite=False)" 207 208 if log: 209 logger.info(f"Adding parameter {name} = {value}") 210 self.params[name] = value 211 212 if log: 213 logger.info("Saving parameters") 214 write_json(self.params, self._PARAMS_FILE) 215 216 def remove_param(self, name: str, force=False): 217 """Remove a parameter from the dataset.""" 218 219 assert force or name in self.params, \ 220 f"Cannot remove parameter {name}, does not exist (and force=False)" 221 222 logger.info(f"Removing parameter {name}") 223 if name in self.params: 224 del self.params[name] 225 226 logger.info("Saving parameters") 227 write_json(self.params, self._PARAMS_FILE) 228 229 def keep_params(self, params_to_keep: list[str]): 230 """Keep only the specified parameters in the dataset.""" 231 logger.info(f"Keeping parameters: {params_to_keep}") 232 self.params = { 233 k: v for k, v in self.params.items() 234 if k in params_to_keep 235 } 236 write_json(self.params, self._PARAMS_FILE) 237 238 def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"): 239 """Replace all instances of a text string in the compute config file.""" 240 241 assert os.path.exists(fp), f"File does not exist: {fp}" 242 with open(fp, 'r') as handle: 243 compute = handle.read() 244 n = len(compute.split(from_str)) - 1 245 logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}") 246 compute = compute.replace(from_str, to_str) 247 with open(fp, 'wt') as handle: 248 handle.write(compute) 249 250 def pivot_samplesheet( 251 self, 252 index=None, 253 pivot_columns: Union[Optional[str], list[str]] = 'read', 254 metadata_columns: list[str] = None, 255 column_prefix: str = "fastq_", 256 file_filter_predicate: str = None 257 ): 258 """ 259 Combines data from both the samples and files table into a wide format with 260 each sample on a row and each file in a column. 261 The file column indexes are created by default from the `read` column, but can be customized. 262 263 For example, if the `files` table has columns `sample`, `read`, and `file`, 264 and the `samplesheet` has columns `sample`, `status`, and `group`, the output 265 will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`. 266 267 Args: 268 index: List[str], used to make the frames new index, defaults to 269 `["sampleIndex", "sample", "lane"]` 270 pivot_columns: str or List[str], columns to pivot on and create the new column, 271 defaults to 'read'. This effectively makes the column `<column_prefix><read>'. 272 If the column is not defined or not present, the pivot column will be generated 273 from the file number index. 274 metadata_columns: List[str], metadata columns to include in the output, 275 defaults to all columns that are available from the sample metadata. 276 If your pipeline doesn't like extra columns, make sure to specify the allowed columns here. 277 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 278 file_filter_predicate: str, optional, a pandas query string to filter the files table. 279 A common use case would be to filter out indexed reads, e.g. `readType == "R"`. 280 281 Returns: 282 DataFrame: A wide-format sample sheet with the specified columns pivoted. 283 """ 284 import pandas as pd 285 286 pivoted_files = self.pivot_files(index=index, 287 pivot_columns=pivot_columns, 288 column_prefix=column_prefix, 289 file_filter_predicate=file_filter_predicate) 290 combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many") 291 292 # Default to keeping all columns 293 if metadata_columns is None: 294 metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist() 295 296 # Keep only the specified metadata columns 297 all_columns = combined.axes[1] 298 for column in all_columns: 299 if (column not in metadata_columns 300 # These columns are required, never drop them 301 and column_prefix not in column 302 and 'sample' != column): 303 combined = combined.drop(columns=[column]) 304 305 return combined 306 307 def pivot_files( 308 self, 309 index: list[str] = None, 310 pivot_columns: Union[str, list[str]] = 'read', 311 column_prefix: str = "fastq_", 312 file_filter_predicate: str = None 313 ): 314 """ 315 Format the files table into a wide format with each sample on a row 316 and each file in a column. The column indexes are created by default 317 from the `read` column, but can be customized. This is useful for 318 paired-end sequencing data where you want to have the columns 319 `sample`, `fastq_1`, and `fastq_2` as the output. 320 321 Args: 322 index: List[str], used to make the frames new index, defaults to 323 pivot_columns: str or List[str], columns to pivot on and create the new column, 324 defaults to 'read'. This effectively makes the column `<column_prefix><read>` 325 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 326 file_filter_predicate: str, optional, a pandas query string to filter the files table. 327 328 Returns: 329 DataFrame: A wide-format sample sheet with the specified columns pivoted. 330 """ 331 if index is None: 332 index = ["sampleIndex", "sample", "lane"] 333 logger.info("Formatting a wide files table") 334 logger.info("File table (long)") 335 logger.info(self.files.head().to_csv(index=False)) 336 337 files = self.files 338 339 if file_filter_predicate is not None: 340 # Filter the files table based on the predicate 341 files = files.query(file_filter_predicate) 342 343 # If we don't have access to the column defined, just use the file number 344 # By default this is 'read' but the data might not be paired 345 pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0 346 if not pivot_columns_defined or pivot_columns not in files.columns.values: 347 logger.warning("Pivot column not found, grouping by sample instead.") 348 files['file_num'] = files.groupby('sample').cumcount() + 1 349 pivot_columns = 'file_num' 350 351 if isinstance(pivot_columns, str): 352 pivot_columns = [pivot_columns] 353 354 assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table" 355 assert 'file' in files.columns.values, "Column 'file' must be present in the file table" 356 assert isinstance(index, list), f"index must be a list (not {type(index)})" 357 358 # Get the list of columns from the inputs 359 input_columns = files.columns.values 360 361 # Format as a wide dataset 362 # Note that all the columns in `index` will be added if they are not already present 363 wide_df = files.reindex( 364 columns=index + pivot_columns + ['file'] 365 ) 366 wide_df = wide_df.pivot( 367 index=index, 368 columns=pivot_columns, 369 values='file' 370 ) 371 # Rename the columns to have a prefix, e.g. 'fastq_' 372 wide_df = wide_df.rename( 373 columns=lambda i: f"{column_prefix}{int(i)}" 374 ) 375 wide_df = wide_df.reset_index() 376 377 # Remove any columns from the output which were added from `index` 378 for cname in index: 379 if cname not in input_columns: 380 wide_df = wide_df.drop(columns=[cname]) 381 # Remove any extra unnecessary columns 382 wide_df = wide_df.drop(columns=pivot_columns, errors='ignore') 383 return wide_df 384 385 def wide_samplesheet( 386 self, 387 index=None, 388 columns='read', 389 values="file", # noqa 390 column_prefix="fastq_" 391 ): 392 """ 393 Format the samplesheet into a wide format with each sample on a row 394 395 This is a legacy method, please use `pivot_samplesheet` instead. 396 """ 397 warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.", 398 DeprecationWarning, stacklevel=2) 399 if values != "file": 400 raise ValueError("The only supported value for `values` is 'file'") 401 return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)
33def write_json(dat, local_path: str, indent=4): 34 """Write a JSON object to a local file.""" 35 with Path(local_path).open(mode="wt") as handle: 36 return json.dump(dat, handle, indent=indent)
Write a JSON object to a local file.
39def read_csv(path: str, required_columns=None) -> 'DataFrame': 40 """Read a CSV from the dataset and check for any required columns.""" 41 if required_columns is None: 42 required_columns = [] 43 path = _fix_s3_path(path) 44 import pandas as pd 45 df = pd.read_csv(path) 46 for col in required_columns: 47 assert col in df.columns.values, f"Did not find expected columns {col} in {path}" 48 return df
Read a CSV from the dataset and check for any required columns.
51def read_json(path: str): 52 """Read a JSON object from a local file or S3 path.""" 53 path = _fix_s3_path(path) 54 s3_path = S3Path(path) 55 56 if s3_path.valid: 57 try: 58 s3 = boto3.client('s3') 59 retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key) 60 except NoCredentialsError: 61 s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) 62 retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key) 63 text = retr['Body'].read().decode() 64 else: 65 with Path(path).open() as handle: 66 text = handle.read() 67 68 # Parse JSON 69 return json.loads(text)
Read a JSON object from a local file or S3 path.
72class PreprocessDataset: 73 """ 74 Helper functions for performing preparatory tasks immediately before launching 75 the analysis workflow for a dataset. 76 77 For use in the `preprocess.py` script. 78 More info: https://docs.cirro.bio/pipelines/preprocess-script/ 79 """ 80 samplesheet: 'DataFrame' 81 """ 82 A pandas DataFrame containing all of the metadata assigned to the samples present 83 in the input datasets (at the time of analysis). 84 85 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet 86 """ 87 files: 'DataFrame' 88 """ 89 A DataFrame containing information on the files contained in the input datasets, 90 and the sample that each file is assigned to. 91 92 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles 93 """ 94 params: dict 95 """ 96 A dictionary with all of the parameter values populated by user input 97 using the process-form.json and process-input.json configurations. 98 99 This is read-only, use `add_param` to add new parameters or `remove_param` to remove them. 100 101 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams 102 """ 103 metadata: dict 104 """ 105 Detailed information about the dataset at the time of analysis, 106 including the project, process, and input datasets. 107 108 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata 109 """ 110 dataset_root: str 111 """ 112 Base path to the dataset 113 """ 114 115 _PARAMS_FILE = "params.json" 116 _REFERENCES_BASE = "s3://pubweb-references" 117 118 def __init__(self, 119 samplesheet: Union['DataFrame', str, Path], 120 files: Union['DataFrame', str, Path], 121 params: dict = None, 122 metadata: dict = None, 123 dataset_root: str = None): 124 import pandas as pd 125 # Convert DataFrame to string if necessary 126 if isinstance(samplesheet, str): 127 samplesheet = pd.read_csv(StringIO(samplesheet)) 128 if isinstance(samplesheet, Path): 129 samplesheet = read_csv(str(samplesheet)) 130 if isinstance(files, str): 131 files = pd.read_csv(StringIO(files)) 132 if isinstance(files, Path): 133 files = read_csv(str(files)) 134 if params is None: 135 params = {} 136 if metadata is None: 137 metadata = {} 138 139 self.samplesheet = samplesheet 140 self.files = files 141 self.params = params 142 self.metadata = metadata 143 self.dataset_root = dataset_root 144 self.logger = logger 145 146 @classmethod 147 def from_path(cls, dataset_root: str, config_directory='config'): 148 """ 149 Creates an instance from a path 150 (useful for testing or when running the script outside Cirro) 151 """ 152 config_directory = Path(dataset_root, config_directory) 153 154 files = read_csv( 155 str(Path(config_directory, "files.csv")), 156 required_columns=["sample", "file"] 157 ) 158 159 samplesheet = read_csv( 160 str(Path(config_directory, "samplesheet.csv")), 161 required_columns=["sample"] 162 ) 163 164 params = read_json( 165 str(Path(config_directory, "params.json")), 166 ) 167 168 metadata = read_json( 169 str(Path(config_directory, "metadata.json")), 170 ) 171 172 return cls(files=files, 173 samplesheet=samplesheet, 174 params=params, 175 metadata=metadata, 176 dataset_root=dataset_root) 177 178 @classmethod 179 def from_running(cls): 180 """ 181 Creates an instance from the currently running dataset 182 (expected to be called from inside a Cirro analysis process) 183 """ 184 logging.basicConfig(level=logging.INFO, 185 format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s') 186 dataset_path = os.getenv("PW_S3_DATASET") 187 return cls.from_path(dataset_path) 188 189 @property 190 def references_base(self): 191 """ 192 Returns the base URL for references. 193 This is used to access public references in the Cirro system. 194 """ 195 return self._REFERENCES_BASE 196 197 def log(self): 198 """Print logging messages about the dataset.""" 199 logger.info(f"Storage location for dataset: {self.dataset_root}") 200 logger.info(f"Number of files in dataset: {self.files.shape[0]:,}") 201 logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}") 202 203 def add_param(self, name: str, value, overwrite=False, log=True): 204 """Add a parameter to the dataset.""" 205 206 assert overwrite or name not in self.params, \ 207 f"Cannot add parameter {name}, already exists (and overwrite=False)" 208 209 if log: 210 logger.info(f"Adding parameter {name} = {value}") 211 self.params[name] = value 212 213 if log: 214 logger.info("Saving parameters") 215 write_json(self.params, self._PARAMS_FILE) 216 217 def remove_param(self, name: str, force=False): 218 """Remove a parameter from the dataset.""" 219 220 assert force or name in self.params, \ 221 f"Cannot remove parameter {name}, does not exist (and force=False)" 222 223 logger.info(f"Removing parameter {name}") 224 if name in self.params: 225 del self.params[name] 226 227 logger.info("Saving parameters") 228 write_json(self.params, self._PARAMS_FILE) 229 230 def keep_params(self, params_to_keep: list[str]): 231 """Keep only the specified parameters in the dataset.""" 232 logger.info(f"Keeping parameters: {params_to_keep}") 233 self.params = { 234 k: v for k, v in self.params.items() 235 if k in params_to_keep 236 } 237 write_json(self.params, self._PARAMS_FILE) 238 239 def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"): 240 """Replace all instances of a text string in the compute config file.""" 241 242 assert os.path.exists(fp), f"File does not exist: {fp}" 243 with open(fp, 'r') as handle: 244 compute = handle.read() 245 n = len(compute.split(from_str)) - 1 246 logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}") 247 compute = compute.replace(from_str, to_str) 248 with open(fp, 'wt') as handle: 249 handle.write(compute) 250 251 def pivot_samplesheet( 252 self, 253 index=None, 254 pivot_columns: Union[Optional[str], list[str]] = 'read', 255 metadata_columns: list[str] = None, 256 column_prefix: str = "fastq_", 257 file_filter_predicate: str = None 258 ): 259 """ 260 Combines data from both the samples and files table into a wide format with 261 each sample on a row and each file in a column. 262 The file column indexes are created by default from the `read` column, but can be customized. 263 264 For example, if the `files` table has columns `sample`, `read`, and `file`, 265 and the `samplesheet` has columns `sample`, `status`, and `group`, the output 266 will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`. 267 268 Args: 269 index: List[str], used to make the frames new index, defaults to 270 `["sampleIndex", "sample", "lane"]` 271 pivot_columns: str or List[str], columns to pivot on and create the new column, 272 defaults to 'read'. This effectively makes the column `<column_prefix><read>'. 273 If the column is not defined or not present, the pivot column will be generated 274 from the file number index. 275 metadata_columns: List[str], metadata columns to include in the output, 276 defaults to all columns that are available from the sample metadata. 277 If your pipeline doesn't like extra columns, make sure to specify the allowed columns here. 278 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 279 file_filter_predicate: str, optional, a pandas query string to filter the files table. 280 A common use case would be to filter out indexed reads, e.g. `readType == "R"`. 281 282 Returns: 283 DataFrame: A wide-format sample sheet with the specified columns pivoted. 284 """ 285 import pandas as pd 286 287 pivoted_files = self.pivot_files(index=index, 288 pivot_columns=pivot_columns, 289 column_prefix=column_prefix, 290 file_filter_predicate=file_filter_predicate) 291 combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many") 292 293 # Default to keeping all columns 294 if metadata_columns is None: 295 metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist() 296 297 # Keep only the specified metadata columns 298 all_columns = combined.axes[1] 299 for column in all_columns: 300 if (column not in metadata_columns 301 # These columns are required, never drop them 302 and column_prefix not in column 303 and 'sample' != column): 304 combined = combined.drop(columns=[column]) 305 306 return combined 307 308 def pivot_files( 309 self, 310 index: list[str] = None, 311 pivot_columns: Union[str, list[str]] = 'read', 312 column_prefix: str = "fastq_", 313 file_filter_predicate: str = None 314 ): 315 """ 316 Format the files table into a wide format with each sample on a row 317 and each file in a column. The column indexes are created by default 318 from the `read` column, but can be customized. This is useful for 319 paired-end sequencing data where you want to have the columns 320 `sample`, `fastq_1`, and `fastq_2` as the output. 321 322 Args: 323 index: List[str], used to make the frames new index, defaults to 324 pivot_columns: str or List[str], columns to pivot on and create the new column, 325 defaults to 'read'. This effectively makes the column `<column_prefix><read>` 326 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 327 file_filter_predicate: str, optional, a pandas query string to filter the files table. 328 329 Returns: 330 DataFrame: A wide-format sample sheet with the specified columns pivoted. 331 """ 332 if index is None: 333 index = ["sampleIndex", "sample", "lane"] 334 logger.info("Formatting a wide files table") 335 logger.info("File table (long)") 336 logger.info(self.files.head().to_csv(index=False)) 337 338 files = self.files 339 340 if file_filter_predicate is not None: 341 # Filter the files table based on the predicate 342 files = files.query(file_filter_predicate) 343 344 # If we don't have access to the column defined, just use the file number 345 # By default this is 'read' but the data might not be paired 346 pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0 347 if not pivot_columns_defined or pivot_columns not in files.columns.values: 348 logger.warning("Pivot column not found, grouping by sample instead.") 349 files['file_num'] = files.groupby('sample').cumcount() + 1 350 pivot_columns = 'file_num' 351 352 if isinstance(pivot_columns, str): 353 pivot_columns = [pivot_columns] 354 355 assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table" 356 assert 'file' in files.columns.values, "Column 'file' must be present in the file table" 357 assert isinstance(index, list), f"index must be a list (not {type(index)})" 358 359 # Get the list of columns from the inputs 360 input_columns = files.columns.values 361 362 # Format as a wide dataset 363 # Note that all the columns in `index` will be added if they are not already present 364 wide_df = files.reindex( 365 columns=index + pivot_columns + ['file'] 366 ) 367 wide_df = wide_df.pivot( 368 index=index, 369 columns=pivot_columns, 370 values='file' 371 ) 372 # Rename the columns to have a prefix, e.g. 'fastq_' 373 wide_df = wide_df.rename( 374 columns=lambda i: f"{column_prefix}{int(i)}" 375 ) 376 wide_df = wide_df.reset_index() 377 378 # Remove any columns from the output which were added from `index` 379 for cname in index: 380 if cname not in input_columns: 381 wide_df = wide_df.drop(columns=[cname]) 382 # Remove any extra unnecessary columns 383 wide_df = wide_df.drop(columns=pivot_columns, errors='ignore') 384 return wide_df 385 386 def wide_samplesheet( 387 self, 388 index=None, 389 columns='read', 390 values="file", # noqa 391 column_prefix="fastq_" 392 ): 393 """ 394 Format the samplesheet into a wide format with each sample on a row 395 396 This is a legacy method, please use `pivot_samplesheet` instead. 397 """ 398 warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.", 399 DeprecationWarning, stacklevel=2) 400 if values != "file": 401 raise ValueError("The only supported value for `values` is 'file'") 402 return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)
Helper functions for performing preparatory tasks immediately before launching the analysis workflow for a dataset.
For use in the preprocess.py
script.
More info: https://docs.cirro.bio/pipelines/preprocess-script/
118 def __init__(self, 119 samplesheet: Union['DataFrame', str, Path], 120 files: Union['DataFrame', str, Path], 121 params: dict = None, 122 metadata: dict = None, 123 dataset_root: str = None): 124 import pandas as pd 125 # Convert DataFrame to string if necessary 126 if isinstance(samplesheet, str): 127 samplesheet = pd.read_csv(StringIO(samplesheet)) 128 if isinstance(samplesheet, Path): 129 samplesheet = read_csv(str(samplesheet)) 130 if isinstance(files, str): 131 files = pd.read_csv(StringIO(files)) 132 if isinstance(files, Path): 133 files = read_csv(str(files)) 134 if params is None: 135 params = {} 136 if metadata is None: 137 metadata = {} 138 139 self.samplesheet = samplesheet 140 self.files = files 141 self.params = params 142 self.metadata = metadata 143 self.dataset_root = dataset_root 144 self.logger = logger
A pandas DataFrame containing all of the metadata assigned to the samples present in the input datasets (at the time of analysis).
More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet
A DataFrame containing information on the files contained in the input datasets, and the sample that each file is assigned to.
More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles
A dictionary with all of the parameter values populated by user input using the process-form.json and process-input.json configurations.
This is read-only, use add_param
to add new parameters or remove_param
to remove them.
More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams
Detailed information about the dataset at the time of analysis, including the project, process, and input datasets.
More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata
146 @classmethod 147 def from_path(cls, dataset_root: str, config_directory='config'): 148 """ 149 Creates an instance from a path 150 (useful for testing or when running the script outside Cirro) 151 """ 152 config_directory = Path(dataset_root, config_directory) 153 154 files = read_csv( 155 str(Path(config_directory, "files.csv")), 156 required_columns=["sample", "file"] 157 ) 158 159 samplesheet = read_csv( 160 str(Path(config_directory, "samplesheet.csv")), 161 required_columns=["sample"] 162 ) 163 164 params = read_json( 165 str(Path(config_directory, "params.json")), 166 ) 167 168 metadata = read_json( 169 str(Path(config_directory, "metadata.json")), 170 ) 171 172 return cls(files=files, 173 samplesheet=samplesheet, 174 params=params, 175 metadata=metadata, 176 dataset_root=dataset_root)
Creates an instance from a path (useful for testing or when running the script outside Cirro)
178 @classmethod 179 def from_running(cls): 180 """ 181 Creates an instance from the currently running dataset 182 (expected to be called from inside a Cirro analysis process) 183 """ 184 logging.basicConfig(level=logging.INFO, 185 format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s') 186 dataset_path = os.getenv("PW_S3_DATASET") 187 return cls.from_path(dataset_path)
Creates an instance from the currently running dataset (expected to be called from inside a Cirro analysis process)
189 @property 190 def references_base(self): 191 """ 192 Returns the base URL for references. 193 This is used to access public references in the Cirro system. 194 """ 195 return self._REFERENCES_BASE
Returns the base URL for references. This is used to access public references in the Cirro system.
197 def log(self): 198 """Print logging messages about the dataset.""" 199 logger.info(f"Storage location for dataset: {self.dataset_root}") 200 logger.info(f"Number of files in dataset: {self.files.shape[0]:,}") 201 logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}")
Print logging messages about the dataset.
203 def add_param(self, name: str, value, overwrite=False, log=True): 204 """Add a parameter to the dataset.""" 205 206 assert overwrite or name not in self.params, \ 207 f"Cannot add parameter {name}, already exists (and overwrite=False)" 208 209 if log: 210 logger.info(f"Adding parameter {name} = {value}") 211 self.params[name] = value 212 213 if log: 214 logger.info("Saving parameters") 215 write_json(self.params, self._PARAMS_FILE)
Add a parameter to the dataset.
217 def remove_param(self, name: str, force=False): 218 """Remove a parameter from the dataset.""" 219 220 assert force or name in self.params, \ 221 f"Cannot remove parameter {name}, does not exist (and force=False)" 222 223 logger.info(f"Removing parameter {name}") 224 if name in self.params: 225 del self.params[name] 226 227 logger.info("Saving parameters") 228 write_json(self.params, self._PARAMS_FILE)
Remove a parameter from the dataset.
230 def keep_params(self, params_to_keep: list[str]): 231 """Keep only the specified parameters in the dataset.""" 232 logger.info(f"Keeping parameters: {params_to_keep}") 233 self.params = { 234 k: v for k, v in self.params.items() 235 if k in params_to_keep 236 } 237 write_json(self.params, self._PARAMS_FILE)
Keep only the specified parameters in the dataset.
239 def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"): 240 """Replace all instances of a text string in the compute config file.""" 241 242 assert os.path.exists(fp), f"File does not exist: {fp}" 243 with open(fp, 'r') as handle: 244 compute = handle.read() 245 n = len(compute.split(from_str)) - 1 246 logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}") 247 compute = compute.replace(from_str, to_str) 248 with open(fp, 'wt') as handle: 249 handle.write(compute)
Replace all instances of a text string in the compute config file.
251 def pivot_samplesheet( 252 self, 253 index=None, 254 pivot_columns: Union[Optional[str], list[str]] = 'read', 255 metadata_columns: list[str] = None, 256 column_prefix: str = "fastq_", 257 file_filter_predicate: str = None 258 ): 259 """ 260 Combines data from both the samples and files table into a wide format with 261 each sample on a row and each file in a column. 262 The file column indexes are created by default from the `read` column, but can be customized. 263 264 For example, if the `files` table has columns `sample`, `read`, and `file`, 265 and the `samplesheet` has columns `sample`, `status`, and `group`, the output 266 will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`. 267 268 Args: 269 index: List[str], used to make the frames new index, defaults to 270 `["sampleIndex", "sample", "lane"]` 271 pivot_columns: str or List[str], columns to pivot on and create the new column, 272 defaults to 'read'. This effectively makes the column `<column_prefix><read>'. 273 If the column is not defined or not present, the pivot column will be generated 274 from the file number index. 275 metadata_columns: List[str], metadata columns to include in the output, 276 defaults to all columns that are available from the sample metadata. 277 If your pipeline doesn't like extra columns, make sure to specify the allowed columns here. 278 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 279 file_filter_predicate: str, optional, a pandas query string to filter the files table. 280 A common use case would be to filter out indexed reads, e.g. `readType == "R"`. 281 282 Returns: 283 DataFrame: A wide-format sample sheet with the specified columns pivoted. 284 """ 285 import pandas as pd 286 287 pivoted_files = self.pivot_files(index=index, 288 pivot_columns=pivot_columns, 289 column_prefix=column_prefix, 290 file_filter_predicate=file_filter_predicate) 291 combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many") 292 293 # Default to keeping all columns 294 if metadata_columns is None: 295 metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist() 296 297 # Keep only the specified metadata columns 298 all_columns = combined.axes[1] 299 for column in all_columns: 300 if (column not in metadata_columns 301 # These columns are required, never drop them 302 and column_prefix not in column 303 and 'sample' != column): 304 combined = combined.drop(columns=[column]) 305 306 return combined
Combines data from both the samples and files table into a wide format with
each sample on a row and each file in a column.
The file column indexes are created by default from the read
column, but can be customized.
For example, if the files
table has columns sample
, read
, and file
,
and the samplesheet
has columns sample
, status
, and group
, the output
will have columns sample
, fastq_1
, fastq_2
, status
, and group
.
Arguments:
- index: List[str], used to make the frames new index, defaults to
["sampleIndex", "sample", "lane"]
- pivot_columns: str or List[str], columns to pivot on and create the new column,
defaults to 'read'. This effectively makes the column `
'. If the column is not defined or not present, the pivot column will be generated from the file number index. - metadata_columns: List[str], metadata columns to include in the output, defaults to all columns that are available from the sample metadata. If your pipeline doesn't like extra columns, make sure to specify the allowed columns here.
- column_prefix: str, optional, prefix for the new columns, defaults to
fastq_
. - file_filter_predicate: str, optional, a pandas query string to filter the files table.
A common use case would be to filter out indexed reads, e.g.
readType == "R"
.
Returns:
DataFrame: A wide-format sample sheet with the specified columns pivoted.
308 def pivot_files( 309 self, 310 index: list[str] = None, 311 pivot_columns: Union[str, list[str]] = 'read', 312 column_prefix: str = "fastq_", 313 file_filter_predicate: str = None 314 ): 315 """ 316 Format the files table into a wide format with each sample on a row 317 and each file in a column. The column indexes are created by default 318 from the `read` column, but can be customized. This is useful for 319 paired-end sequencing data where you want to have the columns 320 `sample`, `fastq_1`, and `fastq_2` as the output. 321 322 Args: 323 index: List[str], used to make the frames new index, defaults to 324 pivot_columns: str or List[str], columns to pivot on and create the new column, 325 defaults to 'read'. This effectively makes the column `<column_prefix><read>` 326 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 327 file_filter_predicate: str, optional, a pandas query string to filter the files table. 328 329 Returns: 330 DataFrame: A wide-format sample sheet with the specified columns pivoted. 331 """ 332 if index is None: 333 index = ["sampleIndex", "sample", "lane"] 334 logger.info("Formatting a wide files table") 335 logger.info("File table (long)") 336 logger.info(self.files.head().to_csv(index=False)) 337 338 files = self.files 339 340 if file_filter_predicate is not None: 341 # Filter the files table based on the predicate 342 files = files.query(file_filter_predicate) 343 344 # If we don't have access to the column defined, just use the file number 345 # By default this is 'read' but the data might not be paired 346 pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0 347 if not pivot_columns_defined or pivot_columns not in files.columns.values: 348 logger.warning("Pivot column not found, grouping by sample instead.") 349 files['file_num'] = files.groupby('sample').cumcount() + 1 350 pivot_columns = 'file_num' 351 352 if isinstance(pivot_columns, str): 353 pivot_columns = [pivot_columns] 354 355 assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table" 356 assert 'file' in files.columns.values, "Column 'file' must be present in the file table" 357 assert isinstance(index, list), f"index must be a list (not {type(index)})" 358 359 # Get the list of columns from the inputs 360 input_columns = files.columns.values 361 362 # Format as a wide dataset 363 # Note that all the columns in `index` will be added if they are not already present 364 wide_df = files.reindex( 365 columns=index + pivot_columns + ['file'] 366 ) 367 wide_df = wide_df.pivot( 368 index=index, 369 columns=pivot_columns, 370 values='file' 371 ) 372 # Rename the columns to have a prefix, e.g. 'fastq_' 373 wide_df = wide_df.rename( 374 columns=lambda i: f"{column_prefix}{int(i)}" 375 ) 376 wide_df = wide_df.reset_index() 377 378 # Remove any columns from the output which were added from `index` 379 for cname in index: 380 if cname not in input_columns: 381 wide_df = wide_df.drop(columns=[cname]) 382 # Remove any extra unnecessary columns 383 wide_df = wide_df.drop(columns=pivot_columns, errors='ignore') 384 return wide_df
Format the files table into a wide format with each sample on a row
and each file in a column. The column indexes are created by default
from the read
column, but can be customized. This is useful for
paired-end sequencing data where you want to have the columns
sample
, fastq_1
, and fastq_2
as the output.
Arguments:
- index: List[str], used to make the frames new index, defaults to
- pivot_columns: str or List[str], columns to pivot on and create the new column,
defaults to 'read'. This effectively makes the column
<column_prefix><read>
- column_prefix: str, optional, prefix for the new columns, defaults to
fastq_
. - file_filter_predicate: str, optional, a pandas query string to filter the files table.
Returns:
DataFrame: A wide-format sample sheet with the specified columns pivoted.
386 def wide_samplesheet( 387 self, 388 index=None, 389 columns='read', 390 values="file", # noqa 391 column_prefix="fastq_" 392 ): 393 """ 394 Format the samplesheet into a wide format with each sample on a row 395 396 This is a legacy method, please use `pivot_samplesheet` instead. 397 """ 398 warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.", 399 DeprecationWarning, stacklevel=2) 400 if values != "file": 401 raise ValueError("The only supported value for `values` is 'file'") 402 return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)
Format the samplesheet into a wide format with each sample on a row
This is a legacy method, please use pivot_samplesheet
instead.