Source code for davae.utils.tools

import numpy as np
import csv
import scanpy as sc
from scipy.sparse import csr_matrix
from sklearn import metrics
import anndata
from scipy.spatial.distance import pdist, squareform
from sklearn.neighbors import NearestNeighbors
import pandas as pd
from sklearn.preprocessing import LabelEncoder


[docs]def read_sc_data(input_file, fmt='h5ad', backed=None, transpose=False, sparse=False, delimiter=" "): if fmt == '10x_h5': adata = sc.read_10x_h5(input_file) elif fmt == '10x_mtx': adata = sc.read_10x_mtx(input_file) elif fmt == "mtx": adata = sc.read_mtx(input_file) elif fmt == 'h5ad': adata = sc.read_h5ad(input_file, backed=backed) elif fmt == "csv": adata = sc.read_csv(input_file) elif fmt == "txt": adata = sc.read_text(input_file, delimiter=delimiter) elif fmt == "tsv": adata = sc.read_text(input_file, delimiter="\t") else: raise ValueError('`format` needs to be \'10x_h5\' or \'10x_mtx\'') if transpose: adata = adata.transpose() if sparse: adata.X = csr_matrix(adata.X, dtype='float32') adata.var_names_make_unique() adata.obs_names_make_unique() return adata
[docs]def cacu_size_factor(adata): return adata.X.sum(axis=1)
[docs]def get_label_by_count(label_path): label = [] with open(label_path)as f: f_csv = csv.reader(f) for row in f_csv: count = row for i in range(len(count)): for j in range(int(count[i])): label.append(i) return label
[docs]def merge(files, type='h5ad'): adata_all = read_sc_data(files[0], type) sample_count = [] sample_count.append(adata_all.shape[0]) for i in range(len(files)-1): adata = read_sc_data(files[i+1], type) sample_count.append(adata.shape[0]) adata_all = adata_all.concatenate(adata) return adata_all, sample_count
[docs]def write_count(count, out_path): with open(out_path, "w") as f: csv_writer = csv.writer(f) csv_writer.writerow(count)
[docs]def filt(adata, min_c, min_g): sc.pp.filter_genes(adata, min_cells=min_c) sc.pp.filter_cells(adata, min_genes=min_g) return adata
[docs]def write_merge_label(count, labels, path): res=[] for i in range(len(count)): for j in range(count[i]): res.append(labels[i]) print(res[1]) with open(path, "w") as f: csv_writer = csv.writer(f) csv_writer.writerows(res) f.close()
[docs]def find_gene_pos(genes, gene): genes['i'] = range(genes.shape[0]) return genes.loc[gene]['i']
[docs]def cacu_color(X, i): return X[:, i]
[docs]def get_label_by_txt(txtpath): label = [] with open(txtpath) as label_f: f11 = label_f.read().splitlines() for x in f11: label.append(x) return label
[docs]def cacu_clustering_metrics(x_list, label, metrics_list): result = [] for m in metrics_list: m_list = [] if m == 'sh': for x in x_list: m_list.append(metrics.silhouette_score(x, label)) elif m == 'ch': for x in x_list: m_list.append(metrics.calinski_harabasz_score(x, label)) result.append(m_list) return result
[docs]def txt_to_adata(txt_path ): f = open(txt_path) lines = f.readlines() gene_count = len(lines)-1 first_line = lines[0].split() cell_count = len(first_line)-1 data = [] for i in range(1, gene_count+1): para_line = lines[i].strip().split(' ') del para_line[0] data.append(para_line) f.close() data = np.array(data) adata = anndata.AnnData(X=data) return adata
[docs]def seurat_preprocession(adata, min_cells, min_genes): sc.pp.filter_genes(adata, min_cells=min_cells) sc.pp.filter_cells(adata, min_genes=min_genes) sc.pp.normalize_total(adata, target_sum=1e4) sc.pp.log1p(adata) return adata
[docs]def joint_cluster_feature(dataset, k): x_nbrs = NearestNeighbors(n_neighbors=k, algorithm='ball_tree') x_nbrs.fit(dataset) print(x_nbrs) dist, ind = x_nbrs.kneighbors() feature = np.zeros([dataset.shape[0], k]) f = np.zeros([dataset.shape[0], 1]) for i in range(0, dataset.shape[0]): f[i] = max(dataset[i]) for i in range(0, dataset.shape[0]): for j in range(0, k): feature[i, j] = f[ind[i, j]] return feature
[docs]def cacu_Manhattan_dist(dataset): d2 = pdist(dataset, 'cityblock') y = squareform(d2) print(y.shape) return y
[docs]def get_panc8(tech, type=''): # panc8_path = '../data/panc_8/' panc8_path = '/Users/zhongyuanke/data/seurat_data/panc_8/' data_path = panc8_path + tech + '_data'+type+'.csv' celltype_path = panc8_path+tech+'_cell_type.csv' cell_type = pd.read_csv(celltype_path, index_col=0) cell_type = cell_type.values adata = sc.read_csv(data_path) print(adata.shape) return adata, cell_type
[docs]def get_intersection_panc8(tech, norm=True): panc8_path = '/Users/zhongyuanke/data/dann_vae/panc8/' data_path = panc8_path+'inter_'+tech+'.h5ad' adata = sc.read_h5ad(data_path) return adata
[docs]def get_ifnb(tech='stim',type=''): ifnb_path = '/Users/zhongyuanke/data/seurat_data/ifnb/' data_path = ifnb_path + tech + type +'.csv' celltype_path = ifnb_path + tech + '_cell_type.csv' cell_type = pd.read_csv(celltype_path, index_col=0) cell_type = cell_type.values adata = sc.read_csv(data_path) return adata, cell_type
[docs]def text_label_to_number(txt_label): encoder = LabelEncoder() label = encoder.fit_transform(txt_label) return label
[docs]def filt_genes(x, min_cells=0): result = [] x = x.transpose() for row in x: count = 0 for i in range(x.shape[1]): if row[i] > 0: count += 1 if count >= min_cells: result.append(row) result = np.array(result) result = result.transpose() return result
[docs]def up_sampling(x, y, batch_size): orig_len_x = x.shape[0] orig_len_y = y.shape[0] # sc.pp.filter_genes(data1, min_cells=3) # sc.pp.filter_genes(data2, min_cells=3) len_x = orig_len_x len_y = orig_len_y n = orig_len_x // len_y if orig_len_x > len_y else len_y // orig_len_x if orig_len_x > len_y: if orig_len_x % batch_size != 0: add_len_x = batch_size - orig_len_x % batch_size else: add_len_x = 0 len_x = orig_len_x + add_len_x add_x = x[:add_len_x, ] x = np.vstack((x, add_x)) up_y = np.zeros([len_x, y.shape[1]]) for i in range(n): up_y[i * len_y:(i + 1) * len_y, ] = y up_y[n * len_y:len_x, ] = y[0:len_x - n * len_y, ] y = up_y else: if orig_len_y % batch_size != 0: add_len_y = batch_size - orig_len_y % batch_size else: add_len_y = 0 len_y = orig_len_y + add_len_y add_y = y[:add_len_y, ] y = np.vstack((y, add_y)) up_x = np.zeros([len_y, x.shape[1]]) for i in range(n): up_x[i * len_x:(i + 1) * len_x, ] = x up_x[n * len_x:len_y, ] = x[0:len_y - n * len_x, ] x = up_x return x, y
[docs]def multi_up_sampling(x, batch_size): orig_lens = [] for i in range(len(x)): orig_lens.append(x[i].shape[0]) lens = orig_lens max_len = max(orig_lens) max_index = orig_lens.index(max(orig_lens)) print(max_len) if max_len % batch_size != 0: add_len = batch_size - max_len % batch_size max_len = max_len + add_len up_datasets = [] for i in range(len(x)): n = max_len//x[i].shape[0] up_x = np.zeros([max_len, x[0].shape[1]]) for j in range(n): up_x[j * x[i].shape[0]:(j + 1) * x[i].shape[0], ] = x[i] up_x[n * x[i].shape[0]:max_len, ] = x[i][0:max_len - n * x[i].shape[0], ] up_datasets.append(up_x) return up_datasets, max_index
[docs]def locate_sample(barcodes, barcode): for i in range(len(barcodes)): if barcode == barcodes[i]: return i return -1
[docs]def intersection_genes(data1, data2, genes1, genes2): data1 = data1.transpose() data2 = data2.transpose() inter_genes = [] filt_data1 = [] filt_data2 = [] for i in range(len(genes1)): if genes1[i] in genes2: index2 = genes2.index(genes1[i]) inter_genes.append(genes1[i]) filt_data1.append(data1[i, ]) filt_data2.append(data2[index2, ]) filt_data1 = np.array(filt_data1).transpose() filt_data2 = np.array(filt_data2).transpose() return filt_data1, filt_data2, inter_genes
[docs]def multi_intersection_genes(data_list, gene_list): filt_data_list = [] for i in range(len(data_list)): data_list[i] = data_list[i].transpose() ref_genes = list(set(gene_list[0]).intersection(gene_list[1])) for i in range(2, len(gene_list)): ref_genes = list(set(ref_genes).intersection(gene_list[i])) for i in range(len(data_list)): para_genes = gene_list[i] para_data = data_list[i] filt_data = [] for j in range(len(para_genes)): if para_genes[j] in ref_genes: filt_data.append(para_data[j]) filt_data = np.array(filt_data) filt_data = filt_data.transpose() print(filt_data.shape) filt_data_list.append(filt_data) return filt_data_list, ref_genes
[docs]def sample_by_reference(ref_barcodes, barcodes, data): sample_data = [] for i in range(len(ref_barcodes)): if ref_barcodes[i] in barcodes: index = barcodes.index(ref_barcodes[i]) sample_data.append(data[index, ]) sample_data = np.array(sample_data) print(sample_data.shape) return 0