Source code for smoder.preprocessing.modality2

import anndata as ad
import scanpy as sc
import numpy as np
import os
import scipy.sparse
from scipy.stats import gmean  # 用于CLR标准化计算几何均值
from sklearn.feature_extraction.text import TfidfTransformer  # TF-IDF所需依赖
import scipy.sparse as sparse  # 明确稀疏矩阵依赖


# ----------------------
# CLR标准化核心函数
# ----------------------
[docs] def clr_normalization(matrix): """执行中心对数比(CLR)标准化:clr(x) = ln(x_i / 几何均值(x))""" mat = matrix.copy() mat = np.maximum(mat, 1e-10) # 替换0和负值为极小值,避免log(0)错误 row_gmeans = gmean(mat, axis=1, keepdims=True) # 计算每行几何均值 return np.log(mat / row_gmeans) # CLR转换
# ---------------------- # 空间蛋白数据处理主函数(保留空间坐标) # ----------------------
[docs] def process_spatial_adt_data(adata, min_genes=20, min_cells=100): """读取数据→过滤低质量位点和蛋白→CLR标准化→保留空间坐标""" # 读取原始数据(包含空间坐标) print(f"原始空间蛋白数据形状: {adata.shape}") # 验证并保留空间坐标信息 if 'spatial' not in adata.obsm: print("警告:原始数据中未找到'spatial'空间坐标信息!") else: print(f"已检测到空间坐标,形状: {adata.obsm['spatial'].shape}(将保留在处理后数据中)") # 处理重复蛋白名 adata.var_names_make_unique() print("空间蛋白名已去重") # 过滤低质量位点(每位点至少表达20种蛋白) sc.pp.filter_cells(adata, min_genes=min_genes) print(f"过滤后(每位点≥20种蛋白)形状: {adata.shape}") # 过滤后空间坐标自动随细胞筛选同步保留 # 过滤低丰度蛋白(每蛋白至少在100个位点表达) sc.pp.filter_genes(adata, min_cells=min_cells) print(f"过滤后(每蛋白≥100个位点)形状: {adata.shape}") adata.layers['counts'] = adata.X.copy() ## save raw data # 转换为稠密矩阵(确保CLR计算兼容性) if scipy.sparse.issparse(adata.X): adata.X = adata.X.toarray() print("空间蛋白数据已转换为稠密矩阵") # 执行CLR标准化(仅处理表达矩阵,不影响空间坐标) adata.X = clr_normalization(adata.X) # sc.pp.clr(adata) ### print("空间蛋白数据已完成CLR标准化") # 再次确认空间坐标保留 if 'spatial' in adata.obsm: print(f"处理后数据中空间坐标保留状态: 已保留(形状: {adata.obsm['spatial'].shape})") return adata
[docs] def process_spatial_adt_data_path(st_adt_path): """通过文件路径读取数据→过滤低质量位点和蛋白→CLR标准化→保留空间坐标""" # 读取原始数据(包含空间坐标) adata = ad.read_h5ad(st_adt_path) print(f"原始空间蛋白数据形状: {adata.shape}") # 验证并保留空间坐标信息 if 'spatial' not in adata.obsm: print("警告:原始数据中未找到'spatial'空间坐标信息!") else: print(f"已检测到空间坐标,形状: {adata.obsm['spatial'].shape}(将保留在处理后数据中)") # 处理重复蛋白名 adata.var_names_make_unique() print("空间蛋白名已去重") # 过滤低质量位点(每位点至少表达20种蛋白) sc.pp.filter_cells(adata, min_genes=20) print(f"过滤后(每位点≥20种蛋白)形状: {adata.shape}") # 过滤后空间坐标自动随细胞筛选同步保留 # 过滤低丰度蛋白(每蛋白至少在100个位点表达) sc.pp.filter_genes(adata, min_cells=100) print(f"过滤后(每蛋白≥100个位点)形状: {adata.shape}") adata.layers['counts'] = adata.X.copy() ## save raw data # 转换为稠密矩阵(确保CLR计算兼容性) if scipy.sparse.issparse(adata.X): adata.X = adata.X.toarray() print("空间蛋白数据已转换为稠密矩阵") # 执行CLR标准化(仅处理表达矩阵,不影响空间坐标) adata.X = clr_normalization(adata.X) # sc.pp.clr(adata) ### print("空间蛋白数据已完成CLR标准化") # 再次确认空间坐标保留 if 'spatial' in adata.obsm: print(f"处理后数据中空间坐标保留状态: 已保留(形状: {adata.obsm['spatial'].shape})") return adata
# ---------------------- # Peak数据处理函数 # ----------------------
[docs] def process_peak_data(adata, n_top_features=20000): """Process peak data with TF-IDF normalization and top-feature selection.""" print("=" * 50) print("开始处理Peak数据...") print(f"原始Peak数据形状: {adata.shape}") # 1. 数据预处理:确保输入为稀疏矩阵(符合TF-IDF要求) if not scipy.sparse.issparse(adata.X): print("将Peak数据转换为CSR稀疏矩阵(符合TF-IDF处理要求)...") adata.X = sparse.csr_matrix(adata.X) else: # 转换为CSR格式,提升后续计算效率 adata.X = adata.X.tocsr() # 2. TF-IDF归一化(等效于 Signac RunTFIDF) print("Running TF-IDF normalization...") # 使用scikit-learn的TfidfTransformer,与Signac默认行为一致(l2范数、开启idf、平滑idf) tfidf = TfidfTransformer(norm='l2', use_idf=True, smooth_idf=True) # 拟合并转换,转置以符合sklearn格式要求(sklearn默认样本在列,基因/peak在行) adata.X = sparse.csr_matrix(tfidf.fit_transform(adata.X.T).T) print("TF-IDF归一化完成,已保留l2范数标准化(每个细胞向量长度为1)") # 3. 选择高变特征/peaks(等效于 Signac FindTopFeatures, min.cutoff='q5') print(f"Selecting top {n_top_features} variable features...") # 计算每个peak的离散度(方差),用于筛选高变特征 if adata.n_vars > n_top_features: # 计算每个peak的均值 mean_val = np.array(adata.X.mean(axis=0)).flatten() # 计算每个peak的方差(E[X²] - (E[X])²) var_val = np.array(adata.X.power(2).mean(axis=0)).flatten() - np.square(mean_val) # 获取方差最大的n_top_features个peak的索引 top_feature_idx = np.argsort(var_val)[-n_top_features:] # 根据索引筛选peaks,生成新的AnnData对象(避免修改原数据) adata = adata[:, top_feature_idx].copy() print(f" 已筛选出 {n_top_features} 个高变Peak,当前数据形状: {adata.shape}") else: # 如果peaks数少于n_top_features,则选择全部 print(f" Peak总数({adata.n_vars})小于目标筛选数({n_top_features}),保留全部Peak") print("Peak数据处理完成!") print("=" * 50) return adata
[docs] def process_peak_data_path(peak_h5ad_path, n_top_features=20000): """Load peak data from file, then apply TF-IDF normalization and top-feature selection.""" # 第一步:从指定路径读取h5ad格式的peak数据 print("=" * 50) print(f"从路径读取Peak数据:{peak_h5ad_path}") try: adata = ad.read_h5ad(peak_h5ad_path) print(f"成功读取Peak数据,原始形状: {adata.shape}") except FileNotFoundError: raise FileNotFoundError(f"错误:未找到指定路径下的文件 → {peak_h5ad_path}") except Exception as e: raise Exception(f"错误:读取Peak数据失败,异常信息 → {str(e)}") # 第二步:调用已实现的process_peak_data完成核心处理(复用逻辑,减少冗余) adata = process_peak_data(adata, n_top_features=n_top_features) # 第三步:返回处理完成的AnnData对象 return adata