cirro.helpers.preprocess_dataset
1import json 2import logging 3import os 4import warnings 5from io import StringIO 6from pathlib import Path 7from typing import TYPE_CHECKING, Union, Optional 8 9import boto3 10 11if TYPE_CHECKING: 12 from pandas import DataFrame 13 14from cirro.models.s3_path import S3Path 15 16logger = logging.getLogger(__name__) 17 18 19def _fix_s3_path(path: str) -> str: 20 """ 21 Fix the S3 path to ensure it starts with 's3://'. 22 """ 23 normalized_path = path.replace(os.sep, '/').strip() 24 if normalized_path.startswith("s3:/") and not normalized_path.startswith("s3://"): 25 return normalized_path.replace("s3:/", "s3://", 1) 26 return path 27 28 29def write_json(dat, local_path: str, indent=4): 30 """Write a JSON object to a local file.""" 31 with Path(local_path).open(mode="wt") as handle: 32 return json.dump(dat, handle, indent=indent) 33 34 35def read_csv(path: str, required_columns=None) -> 'DataFrame': 36 """Read a CSV from the dataset and check for any required columns.""" 37 if required_columns is None: 38 required_columns = [] 39 path = _fix_s3_path(path) 40 import pandas as pd 41 df = pd.read_csv(path) 42 for col in required_columns: 43 assert col in df.columns.values, f"Did not find expected columns {col} in {path}" 44 return df 45 46 47def read_json(path: str): 48 """Read a JSON object from a local file or S3 path.""" 49 path = _fix_s3_path(path) 50 s3_path = S3Path(path) 51 52 if s3_path.valid: 53 s3 = boto3.client('s3') 54 retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key) 55 text = retr['Body'].read().decode() 56 else: 57 with Path(path).open() as handle: 58 text = handle.read() 59 60 # Parse JSON 61 return json.loads(text) 62 63 64class PreprocessDataset: 65 """ 66 Helper functions for performing preparatory tasks immediately before launching 67 the analysis workflow for a dataset. 68 69 For use in the `preprocess.py` script. 70 More info: https://docs.cirro.bio/pipelines/preprocess-script/ 71 """ 72 samplesheet: 'DataFrame' 73 """ 74 A pandas DataFrame containing all of the metadata assigned to the samples present 75 in the input datasets (at the time of analysis). 76 77 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet 78 """ 79 files: 'DataFrame' 80 """ 81 A DataFrame containing information on the files contained in the input datasets, 82 and the sample that each file is assigned to. 83 84 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles 85 """ 86 params: dict 87 """ 88 A dictionary with all of the parameter values populated by user input 89 using the process-form.json and process-input.json configurations. 90 91 This is read-only, use `add_param` to add new parameters or `remove_param` to remove them. 92 93 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams 94 """ 95 metadata: dict 96 """ 97 Detailed information about the dataset at the time of analysis, 98 including the project, process, and input datasets. 99 100 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata 101 """ 102 dataset_root: str 103 """ 104 Base path to the dataset 105 """ 106 107 _PARAMS_FILE = "params.json" 108 _REFERENCES_BASE = "s3://pubweb-references" 109 110 def __init__(self, 111 samplesheet: Union['DataFrame', str, Path], 112 files: Union['DataFrame', str, Path], 113 params: dict = None, 114 metadata: dict = None, 115 dataset_root: str = None): 116 import pandas as pd 117 # Convert DataFrame to string if necessary 118 if isinstance(samplesheet, str): 119 samplesheet = pd.read_csv(StringIO(samplesheet)) 120 if isinstance(samplesheet, Path): 121 samplesheet = read_csv(str(samplesheet)) 122 if isinstance(files, str): 123 files = pd.read_csv(StringIO(files)) 124 if isinstance(files, Path): 125 files = read_csv(str(files)) 126 if params is None: 127 params = {} 128 if metadata is None: 129 metadata = {} 130 131 self.samplesheet = samplesheet 132 self.files = files 133 self.params = params 134 self.metadata = metadata 135 self.dataset_root = dataset_root 136 self.logger = logger 137 138 @classmethod 139 def from_path(cls, dataset_root: str, config_directory='config'): 140 """ 141 Creates an instance from a path 142 (useful for testing or when running the script outside Cirro) 143 """ 144 config_directory = Path(dataset_root, config_directory) 145 146 files = read_csv( 147 str(Path(config_directory, "files.csv")), 148 required_columns=["sample", "file"] 149 ) 150 151 samplesheet = read_csv( 152 str(Path(config_directory, "samplesheet.csv")), 153 required_columns=["sample"] 154 ) 155 156 params = read_json( 157 str(Path(config_directory, "params.json")), 158 ) 159 160 metadata = read_json( 161 str(Path(config_directory, "metadata.json")), 162 ) 163 164 return cls(files=files, 165 samplesheet=samplesheet, 166 params=params, 167 metadata=metadata, 168 dataset_root=dataset_root) 169 170 @classmethod 171 def from_running(cls): 172 """ 173 Creates an instance from the currently running dataset 174 (expected to be called from inside a Cirro analysis process) 175 """ 176 logging.basicConfig(level=logging.INFO, 177 format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s') 178 dataset_path = os.getenv("PW_S3_DATASET") 179 return cls.from_path(dataset_path) 180 181 @property 182 def references_base(self): 183 """ 184 Returns the base URL for references. 185 This is used to access public references in the Cirro system. 186 """ 187 return self._REFERENCES_BASE 188 189 def log(self): 190 """Print logging messages about the dataset.""" 191 logger.info(f"Storage location for dataset: {self.dataset_root}") 192 logger.info(f"Number of files in dataset: {self.files.shape[0]:,}") 193 logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}") 194 195 def add_param(self, name: str, value, overwrite=False, log=True): 196 """Add a parameter to the dataset.""" 197 198 assert overwrite or name not in self.params, \ 199 f"Cannot add parameter {name}, already exists (and overwrite=False)" 200 201 if log: 202 logger.info(f"Adding parameter {name} = {value}") 203 self.params[name] = value 204 205 if log: 206 logger.info("Saving parameters") 207 write_json(self.params, self._PARAMS_FILE) 208 209 def remove_param(self, name: str, force=False): 210 """Remove a parameter from the dataset.""" 211 212 assert force or name in self.params, \ 213 f"Cannot remove parameter {name}, does not exist (and force=False)" 214 215 logger.info(f"Removing parameter {name}") 216 if name in self.params: 217 del self.params[name] 218 219 logger.info("Saving parameters") 220 write_json(self.params, self._PARAMS_FILE) 221 222 def keep_params(self, params_to_keep: list[str]): 223 """Keep only the specified parameters in the dataset.""" 224 logger.info(f"Keeping parameters: {params_to_keep}") 225 self.params = { 226 k: v for k, v in self.params.items() 227 if k in params_to_keep 228 } 229 write_json(self.params, self._PARAMS_FILE) 230 231 def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"): 232 """Replace all instances of a text string in the compute config file.""" 233 234 assert os.path.exists(fp), f"File does not exist: {fp}" 235 with open(fp, 'r') as handle: 236 compute = handle.read() 237 n = len(compute.split(from_str)) - 1 238 logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}") 239 compute = compute.replace(from_str, to_str) 240 with open(fp, 'wt') as handle: 241 handle.write(compute) 242 243 def pivot_samplesheet( 244 self, 245 index=None, 246 pivot_columns: Union[Optional[str], list[str]] = 'read', 247 metadata_columns: list[str] = None, 248 column_prefix: str = "fastq_", 249 file_filter_predicate: str = None 250 ): 251 """ 252 Combines data from both the samples and files table into a wide format with 253 each sample on a row and each file in a column. 254 The file column indexes are created by default from the `read` column, but can be customized. 255 256 For example, if the `files` table has columns `sample`, `read`, and `file`, 257 and the `samplesheet` has columns `sample`, `status`, and `group`, the output 258 will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`. 259 260 Args: 261 index: List[str], used to make the frames new index, defaults to 262 `["sampleIndex", "sample", "lane"]` 263 pivot_columns: str or List[str], columns to pivot on and create the new column, 264 defaults to 'read'. This effectively makes the column `<column_prefix><read>'. 265 If the column is not defined or not present, the pivot column will be generated 266 from the file number index. 267 metadata_columns: List[str], metadata columns to include in the output, 268 defaults to all columns that are available from the sample metadata. 269 If your pipeline doesn't like extra columns, make sure to specify the allowed columns here. 270 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 271 file_filter_predicate: str, optional, a pandas query string to filter the files table. 272 A common use case would be to filter out indexed reads, e.g. `readType == "R"`. 273 274 Returns: 275 DataFrame: A wide-format sample sheet with the specified columns pivoted. 276 """ 277 import pandas as pd 278 279 pivoted_files = self.pivot_files(index=index, 280 pivot_columns=pivot_columns, 281 column_prefix=column_prefix, 282 file_filter_predicate=file_filter_predicate) 283 combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many") 284 285 # Default to keeping all columns 286 if metadata_columns is None: 287 metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist() 288 289 # Keep only the specified metadata columns 290 all_columns = combined.axes[1] 291 for column in all_columns: 292 if (column not in metadata_columns 293 # These columns are required, never drop them 294 and column_prefix not in column 295 and 'sample' != column): 296 combined = combined.drop(columns=[column]) 297 298 return combined 299 300 def pivot_files( 301 self, 302 index: list[str] = None, 303 pivot_columns: Union[str, list[str]] = 'read', 304 column_prefix: str = "fastq_", 305 file_filter_predicate: str = None 306 ): 307 """ 308 Format the files table into a wide format with each sample on a row 309 and each file in a column. The column indexes are created by default 310 from the `read` column, but can be customized. This is useful for 311 paired-end sequencing data where you want to have the columns 312 `sample`, `fastq_1`, and `fastq_2` as the output. 313 314 Args: 315 index: List[str], used to make the frames new index, defaults to 316 pivot_columns: str or List[str], columns to pivot on and create the new column, 317 defaults to 'read'. This effectively makes the column `<column_prefix><read>` 318 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 319 file_filter_predicate: str, optional, a pandas query string to filter the files table. 320 321 Returns: 322 DataFrame: A wide-format sample sheet with the specified columns pivoted. 323 """ 324 if index is None: 325 index = ["sampleIndex", "sample", "lane"] 326 logger.info("Formatting a wide files table") 327 logger.info("File table (long)") 328 logger.info(self.files.head().to_csv(index=False)) 329 330 files = self.files 331 332 if file_filter_predicate is not None: 333 # Filter the files table based on the predicate 334 files = files.query(file_filter_predicate) 335 336 # If we don't have access to the column defined, just use the file number 337 # By default this is 'read' but the data might not be paired 338 pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0 339 if not pivot_columns_defined or pivot_columns not in files.columns.values: 340 logger.warning("Pivot column not found, grouping by sample instead.") 341 files['file_num'] = files.groupby('sample').cumcount() + 1 342 pivot_columns = 'file_num' 343 344 if isinstance(pivot_columns, str): 345 pivot_columns = [pivot_columns] 346 347 assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table" 348 assert 'file' in files.columns.values, "Column 'file' must be present in the file table" 349 assert isinstance(index, list), f"index must be a list (not {type(index)})" 350 351 # Get the list of columns from the inputs 352 input_columns = files.columns.values 353 354 # Format as a wide dataset 355 # Note that all the columns in `index` will be added if they are not already present 356 wide_df = files.reindex( 357 columns=index + pivot_columns + ['file'] 358 ) 359 wide_df = wide_df.pivot( 360 index=index, 361 columns=pivot_columns, 362 values='file' 363 ) 364 # Rename the columns to have a prefix, e.g. 'fastq_' 365 wide_df = wide_df.rename( 366 columns=lambda i: f"{column_prefix}{int(i)}" 367 ) 368 wide_df = wide_df.reset_index() 369 370 # Remove any columns from the output which were added from `index` 371 for cname in index: 372 if cname not in input_columns: 373 wide_df = wide_df.drop(columns=[cname]) 374 # Remove any extra unnecessary columns 375 wide_df = wide_df.drop(columns=pivot_columns, errors='ignore') 376 return wide_df 377 378 def wide_samplesheet( 379 self, 380 index=None, 381 columns='read', 382 values="file", # noqa 383 column_prefix="fastq_" 384 ): 385 """ 386 Format the samplesheet into a wide format with each sample on a row 387 388 This is a legacy method, please use `pivot_samplesheet` instead. 389 """ 390 warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.", 391 DeprecationWarning, stacklevel=2) 392 if values != "file": 393 raise ValueError("The only supported value for `values` is 'file'") 394 return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)
30def write_json(dat, local_path: str, indent=4): 31 """Write a JSON object to a local file.""" 32 with Path(local_path).open(mode="wt") as handle: 33 return json.dump(dat, handle, indent=indent)
Write a JSON object to a local file.
36def read_csv(path: str, required_columns=None) -> 'DataFrame': 37 """Read a CSV from the dataset and check for any required columns.""" 38 if required_columns is None: 39 required_columns = [] 40 path = _fix_s3_path(path) 41 import pandas as pd 42 df = pd.read_csv(path) 43 for col in required_columns: 44 assert col in df.columns.values, f"Did not find expected columns {col} in {path}" 45 return df
Read a CSV from the dataset and check for any required columns.
48def read_json(path: str): 49 """Read a JSON object from a local file or S3 path.""" 50 path = _fix_s3_path(path) 51 s3_path = S3Path(path) 52 53 if s3_path.valid: 54 s3 = boto3.client('s3') 55 retr = s3.get_object(Bucket=s3_path.bucket, Key=s3_path.key) 56 text = retr['Body'].read().decode() 57 else: 58 with Path(path).open() as handle: 59 text = handle.read() 60 61 # Parse JSON 62 return json.loads(text)
Read a JSON object from a local file or S3 path.
65class PreprocessDataset: 66 """ 67 Helper functions for performing preparatory tasks immediately before launching 68 the analysis workflow for a dataset. 69 70 For use in the `preprocess.py` script. 71 More info: https://docs.cirro.bio/pipelines/preprocess-script/ 72 """ 73 samplesheet: 'DataFrame' 74 """ 75 A pandas DataFrame containing all of the metadata assigned to the samples present 76 in the input datasets (at the time of analysis). 77 78 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet 79 """ 80 files: 'DataFrame' 81 """ 82 A DataFrame containing information on the files contained in the input datasets, 83 and the sample that each file is assigned to. 84 85 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles 86 """ 87 params: dict 88 """ 89 A dictionary with all of the parameter values populated by user input 90 using the process-form.json and process-input.json configurations. 91 92 This is read-only, use `add_param` to add new parameters or `remove_param` to remove them. 93 94 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams 95 """ 96 metadata: dict 97 """ 98 Detailed information about the dataset at the time of analysis, 99 including the project, process, and input datasets. 100 101 More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata 102 """ 103 dataset_root: str 104 """ 105 Base path to the dataset 106 """ 107 108 _PARAMS_FILE = "params.json" 109 _REFERENCES_BASE = "s3://pubweb-references" 110 111 def __init__(self, 112 samplesheet: Union['DataFrame', str, Path], 113 files: Union['DataFrame', str, Path], 114 params: dict = None, 115 metadata: dict = None, 116 dataset_root: str = None): 117 import pandas as pd 118 # Convert DataFrame to string if necessary 119 if isinstance(samplesheet, str): 120 samplesheet = pd.read_csv(StringIO(samplesheet)) 121 if isinstance(samplesheet, Path): 122 samplesheet = read_csv(str(samplesheet)) 123 if isinstance(files, str): 124 files = pd.read_csv(StringIO(files)) 125 if isinstance(files, Path): 126 files = read_csv(str(files)) 127 if params is None: 128 params = {} 129 if metadata is None: 130 metadata = {} 131 132 self.samplesheet = samplesheet 133 self.files = files 134 self.params = params 135 self.metadata = metadata 136 self.dataset_root = dataset_root 137 self.logger = logger 138 139 @classmethod 140 def from_path(cls, dataset_root: str, config_directory='config'): 141 """ 142 Creates an instance from a path 143 (useful for testing or when running the script outside Cirro) 144 """ 145 config_directory = Path(dataset_root, config_directory) 146 147 files = read_csv( 148 str(Path(config_directory, "files.csv")), 149 required_columns=["sample", "file"] 150 ) 151 152 samplesheet = read_csv( 153 str(Path(config_directory, "samplesheet.csv")), 154 required_columns=["sample"] 155 ) 156 157 params = read_json( 158 str(Path(config_directory, "params.json")), 159 ) 160 161 metadata = read_json( 162 str(Path(config_directory, "metadata.json")), 163 ) 164 165 return cls(files=files, 166 samplesheet=samplesheet, 167 params=params, 168 metadata=metadata, 169 dataset_root=dataset_root) 170 171 @classmethod 172 def from_running(cls): 173 """ 174 Creates an instance from the currently running dataset 175 (expected to be called from inside a Cirro analysis process) 176 """ 177 logging.basicConfig(level=logging.INFO, 178 format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s') 179 dataset_path = os.getenv("PW_S3_DATASET") 180 return cls.from_path(dataset_path) 181 182 @property 183 def references_base(self): 184 """ 185 Returns the base URL for references. 186 This is used to access public references in the Cirro system. 187 """ 188 return self._REFERENCES_BASE 189 190 def log(self): 191 """Print logging messages about the dataset.""" 192 logger.info(f"Storage location for dataset: {self.dataset_root}") 193 logger.info(f"Number of files in dataset: {self.files.shape[0]:,}") 194 logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}") 195 196 def add_param(self, name: str, value, overwrite=False, log=True): 197 """Add a parameter to the dataset.""" 198 199 assert overwrite or name not in self.params, \ 200 f"Cannot add parameter {name}, already exists (and overwrite=False)" 201 202 if log: 203 logger.info(f"Adding parameter {name} = {value}") 204 self.params[name] = value 205 206 if log: 207 logger.info("Saving parameters") 208 write_json(self.params, self._PARAMS_FILE) 209 210 def remove_param(self, name: str, force=False): 211 """Remove a parameter from the dataset.""" 212 213 assert force or name in self.params, \ 214 f"Cannot remove parameter {name}, does not exist (and force=False)" 215 216 logger.info(f"Removing parameter {name}") 217 if name in self.params: 218 del self.params[name] 219 220 logger.info("Saving parameters") 221 write_json(self.params, self._PARAMS_FILE) 222 223 def keep_params(self, params_to_keep: list[str]): 224 """Keep only the specified parameters in the dataset.""" 225 logger.info(f"Keeping parameters: {params_to_keep}") 226 self.params = { 227 k: v for k, v in self.params.items() 228 if k in params_to_keep 229 } 230 write_json(self.params, self._PARAMS_FILE) 231 232 def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"): 233 """Replace all instances of a text string in the compute config file.""" 234 235 assert os.path.exists(fp), f"File does not exist: {fp}" 236 with open(fp, 'r') as handle: 237 compute = handle.read() 238 n = len(compute.split(from_str)) - 1 239 logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}") 240 compute = compute.replace(from_str, to_str) 241 with open(fp, 'wt') as handle: 242 handle.write(compute) 243 244 def pivot_samplesheet( 245 self, 246 index=None, 247 pivot_columns: Union[Optional[str], list[str]] = 'read', 248 metadata_columns: list[str] = None, 249 column_prefix: str = "fastq_", 250 file_filter_predicate: str = None 251 ): 252 """ 253 Combines data from both the samples and files table into a wide format with 254 each sample on a row and each file in a column. 255 The file column indexes are created by default from the `read` column, but can be customized. 256 257 For example, if the `files` table has columns `sample`, `read`, and `file`, 258 and the `samplesheet` has columns `sample`, `status`, and `group`, the output 259 will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`. 260 261 Args: 262 index: List[str], used to make the frames new index, defaults to 263 `["sampleIndex", "sample", "lane"]` 264 pivot_columns: str or List[str], columns to pivot on and create the new column, 265 defaults to 'read'. This effectively makes the column `<column_prefix><read>'. 266 If the column is not defined or not present, the pivot column will be generated 267 from the file number index. 268 metadata_columns: List[str], metadata columns to include in the output, 269 defaults to all columns that are available from the sample metadata. 270 If your pipeline doesn't like extra columns, make sure to specify the allowed columns here. 271 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 272 file_filter_predicate: str, optional, a pandas query string to filter the files table. 273 A common use case would be to filter out indexed reads, e.g. `readType == "R"`. 274 275 Returns: 276 DataFrame: A wide-format sample sheet with the specified columns pivoted. 277 """ 278 import pandas as pd 279 280 pivoted_files = self.pivot_files(index=index, 281 pivot_columns=pivot_columns, 282 column_prefix=column_prefix, 283 file_filter_predicate=file_filter_predicate) 284 combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many") 285 286 # Default to keeping all columns 287 if metadata_columns is None: 288 metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist() 289 290 # Keep only the specified metadata columns 291 all_columns = combined.axes[1] 292 for column in all_columns: 293 if (column not in metadata_columns 294 # These columns are required, never drop them 295 and column_prefix not in column 296 and 'sample' != column): 297 combined = combined.drop(columns=[column]) 298 299 return combined 300 301 def pivot_files( 302 self, 303 index: list[str] = None, 304 pivot_columns: Union[str, list[str]] = 'read', 305 column_prefix: str = "fastq_", 306 file_filter_predicate: str = None 307 ): 308 """ 309 Format the files table into a wide format with each sample on a row 310 and each file in a column. The column indexes are created by default 311 from the `read` column, but can be customized. This is useful for 312 paired-end sequencing data where you want to have the columns 313 `sample`, `fastq_1`, and `fastq_2` as the output. 314 315 Args: 316 index: List[str], used to make the frames new index, defaults to 317 pivot_columns: str or List[str], columns to pivot on and create the new column, 318 defaults to 'read'. This effectively makes the column `<column_prefix><read>` 319 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 320 file_filter_predicate: str, optional, a pandas query string to filter the files table. 321 322 Returns: 323 DataFrame: A wide-format sample sheet with the specified columns pivoted. 324 """ 325 if index is None: 326 index = ["sampleIndex", "sample", "lane"] 327 logger.info("Formatting a wide files table") 328 logger.info("File table (long)") 329 logger.info(self.files.head().to_csv(index=False)) 330 331 files = self.files 332 333 if file_filter_predicate is not None: 334 # Filter the files table based on the predicate 335 files = files.query(file_filter_predicate) 336 337 # If we don't have access to the column defined, just use the file number 338 # By default this is 'read' but the data might not be paired 339 pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0 340 if not pivot_columns_defined or pivot_columns not in files.columns.values: 341 logger.warning("Pivot column not found, grouping by sample instead.") 342 files['file_num'] = files.groupby('sample').cumcount() + 1 343 pivot_columns = 'file_num' 344 345 if isinstance(pivot_columns, str): 346 pivot_columns = [pivot_columns] 347 348 assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table" 349 assert 'file' in files.columns.values, "Column 'file' must be present in the file table" 350 assert isinstance(index, list), f"index must be a list (not {type(index)})" 351 352 # Get the list of columns from the inputs 353 input_columns = files.columns.values 354 355 # Format as a wide dataset 356 # Note that all the columns in `index` will be added if they are not already present 357 wide_df = files.reindex( 358 columns=index + pivot_columns + ['file'] 359 ) 360 wide_df = wide_df.pivot( 361 index=index, 362 columns=pivot_columns, 363 values='file' 364 ) 365 # Rename the columns to have a prefix, e.g. 'fastq_' 366 wide_df = wide_df.rename( 367 columns=lambda i: f"{column_prefix}{int(i)}" 368 ) 369 wide_df = wide_df.reset_index() 370 371 # Remove any columns from the output which were added from `index` 372 for cname in index: 373 if cname not in input_columns: 374 wide_df = wide_df.drop(columns=[cname]) 375 # Remove any extra unnecessary columns 376 wide_df = wide_df.drop(columns=pivot_columns, errors='ignore') 377 return wide_df 378 379 def wide_samplesheet( 380 self, 381 index=None, 382 columns='read', 383 values="file", # noqa 384 column_prefix="fastq_" 385 ): 386 """ 387 Format the samplesheet into a wide format with each sample on a row 388 389 This is a legacy method, please use `pivot_samplesheet` instead. 390 """ 391 warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.", 392 DeprecationWarning, stacklevel=2) 393 if values != "file": 394 raise ValueError("The only supported value for `values` is 'file'") 395 return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)
Helper functions for performing preparatory tasks immediately before launching the analysis workflow for a dataset.
For use in the preprocess.py
script.
More info: https://docs.cirro.bio/pipelines/preprocess-script/
111 def __init__(self, 112 samplesheet: Union['DataFrame', str, Path], 113 files: Union['DataFrame', str, Path], 114 params: dict = None, 115 metadata: dict = None, 116 dataset_root: str = None): 117 import pandas as pd 118 # Convert DataFrame to string if necessary 119 if isinstance(samplesheet, str): 120 samplesheet = pd.read_csv(StringIO(samplesheet)) 121 if isinstance(samplesheet, Path): 122 samplesheet = read_csv(str(samplesheet)) 123 if isinstance(files, str): 124 files = pd.read_csv(StringIO(files)) 125 if isinstance(files, Path): 126 files = read_csv(str(files)) 127 if params is None: 128 params = {} 129 if metadata is None: 130 metadata = {} 131 132 self.samplesheet = samplesheet 133 self.files = files 134 self.params = params 135 self.metadata = metadata 136 self.dataset_root = dataset_root 137 self.logger = logger
A pandas DataFrame containing all of the metadata assigned to the samples present in the input datasets (at the time of analysis).
More info: https://docs.cirro.bio/pipelines/preprocess-script/#dssamplesheet
A DataFrame containing information on the files contained in the input datasets, and the sample that each file is assigned to.
More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsfiles
A dictionary with all of the parameter values populated by user input using the process-form.json and process-input.json configurations.
This is read-only, use add_param
to add new parameters or remove_param
to remove them.
More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsparams
Detailed information about the dataset at the time of analysis, including the project, process, and input datasets.
More info: https://docs.cirro.bio/pipelines/preprocess-script/#dsmetadata
139 @classmethod 140 def from_path(cls, dataset_root: str, config_directory='config'): 141 """ 142 Creates an instance from a path 143 (useful for testing or when running the script outside Cirro) 144 """ 145 config_directory = Path(dataset_root, config_directory) 146 147 files = read_csv( 148 str(Path(config_directory, "files.csv")), 149 required_columns=["sample", "file"] 150 ) 151 152 samplesheet = read_csv( 153 str(Path(config_directory, "samplesheet.csv")), 154 required_columns=["sample"] 155 ) 156 157 params = read_json( 158 str(Path(config_directory, "params.json")), 159 ) 160 161 metadata = read_json( 162 str(Path(config_directory, "metadata.json")), 163 ) 164 165 return cls(files=files, 166 samplesheet=samplesheet, 167 params=params, 168 metadata=metadata, 169 dataset_root=dataset_root)
Creates an instance from a path (useful for testing or when running the script outside Cirro)
171 @classmethod 172 def from_running(cls): 173 """ 174 Creates an instance from the currently running dataset 175 (expected to be called from inside a Cirro analysis process) 176 """ 177 logging.basicConfig(level=logging.INFO, 178 format='%(asctime)s %(levelname)-8s [PreprocessDataset] %(message)s') 179 dataset_path = os.getenv("PW_S3_DATASET") 180 return cls.from_path(dataset_path)
Creates an instance from the currently running dataset (expected to be called from inside a Cirro analysis process)
182 @property 183 def references_base(self): 184 """ 185 Returns the base URL for references. 186 This is used to access public references in the Cirro system. 187 """ 188 return self._REFERENCES_BASE
Returns the base URL for references. This is used to access public references in the Cirro system.
190 def log(self): 191 """Print logging messages about the dataset.""" 192 logger.info(f"Storage location for dataset: {self.dataset_root}") 193 logger.info(f"Number of files in dataset: {self.files.shape[0]:,}") 194 logger.info(f"Number of samples in dataset: {self.samplesheet.shape[0]:,}")
Print logging messages about the dataset.
196 def add_param(self, name: str, value, overwrite=False, log=True): 197 """Add a parameter to the dataset.""" 198 199 assert overwrite or name not in self.params, \ 200 f"Cannot add parameter {name}, already exists (and overwrite=False)" 201 202 if log: 203 logger.info(f"Adding parameter {name} = {value}") 204 self.params[name] = value 205 206 if log: 207 logger.info("Saving parameters") 208 write_json(self.params, self._PARAMS_FILE)
Add a parameter to the dataset.
210 def remove_param(self, name: str, force=False): 211 """Remove a parameter from the dataset.""" 212 213 assert force or name in self.params, \ 214 f"Cannot remove parameter {name}, does not exist (and force=False)" 215 216 logger.info(f"Removing parameter {name}") 217 if name in self.params: 218 del self.params[name] 219 220 logger.info("Saving parameters") 221 write_json(self.params, self._PARAMS_FILE)
Remove a parameter from the dataset.
223 def keep_params(self, params_to_keep: list[str]): 224 """Keep only the specified parameters in the dataset.""" 225 logger.info(f"Keeping parameters: {params_to_keep}") 226 self.params = { 227 k: v for k, v in self.params.items() 228 if k in params_to_keep 229 } 230 write_json(self.params, self._PARAMS_FILE)
Keep only the specified parameters in the dataset.
232 def update_compute(self, from_str: str, to_str: str, fp="nextflow-override.config"): 233 """Replace all instances of a text string in the compute config file.""" 234 235 assert os.path.exists(fp), f"File does not exist: {fp}" 236 with open(fp, 'r') as handle: 237 compute = handle.read() 238 n = len(compute.split(from_str)) - 1 239 logger.info(f"Replacing {n:,} instances of {from_str} with {to_str} in {fp}") 240 compute = compute.replace(from_str, to_str) 241 with open(fp, 'wt') as handle: 242 handle.write(compute)
Replace all instances of a text string in the compute config file.
244 def pivot_samplesheet( 245 self, 246 index=None, 247 pivot_columns: Union[Optional[str], list[str]] = 'read', 248 metadata_columns: list[str] = None, 249 column_prefix: str = "fastq_", 250 file_filter_predicate: str = None 251 ): 252 """ 253 Combines data from both the samples and files table into a wide format with 254 each sample on a row and each file in a column. 255 The file column indexes are created by default from the `read` column, but can be customized. 256 257 For example, if the `files` table has columns `sample`, `read`, and `file`, 258 and the `samplesheet` has columns `sample`, `status`, and `group`, the output 259 will have columns `sample`, `fastq_1`, `fastq_2`, `status`, and `group`. 260 261 Args: 262 index: List[str], used to make the frames new index, defaults to 263 `["sampleIndex", "sample", "lane"]` 264 pivot_columns: str or List[str], columns to pivot on and create the new column, 265 defaults to 'read'. This effectively makes the column `<column_prefix><read>'. 266 If the column is not defined or not present, the pivot column will be generated 267 from the file number index. 268 metadata_columns: List[str], metadata columns to include in the output, 269 defaults to all columns that are available from the sample metadata. 270 If your pipeline doesn't like extra columns, make sure to specify the allowed columns here. 271 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 272 file_filter_predicate: str, optional, a pandas query string to filter the files table. 273 A common use case would be to filter out indexed reads, e.g. `readType == "R"`. 274 275 Returns: 276 DataFrame: A wide-format sample sheet with the specified columns pivoted. 277 """ 278 import pandas as pd 279 280 pivoted_files = self.pivot_files(index=index, 281 pivot_columns=pivot_columns, 282 column_prefix=column_prefix, 283 file_filter_predicate=file_filter_predicate) 284 combined = pd.merge(pivoted_files, self.samplesheet, on='sample', how="inner", validate="many_to_many") 285 286 # Default to keeping all columns 287 if metadata_columns is None: 288 metadata_columns = self.samplesheet.columns.tolist() + pivoted_files.columns.tolist() 289 290 # Keep only the specified metadata columns 291 all_columns = combined.axes[1] 292 for column in all_columns: 293 if (column not in metadata_columns 294 # These columns are required, never drop them 295 and column_prefix not in column 296 and 'sample' != column): 297 combined = combined.drop(columns=[column]) 298 299 return combined
Combines data from both the samples and files table into a wide format with
each sample on a row and each file in a column.
The file column indexes are created by default from the read
column, but can be customized.
For example, if the files
table has columns sample
, read
, and file
,
and the samplesheet
has columns sample
, status
, and group
, the output
will have columns sample
, fastq_1
, fastq_2
, status
, and group
.
Arguments:
- index: List[str], used to make the frames new index, defaults to
["sampleIndex", "sample", "lane"]
- pivot_columns: str or List[str], columns to pivot on and create the new column,
defaults to 'read'. This effectively makes the column `
'. If the column is not defined or not present, the pivot column will be generated from the file number index. - metadata_columns: List[str], metadata columns to include in the output, defaults to all columns that are available from the sample metadata. If your pipeline doesn't like extra columns, make sure to specify the allowed columns here.
- column_prefix: str, optional, prefix for the new columns, defaults to
fastq_
. - file_filter_predicate: str, optional, a pandas query string to filter the files table.
A common use case would be to filter out indexed reads, e.g.
readType == "R"
.
Returns:
DataFrame: A wide-format sample sheet with the specified columns pivoted.
301 def pivot_files( 302 self, 303 index: list[str] = None, 304 pivot_columns: Union[str, list[str]] = 'read', 305 column_prefix: str = "fastq_", 306 file_filter_predicate: str = None 307 ): 308 """ 309 Format the files table into a wide format with each sample on a row 310 and each file in a column. The column indexes are created by default 311 from the `read` column, but can be customized. This is useful for 312 paired-end sequencing data where you want to have the columns 313 `sample`, `fastq_1`, and `fastq_2` as the output. 314 315 Args: 316 index: List[str], used to make the frames new index, defaults to 317 pivot_columns: str or List[str], columns to pivot on and create the new column, 318 defaults to 'read'. This effectively makes the column `<column_prefix><read>` 319 column_prefix: str, optional, prefix for the new columns, defaults to `fastq_`. 320 file_filter_predicate: str, optional, a pandas query string to filter the files table. 321 322 Returns: 323 DataFrame: A wide-format sample sheet with the specified columns pivoted. 324 """ 325 if index is None: 326 index = ["sampleIndex", "sample", "lane"] 327 logger.info("Formatting a wide files table") 328 logger.info("File table (long)") 329 logger.info(self.files.head().to_csv(index=False)) 330 331 files = self.files 332 333 if file_filter_predicate is not None: 334 # Filter the files table based on the predicate 335 files = files.query(file_filter_predicate) 336 337 # If we don't have access to the column defined, just use the file number 338 # By default this is 'read' but the data might not be paired 339 pivot_columns_defined = pivot_columns is not None and len(pivot_columns) > 0 340 if not pivot_columns_defined or pivot_columns not in files.columns.values: 341 logger.warning("Pivot column not found, grouping by sample instead.") 342 files['file_num'] = files.groupby('sample').cumcount() + 1 343 pivot_columns = 'file_num' 344 345 if isinstance(pivot_columns, str): 346 pivot_columns = [pivot_columns] 347 348 assert pivot_columns in files.columns.values, f"Column '{pivot_columns}' not found in file table" 349 assert 'file' in files.columns.values, "Column 'file' must be present in the file table" 350 assert isinstance(index, list), f"index must be a list (not {type(index)})" 351 352 # Get the list of columns from the inputs 353 input_columns = files.columns.values 354 355 # Format as a wide dataset 356 # Note that all the columns in `index` will be added if they are not already present 357 wide_df = files.reindex( 358 columns=index + pivot_columns + ['file'] 359 ) 360 wide_df = wide_df.pivot( 361 index=index, 362 columns=pivot_columns, 363 values='file' 364 ) 365 # Rename the columns to have a prefix, e.g. 'fastq_' 366 wide_df = wide_df.rename( 367 columns=lambda i: f"{column_prefix}{int(i)}" 368 ) 369 wide_df = wide_df.reset_index() 370 371 # Remove any columns from the output which were added from `index` 372 for cname in index: 373 if cname not in input_columns: 374 wide_df = wide_df.drop(columns=[cname]) 375 # Remove any extra unnecessary columns 376 wide_df = wide_df.drop(columns=pivot_columns, errors='ignore') 377 return wide_df
Format the files table into a wide format with each sample on a row
and each file in a column. The column indexes are created by default
from the read
column, but can be customized. This is useful for
paired-end sequencing data where you want to have the columns
sample
, fastq_1
, and fastq_2
as the output.
Arguments:
- index: List[str], used to make the frames new index, defaults to
- pivot_columns: str or List[str], columns to pivot on and create the new column,
defaults to 'read'. This effectively makes the column
<column_prefix><read>
- column_prefix: str, optional, prefix for the new columns, defaults to
fastq_
. - file_filter_predicate: str, optional, a pandas query string to filter the files table.
Returns:
DataFrame: A wide-format sample sheet with the specified columns pivoted.
379 def wide_samplesheet( 380 self, 381 index=None, 382 columns='read', 383 values="file", # noqa 384 column_prefix="fastq_" 385 ): 386 """ 387 Format the samplesheet into a wide format with each sample on a row 388 389 This is a legacy method, please use `pivot_samplesheet` instead. 390 """ 391 warnings.warn("`wide_samplesheet` is deprecated, use `pivot_samplesheet` instead.", 392 DeprecationWarning, stacklevel=2) 393 if values != "file": 394 raise ValueError("The only supported value for `values` is 'file'") 395 return self.pivot_files(index=index, pivot_columns=[columns], column_prefix=column_prefix)
Format the samplesheet into a wide format with each sample on a row
This is a legacy method, please use pivot_samplesheet
instead.