Files
chromatin-vgae-hic/scripts/compute_compartments.py
aman acadbd780c v1.0.0: VGAE applied to GM12878 vs IMR90 chr21 Hi-C at 25kb
Full reproducible pipeline: .mcool + ChIP-seq bigwigs → latent
  embeddings → A/B compartment calls → cross-cell comparison.

  Key results (chr21, 25 kb, latent dim=32):
  - Test AUC=0.777, AP=0.759 (converged epoch 31/300)
  - GM12878 A/B silhouette (cosine) = 0.775
  - IMR90 zero-shot silhouette = 0.443
  - A-compartment bins stable across cell types (mean cosine Δ=0.042)
  - B-compartment bins shift substantially (mean cosine Δ=0.451)
  - 101 B→A and 70 A→B compartment switches GM12878→IMR90
2026-05-15 01:53:04 +02:00

171 lines
5.8 KiB
Python

#!/usr/bin/env python3
"""
Compute A/B chromatin compartments from a Hi-C .mcool file.
Algorithm
---------
1. Load ICE-balanced contact matrix for the target chromosome.
2. Distance-normalise to O/E (divide each diagonal by its mean contact frequency).
3. Compute Pearson correlation matrix of the O/E rows.
4. PCA of the correlation matrix; PC1 distinguishes A from B compartments.
5. Orient the PC1 sign using --bigwig_orient (e.g. CTCF):
positive PC1 → high signal in that track.
With CTCF: positive PC1 = CTCF-enriched = A compartment (active).
With H3K27me3: pass --flip_orient so positive PC1 = B compartment (repressive).
Output
------
CSV with columns: chrom, start, end, pc1, compartment (A / B / N for masked bins).
"""
import argparse
import os
import sys
import cooler
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
sys.path.insert(0, os.path.dirname(__file__))
def _bin_bigwig(bw_path: str, chrom: str, bins) -> np.ndarray:
"""Average bigWig signal over a list of (start, end) genomic bins."""
import pyBigWig
bw = pyBigWig.open(bw_path)
chrom_len = bw.chroms().get(chrom, 0)
vals = []
for s, e in bins:
s, e = max(0, int(s)), min(chrom_len, int(e))
if s >= e:
vals.append(0.0)
continue
v = bw.stats(chrom, s, e, type="mean")[0]
vals.append(0.0 if v is None or np.isnan(v) else float(v))
bw.close()
return np.array(vals)
def _observed_over_expected(matrix: np.ndarray) -> np.ndarray:
"""Distance-normalise a symmetric contact matrix (O/E transform)."""
n = matrix.shape[0]
oe = np.zeros((n, n), dtype=float)
for d in range(n):
idx = np.arange(n - d)
diag = matrix[idx, idx + d].astype(float)
positive = diag[diag > 0]
if positive.size == 0:
continue
mean_d = positive.mean()
norm_diag = np.where((np.isnan(diag)) | (diag == 0), 0.0, diag / mean_d)
oe[idx, idx + d] = norm_diag
if d > 0:
oe[idx + d, idx] = norm_diag
return oe
def compute_compartments(
mcool_path: str,
chrom: str,
res: int,
orient_signal=None,
flip_orient: bool = False,
) -> pd.DataFrame:
"""
Return a DataFrame (chrom, start, end, pc1, compartment).
Parameters
----------
orient_signal : array-like, optional
Per-bin 1-D signal used to fix the sign of PC1.
Pass CTCF signal for positive-PC1 = A convention.
flip_orient : bool
If True, high orient_signal maps to negative PC1 (use with H3K27me3).
"""
c = cooler.Cooler(f"{mcool_path}::resolutions/{res}")
bins_df = c.bins().fetch(chrom).reset_index(drop=True)
matrix = c.matrix(balance=True).fetch(chrom).astype(float)
bad_bins = np.isnan(matrix).all(axis=0) | (matrix.sum(axis=0) == 0)
np.nan_to_num(matrix, nan=0.0, copy=False)
oe = _observed_over_expected(matrix)
good = ~bad_bins
oe_good = oe[np.ix_(good, good)]
# Zero rows produce NaN in corrcoef; add tiny noise to avoid singularity
row_norms = np.linalg.norm(oe_good, axis=1)
oe_good[row_norms == 0] += 1e-9
corr = np.corrcoef(oe_good)
np.nan_to_num(corr, nan=0.0, copy=False)
pca = PCA(n_components=3, random_state=42)
pcs = pca.fit_transform(corr)
pc1_good = pcs[:, 0]
pc1 = np.full(len(bins_df), np.nan)
pc1[good] = pc1_good
if orient_signal is not None:
sig = np.asarray(orient_signal, dtype=float)
sig_good = sig[good]
valid = ~np.isnan(sig_good) & ~np.isnan(pc1_good)
if valid.sum() > 10:
r = np.corrcoef(pc1_good[valid], sig_good[valid])[0, 1]
# By default: positive orient_signal → positive PC1.
# flip_orient reverses this (e.g. H3K27me3 → positive PC1 = B).
if (r < 0 and not flip_orient) or (r > 0 and flip_orient):
pc1 = -pc1
bins_df["pc1"] = pc1
bins_df["compartment"] = np.where(
np.isnan(bins_df["pc1"]), "N",
np.where(bins_df["pc1"] > 0, "A", "B"),
)
return bins_df[["chrom", "start", "end", "pc1", "compartment"]]
def main():
p = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
)
p.add_argument("--mcool", required=True, help="Path to .mcool file")
p.add_argument("--chrom", required=True, help="Chromosome (e.g. chr21)")
p.add_argument("--res", type=int, default=25000, help="Resolution in bp (default: 25000)")
p.add_argument("--bigwig_orient",
help="bigWig track for PC1 sign orientation (recommended: CTCF)")
p.add_argument("--flip_orient", action="store_true",
help="Flip orientation: high signal → negative PC1 (use with H3K27me3)")
p.add_argument("--out", required=True, help="Output CSV path")
args = p.parse_args()
orient_signal = None
if args.bigwig_orient:
c = cooler.Cooler(f"{args.mcool}::resolutions/{args.res}")
bins_df = c.bins().fetch(args.chrom).reset_index(drop=True)
coords = list(zip(bins_df["start"].values, bins_df["end"].values))
orient_signal = _bin_bigwig(args.bigwig_orient, args.chrom, coords)
print(f"Loaded orientation signal: {os.path.basename(args.bigwig_orient)}")
df = compute_compartments(
args.mcool, args.chrom, args.res,
orient_signal=orient_signal,
flip_orient=args.flip_orient,
)
os.makedirs(os.path.dirname(os.path.abspath(args.out)), exist_ok=True)
df.to_csv(args.out, index=False)
n_a = (df["compartment"] == "A").sum()
n_b = (df["compartment"] == "B").sum()
n_nan = (df["compartment"] == "N").sum()
print(f"Saved → {args.out}")
print(f" A: {n_a} B: {n_b} N/masked: {n_nan} bins")
if __name__ == "__main__":
main()