import anndata as ad
import scanpy as sc
import pandas as pd
import numpy as np
import os
import scipy.sparse
import re
# ----------------------
# 前置核心筛选函数
# ----------------------
[docs]
def filter_out_low_quality_data(adata, data_type="spatial"):
"""Filter low-quality data for spatial or single-cell AnnData objects."""
print(f"开始对{data_type}数据执行最终低质过滤...")
print(f"过滤前数据形状: {adata.shape}")
# 根据数据类型执行最终筛选
if data_type == "spatial":
# 空间数据最终筛选
sc.pp.filter_cells(adata, min_genes=10) # 每个位点至少10个基因
sc.pp.filter_genes(adata, min_cells=10) # 每个基因至少在10个位点表达
print(f"空间数据最终筛选后形状: {adata.shape}")
elif data_type == "single_cell":
# 单细胞数据最终筛选(仅执行低质基因和细胞过滤)
sc.pp.filter_genes(adata, min_cells=100) # 基因至少在100个细胞中表达
sc.pp.filter_cells(adata, min_genes=100) # 细胞至少表达100个基因
print(f"单细胞数据最终过滤低质基因和细胞后形状: {adata.shape}")
else:
raise ValueError("data_type必须为'spatial'或'single_cell'")
return adata
# ----------------------
# 单细胞初始处理函数
# ----------------------
[docs]
def process_single_cell(adata, ref_celltype_col='CellType'):
"""
单细胞数据初始处理(基础清洗,不含最终过滤步骤)
参数: 数据文件adata
返回: 初始处理后的AnnData对象
"""
# adata = adata_ref.copy() # avoid changing the content of adata_ref
print(f"原始单细胞数据形状: {adata.shape}")
# 处理重复基因名
adata.var_names_make_unique()
print("单细胞数据基因名已去重")
# 清洗线粒体基因
mito_genes = adata.var_names.str.startswith(('MT-', 'mt-'))
adata = adata[:, ~mito_genes]
print(f"移除线粒体基因后形状: {adata.shape}")
# 保留样本量>=1的细胞类型
if ref_celltype_col in adata.obs.columns:
cell_type_counts = adata.obs[ref_celltype_col].value_counts()
keep_types = cell_type_counts[cell_type_counts >= 1].index
adata = adata[adata.obs[ref_celltype_col].isin(keep_types)]
print(f"过滤低丰度细胞类型后形状: {adata.shape}")
else:
print("警告:单细胞数据obs中未找到'CellType'列,跳过细胞类型筛选")
# 初步过滤(低阈值,为后续最终过滤做准备)
sc.pp.filter_genes(adata, min_cells=1)
sc.pp.filter_cells(adata, min_genes=1)
print(f"初始过滤空基因和细胞后形状: {adata.shape}")
return adata
[docs]
def process_single_cell_path(sc_path):
"""
单细胞数据初始处理(基础清洗,不含最终过滤步骤)
参数: 数据文件路径
返回: 初始处理后的AnnData对象
"""
adata = ad.read_h5ad(sc_path)
print(f"原始单细胞数据形状: {adata.shape}")
# 处理重复基因名
adata.var_names_make_unique()
print("单细胞数据基因名已去重")
# 清洗线粒体基因
mito_genes = adata.var_names.str.startswith(('MT-', 'mt-'))
adata = adata[:, ~mito_genes]
print(f"移除线粒体基因后形状: {adata.shape}")
# 保留样本量>1的细胞类型
if 'CellType' in adata.obs.columns:
cell_type_counts = adata.obs['CellType'].value_counts()
keep_types = cell_type_counts[cell_type_counts > 1].index
adata = adata[adata.obs['CellType'].isin(keep_types)]
print(f"过滤低丰度细胞类型后形状: {adata.shape}")
else:
print("警告:单细胞数据obs中未找到'CellType'列,跳过细胞类型筛选")
# 初步过滤(低阈值,为后续最终过滤做准备)
sc.pp.filter_genes(adata, min_cells=1)
sc.pp.filter_cells(adata, min_genes=1)
print(f"初始过滤空基因和细胞后形状: {adata.shape}")
return adata
# 辅助函数:清理文件名中的特殊字符
[docs]
def sanitize_filename(name):
"""替换文件名中的特殊字符,避免路径错误"""
return re.sub(r'[\\/*?:"<>|/]', '_', name)
# 辅助函数:归一化表达矩阵
[docs]
def normalize_expression_matrix(matrix, target_sum):
"""对表达矩阵进行归一化处理"""
if isinstance(matrix, pd.DataFrame):
mat = matrix.values.copy()
index = matrix.index
columns = matrix.columns
else:
mat = matrix.copy()
index = None
columns = None
row_sums = mat.sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1e-10
normalized_mat = mat * (target_sum / row_sums)
if isinstance(matrix, pd.DataFrame):
return pd.DataFrame(normalized_mat, index=index, columns=columns)
return normalized_mat
# ----------------------
# 单细胞数据处理函数
# ----------------------
[docs]
def normalize_single_cell(adata):
"""对筛选高变基因后的单细胞数据进行归一化"""
adata_norm = adata.copy()
sc.pp.normalize_total(adata_norm, target_sum=1)
print("单细胞数据归一化完成 (target_sum=1)")
return adata_norm
# ----------------------
# 空间转录组数据处理函数
# ----------------------
[docs]
def process_spatial_data(adata, common_genes, hvgs):
"""处理空间转录组数据:筛选基因+归一化+log转换"""
# adata = ad.read_h5ad(st_path)
print(f"原始空间转录组数据形状: {adata.shape}")
adata.var_names_make_unique()
print("空间转录组数据基因名已去重")
# 筛选共同基因
adata = adata[:, common_genes].copy()
print(f"筛选共同基因后形状: {adata.shape}")
# 从共同基因中筛选高变基因
adata = adata[:, hvgs].copy()
print(f"筛选高变基因后形状: {adata.shape}")
adata.layers['counts'] = adata.X.copy() ## 保存count matrix
# 归一化和log转换
sc.pp.normalize_total(adata, target_sum=1e4)
print("空间转录组归一化完成 (target_sum=1e4)")
sc.pp.log1p(adata)
print("空间转录组log转换完成")
# 稀疏矩阵转稠密矩阵
if scipy.sparse.issparse(adata.X):
adata.X = adata.X.toarray()
print("空间转录组数据已转换为稠密矩阵")
return adata
[docs]
def process_spatial_data_path(st_path, common_genes, hvgs):
"""处理空间转录组数据:筛选基因+归一化+log转换"""
adata = ad.read_h5ad(st_path)
print(f"原始空间转录组数据形状: {adata.shape}")
adata.var_names_make_unique()
print("空间转录组数据基因名已去重")
# 筛选共同基因
adata = adata[:, common_genes].copy()
print(f"筛选共同基因后形状: {adata.shape}")
# 从共同基因中筛选高变基因
adata = adata[:, hvgs].copy()
print(f"筛选高变基因后形状: {adata.shape}")
adata.layers['counts'] = adata.X.copy() ## 保存count matrix
# 归一化和log转换
sc.pp.normalize_total(adata, target_sum=1e4)
print("空间转录组归一化完成 (target_sum=1e4)")
sc.pp.log1p(adata)
print("空间转录组log转换完成")
# 稀疏矩阵转稠密矩阵
if scipy.sparse.issparse(adata.X):
adata.X = adata.X.toarray()
print("空间转录组数据已转换为稠密矩阵")
return adata
[docs]
def get_spatial_hvgs_only(st_path, common_genes, hvgs):
"""仅筛选高变基因,不进行其他处理"""
adata = ad.read_h5ad(st_path)
print(f"原始空间转录组数据形状: {adata.shape}")
adata.var_names_make_unique()
# 筛选共同基因
adata = adata[:, common_genes].copy()
print(f"筛选共同基因后形状: {adata.shape}")
# 筛选高变基因
adata = adata[:, hvgs].copy()
print(f"仅筛选高变基因后的空间数据形状: {adata.shape}")
return adata
# ----------------------
# 高变基因基因筛选
# ----------------------
[docs]
def select_hvgs(adata_ref, celltype_col, num_per_group=200):
adata_ref_log = adata_ref.copy()
### Perform log-normalization then do DEG maker identification
sc.pp.log1p(adata_ref_log)
sc.tl.rank_genes_groups(adata_ref_log, groupby=celltype_col, method="t-test", key_added="ttest", use_raw=False)
markers_df = pd.DataFrame(adata_ref_log.uns['ttest']['names']).iloc[0:num_per_group, :]
genes = sorted(list(np.unique(markers_df.melt().value.values)))
return genes
# def select_hvgs(adata_ref, celltype_col, num_per_group=200):
# """筛选高变基因
# Parameters:
# adata_sc: AnnData 单细胞数据对象
# celltype_col: 细胞类型列的列名
# num_per_group: 每组选择的基因数量
# """
# # 创建数据副本
# adata_temp = adata_ref.copy()
# # 对数化处理数据(如果尚未处理)
# if not np.all(adata_temp.X.data >= 0) if hasattr(adata_temp.X, 'data') else not np.all(adata_temp.X >= 0):
# print("数据可能已经对数化或标准化,跳过对数化步骤")
# else:
# sc.pp.log1p(adata_temp)
# # 使用更高效的方法进行差异表达分析
# # 设置use_raw=False以避免使用原始数据
# sc.tl.rank_genes_groups(
# adata_temp,
# groupby=celltype_col,
# method="t-test",
# key_added="ttest",
# use_raw=False,
# n_genes=num_per_group # 限制每个组的基因数量
# )
# # 更高效地提取marker基因
# marker_genes = set()
# for group in adata_temp.uns['ttest']['names'].dtype.names:
# # 直接获取每个组的前num_per_group个基因
# group_genes = adata_temp.uns['ttest']['names'][group][:num_per_group]
# marker_genes.update(group_genes)
# genes = sorted(list(marker_genes))
# print(f"筛选出 {len(genes)} 个高变基因")
# return genes
# ----------------------
# 计算细胞类型平均表达
# ----------------------
[docs]
def calculate_celltype_averages(adata_sc_norm, celltype_col, sample_id_col, target_sum, normalize):
"""计算细胞类型平均表达并可选归一化"""
cell_types = adata_sc_norm.obs[celltype_col].unique()
num_genes = adata_sc_norm.shape[1]
avg_matrix = np.zeros((len(cell_types), num_genes), dtype=np.float32)
sam_ids = adata_sc_norm.obs[sample_id_col].unique()
num_sections = len(sam_ids)
for sid in sam_ids:
mask = adata_sc_norm.obs[sample_id_col] == sid
adata_s = adata_sc_norm[mask]
cell_types_s = adata_s.obs[celltype_col].unique()
for i, cell_type in enumerate(cell_types):
if cell_type in cell_types_s:
mask = adata_s.obs[celltype_col] == cell_type
cell_subset = adata_s[mask] # 等价于adata_sc_norm[mask,:]
gene_sums = cell_subset.X.sum(axis=0)
num_cells = cell_subset.shape[0]
gene_avg = gene_sums / num_cells if num_cells > 0 else np.zeros(num_genes)
# 调试:打印形状信息
if gene_avg.ndim > 1:
gene_avg = gene_avg.flatten()
# print(f"gene_avg shape: {gene_avg.shape}")
# print(f"avg_matrix[i, :] shape: {avg_matrix[i, :].shape}")
avg_matrix[i:(i + 1), :] += gene_avg.flatten()
print(f"计算section {sid} 的平均表达: {adata_s.shape[0]}个细胞, 基因数: {adata_s.shape[1]}")
avg_matrix = avg_matrix / num_sections
avg_df = pd.DataFrame(
avg_matrix,
index=cell_types,
columns=adata_sc_norm.var_names
)
if normalize:
avg_df = normalize_expression_matrix(avg_df, target_sum)
print(f"细胞类型平均表达已归一化 (target_sum={target_sum})")
return avg_df
# ----------------------
[docs]
def calculate_celltype_averages2(adata_sc_norm, celltype_col, target_sum, normalize):
"""计算细胞类型平均表达并可选归一化"""
cell_types = adata_sc_norm.obs[celltype_col].unique()
num_genes = adata_sc_norm.shape[1]
avg_matrix = np.zeros((len(cell_types), num_genes), dtype=np.float32)
for i, cell_type in enumerate(cell_types):
mask = adata_sc_norm.obs[celltype_col] == cell_type
cell_subset = adata_sc_norm[mask]
gene_sums = cell_subset.X.sum(axis=0)
num_cells = cell_subset.shape[0]
gene_avg = gene_sums / num_cells if num_cells > 0 else np.zeros(num_genes)
if gene_avg.ndim > 1:
gene_avg = gene_avg.flatten()
avg_matrix[i, :] = gene_avg
print(f"计算 {cell_type} 的平均表达: {num_cells}个细胞, 基因数: {num_genes}")
avg_df = pd.DataFrame(
avg_matrix,
index=cell_types,
columns=adata_sc_norm.var_names
)
if normalize:
avg_df = normalize_expression_matrix(avg_df, target_sum)
print(f"细胞类型平均表达已归一化 (target_sum={target_sum})")
return avg_df