import sys
import os
import parallel_sort
import pandas as pd
import numpy as np
import pyarrow as pa
from pyarrow import csv
from sklearn.decomposition import PCA
from kneed import KneeLocator
from tqdm import tqdm
from datetime import datetime
from corradjust.metrics import *
from corradjust.plots import *
from corradjust.utils import *
[docs]
class CorrAdjust:
"""
The main CorrAdjust class.
Parameters
----------
df_feature_ann : pandas.DataFrame
A data frame with index and 2 columns
providing annotation for features.
- Index: unique feature ids that are also used in the data table.
For example, ENSEMBL gene ids or small RNA license plates.
- Column ``feature_name``: user-friendly names of the features,
which are also used in the reference ``.gmt`` files.
For example, gene symbols or ``"hsa-miR-xx-5p"`` notation for miRNAs.
- Column ``feature_type``: discrete set of feature types.
E.g., if you are analyzing only mRNA-seq data, put ``"mRNA"`` for all features;
if you are integrating miRNA, mRNA, or any other data type,
you could use more than one type (e.g., ``"miRNA"`` and ``"mRNA"``).
Feature types shouldn't contain ``"-"`` character.
ref_feature_colls : dict
A dictionary with information about reference feature set collections.
Each collection is an item of `ref_feature_colls` with key being
collection name (string) and value being another dict with the
following structure.
| {
| ``"path"`` : `str`
| Relative or absolute path to the ``.gmt`` file with reference feature sets.
| ``"sign"`` : ``{"positive", "negative", "absolute"}``
| Determines how to rank feature-feature correlations.
| ``"absolute"``: absolute values of correlations
(highest absolute correlations go first).
| ``"positive"``: descending sorting of correlations
(most positive correlations go first).
| ``"negative"``: ascending sorting of correlations
(most negative correlations go first).
| ``"feature_pair_types"`` : `list`
| List of strings showing types of correlations you are interested in.
For example, ``["mRNA-mRNA"]`` or ``["miRNA-mRNA"]``.
| ``"high_corr_frac"`` : float
| Fraction of all feature pairs with most highly ranked correlations
that will be used for scoring (alpha in the paper's notation).
| }.
out_dir : str
Path to the directory where the output files will be stored.
The directory will be created if it doesn't exist.
winsorize : float or None, optional, default=0.05
If `float`, specifies fraction of the lowest and the highest
data values that will be winsorized.
If ``None``, no winsorizing will be applied.
Winsorizing is train/test-set friendly, i.e.,
test data has no influence on training data.
shuffle_feature_names : bool or list, optional, default=False
If ``True``, ``feature_name`` column of `df_feature_ann` will be randomly permuted.
If list of feature types, e.g., ``["mRNA"]``, permutation will be conducted
only within the corresponding feature types.
If ``False``, no permutations will be conducted.
metric : {"enrichment-based", "BP@K"}, optional, default="enrichment-based"
Metric for evaluating feature correlations (see CorrAdjust paper for details).
Please use the default option unless you are absolutely sure
why do you need balanced precision at K (BP@K).
min_pairs_to_score : int, optional, default=1000
Minimum number of total pairs containing a feature
(N_j in the paper notation) to include the feature
in the score computations. Inequaility must hold
both in training and validation pair sets.
random_seed : int, optional, default=17
Random seed used by PCA and other non-deterministic procedures.
title : str or None, optional, default=None
If string, it will be shown in top-left corner of all generated plots.
verbose : bool, optional, default=True
Whether to print progress messages to stderr.
Attributes
----------
confounder_PCs : list
The list of confounder PC indices (0-based).
Populated by calling `fit` method.
PCA_model : sklearn.decomposition.PCA
PCA instance fit on training data.
Populated by calling `fit` and `fit_PCA` methods.
winsorizer : Winsorizer
Winsorizer instance fit on training data.
Populated by calling `fit` and `fit_PCA` methods.
mean_centerizer : MeanCenterizer
MeanCenterizer instance fit on training data.
Populated by calling `fit` and `fit_PCA` methods.
corr_scorer : CorrScorer
Instance of CorrScorer.
Populated by constructor.
out_dir : str
Output directory.
Populated by constructor.
title : str
Title added to left-top corner of the plots.
Populated by constructor.
metric : {"enrichment-based", "BP@K"}
Metric for evaluating feature correlations.
Populated by constructor.
random_seed : int
Random seed.
Populated by constructor.
verbose : bool
Whether to print progress messages.
Populated by constructor.
"""
def __init__(
self,
df_feature_ann,
ref_feature_colls,
out_dir,
winsorize=0.05,
shuffle_feature_names=False,
metric="enrichment-based",
min_pairs_to_score=1000,
random_seed=17,
title=None,
verbose=True
):
self.out_dir = out_dir
os.makedirs(self.out_dir, exist_ok=True)
if winsorize is not None:
assert (
isinstance(winsorize, float) and 0 < winsorize < 1
), "winsorize must be None or float between 0 and 1."
self.winsorizer = Winsorizer(alpha=winsorize)
else:
self.winsorizer = None
assert title is None or isinstance(title, str), "title must be None or str."
self.title = title
# Load reference data in a data structure
self.corr_scorer = CorrScorer(
df_feature_ann,
ref_feature_colls,
shuffle_feature_names,
metric,
min_pairs_to_score,
random_seed,
verbose
)
self.metric = metric
self.random_seed = random_seed
self.verbose = verbose
[docs]
def fit(
self,
df_data,
df_samp_ann=None,
method="greedy",
n_PCs=None,
n_iters=None
):
"""
Find optimal set of PCs to regress out.
Calling this method sets `confounder_PCs` attribute.
It also generates several files in `out_dir`:
- ``fit.tsv`` : file with the scores across the optimization trajectory.
- ``fit.{sample_group}.png`` : visualization of ``fit.tsv`` for each sample group.
- ``fit.log`` : detailed log file updated at each iteration.
Parameters
----------
df_data : pandas.DataFrame
Training data. Index = samples, columns = feature ids.
Data should be normalized so that feature values
are comparable between samples.
Feature ids should perfectly match the ones used in
`df_feature_ann` while initializing the object.
df_samp_ann : pandas.DataFrame or None, optional, default=None
A data frame with index and one column
providing group annotation of samples.
- Index: sample names (should match index of `df_data`).
- Column ``group``: discrete set of group names (e.g., ``"normal"`` and ``"tumor"``).
Group name cannot be ``"mean"``.
If `df_samp_ann` is ``None``, the method automatically generates it
with one sample group called ``all_samp``.
method : {"greedy", "first-n"}, optional, default="greedy"
Optimization method.
- ``"greedy"`` (recommended default): searches for the best PC
to remove on each iteration (quadratic complexity).
- ``"first-n"``: iteratively removes PCs based on
% variance they explain (linear complexity). This is
the concept used by `sva_network` function of the `sva` package.
n_PCs : int or None, optional, default=None
Number of PCs to choose from during the optimization.
By default (``None``), number of PCs is estimated as a knee of
% cumulative variance plot.
n_iters : int or None, optional, default=None
How many optimization iterations to perform.
If ``None``, `n_iters` is set to `n_PCs`.
"""
self._check_df_data(df_data)
self._check_df_samp_ann(df_samp_ann, df_data)
assert (
method in {"greedy", "first-n"}
), f"Method should be 'greedy' or 'first-n' but '{method}' found."
assert n_PCs is None or isinstance(n_PCs, int), "n_PCs must be None or int."
assert n_iters is None or isinstance(n_iters, int), "n_iters must be None or int."
# If there are no sample groups, make a dummy one
if df_samp_ann is None:
df_samp_ann = make_dummy_samp_ann(df_data)
# Mean-centering and winsorization occurs in .fit_PCA,
# that's why it returns df_data
PCs, PC_coefs, n_PCs_knee, df_data = self.fit_PCA(df_data, df_samp_ann)
# If not explicitly passed, we use knee estimate for number of PCs
if not n_PCs:
n_PCs = n_PCs_knee
else:
# If user set more PCs than possible, we auto-correct that
if n_PCs > PCs.shape[1]:
print(
f"WARNING | You set {n_PCs} PCs, but the data has only "
f"{PCs.shape[1]} PCs. The run will continue with "
f"{PCs.shape[1]} PCs.",
file=sys.stderr
)
n_PCs = PCs.shape[1]
PCs = PCs[:, :n_PCs]
PC_coefs = PC_coefs[:n_PCs, :]
if n_iters is None:
n_iters = n_PCs
self._search_confounder_PCs(
df_data, df_samp_ann,
PCs, PC_coefs,
method, n_iters
)
[docs]
def fit_PCA(self, df_data, df_samp_ann=None, plot_var=False):
"""
Compute principal components of training data.
The data are centered and optionally winsorized before
computing PCA in train/test-friendly way.
Calling this method sets `PCA_model` attribute.
Parameters
----------
df_data : pandas.DataFrame
Training data. Index = samples, columns = feature ids.
See `fit` method.
df_samp_ann : pandas.DataFrame, optional, default=None
A data frame providing group annotation of samples.
See `fit` method.
plot_var : bool, optional, default=False
Whether to make plots with % variance explained.
The plots will be saved as ``"PCA_var.png"`` in `out_dir`.
Returns
-------
PCs : numpy.ndarray
Array of principal components with shape ``(n_samples, n_components)``.
PC_coefs : numpy.ndarray
Array of PC coefficients with shape ``(n_components, n_features)``.
n_PCs_knee : int
Number of PCs estimated using knee (a.k.a. elbow) method.
df_data : pandas.DataFrame
Centered and (optionally) winsorized training data.
"""
self._check_df_data(df_data)
self._check_df_samp_ann(df_samp_ann, df_data)
assert plot_var in {True, False}, "plot_var must True or False."
if self.verbose:
print(f"{datetime.now()} | Computing PCA...", file=sys.stderr)
# If there are no sample groups, make a dummy one
if df_samp_ann is None:
df_samp_ann = make_dummy_samp_ann(df_data)
# fit_PCA is always called from fit, but might also be used standalone
# That's why we decided to center and winsorize data in this method.
if self.winsorizer is not None:
self.winsorizer.fit(df_data)
df_data = self.winsorizer.transform(df_data)
self.mean_centerizer = MeanCenterizer()
self.mean_centerizer.fit(df_data, df_samp_ann)
df_data = self.mean_centerizer.transform(df_data, df_samp_ann)
# Initialize and fit PCA
self.PCA_model = PCA(random_state=self.random_seed)
self.PCA_model.fit(df_data)
PCs = self.PCA_model.transform(df_data)
PC_coefs = self.PCA_model.components_
# Find number of PCs with knee method
kneedle = KneeLocator(
np.arange(1, len(self.PCA_model.explained_variance_ratio_) + 1),
np.cumsum(self.PCA_model.explained_variance_ratio_),
S=1.0, curve="concave", direction="increasing"
)
n_PCs_knee = kneedle.knee
if plot_var:
plotter = PCAVariancePlotter()
plotter.plot(self.PCA_model, n_PCs_knee)
plotter.save_plot(f"{self.out_dir}/PCA_var.png", title=self.title)
return PCs, PC_coefs, n_PCs_knee, df_data
[docs]
def compute_confounders(self, df_data, df_samp_ann=None):
"""
Compute the data frame with confounders from input data.
This method should be called after `fit`, or after manually
calling `fit_PCA` and setting `confounder_PCs` attribute.
Parameters
----------
df_data : pandas.DataFrame
Input data. Index = samples, columns = feature ids.
Feature ids should perfectly match the ones used in
`df_feature_ann` while initializing the object.
df_samp_ann : pandas.DataFrame or None, optional, default=None
A data frame providing group annotation of samples.
This argument is mandatory if it was used during `fit` call.
Returns
-------
pandas.DataFrame
Data frame with values of confounder PCs.
Note that PC indices are 0-based.
"""
assert (
hasattr(self, "confounder_PCs")
), (
"CorrAdjust instance must have confounder_PCs attribute when calling compute_confounders. "
"Call fit method or manually set the attribute."
)
assert (
hasattr(self, "PCA_model") and hasattr(self, "mean_centerizer")
), "fit_PCA should be called prior to compute_confounders."
self._check_df_data(df_data)
self._check_df_samp_ann(df_samp_ann, df_data, after_training=True)
if self.winsorizer is not None:
df_data = self.winsorizer.transform(df_data)
df_data = self.mean_centerizer.transform(df_data, df_samp_ann)
X_raw = df_data.to_numpy()
X_PCA = self.PCA_model.transform(df_data)
df_confounders = pd.DataFrame(
X_PCA[:, self.confounder_PCs],
index=df_data.index,
columns=[f"PC_{i}" for i in self.confounder_PCs]
)
return df_confounders
[docs]
def compute_feature_scores(
self,
df_data,
df_samp_ann=None,
samp_group=None,
pairs_subset="all"
):
"""
Compute enrichment scores and p-values
for feature-feature correlations in given data.
Parameters
----------
df_data : pandas.DataFrame
Input data. Index = samples, columns = feature ids.
Feature ids should perfectly match the ones used in
`df_feature_ann` while initializing the object.
df_samp_ann : pandas.DataFrame or None, optional, default=None
A data frame providing group annotation of samples.
This argument is mandatory if it was used during `fit` call.
samp_group : str or None, default=None
If `df_samp_ann` is passed, this argument is mandatory
and should refer to one of the sample groups in `df_samp_ann`.
pairs_subset : {"all", "training", "validation"}, optional, default="all"
Which set of feature pairs to use for computing scores.
Returns
-------
dict
Dict with data frames containing feature-wise
enrichment scores and p-values.
It has the following structure:
| {
| ``"Clean"`` : {
| ``ref_feature_coll`` : pandas.DataFrame
| },
| ``"Raw"``: {...}
| }.
"""
assert (
hasattr(self, "PCA_model") and hasattr(self, "mean_centerizer")
), "fit_PCA should be called prior to compute_feature_scores."
self._check_df_data(df_data)
if df_samp_ann is None:
df_samp_ann = make_dummy_samp_ann(df_data)
samp_group = df_samp_ann.iloc[0]["group"]
self._check_df_samp_ann(df_samp_ann, df_data, after_training=True)
assert samp_group in set(df_samp_ann["group"]), f"{samp_group} is not present in sample annotation."
# Subset is training/validation/all
subset_idxs = {"training": 0, "validation": 1, "all": 2}
assert (
pairs_subset in subset_idxs
), f"pairs_subset must be one of 'all', 'training', 'validation', but '{pairs_subset}' was found."
feature_scores = {}
corrs_raw, corrs_clean = self._compute_raw_and_clean_corrs(df_data, df_samp_ann, samp_group)
corr_scores_raw = self.corr_scorer.compute_corr_scores(corrs_raw, full_output=True)
if hasattr(self, "confounder_PCs"):
corr_scores_clean = self.corr_scorer.compute_corr_scores(corrs_clean, full_output=True)
else:
corr_scores_clean = None
for state, corr_scores in [("Raw", corr_scores_raw), ("Clean", corr_scores_clean)]:
if corr_scores is None:
feature_scores[state] = None
continue
else:
feature_scores[state] = {}
for ref_feature_coll in corr_scores:
subset_idx = subset_idxs[pairs_subset]
total_pos = self.corr_scorer.data[ref_feature_coll]["features_total_pos"][subset_idx]
total_neg = self.corr_scorer.data[ref_feature_coll]["features_total_neg"][subset_idx]
df = pd.DataFrame({
"feature_id": self.corr_scorer.feature_ids,
"feature_name": self.corr_scorer.feature_names,
"ref_pairs@K": corr_scores[ref_feature_coll][f"TPs_at_K {pairs_subset}"],
"K": corr_scores[ref_feature_coll][f"num_pairs {pairs_subset}"],
"ref_pairs@total": total_pos,
"total": total_pos + total_neg,
"enrichment": corr_scores[ref_feature_coll][f"enrichments {pairs_subset}"],
"balanced_precision": corr_scores[ref_feature_coll][f"BPs_at_K {pairs_subset}"],
"pvalue": corr_scores[ref_feature_coll][f"pvalues {pairs_subset}"],
"padj": corr_scores[ref_feature_coll][f"pvalues_adj {pairs_subset}"],
})
df = df.set_index("feature_id")
df["ref_pairs@K"] = df["ref_pairs@K"].astype("string") + "/" + df["K"].astype("string")
df["ref_pairs@total"] = df["ref_pairs@total"].astype("string") + "/" + df["total"].astype("string")
df = df.drop(columns=["K", "total"])
df = df.sort_values(["padj", "pvalue", "feature_id"])
feature_scores[state][ref_feature_coll] = df
return feature_scores
[docs]
def make_volcano_plot(self,
feature_scores,
plot_filename,
**plot_kwargs
):
"""
Make volcano plot.
Parameters
----------
feature_scores : dict
Dict produced by `compute_feature_scores` method.
plot_filename : str
Path to the figure (with extension, e.g., ``.png``).
**plot_kwargs
Other keyword arguments controlling plot
aesthetics passed to `VolcanoPlotter`.
Returns
-------
VolcanoPlotter
Instance of `VolcanoPlotter`. At this point,
the plot is already saved in the file, so you
can change it through fig/ax and re-save.
"""
plotter = VolcanoPlotter(self.corr_scorer, **plot_kwargs)
plotter.plot(feature_scores)
plotter.save_plot(f"{self.out_dir}/{plot_filename}", title=self.title)
return plotter
[docs]
def make_corr_distr_plot(
self,
df_data,
plot_filename,
df_samp_ann=None,
samp_group=None,
pairs_subset="all",
**plot_kwargs
):
"""
Visualize distribution of feature-feature correlations
before and after PC correction.
Parameters
----------
df_data : pandas.DataFrame
Input data. Index = samples, columns = feature ids.
Feature ids should perfectly match the ones used in
`df_feature_ann` while initializing the object.
plot_filename : str
Path to the figure (with extension, e.g., ``.png``).
df_samp_ann : pandas.DataFrame or None, optional, default=None
A data frame providing group annotation of samples.
This argument is mandatory if it was used during `fit` call.
samp_group : str or None, default=None
If `df_samp_ann` is passed, this argument is mandatory
and should refer to one of the sample groups in `df_samp_ann`.
pairs_subset : {"all", "training", "validation"}, optional, default="all"
Which set of feature pairs to use for computing scores.
**plot_kwargs
Other keyword arguments controlling plot
aesthetics passed to `CorrDistrPlotter`.
Returns
-------
CorrDistrPlotter
Instance of `CorrDistrPlotter`. At this point,
the plot is already saved in the file, so you
can change it through fig/ax and re-save.
"""
assert (
hasattr(self, "PCA_model") and hasattr(self, "mean_centerizer")
), "fit_PCA should be called prior to make_corr_distr_plot."
self._check_df_data(df_data)
if df_samp_ann is None:
df_samp_ann = make_dummy_samp_ann(df_data)
samp_group = df_samp_ann.iloc[0]["group"]
self._check_df_samp_ann(df_samp_ann, df_data, after_training=True)
assert samp_group in set(df_samp_ann["group"]), f"{samp_group} is not present in sample annotation."
# Subset is training/validation/all
subset_idxs = {"training": 0, "validation": 1, "all": 2}
assert (
pairs_subset in subset_idxs
), f"pairs_subset must be one of 'all', 'training', 'validation', but '{pairs_subset}' was found."
corrs_raw, corrs_clean = self._compute_raw_and_clean_corrs(df_data, df_samp_ann, samp_group)
corr_scores_raw = self.corr_scorer.compute_corr_scores(corrs_raw, full_output=True)
if hasattr(self, "confounder_PCs"):
corr_scores_clean = self.corr_scorer.compute_corr_scores(corrs_clean, full_output=True)
else:
corr_scores_clean = None
plotter = CorrDistrPlotter(self.corr_scorer, pairs_subset=pairs_subset, **plot_kwargs)
plotter.add_plots(corr_scores_raw, state="Raw")
if hasattr(self, "confounder_PCs") and len(self.confounder_PCs):
plotter.add_plots(corr_scores_clean, state="Clean")
plotter.save_plot(f"{self.out_dir}/{plot_filename}", title=self.title)
return plotter
[docs]
def export_corrs(
self,
df_data,
filename,
df_samp_ann=None,
samp_group=None,
chunk_size=1000000
):
"""
Export feature-feature correlations to file.
Parameters
----------
df_data : pandas.DataFrame
Input data. Index = samples, columns = feature ids.
Feature ids should perfectly match the ones used in
`df_feature_ann` while initializing the object.
filename : str
Path to the export table with ``.tsv`` extension.
df_samp_ann : pandas.DataFrame or None, optional, default=None
A data frame providing group annotation of samples.
This argument is mandatory if it was used during `fit` call.
samp_group : str or None, default=None
If `df_samp_ann` is passed, this argument is mandatory
and should refer to one of the sample groups in `df_samp_ann`.
chunk_size : int, optional, default=1000000
How many rows are exported in a single chunk.
"""
assert (
hasattr(self, "PCA_model") and hasattr(self, "mean_centerizer")
), "fit_PCA should be called prior to export_corrs."
self._check_df_data(df_data)
if df_samp_ann is None:
df_samp_ann = make_dummy_samp_ann(df_data)
samp_group = df_samp_ann.iloc[0]["group"]
self._check_df_samp_ann(df_samp_ann, df_data, after_training=True)
assert samp_group in set(df_samp_ann["group"]), f"{samp_group} is not present in sample annotation."
assert isinstance(chunk_size, int), "chunk_size must be int."
corrs_raw, corrs_clean = self._compute_raw_and_clean_corrs(df_data, df_samp_ann, samp_group)
if hasattr(self, "confounder_PCs"):
corrs_sorted_args = parallel_sort.argsort(corrs_clean)
else:
corrs_sorted_args = parallel_sort.argsort(corrs_raw)
n_samples, n_features = df_data.shape
triu_rows, triu_cols = np.triu_indices(n_features, k=1)
if self.verbose:
print(f"{datetime.now()} | Starting export to file...", file=sys.stderr)
# We explicitly write header to csv file
# because pyarrow
out_file = open(f"{self.out_dir}/{filename}", "w")
header = [
"feature_id1", "feature_id2",
"feature_name1", "feature_name2",
"feature_type1", "feature_type2",
"corr_clean", "pvalue_clean",
"corr_raw", "pvalue_raw",
]
ref_feature_coll_header = [
f"{ref_feature_coll}_{col_type}"
for ref_feature_coll in self.corr_scorer.data.keys()
for col_type in ["flag", "trainval"]
]
header += ref_feature_coll_header
header = "\t".join(header)
out_file.write(f"{header}\n")
out_file.close()
n_chunks = corrs_sorted_args.shape[0] // chunk_size
n_chunks += 1 if corrs_sorted_args.shape[0] % chunk_size else 0
if self.verbose:
chunk_iterator = tqdm(range(n_chunks))
else:
chunk_iterator = range(n_chunks)
for chunk_idx in chunk_iterator:
start = chunk_idx * chunk_size
end = start + chunk_size
idxs = corrs_sorted_args[start:end]
pvalues_raw = compute_corr_pvalues(corrs_raw[idxs], n_samples)
if hasattr(self, "confounder_PCs"):
if len(self.confounder_PCs):
pvalues_clean = compute_corr_pvalues(corrs_clean[idxs], n_samples - len(self.confounder_PCs))
else:
pvalues_clean = pvalues_raw
else:
pvalues_clean = np.zeros(pvalues_raw.shape)
pvalues_clean[:] = np.nan
regular_columns = {
"feature_id1": self.corr_scorer.feature_ids[triu_rows[idxs]],
"feature_id2": self.corr_scorer.feature_ids[triu_cols[idxs]],
"feature_name1": self.corr_scorer.feature_names[triu_rows[idxs]],
"feature_name2": self.corr_scorer.feature_names[triu_cols[idxs]],
"feature_type1": self.corr_scorer.feature_types[triu_rows[idxs]],
"feature_type2": self.corr_scorer.feature_types[triu_cols[idxs]],
"corr_clean": corrs_clean[idxs],
"pvalue_clean": pvalues_clean,
"corr_raw": corrs_raw[idxs],
"pvalue_raw": pvalues_raw
}
ref_feature_columns = {}
for ref_feature_coll in self.corr_scorer.data:
ref_feature_columns[f"{ref_feature_coll}_flag"] = (
self.corr_scorer.data[ref_feature_coll]["mask"][idxs]
)
ref_feature_columns[f"{ref_feature_coll}_trainval"] = (
self.corr_scorer.data[ref_feature_coll]["train_val_mask"][idxs]
)
tab = pa.table({
**regular_columns,
**ref_feature_columns
})
if chunk_idx == 0:
# This cannot be done before the loop because we need to know tab.schema
write_options = csv.WriteOptions(include_header=False, delimiter="\t", quoting_style="none")
out_file = open(f"{self.out_dir}/{filename}", "ab")
writer = csv.CSVWriter(out_file, tab.schema, write_options=write_options)
writer.write(tab)
writer.close()
out_file.close()
[docs]
def _search_confounder_PCs(
self,
df_data,
df_samp_ann,
PCs,
PC_coefs,
method,
n_iters
):
"""
Iteratively residualize PCs from training data
to maximize number of reference pairs among
highly correlated feature pairs.
Parental `fit` method describes files generated by this method.
Parameters
----------
df_data : pandas.DataFrame
See `fit` method.
df_samp_ann : pandas.DataFrame or None
See `fit` method.
PCs : numpy.ndarray
2D array of PCs (returned by `fit_PCA`).
PC_coefs : numpy.ndarray
2D array of PC coefficients (returned by `fit_PCA`).
method : str
See `fit` method.
n_iters : _type_
See `fit` method.
"""
n_PCs = PCs.shape[1]
X = df_data.to_numpy()
used_PCs = []
unused_PCs = list(range(n_PCs))
if self.verbose:
print(
f"{datetime.now()} | Starting PC optimization...",
file=sys.stderr
)
# +1 stands for the iteration before cleaning
if method == "greedy":
total_iterations = sum([n_PCs - i for i in range(n_iters)]) + 1
else:
total_iterations = n_iters + 1
prog_bar = tqdm(total=total_iterations)
else:
prog_bar = None
log_file = open(f"{self.out_dir}/fit.log", "w")
# Compute scores with raw data
print(
f"{datetime.now()} | Starting iteration #0 (raw correlations)...\n",
file=log_file, flush=True
)
iter_scores, main_score = self._compute_corrs_and_scores(
X, df_samp_ann, log_file, prog_bar, "Scores before correction"
)
# This list contains values of best scores across
# algorithm iterations. First values are the scores
# before regressing out any PCs
best_iter_scores = [iter_scores]
for _ in range(n_iters):
print(
f"\n{datetime.now()} | Starting iteration #{len(used_PCs) + 1}...\n",
file=log_file, flush=True
)
main_scores = []
full_scores = []
for PC in unused_PCs:
# Regress out PC
X_resid = X - PCs[:, [PC]] @ PC_coefs[[PC], :]
log_message = f"Trying PC {PC}" if method == "greedy" else None
iter_scores, main_score = self._compute_corrs_and_scores(
X_resid, df_samp_ann, log_file, prog_bar, log_message
)
main_scores.append(main_score)
full_scores.append(iter_scores)
# If we break the loop here, then
# the chosen PCs will be the one with the lowest index
if method == "first-n":
break
PC_max_idx = np.argmax(main_scores)
PC_max = unused_PCs[PC_max_idx]
main_score_max = main_scores[PC_max_idx]
full_score_max = full_scores[PC_max_idx]
used_PCs.append(PC_max)
unused_PCs.remove(PC_max)
best_iter_scores.append(full_score_max)
print(
f"\n{datetime.now()} | PC {PC_max} selected on iteration #{len(used_PCs)}; "
f"Main score: {main_score_max}; "
f"All scores: {full_score_max}",
file=log_file, flush=True
)
# Update data matrix with PC_max
X = X - PCs[:, [PC_max]] @ PC_coefs[[PC_max], :]
log_file.close()
if self.verbose:
prog_bar.close()
# Convert scores into data frame
df_best_iter_scores = self._best_iter_scores_to_df(best_iter_scores, used_PCs)
df_best_iter_scores.to_csv(f"{self.out_dir}/fit.tsv", sep="\t")
# Find the peak for number of iterations using early stopping
# Mean of all sample groups, mean of reference collections
main_score_col = "mean;mean;validation"
peak_iter = find_peak(df_best_iter_scores[main_score_col].to_numpy())
self.confounder_PCs = df_best_iter_scores["PC"][1:1 + peak_iter].to_list()
# Make line plot for each sample group
self._make_best_iter_scores_plot(df_best_iter_scores, peak_iter, df_samp_ann)
[docs]
def _compute_corrs_and_scores(
self,
X,
df_samp_ann,
log_file,
prog_bar,
log_message
):
"""
Conduct one iteration of PC optimization
by computing and scoring correlations.
Scores will be computed separately
for each sample group, mean scores will be also returned.
Parameters
----------
X : numpy.ndarray
Training data, rows = samples, columns = features.
df_samp_ann : pandas.DataFrame
See `fit` method.
log_file : file
Writing-open log file.
prog_bar : tqdm.std.tqdm or None
Progress bar (will be updated) or ``None``.
log_message : str or None
Title of the log message. If ``None``, nothing
will be printed to log.
Returns
-------
iter_scores : dict
A dict of the following structure:
| {
| ``sample_group_name`` : {
| ``ref_feature_coll`` : {
| ``"score training"`` : float,
| ``"score validation"`` : float,
| ``"score all"`` : float,
| }
| }
| }.
main_score : float
Mean of training scores across all sample groups
and reference collections, i.e.,
``iter_scores["mean"]["mean"]["score training"]``.
"""
metric_subsets = ["training", "validation", "all"]
iter_scores = {}
# Compute correlations and scores separately for each sample group
unique_samp_groups = df_samp_ann["group"].unique()
for samp_group in unique_samp_groups:
samp_group_mask = np.where(df_samp_ann["group"] == samp_group)[0]
# Compute correlations and scores
corrs = compute_pearson_columns(X[samp_group_mask])
corr_scores = self.corr_scorer.compute_corr_scores(corrs)
# Re-arrange scores in a dict
iter_scores[samp_group] = {
ref_feature_coll: {
f"score {subset}": metrics[f"score {subset}"]
for subset in metric_subsets
}
for ref_feature_coll, metrics in corr_scores.items()
}
# Compute mean score across reference collections (within sample group)
iter_scores[samp_group]["mean"] = {
f"score {subset}": np.mean([
metrics[f"score {subset}"]
for ref_feature_coll, metrics in iter_scores[samp_group].items()
])
for subset in metric_subsets
}
# We also compute mean across sample groups
iter_scores["mean"] = {
ref_feature_coll: {
f"score {subset}": np.mean([
iter_scores[samp_group][ref_feature_coll][f"score {subset}"]
for samp_group in unique_samp_groups
])
for subset in metric_subsets
}
for ref_feature_coll in iter_scores[unique_samp_groups[0]].keys()
}
main_score = iter_scores["mean"]["mean"]["score training"]
if prog_bar is not None:
prog_bar.update()
if log_message is not None:
print(
f"{datetime.now()} | {log_message}; "
f"Main score: {main_score}; "
f"All scores: {iter_scores}",
file=log_file, flush=True
)
return iter_scores, main_score
[docs]
def _best_iter_scores_to_df(self, best_iter_scores, used_PCs):
"""
Convert dict of scores into data frame.
Parameters
----------
best_iter_scores : list
List of dicts with scores corresponding
to the iterations of optimization. Each element
of list should have a format specified in the return
dict of the `_compute_corrs_and_scores` method.
used_PCs : list
List of PC indices corresponding to `best_iter_scores`.
Returns
-------
pandas.DataFrame
Pandas data frame with scores in columns and iterations on rows.
- Index: iteration number. Starts with 0 (raw dataset before correction).
- Column ``PC``: index of PC being regressed out on each iteration.
First row always have NaN (no correction).
- Score columns: these columns have the following format:
``"{sample_group_name};{ref_feature_coll};{training/validation/all}"``.
"""
best_iter_scores = [
{
f"{samp_group};{ref_feature_coll};{metric.replace('score ', '')}": value
for samp_group in iter_scores_dict
for ref_feature_coll, metrics in iter_scores_dict[samp_group].items()
for metric, value in metrics.items()
}
for iter_scores_dict in best_iter_scores
]
df_best_iter_scores = pd.DataFrame.from_dict(best_iter_scores)
df_best_iter_scores.index.name = "Iteration"
df_best_iter_scores.insert(0, "PC", [np.nan] + used_PCs)
df_best_iter_scores["PC"] = df_best_iter_scores["PC"].astype("Int64")
return df_best_iter_scores
[docs]
def _make_best_iter_scores_plot(self, df_best_iter_scores, peak_iter, df_samp_ann):
"""
Make optimization score line plots.
If there is one sample group, only one plot
with ``"fit.png"`` name will be produced.
If there are multiple groups, one plot per group
will be made: ``"fit.{group_name}.png"``, including
group with mean values.
Parameters
----------
df_best_iter_scores : pandas.DataFrame
Data frame generated using `_best_iter_scores_to_df` method.
peak_iter : int
This number is used to draw vertical dashed red
line at the selected early stopping iteration.
df_samp_ann : pandas.DataFrame
See `fit` method.
"""
unique_samp_groups = df_samp_ann["group"].unique()
# If there is only one sample group, we don't plot "mean"
if len(unique_samp_groups) > 1:
unique_samp_groups = unique_samp_groups.tolist() + ["mean"]
for samp_group in unique_samp_groups:
# Leave only necessary columns
df_best_iter_scores_group = df_best_iter_scores[[
col for col in df_best_iter_scores.columns if col.startswith(samp_group)
]]
plotter = GreedyOptimizationPlotter(
samp_group_name=samp_group,
metric=self.metric
)
plotter.plot(df_best_iter_scores_group, peak_iter)
plotter.save_plot(f"{self.out_dir}/fit.{samp_group}.png", title=self.title)
[docs]
def _compute_raw_and_clean_corrs(self, df_data, df_samp_ann, samp_group):
"""
Compute correlations before and after residualizing
PC confounders for a given group of samples.
Parameters
----------
df_data : pandas.DataFrame
Input data. Index = samples, columns = feature ids.
See `transform` for details.
df_samp_ann : pandas.DataFrame
A data frame providing group annotation of samples.
See `transform` for details.
samp_group : str
One of the sample groups in `df_samp_ann`.
Returns
-------
corrs_raw : numpy.array
1D array of correlations (diagonal 1s are not included).
corrs_clean : numpy.array
Same but with cleaned correlations.
"""
# Preprocess the data
df_data_orig = df_data.copy()
if self.winsorizer is not None:
df_data = self.winsorizer.transform(df_data)
df_data = self.mean_centerizer.transform(df_data, df_samp_ann)
# All samples are cleaned together
if hasattr(self, "confounder_PCs"):
if len(self.confounder_PCs):
# We feed original data, so it's not centered/winsorized twice
df_data_clean, df_rsquareds = self.transform(df_data_orig, df_samp_ann)
else:
df_data_clean = df_data
# Only samp_group samples are used to compute correlation
if self.verbose:
print(f"{datetime.now()} | Computing raw correlations for {samp_group}...", file=sys.stderr)
X_raw = df_data.loc[df_samp_ann["group"] == samp_group].to_numpy()
corrs_raw = compute_pearson_columns(X_raw)
if hasattr(self, "confounder_PCs"):
if len(self.confounder_PCs):
if self.verbose:
print(f"{datetime.now()} | Computing corrected correlations for {samp_group}...", file=sys.stderr)
X_clean = df_data_clean.loc[df_samp_ann["group"] == samp_group].to_numpy()
corrs_clean = compute_pearson_columns(X_clean)
else:
# If there are no PCs to clean, then we don't need to re-compute anything
corrs_clean = corrs_raw
else:
corrs_clean = np.zeros(corrs_raw.shape)
corrs_clean[:] = np.nan
return corrs_raw, corrs_clean
[docs]
def _check_df_data(self, df_data):
"""
Check validity of input data.
Parameters
----------
df_data : pandas.DataFrame
Input data. Index = samples, columns = feature ids.
See `fit` method.
Raises
------
AssertionError
Several possible reasons:
- Feature ids do not match ones in feature annotation.
- There is a gene with constant expression.
"""
assert (
list(df_data.columns) == list(self.corr_scorer.feature_ids)
), "Columns of input data should match index of feature annotation."
assert (
np.all(df_data.std(axis=0) > 0)
), "Input data contains features with constant expression."
[docs]
def _check_df_samp_ann(self, df_samp_ann, df_data, after_training=False):
"""
Check validity of sample annotation.
Parameters
----------
df_samp_ann : pandas.DataFrame or None
Sample annotation data.
See `fit` method.
df_data : pandas.DataFrame
Input data. Index = samples, columns = feature ids.
See `fit` method.
after_training : bool
Whether this method is called after training.
Raises
------
AssertionError
Several possible reasons:
- Sample names in `df_samp_ann` and `df_data` do not match.
- `df_samp_ann` lacks ``group`` column.
- If ``after_training == True``, method will also check
that group names of `df_samp_ann` match those in `mean_centerizer`.
"""
if df_samp_ann is None:
return
assert (
list(df_samp_ann.index) == list(df_data.index)
), "Index of input data should match index of sample annotation."
assert (
"group" in set(df_samp_ann.columns)
), "Column 'group' is not found in sample annotation."
assert (
"mean" not in set(df_samp_ann["group"])
), "Sample group cannot be named 'mean'."
if after_training:
assert (
set(df_samp_ann["group"]) <= set(self.mean_centerizer.means.keys())
), "Sample annotation has some groups that were absent in training samples."