#!/usr/bin/env python3 """ Compute A/B chromatin compartments from a Hi-C .mcool file. Algorithm --------- 1. Load ICE-balanced contact matrix for the target chromosome. 2. Distance-normalise to O/E (divide each diagonal by its mean contact frequency). 3. Compute Pearson correlation matrix of the O/E rows. 4. PCA of the correlation matrix; PC1 distinguishes A from B compartments. 5. Orient the PC1 sign using --bigwig_orient (e.g. CTCF): positive PC1 → high signal in that track. With CTCF: positive PC1 = CTCF-enriched = A compartment (active). With H3K27me3: pass --flip_orient so positive PC1 = B compartment (repressive). Output ------ CSV with columns: chrom, start, end, pc1, compartment (A / B / N for masked bins). """ import argparse import os import sys import cooler import numpy as np import pandas as pd from sklearn.decomposition import PCA def _bin_bigwig(bw_path: str, chrom: str, bins) -> np.ndarray: """Average bigWig signal over a list of (start, end) genomic bins.""" import pyBigWig bw = pyBigWig.open(bw_path) chrom_len = bw.chroms().get(chrom, 0) vals = [] for s, e in bins: s, e = max(0, int(s)), min(chrom_len, int(e)) if s >= e: vals.append(0.0) continue v = bw.stats(chrom, s, e, type="mean")[0] vals.append(0.0 if v is None or np.isnan(v) else float(v)) bw.close() return np.array(vals) def _observed_over_expected(matrix: np.ndarray) -> np.ndarray: """Distance-normalise a symmetric contact matrix (O/E transform).""" n = matrix.shape[0] oe = np.zeros((n, n), dtype=float) for d in range(n): idx = np.arange(n - d) diag = matrix[idx, idx + d].astype(float) positive = diag[diag > 0] if positive.size == 0: continue mean_d = positive.mean() norm_diag = np.where((np.isnan(diag)) | (diag == 0), 0.0, diag / mean_d) oe[idx, idx + d] = norm_diag if d > 0: oe[idx + d, idx] = norm_diag return oe def compute_compartments( mcool_path: str, chrom: str, res: int, orient_signal=None, flip_orient: bool = False, ) -> pd.DataFrame: """ Return a DataFrame (chrom, start, end, pc1, compartment). Parameters ---------- orient_signal : array-like, optional Per-bin 1-D signal used to fix the sign of PC1. Pass CTCF signal for positive-PC1 = A convention. flip_orient : bool If True, high orient_signal maps to negative PC1 (use with H3K27me3). """ c = cooler.Cooler(f"{mcool_path}::resolutions/{res}") bins_df = c.bins().fetch(chrom).reset_index(drop=True) matrix = c.matrix(balance=True).fetch(chrom).astype(float) bad_bins = np.isnan(matrix).all(axis=0) | (matrix.sum(axis=0) == 0) np.nan_to_num(matrix, nan=0.0, copy=False) oe = _observed_over_expected(matrix) good = ~bad_bins oe_good = oe[np.ix_(good, good)] # Zero rows produce NaN in corrcoef; add tiny noise to avoid singularity row_norms = np.linalg.norm(oe_good, axis=1) oe_good[row_norms == 0] += 1e-9 corr = np.corrcoef(oe_good) np.nan_to_num(corr, nan=0.0, copy=False) pca = PCA(n_components=3, random_state=42) pcs = pca.fit_transform(corr) pc1_good = pcs[:, 0] pc1 = np.full(len(bins_df), np.nan) pc1[good] = pc1_good if orient_signal is not None: sig = np.asarray(orient_signal, dtype=float) sig_good = sig[good] valid = ~np.isnan(sig_good) & ~np.isnan(pc1_good) if valid.sum() > 10: r = np.corrcoef(pc1_good[valid], sig_good[valid])[0, 1] # By default: positive orient_signal → positive PC1. # flip_orient reverses this (e.g. H3K27me3 → positive PC1 = B). if (r < 0 and not flip_orient) or (r > 0 and flip_orient): pc1 = -pc1 bins_df["pc1"] = pc1 bins_df["compartment"] = np.where( np.isnan(bins_df["pc1"]), "N", np.where(bins_df["pc1"] > 0, "A", "B"), ) return bins_df[["chrom", "start", "end", "pc1", "compartment"]] def main(): p = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) p.add_argument("--mcool", required=True, help="Path to .mcool file") p.add_argument("--chrom", required=True, help="Chromosome (e.g. chr21)") p.add_argument("--res", type=int, default=25000, help="Resolution in bp (default: 25000)") p.add_argument("--bigwig_orient", help="bigWig track for PC1 sign orientation (recommended: CTCF)") p.add_argument("--flip_orient", action="store_true", help="Flip orientation: high signal → negative PC1 (use with H3K27me3)") p.add_argument("--out", required=True, help="Output CSV path") args = p.parse_args() orient_signal = None if args.bigwig_orient: c = cooler.Cooler(f"{args.mcool}::resolutions/{args.res}") bins_df = c.bins().fetch(args.chrom).reset_index(drop=True) coords = list(zip(bins_df["start"].values, bins_df["end"].values)) orient_signal = _bin_bigwig(args.bigwig_orient, args.chrom, coords) print(f"Loaded orientation signal: {os.path.basename(args.bigwig_orient)}") df = compute_compartments( args.mcool, args.chrom, args.res, orient_signal=orient_signal, flip_orient=args.flip_orient, ) os.makedirs(os.path.dirname(os.path.abspath(args.out)), exist_ok=True) df.to_csv(args.out, index=False) n_a = (df["compartment"] == "A").sum() n_b = (df["compartment"] == "B").sum() n_nan = (df["compartment"] == "N").sum() print(f"Saved → {args.out}") print(f" A: {n_a} B: {n_b} N/masked: {n_nan} bins") if __name__ == "__main__": main()