Source code for oasis_stat.oasis_stat

'''
Main module.
Implementation of OASIS statistical test(https://www.pnas.org/doi/10.1073/pnas.2304671121)
Tavor Z. Baharav, David Tse, and Julia Salzman

OASIS (Optimized Adaptive Statistic for Inferring Structure) utilizes a linear test-statistic, enabling the computation of closed form P-value bounds, exact asymptotic ones, and interpretable rejection of the null. It is implemented and used in SPLASH: https://github.com/refresh-bio/SPLASH.

The key method in this file is OASIS_pvalue, which returns the p-value of the OASIS test on a given contingency table. The method can be used to compute the p-value using either the asymptotic or finite sample p-value (asymptotic flag), and can return the optimizing row and column embeddings (return_f_c flag).

Please reach out on github with any questions, or directly to tavorb@mit.edu.
'''

import numpy as np # type: ignore
import scipy.stats # type: ignore


# 1: utility functions
[docs] def splitCountsColwise(mat, downSampleFrac=.5): """Downsamples a matrix columnwise, to ensure that each column is equally well represented in the train / test downsampled matrices. Args: mat (numpy.ndarray): The input matrix, IxJ. downSampleFrac (float, optional): The fraction of counts to retain per column. Default is 0.5. Returns: out_mat (numpy.ndarray): The downsampled matrix, IxJ. """ ####### Helper function to split counts in a vector def splitCounts(mat, downSampleFrac=.5): """Downsamples a matrix. Very slight modification of https://stackoverflow.com/questions/11818215/subsample-a-matrix-python. This downsamples uniformly at random. Args: mat (numpy.ndarray): The input counts matrix to be downsampled. Must be integer-valued. downSampleFrac (float, optional): The fraction of counts to retain. Default is 0.5. Returns: out_mat (numpy.ndarray): The downsampled matrix. """ keys, counts = zip(*[ ((i, j), mat[i, j]) for i in range(mat.shape[0]) for j in range(mat.shape[1]) if mat[i, j] > 0 ]) # Make the cumulative counts array counts = np.array(counts, dtype=np.int64) sum_counts = np.cumsum(counts) # Decide how many counts to include in the sample frac_select = downSampleFrac count_select = int(sum_counts[-1] * frac_select) # Choose unique counts ind_select = sorted(np.random.choice( range(sum_counts[-1]), count_select, replace=False)) # A vector to hold the new counts out_counts = np.zeros(counts.shape, dtype=np.int64) # Perform basically the merge step of merge-sort, finding where # the counts land in the cumulative array i = 0 j = 0 while i < len(sum_counts) and j < len(ind_select): if ind_select[j] < sum_counts[i]: j += 1 out_counts[i] += 1 else: i += 1 # Rebuild the matrix using the `keys` list from before out_mat = np.zeros(mat.shape, dtype=np.int64) for i in range(len(out_counts)): out_mat[keys[i]] = out_counts[i] return out_mat out_mat = np.zeros_like(mat) for j in range(mat.shape[1]): if mat[:, j].sum() > 0: out_mat[:, j] = splitCounts(np.reshape( mat[:, j], (mat.shape[0], -1)), downSampleFrac).flatten() return out_mat
# 2: c,f generation (row and column embeddings for contingency table)
[docs] def generate_cf_finite_optimized(X, randSeed=0, numRandInits=10): """Generates optimized column and row embeddings for the finite sample p-value. Tries multiple random initializations and returns the embeddings that maximize the finite sample p-value on the given X. Args: X (numpy.ndarray): The input count matrix, IxJ. randSeed (int, optional): The random seed for reproducibility. Default is 0. numRandInits (int, optional): The number of random initializations to try. Default is 10. Returns: tuple: A tuple containing the column embedding vector (c) and the row embedding vector (f). - c (numpy.ndarray): The column embedding vector, J-dimensional. - f (numpy.ndarray): The row embedding vector, I-dimensional. """ np.random.seed(randSeed) # random initialization and extension relevantTargs = X.sum(axis=1) > 0 relevantSamples = X.sum(axis=0) > 0 nrows, ncols = X.shape if relevantTargs.sum() < 2 or relevantSamples.sum() < 2: return np.zeros(ncols), np.zeros(nrows) X = X[np.ix_(relevantTargs, relevantSamples)] # starting at c, run alternating maximization def altMaximize(X, c): # if clustering put all in same cluster, perturb if np.all(c == c[0]): c[0] = -1*c[0] nj = X.sum(axis=0) njinvSqrt = 1.0/np.maximum(1, np.sqrt(nj)) # avoid divide by 0 errors njinvSqrt[nj == 0] = 0 Xtild = (X - 1.0/X.sum()*np.outer(X @ np.ones(X.shape[1]), X.T@np.ones(X.shape[0]))) @ np.diag(njinvSqrt) Sold = 0 i = 0 while True: # find optimal f for fixed c f = np.sign(Xtild @ c) f1 = (f+1)/2 # to rescale f to be [0,1] valued f2 = (1-f)/2 f = f1 if np.abs(f2@Xtild@c) > np.abs(f1@Xtild@c): f = f2 # find optimal c for fixed f c = Xtild.T @ f if np.linalg.norm(c) > 0: c /= np.linalg.norm(c) # compute objective value, if fixed, stop S = f @ Xtild @ c if S == Sold: # will terminate once fOpt is fixed over 2 iterations break Sold = S i += 1 if i > 50: c = np.zeros_like(c) f = np.zeros_like(f) S = 0 break return c, f, np.abs(S) Sbase = 0 fMax = 0 cMax = 0 for _ in range(numRandInits): c = np.random.choice([-1, 1], size=X.shape[1]) c, f, S = altMaximize(X, c) if S > Sbase: fMax = f cMax = c Sbase = S # extend to targets and samples that didn't occur previously fElong = np.random.choice([0, 1], size=nrows) fElong[relevantTargs] = fMax fOpt = fElong cElong = np.zeros(ncols) cElong[np.arange(ncols)[relevantSamples]] = cMax # fancy indexing cOpt = cElong return (cOpt, fOpt)
[docs] def generate_cf_asymp_optimized(X): """Generates the optimal column and row embeddings for the asymptotic p-value. Args: X (numpy.ndarray): The input count matrix, IxJ. Returns: tuple: A tuple containing the column embedding vector (c) and the row embedding vector (f). - c (numpy.ndarray): The length J column embedding vector. - f (numpy.ndarray): The length I row embedding vector. """ c = np.ones(X.shape[1]) zeroIdxs = X.sum(axis=1) == 0 X = X[~zeroIdxs] zeroCols = X.sum(axis=0) == 0 X = X[:, ~zeroCols] # empirical probability dist p over rows of X p = X.sum(axis=1)/X.sum() Xtild = (X-np.outer(X.sum(axis=1), X.sum(axis=0)) / X.sum())@np.diag(1.0/np.sqrt(X.sum(axis=0))) A = np.diag(1.0/p)@Xtild @ Xtild.T # set f to be principal eigenvector of A eigvals, eigvecs = np.linalg.eig(A) f = eigvecs[:, np.argmax(eigvals)] # retain only real part of f f = np.real(f) c = Xtild.T@f c /= np.linalg.norm(c) fOld = f.copy() # map the entries of fOld to f, with the zeroIdxs zeroed out f = np.zeros(len(zeroIdxs)) f[~zeroIdxs] = fOld cOld = c.copy() c = np.zeros(len(zeroCols)) c[~zeroCols] = cOld return (c, f)
# 3. p-value computation
[docs] def compute_test_stat(X, c, f, asymptotic=False): """Computes the OASIS test statistic. Args: X (numpy.ndarray): The input count matrix, IxJ. c (numpy.ndarray): The column embedding vector, J-dimensional. f (numpy.ndarray): The row embedding vector, I-dimensional. asymptotic (bool, optional): Whether to return the test statistic for the asymptotic p-value. Default is False (finite sample p-value). Returns: float: The OASIS test statistic. """ c = np.nan_to_num(c, 0) f = np.nan_to_num(f, 0) nj = X.sum(axis=0) njinvSqrt = 1.0/np.maximum(1, np.sqrt(nj)) njinvSqrt[nj == 0] = 0 S = f @ (X-X@np.outer(np.ones(X.shape[1]), nj)/X.sum())@(c*njinvSqrt) S = np.abs(S) M = X.sum() if asymptotic: muhat = (f@X).sum()/M varF = (f-muhat)**2 @ X.sum(axis=1)/X.sum() totalVar = varF * (np.linalg.norm(c)**2 - (c@np.sqrt(nj))**2/M) if totalVar <= 0: return 0 # numerical error / issue normalizedTestStat = S/np.sqrt(totalVar) else: denom = (np.linalg.norm(c)**2 - (c@np.sqrt(nj))**2/M) normalizedTestStat = 2*S / np.sqrt(denom) return normalizedTestStat
[docs] def compute_pvalue(X, cOpt, fOpt, asymptotic=False): """Computes the OASIS p-value for a given contingency table, row embedding, and column embedding. Note that in order for the p-value to be valid, the row and column embeddings cannot depend on the input count matrix X. Args: X (numpy.ndarray): The input count matrix, IxJ. cOpt (numpy.ndarray): The column embedding vector, J-dimensional. fOpt (numpy.ndarray): The row embedding vector, I-dimensional. asymptotic (bool, optional): Whether to compute the asymptotic p-value. Default is False (finite sample p-value). Returns: float: The OASIS p-value. """ cOpt = np.nan_to_num(cOpt, 0) fOpt = np.nan_to_num(fOpt, 0).astype(float) if np.all(cOpt == cOpt[0]) or np.all(fOpt == fOpt[0]): return 1 # if fOpt.max()-fOpt.min() > 1: fOpt /= (fOpt.max()-fOpt.min()) cOpt /= np.linalg.norm(cOpt) normalizedTestStat = compute_test_stat(X, cOpt, fOpt, asymptotic) if np.isnan(normalizedTestStat): return 1 if asymptotic: pval = 2*scipy.stats.norm.cdf(-np.abs(normalizedTestStat)) else: pval = 2*np.exp(-normalizedTestStat**2/2) final_pv = min(np.nan_to_num(pval, 1), 1) return final_pv
[docs] def effectSize_bin(X, c, f): """Computes the effect size measure from the OASIS paper. Binarizes samples into positive and negative groups based on the sign of the column embedding. Args: X (numpy.ndarray): The input count matrix, IxJ. c (numpy.ndarray): The column embedding vector, J-dimensional. f (numpy.ndarray): The row embedding vector, I-dimensional. Returns: float: The two-group effect size measure. """ if (c > 0).sum() == 0 or (c < 0).sum() == 0: return 0 e_size = np.abs(f@X@(c > 0) / (X@(c > 0)).sum() - f@X@(c < 0) / (X@(c < 0)).sum()) return e_size
[docs] def OASIS_pvalue(X, numSplits=5, trainFrac=.25, asymptotic=False, return_f_c=False, return_test_stat=False, return_effect_size=False, random_seed=0): """ Computes the p-value using the OASIS method. Args: X (numpy.ndarray): The input count matrix, IxJ. numSplits (int, optional): The number of train/test splits for computing optimized f,c on. Default is 5 trainFrac (float, optional): The fraction of data to be used for training. Default is 0.25. asymptotic (bool, optional): Whether to use the asymptotic p-value. Default is False (uses finite sample p-value). return_f_c (bool, optional): Whether to return the row and column embeddings. Default is False. return_test_stat (bool, optional): Whether to return the test statistic. Default is False. return_effect_size (bool, optional): Whether to return the effect size. Default is False. random_seed (int, optional): The random seed for reproducibility. Default is 0. Returns: tuple: A tuple containing the following elements (just the float p-value returned if none of the additional flags are true):: - pv (float): The minimum p-value computed across all splits, multiplied by the number of splits (Bonferroni correction). The returned value is capped at 1. - f (numpy.ndarray, optional): Length I row embedding vector, f. Returned only if `return_f_c` is True. - c (numpy.ndarray, optional): Length J column embedding vector, c. Returned only if `return_f_c` is True. - test_stat (float, optional): The OASIS test statistic. Returned only if `return_test_stat` is True. - effect_size (float, optional): The effect size measure. Returned only if `return_effect_size` is True. """ I, J = X.shape min_pval = 1 cOpt, fOpt = (np.zeros(J), np.zeros(I)) test_stat_opt = 0 effect_size_opt = 0 f_c_gen_method = generate_cf_asymp_optimized if asymptotic else generate_cf_finite_optimized # pval_test_method = testPval_asymp if asymptotic else testPval_finite for i in range(numSplits): np.random.seed(i+random_seed) Xtrain = splitCountsColwise(X, trainFrac) Xtest = X-Xtrain c, f = f_c_gen_method(Xtrain) pval = compute_pvalue(Xtest, c, f, asymptotic) if pval < min_pval: min_pval = pval cOpt = c fOpt = f test_stat_opt = compute_test_stat(Xtest, c, f, asymptotic) effect_size_opt = effectSize_bin(Xtest, c, f) pval = min(1, numSplits*min_pval) return_arr = [pval] if return_f_c: return_arr.extend([fOpt, cOpt]) if return_test_stat: return_arr.append(test_stat_opt) if return_effect_size: return_arr.append(effect_size_opt) if len(return_arr) == 1: return return_arr[0] return tuple(return_arr)