Python源码示例:sklearn.metrics.normalized_mutual_info_score()
示例1
def clustering_scores(self, prediction_algorithm: str = "knn") -> Tuple:
if self.gene_dataset.n_labels > 1:
latent, _, labels = self.get_latent()
if prediction_algorithm == "knn":
labels_pred = KMeans(
self.gene_dataset.n_labels, n_init=200
).fit_predict(
latent
) # n_jobs>1 ?
elif prediction_algorithm == "gmm":
gmm = GMM(self.gene_dataset.n_labels)
gmm.fit(latent)
labels_pred = gmm.predict(latent)
asw_score = silhouette_score(latent, labels)
nmi_score = NMI(labels, labels_pred)
ari_score = ARI(labels, labels_pred)
uca_score = unsupervised_clustering_accuracy(labels, labels_pred)[0]
logger.debug(
"Clustering Scores:\nSilhouette: %.4f\nNMI: %.4f\nARI: %.4f\nUCA: %.4f"
% (asw_score, nmi_score, ari_score, uca_score)
)
return asw_score, nmi_score, ari_score, uca_score
示例2
def test_pipeline_spectral_clustering(seed=36):
# Test using pipeline to do spectral clustering
random_state = np.random.RandomState(seed)
se_rbf = SpectralEmbedding(n_components=n_clusters,
affinity="rbf",
random_state=random_state)
se_knn = SpectralEmbedding(n_components=n_clusters,
affinity="nearest_neighbors",
n_neighbors=5,
random_state=random_state)
for se in [se_rbf, se_knn]:
km = KMeans(n_clusters=n_clusters, random_state=random_state)
km.fit(se.fit_transform(S))
assert_array_almost_equal(
normalized_mutual_info_score(
km.labels_,
true_labels), 1.0, 2)
示例3
def load_amazon():
"""Amazon product co-purchasing network and ground-truth communities.
Network was collected by crawling Amazon website. It is based on Customers Who Bought
This Item Also Bought feature of the Amazon website. If a product i is frequently
co-purchased with product j, the graph contains an undirected edge from i to j.
Each product category provided by Amazon defines each ground-truth community.
"""
dataset_path = _load('amazon')
X = _load_csv(dataset_path, 'data')
y = X.pop('label').values
graph = nx.Graph(nx.read_gml(os.path.join(dataset_path, 'graph.gml')))
return Dataset(load_amazon.__doc__, X, y, normalized_mutual_info_score, graph=graph)
示例4
def check_forward(self, x_data, c_data, gamma, T, y_star, y_pam):
num_examples = len(x_data)
x = chainer.Variable(x_data)
c = chainer.Variable(c_data)
loss = clustering_loss(x, c, gamma, T)
sq_distances_ij = []
for i, j in zip(range(num_examples), y_pam):
sqd_ij = np.sum((x_data[i] - x_data[j]) ** 2)
sq_distances_ij.append(sqd_ij)
f = -sum(sq_distances_ij)
sq_distances_ij = []
for i, j in zip(range(num_examples), y_star):
sqd_ij = np.sum((x_data[i] - x_data[j]) ** 2)
sq_distances_ij.append(sqd_ij)
f_tilde = -sum(sq_distances_ij)
delta = 1.0 - normalized_mutual_info_score(cuda.to_cpu(c_data), y_pam)
loss_expected = f + gamma * delta - f_tilde
testing.assert_allclose(loss.data, loss_expected)
示例5
def load_amazon():
"""Amazon dataset.
Amazon product co-purchasing network and ground-truth communities.
Network was collected by crawling Amazon website. It is based on Customers Who Bought
This Item Also Bought feature of the Amazon website. If a product i is frequently
co-purchased with product j, the graph contains an undirected edge from i to j.
Each product category provided by Amazon defines each ground-truth community.
"""
dataset_path = _load('amazon')
X = _load_csv(dataset_path, 'data')
y = X.pop('label').values
graph = nx.Graph(nx.read_gml(os.path.join(dataset_path, 'graph.gml')))
return Dataset(load_amazon.__doc__, X, y, normalized_mutual_info_score, 'graph',
'community_detection', graph=graph)
示例6
def test_pipeline_spectral_clustering(seed=36):
# Test using pipeline to do spectral clustering
random_state = np.random.RandomState(seed)
se_rbf = SpectralEmbedding(n_components=n_clusters,
affinity="rbf",
random_state=random_state)
se_knn = SpectralEmbedding(n_components=n_clusters,
affinity="nearest_neighbors",
n_neighbors=5,
random_state=random_state)
for se in [se_rbf, se_knn]:
km = KMeans(n_clusters=n_clusters, random_state=random_state)
km.fit(se.fit_transform(S))
assert_array_almost_equal(
normalized_mutual_info_score(
km.labels_,
true_labels), 1.0, 2)
示例7
def benchmarking(gtlabels, labels):
# TODO: Please note that the AMI definition used in the paper differs from that in the sklearn python package.
# TODO: Please modify it accordingly.
numeval = len(gtlabels)
ari = metrics.adjusted_rand_score(gtlabels[:numeval], labels[:numeval])
ami = metrics.adjusted_mutual_info_score(gtlabels[:numeval], labels[:numeval])
nmi = metrics.normalized_mutual_info_score(gtlabels[:numeval], labels[:numeval])
acc = clustering_accuracy(gtlabels[:numeval], labels[:numeval])
return ari, ami, nmi, acc
示例8
def test_spectral_embedding_two_components(seed=36):
# Test spectral embedding with two components
random_state = np.random.RandomState(seed)
n_sample = 100
affinity = np.zeros(shape=[n_sample * 2, n_sample * 2])
# first component
affinity[0:n_sample,
0:n_sample] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# second component
affinity[n_sample::,
n_sample::] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# Test of internal _graph_connected_component before connection
component = _graph_connected_component(affinity, 0)
assert component[:n_sample].all()
assert not component[n_sample:].any()
component = _graph_connected_component(affinity, -1)
assert not component[:n_sample].any()
assert component[n_sample:].all()
# connection
affinity[0, n_sample + 1] = 1
affinity[n_sample + 1, 0] = 1
affinity.flat[::2 * n_sample + 1] = 0
affinity = 0.5 * (affinity + affinity.T)
true_label = np.zeros(shape=2 * n_sample)
true_label[0:n_sample] = 1
se_precomp = SpectralEmbedding(n_components=1, affinity="precomputed",
random_state=np.random.RandomState(seed))
embedded_coordinate = se_precomp.fit_transform(affinity)
# Some numpy versions are touchy with types
embedded_coordinate = \
se_precomp.fit_transform(affinity.astype(np.float32))
# thresholding on the first components using 0.
label_ = np.array(embedded_coordinate.ravel() < 0, dtype="float")
assert_equal(normalized_mutual_info_score(true_label, label_), 1.0)
示例9
def _augmented_update_medoid_ics_in_place(self, pdists, y_gt, cluster_ics,
medoid_ics, loss_mult):
for cluster_idx in range(self.n_clusters):
# y_pred = self._get_cluster_ics(D, medoid_ics)
# Don't prematurely do the assignment step.
# Do this after we've updated all cluster medoids.
y_pred = cluster_ics
if sum(y_pred == cluster_idx) == 0:
# Cluster is empty.
continue
curr_score = (
-1.0 * np.sum(
pdists[medoid_ics[cluster_idx], y_pred == cluster_idx]) +
loss_mult * (1.0 - metrics.normalized_mutual_info_score(
y_gt, y_pred)))
pdist_in = pdists[y_pred == cluster_idx, :]
pdist_in = pdist_in[:, y_pred == cluster_idx]
all_scores_fac = np.sum(-1.0 * pdist_in, axis=1)
all_scores_loss = []
for i in range(y_pred.size):
if y_pred[i] != cluster_idx:
continue
# remove this cluster's current centroid
medoid_ics_i = medoid_ics[:cluster_idx] + medoid_ics[cluster_idx + 1:]
# add this new candidate to the centroid list
medoid_ics_i += [i]
y_pred_i = self._get_cluster_ics(pdists, medoid_ics_i)
all_scores_loss.append(loss_mult * (
1.0 - metrics.normalized_mutual_info_score(y_gt, y_pred_i)))
all_scores = all_scores_fac + all_scores_loss
max_score_idx = np.argmax(all_scores)
max_score = all_scores[max_score_idx]
if max_score > curr_score:
medoid_ics[cluster_idx] = np.where(
y_pred == cluster_idx)[0][max_score_idx]
示例10
def pam_augmented_fit(self, feat, y, loss_mult):
pam_max_iter = 5
self._check_init_args()
feat = self._check_array(feat)
pdists = pairwise_distance_np(feat)
self.loss_augmented_fit(feat, y, loss_mult)
print('PAM -1 (before PAM): score: %f, score_aug: %f' % (
self.score_, self.score_aug_))
# Initialize from loss augmented facility location
subset = self.center_ics_
for iter_ in range(pam_max_iter):
# update the cluster assignment
cluster_ics = self._get_cluster_ics(pdists, subset)
# update the medoid for each clusters
self._augmented_update_medoid_ics_in_place(pdists, y, cluster_ics, subset,
loss_mult)
self.score_ = np.float32(-1.0) * self._get_facility_distance(
pdists, subset)
self.score_aug_ = self.score_ + loss_mult * (
1.0 - metrics.normalized_mutual_info_score(
y, self._get_cluster_ics(pdists, subset)))
self.score_aug_ = self.score_aug_.astype(np.float32)
print('PAM iter: %d, score: %f, score_aug: %f' % (iter_, self.score_,
self.score_aug_))
self.center_ics_ = subset
self.labels_ = cluster_ics
return self
示例11
def _compute_nmi_score(labels, predictions):
return math_ops.to_float(
script_ops.py_func(
metrics.normalized_mutual_info_score, [labels, predictions],
[dtypes.float64],
name='nmi'))
示例12
def evaluate_cluster(self, embedding_list):
X = []
Y = []
for p in self.label:
X.append(embedding_list[p])
Y.append(self.label[p])
Y_pred = KMeans(self.n_label, random_state=self.seed).fit(np.array(X)).predict(X)
nmi = normalized_mutual_info_score(np.array(Y), Y_pred)
ari = adjusted_rand_score(np.array(Y), Y_pred)
return nmi, ari
示例13
def evaluate_author_cluster(self, embedding_matrix):
embedding_list = embedding_matrix.tolist()
X = []
Y = []
for author in self.author_label:
X.append(embedding_list[author])
Y.append(self.author_label[author])
pred_Y = KMeans(4).fit(np.array(X)).predict(X)
score = normalized_mutual_info_score(np.array(Y), pred_Y)
return score
示例14
def evaluate_paper_cluster(self, embedding_matrix):
embedding_list = embedding_matrix.tolist()
X = []
Y = []
for paper in self.paper_label:
X.append(embedding_list[paper])
Y.append(self.paper_label[paper])
pred_Y = KMeans(3).fit(np.array(X)).predict(X)
score = normalized_mutual_info_score(np.array(Y), pred_Y)
return score
示例15
def my_Kmeans(x, y, k=4, time=10, return_NMI=False):
x = np.array(x)
x = np.squeeze(x)
y = np.array(y)
if len(y.shape) > 1:
y = np.argmax(y, axis=1)
estimator = KMeans(n_clusters=k)
ARI_list = [] # adjusted_rand_score(
NMI_list = []
if time:
# print('KMeans exps {}次 æ±~B平å~]~G '.format(time))
for i in range(time):
estimator.fit(x, y)
y_pred = estimator.predict(x)
score = normalized_mutual_info_score(y, y_pred)
NMI_list.append(score)
s2 = adjusted_rand_score(y, y_pred)
ARI_list.append(s2)
# print('NMI_list: {}'.format(NMI_list))
score = sum(NMI_list) / len(NMI_list)
s2 = sum(ARI_list) / len(ARI_list)
print('NMI (10 avg): {:.4f} , ARI (10avg): {:.4f}'.format(score, s2))
else:
estimator.fit(x, y)
y_pred = estimator.predict(x)
score = normalized_mutual_info_score(y, y_pred)
print("NMI on all label data: {:.5f}".format(score))
if return_NMI:
return score, s2
示例16
def evaluate_clustering(y_gt, y_assignment):
return normalized_mutual_info_score(y_gt, y_assignment)
示例17
def test_spectral_embedding_two_components(seed=36):
"""Test spectral embedding with two components"""
random_state = np.random.RandomState(seed)
n_sample = 100
affinity = np.zeros(shape=[n_sample * 2,
n_sample * 2])
# first component
affinity[0:n_sample,
0:n_sample] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# second component
affinity[n_sample::,
n_sample::] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# connection
affinity[0, n_sample + 1] = 1
affinity[n_sample + 1, 0] = 1
affinity.flat[::2 * n_sample + 1] = 0
affinity = 0.5 * (affinity + affinity.T)
true_label = np.zeros(shape=2 * n_sample)
true_label[0:n_sample] = 1
se_precomp = SpectralEmbedding(n_components=1,
random_state=np.random.RandomState(seed),
eigen_solver = 'arpack')
embedded_coordinate = se_precomp.fit_transform(affinity,
input_type='affinity')
# thresholding on the first components using 0.
label_ = np.array(embedded_coordinate.ravel() < 0, dtype="float")
assert_equal(normalized_mutual_info_score(true_label, label_), 1.0)
示例18
def test_diffusion_embedding_two_components_no_diffusion_time(seed=36):
"""Test spectral embedding with two components"""
random_state = np.random.RandomState(seed)
n_sample = 100
affinity = np.zeros(shape=[n_sample * 2,
n_sample * 2])
# first component
affinity[0:n_sample,
0:n_sample] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# second component
affinity[n_sample::,
n_sample::] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# connection
affinity[0, n_sample + 1] = 1
affinity[n_sample + 1, 0] = 1
affinity.flat[::2 * n_sample + 1] = 0
affinity = 0.5 * (affinity + affinity.T)
true_label = np.zeros(shape=2 * n_sample)
true_label[0:n_sample] = 1
geom_params = {'laplacian_method':'geometric'}
se_precomp = SpectralEmbedding(n_components=1,
random_state=np.random.RandomState(seed),
eigen_solver = 'arpack',
diffusion_maps = True,
geom = geom_params)
embedded_coordinate = se_precomp.fit_transform(affinity,
input_type='affinity')
# thresholding on the first components using 0.
label_ = np.array(embedded_coordinate.ravel() < 0, dtype="float")
assert_equal(normalized_mutual_info_score(true_label, label_), 1.0)
示例19
def test_diffusion_embedding_two_components_diffusion_time_one(seed=36):
"""Test spectral embedding with two components"""
random_state = np.random.RandomState(seed)
n_sample = 100
affinity = np.zeros(shape=[n_sample * 2,
n_sample * 2])
# first component
affinity[0:n_sample,
0:n_sample] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# second component
affinity[n_sample::,
n_sample::] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# connection
affinity[0, n_sample + 1] = 1
affinity[n_sample + 1, 0] = 1
affinity.flat[::2 * n_sample + 1] = 0
affinity = 0.5 * (affinity + affinity.T)
true_label = np.zeros(shape=2 * n_sample)
true_label[0:n_sample] = 1
geom_params = {'laplacian_method':'geometric'}
se_precomp = SpectralEmbedding(n_components=1,
random_state=np.random.RandomState(seed),
eigen_solver = 'arpack',
diffusion_maps = True,
diffusion_time = 1.0,
geom = geom_params)
embedded_coordinate = se_precomp.fit_transform(affinity,
input_type='affinity')
# thresholding on the first components using 0.
label_ = np.array(embedded_coordinate.ravel() < 0, dtype="float")
assert_equal(normalized_mutual_info_score(true_label, label_), 1.0)
示例20
def test_normalized_mutual_info_score(self):
result = self.df.metrics.normalized_mutual_info_score()
expected = metrics.normalized_mutual_info_score(self.target, self.pred)
self.assertEqual(result, expected)
示例21
def normalized_mutual_information(first_partition, second_partition):
"""
Normalized Mutual Information between two clusterings.
Normalized Mutual Information (NMI) is an normalization of the Mutual
Information (MI) score to scale the results between 0 (no mutual
information) and 1 (perfect correlation). In this function, mutual
information is normalized by ``sqrt(H(labels_true) * H(labels_pred))``
:param first_partition: NodeClustering object
:param second_partition: NodeClustering object
:return: MatchingResult object
:Example:
>>> from cdlib import evaluation, algorithms
>>> g = nx.karate_club_graph()
>>> louvain_communities = algorithms.louvain(g)
>>> leiden_communities = algorithms.leiden(g)
>>> evaluation.normalized_mutual_information(louvain_communities,leiden_communities)
"""
__check_partition_coverage(first_partition, second_partition)
__check_partition_overlap(first_partition, second_partition)
first_partition_c = [x[1]
for x in sorted([(node, nid)
for nid, cluster in enumerate(first_partition.communities)
for node in cluster], key=lambda x: x[0])]
second_partition_c = [x[1]
for x in sorted([(node, nid)
for nid, cluster in enumerate(second_partition.communities)
for node in cluster], key=lambda x: x[0])]
from sklearn.metrics import normalized_mutual_info_score
return MatchingResult(score=normalized_mutual_info_score(first_partition_c, second_partition_c))
示例22
def _nmi(preds, targets):
return metrics.normalized_mutual_info_score(targets, preds)
示例23
def calculate_NMI(self, query_labels, cluster_labels, **kwargs):
return normalized_mutual_info_score(query_labels, cluster_labels)
示例24
def test_spectral_embedding_two_components(seed=36):
# Test spectral embedding with two components
random_state = np.random.RandomState(seed)
n_sample = 100
affinity = np.zeros(shape=[n_sample * 2, n_sample * 2])
# first component
affinity[0:n_sample,
0:n_sample] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# second component
affinity[n_sample::,
n_sample::] = np.abs(random_state.randn(n_sample, n_sample)) + 2
# Test of internal _graph_connected_component before connection
component = _graph_connected_component(affinity, 0)
assert_true(component[:n_sample].all())
assert_true(not component[n_sample:].any())
component = _graph_connected_component(affinity, -1)
assert_true(not component[:n_sample].any())
assert_true(component[n_sample:].all())
# connection
affinity[0, n_sample + 1] = 1
affinity[n_sample + 1, 0] = 1
affinity.flat[::2 * n_sample + 1] = 0
affinity = 0.5 * (affinity + affinity.T)
true_label = np.zeros(shape=2 * n_sample)
true_label[0:n_sample] = 1
se_precomp = SpectralEmbedding(n_components=1, affinity="precomputed",
random_state=np.random.RandomState(seed))
embedded_coordinate = se_precomp.fit_transform(affinity)
# Some numpy versions are touchy with types
embedded_coordinate = \
se_precomp.fit_transform(affinity.astype(np.float32))
# thresholding on the first components using 0.
label_ = np.array(embedded_coordinate.ravel() < 0, dtype="float")
assert_equal(normalized_mutual_info_score(true_label, label_), 1.0)
示例25
def loss_augmented_fit(self, feat, y, loss_mult):
"""Fit K-Medoids to the provided data."""
self._check_init_args()
# Check that the array is good and attempt to convert it to
# Numpy array if possible.
feat = self._check_array(feat)
# Apply distance metric to get the distance matrix.
pdists = pairwise_distance_np(feat)
num_data = feat.shape[0]
candidate_ids = list(range(num_data))
candidate_scores = np.zeros(num_data,)
subset = []
k = 0
while k < self.n_clusters:
candidate_scores = []
for i in candidate_ids:
# push i to subset.
subset.append(i)
marginal_cost = -1.0 * np.sum(np.min(pdists[:, subset], axis=1))
loss = 1.0 - metrics.normalized_mutual_info_score(
y, self._get_cluster_ics(pdists, subset))
candidate_scores.append(marginal_cost + loss_mult * loss)
# remove i from subset.
subset.pop()
# push i_star to subset.
i_star = candidate_ids[np.argmax(candidate_scores)]
subset.append(i_star)
# remove i_star from candidate indices.
candidate_ids.remove(i_star)
k += 1
# Expose labels_ which are the assignments of
# the training data to clusters.
self.labels_ = self._get_cluster_ics(pdists, subset)
# Expose cluster centers, i.e. medoids.
self.cluster_centers_ = feat.take(subset, axis=0)
# Expose indices of chosen cluster centers.
self.center_ics_ = subset
# Expose the score = -\sum_{i \in V} min_{j \in S} || x_i - x_j ||
self.score_ = np.float32(-1.0) * self._get_facility_distance(pdists, subset)
self.score_aug_ = self.score_ + loss_mult * (
1.0 - metrics.normalized_mutual_info_score(
y, self._get_cluster_ics(pdists, subset)))
self.score_aug_ = self.score_aug_.astype(np.float32)
# Expose the chosen cluster indices.
self.subset_ = subset
return self
示例26
def plot_metrics(path, dataset, ref, fraction):
ARI = []
NMI = []
F1 = []
methods = ['scABC', 'SC3', 'scVI', 'SCALE']
for frac in fraction:
outdir = os.path.join(path, dataset, frac) #;print(outdir)
scABC_pred, _ = read_labels(os.path.join(outdir, 'scABC_predict.txt'))
if os.path.isfile(os.path.join(outdir, 'SC3_predict.txt')):
SC3_pred, _ = read_labels(os.path.join(outdir, 'SC3_predict.txt'))
else:
SC3_pred = None
scVI_pred, _ = read_labels(os.path.join(outdir, 'scVI_predict.txt'))
scale_pred, pred_classes = read_labels(os.path.join(outdir, 'cluster_assignments.txt'))
ari = []
nmi = []
f1 = []
for pred, method in zip([scABC_pred, SC3_pred, scVI_pred, scale_pred], methods):
if pred is None:
ari.append(0)
nmi.append(0)
f1.append(0)
else:
pred = reassign_cluster_with_ref(pred, ref)
ari.append(adjusted_rand_score(ref, pred))
nmi.append(normalized_mutual_info_score(ref, pred))
f1.append(f1_score(ref, pred, average='micro'))
ARI.append(ari)
NMI.append(nmi)
F1.append(f1)
fraction = [ frac.replace('corrupt_', '') for frac in fraction]
ARI = pd.Series(np.concatenate(ARI, axis=0))
NMI = pd.Series(np.concatenate(NMI, axis=0))
F1 = pd.Series(np.concatenate(F1, axis=0))
M = pd.Series(methods * len(fraction))
F = pd.Series(np.concatenate([[i]*len(methods) for i in fraction]))
metrics = pd.concat([ARI, NMI, F1, M, F], axis=1)
metrics.columns = ['ARI', 'NMI', 'F1', 'method', 'fraction']
lineplot(metrics, 'ARI', dataset, False)
lineplot(metrics, 'NMI', dataset, False)
lineplot(metrics, 'F1', dataset, True)
示例27
def loss_augmented_fit(self, X, y, loss_mult):
"""Fit K-Medoids to the provided data.
Parameters
----------
X : array-like or sparse matrix, shape=(n_samples, n_features)
Returns
-------
self
"""
self._check_init_args()
# Check that the array is good and attempt to convert it to
# Numpy array if possible
X = self._check_array(X)
# Apply distance metric to get the distance matrix
D = self.distance_func(X)
num_data = X.shape[0]
candidate_ids = range(num_data)
candidate_scores = np.zeros(num_data,)
subset = []
k = 0
while k < self.n_clusters:
candidate_scores = []
for i in candidate_ids:
# push i to subset
subset.append(i)
marginal_cost = np.sum(np.min(D[:, subset], axis=1))
loss = normalized_mutual_info_score(y,self._get_cluster_ics(D, subset))
candidate_scores.append(marginal_cost - loss_mult*loss)
# remove i from subset
subset.pop()
# push i_star to subset
i_star = candidate_ids[np.argmin(candidate_scores)]
bisect.insort(subset, i_star)
# remove i_star from candiate indices
del candidate_ids[bisect.bisect_left(candidate_ids, i_star)]
k = k + 1
#print '|S|: %d, F(S): %f' % (k, np.min(candidate_scores))
# Expose labels_ which are the assignments of
# the training data to clusters
self.labels_ = self._get_cluster_ics(D, subset)
# Expose cluster centers, i.e. medoids
self.cluster_centers_ = X.take(subset, axis=0)
# Expose indices of chosen cluster centers
self.center_ics_ = subset
return self