Python源码示例:sklearn.metrics.silhouette_score()
示例1
def silhouette_score(phate_op, n_clusters, random_state=None, **kwargs):
"""Compute the Silhouette score on KMeans on the PHATE potential
Parameters
----------
phate_op : phate.PHATE
Fitted PHATE operator
n_clusters : int
Number of clusters.
random_state : int or None, optional (default: None)
Random seed for k-means
Returns
-------
score : float
"""
cluster_labels = kmeans(phate_op, n_clusters=n_clusters, random_state=random_state, **kwargs)
return metrics.silhouette_score(phate_op.diff_potential, cluster_labels)
示例2
def calc_scores(cls, model, data, min_clusters, max_clusters, random_state=0):
silhouettes = []
davieses = []
calinskies = []
if model.__class__.__name__ == 'HierarchicalClustering':
linkage_matrix = model.fit(data)
else:
linkage_matrix = None
for nc in range(min_clusters, max_clusters + 1):
model.n_clusters = nc
model.random_state = random_state
pred_labels = model.fit_predict(data)
silhouettes.append(silhouette_score(data, pred_labels, random_state=random_state))
davieses.append(davies_bouldin_score(data, pred_labels))
calinskies.append(calinski_harabasz_score(data, pred_labels))
sil_nc = np.argmax(silhouettes) + min_clusters
dav_nc = np.argmin(davieses) + min_clusters
cal_nc = np.argmax(calinskies) + min_clusters
return silhouettes, sil_nc, davieses, dav_nc, calinskies, cal_nc, linkage_matrix
示例3
def clustering_scores(self, prediction_algorithm: str = "knn") -> Tuple:
if self.gene_dataset.n_labels > 1:
latent, _, labels = self.get_latent()
if prediction_algorithm == "knn":
labels_pred = KMeans(
self.gene_dataset.n_labels, n_init=200
).fit_predict(
latent
) # n_jobs>1 ?
elif prediction_algorithm == "gmm":
gmm = GMM(self.gene_dataset.n_labels)
gmm.fit(latent)
labels_pred = gmm.predict(latent)
asw_score = silhouette_score(latent, labels)
nmi_score = NMI(labels, labels_pred)
ari_score = ARI(labels, labels_pred)
uca_score = unsupervised_clustering_accuracy(labels, labels_pred)[0]
logger.debug(
"Clustering Scores:\nSilhouette: %.4f\nNMI: %.4f\nARI: %.4f\nUCA: %.4f"
% (asw_score, nmi_score, ari_score, uca_score)
)
return asw_score, nmi_score, ari_score, uca_score
示例4
def _cluster_plot(self, embedding, labels):
silhouette = silhouette_score(embedding.squeeze(), labels)
chs = calinski_harabaz_score(embedding.squeeze(), labels)
dbs = davies_bouldin_score(embedding.squeeze(), labels)
n_labels = len(set(labels))
self.writer.add_scalar(f"silhouette {n_labels}", silhouette, self.step_id)
self.writer.add_scalar(f"chs {n_labels}", chs, self.step_id)
self.writer.add_scalar(f"dbs {n_labels}", dbs, self.step_id)
indices = list(range(len(labels)))
random.shuffle(indices)
samples_to_plot = indices[:1000]
sample_labels = [labels[idx] for idx in samples_to_plot]
sample_embedding = embedding[samples_to_plot]
pca = PCA(2).fit_transform(sample_embedding.squeeze())
fig, ax = plt.subplots()
ax.scatter(pca[:, 0], pca[:, 1], c=sample_labels, cmap="tab20")
self.writer.add_figure(f"clustering {n_labels}", fig, self.step_id)
示例5
def test_silhouette():
# this test checks wether combat can align data from several gaussians
# it checks this by computing the silhouette coefficient in a pca embedding
# load in data
adata = sc.datasets.blobs()
# apply combat
sc.pp.combat(adata, 'blobs')
# compute pca
sc.tl.pca(adata)
X_pca = adata.obsm['X_pca']
# compute silhouette coefficient in pca
sh = silhouette_score(X_pca[:, :2], adata.obs['blobs'].values)
assert sh < 0.1
示例6
def _find_optimal_clustering(self,clusterings):
max_score = float('-inf')
max_clustering = None
for clustering in clusterings:
labeled_vectors = [(node.vector,cluster_idx) for cluster_idx in range(len(clustering)) for node in _get_cluster_nodes(clustering[cluster_idx][1]) ]
vectors,labels = [np.array(x) for x in zip(*labeled_vectors)]
if np.in1d([1],labels)[0]:
score = silhouette_score(vectors,labels,metric='cosine')
else:
continue # silhouette doesn't work with just one cluster
if score > max_score:
max_score = score
max_clustering = clustering
return list(zip(*max_clustering))[1] if max_clustering else list(zip(*clusterings[0]))[1]
示例7
def bench_k_means(estimator, name, data):
estimator.fit(data)
# A short explanation for every score:
# homogeneity: each cluster contains only members of a single class (range 0 - 1)
# completeness: all members of a given class are assigned to the same cluster (range 0 - 1)
# v_measure: harmonic mean of homogeneity and completeness
# adjusted_rand: similarity of the actual values and their predictions,
# ignoring permutations and with chance normalization
# (range -1 to 1, -1 being bad, 1 being perfect and 0 being random)
# adjusted_mutual_info: agreement of the actual values and predictions, ignoring permutations
# (range 0 - 1, with 0 being random agreement and 1 being perfect agreement)
# silhouette: uses the mean distance between a sample and all other points in the same class,
# as well as the mean distance between a sample and all other points in the nearest cluster
# to calculate a score (range: -1 to 1, with the former being incorrect,
# and the latter standing for highly dense clustering.
# 0 indicates overlapping clusters.
print('%-9s \t%i \thomogeneity: %.3f \tcompleteness: %.3f \tv-measure: %.3f \tadjusted-rand: %.3f \t'
'adjusted-mutual-info: %.3f \tsilhouette: %.3f'
% (name, estimator.inertia_,
metrics.homogeneity_score(y, estimator.labels_),
metrics.completeness_score(y, estimator.labels_),
metrics.v_measure_score(y, estimator.labels_),
metrics.adjusted_rand_score(y, estimator.labels_),
metrics.adjusted_mutual_info_score(y, estimator.labels_),
metrics.silhouette_score(data, estimator.labels_,
metric='euclidean')))
示例8
def printClustersSummary(data, labels, centroids):
'''
Helper method to automate models assessment
'''
print('Pseudo_F: ', pseudo_F(data, labels, centroids))
print('Davis-Bouldin: ',
davis_bouldin(data, labels, centroids))
print('Silhouette score: ',
mt.silhouette_score(data, np.array(labels),
metric='euclidean'))
示例9
def find_best_n_clusters(data, clusterer, max_n_clusters, random_state, **kwargs):
"""
Finds best number of clusters for KMeans and Gaussian Mixture.
Parameters
-------
data: pd.DataFrame
Dataframe with features for clustering with index as in ``retention_config.index_col``
clusterer: sklearn clusterer class
For instance, ``sklearn.cluster.KMeans`` or ``sklearn.mixture.GaussianMixture``.
max_n_clusters: int
Maximal number of clusters for searching.
random_state: int
Random state for clusterer.
Returns
-------
Optimal keyword arguments for clustering method.
Return type
------
Dict
"""
args = {i: j for i, j in kwargs.items() if i in clusterer.get_params(clusterer)}
if 'n_clusters' in clusterer.get_params(clusterer):
kms = True
else:
kms = False
args.pop('n_clusters' if kms else 'n_components', None)
args.update({'random_state': random_state})
score = {}
for i in range(2, max_n_clusters + 1):
args.update({'n_clusters' if kms else 'n_components': i})
km = clusterer(**args)
score[i] = silhouette_score(data, km.fit_predict(data), metric='cosine')
best = pd.Series(score).idxmax()
args.update({'n_clusters' if kms else 'n_components': best})
print(f'Best number of clusters is {best}')
return args
示例10
def calc_all_metrics(data, km):
"""
Calculates all quality metrics: Cluster Stability Index, Silhouette score, Homogeneity, distances for clustering.
Parameters
--------
data: pd.DataFrame
Dataframe with features for clustering indexed as in ``retention_config.index_col``
km:
Already fitted clusterer.
Returns
--------
Metrics scores
Return type
--------
Dict
"""
res = {}
cl = km.labels_
res['mean_pd'] = calc_mean_pd(data, cl)
if hasattr(km, 'cluster_centers_'):
res['mean_fc'] = calc_mean_dist_from_center(data, km)
if len(set(cl)) > 1:
res['silhouette'] = silhouette_score(data, cl, metric='cosine')
return res
示例11
def test():
parser = argparse.ArgumentParser()
parser.add_argument("File")
args = parser.parse_args()
info = fh.get_function_information(args.File)
#info = fh.get_arg_funcs(args.File)
info = trim_funcs(info, args.File)
vect, func_sparse = funcs_to_sparse(info)
transformer = Normalizer().fit(func_sparse)
func_sparse = transformer.transform(func_sparse)
#svd = TruncatedSVD(random_state=2)
svd = TruncatedSVD(n_components=5, n_iter=7, random_state=42)
func_sparse = svd.fit_transform(func_sparse)
scores = []
clust_count = []
for x in range(2, 20):
result = KMeans(n_clusters=x, random_state=2).fit(func_sparse)
score = silhouette_score(func_sparse, result.labels_, metric="cosine")
scores.append(score)
clust_count.append(x)
print("Clusters {:<3} | Silhoette Score : {}".format(x, score))
plt.plot(clust_count, scores)
plt.xlabel("Cluster Centroid Count")
plt.ylabel("Silhoette Score")
plt.grid = True
plt.show()
pass
示例12
def single_cluster(all_functions, centroid_count=2):
vect, func_sparse = funcs_to_sparse(all_functions)
transformer = Normalizer().fit(func_sparse)
func_sparse = transformer.transform(func_sparse)
# svd = TruncatedSVD(random_state=2)
# svd = TruncatedSVD(n_components=5, n_iter=7, random_state=42)
# func_sparse = svd.fit_transform(func_sparse)
labels = []
result = KMeans(n_clusters=centroid_count, random_state=2).fit(func_sparse)
score = silhouette_score(func_sparse,
result.labels_,
metric="cosine",
random_state=2,
sample_size=5000)
labels.append(result.labels_)
print("Clusters {:<3} | Silhoette Score : {}".format(
centroid_count, score))
return result.labels_
示例13
def get_single_cluster(all_functions, centroid_count=2):
return_dict = {}
vect, func_sparse = funcs_to_sparse(all_functions)
transformer = Normalizer().fit(func_sparse)
func_sparse = transformer.transform(func_sparse)
# svd = TruncatedSVD(random_state=2)
# svd = TruncatedSVD(n_components=5, n_iter=7, random_state=42)
# func_sparse = svd.fit_transform(func_sparse)
labels = []
result = KMeans(n_clusters=centroid_count, random_state=2).fit(func_sparse)
score = silhouette_score(func_sparse,
result.labels_,
metric="cosine",
random_state=2,
sample_size=5000)
labels.append(result.labels_)
#print("Clusters {:<3} | Silhoette Score : {}".format(centroid_count, score))
return_dict['count'] = centroid_count
return_dict['score'] = score
return_dict['labels'] = result.labels_
return return_dict
示例14
def n_cluster_embeddings(self, features=None, n_clusters=3, method='ac'):
'''
clusters the nodes based on embedding features
features = None (use DGI generated embeddings)
'''
if method == 'ac':
clustering = AgglomerativeClustering(n_clusters=n_clusters, affinity='euclidean',\
linkage='ward')
clustering.fit(self.embeddings if features is None else features)
self.labels = clustering.labels_
self.score = silhouette_score(self.embeddings if features is None else features,\
self.labels)
return {'labels': self.labels, 'score': self.score}
示例15
def evaluate(k):
km = kmeans[k]
score = silhouette_score(train_offsets, km.labels_, metric='euclidean', random_state=RANDOM_SEED)
print('Silhouette score for k=%d is %f.' % (k, score))
return (k, score)
示例16
def sk_kmeans(core): #, kval=3
solrURL = "http://localhost:8983/solr/" + core
solrInstance = Solr(solrURL)
list_of_points = []
docs = solrInstance.query_iterator(query="*:*", start=0)
for doc in docs:
list_of_points.append(Vector(doc['id'], doc))
list_of_Dicts = (point.features for point in list_of_points)
df = pd.DataFrame(list_of_Dicts)
df = df.fillna(0)
silhouettes = {}
for k in range(2, 10):
kmeans = KMeans(n_clusters=k,
init='k-means++',
max_iter=300, # k-means convergence
n_init=10, # find global minima
n_jobs=-2, # parallelize
)
labels = kmeans.fit_predict(df)
silhouettes[k] = silhouette_score(df, labels)
return str(silhouettes)
示例17
def evaluate_performance(data, labels, metric='euclidean'):
score = skmetrics.silhouette_score(data, labels, metric=metric)
print('Labels:', labels)
print('Score:', score)
return score
示例18
def labeled_val_fun(u_feats, l_feats, l_targets, k):
if device=='cuda':
torch.cuda.empty_cache()
l_num=len(l_targets)
kmeans = K_Means(k, pairwise_batch_size=256)
kmeans.fit_mix(torch.from_numpy(u_feats).to(device), torch.from_numpy(l_feats).to(device), torch.from_numpy(l_targets).to(device))
cat_pred = kmeans.labels_.cpu().numpy()
u_pred = cat_pred[l_num:]
silh_score = silhouette_score(u_feats, u_pred)
return silh_score, cat_pred
示例19
def labeled_val_fun(u_feats, l_feats, l_targets, k):
if device=='cuda':
torch.cuda.empty_cache()
l_num=len(l_targets)
kmeans = K_Means(k, pairwise_batch_size=256)
kmeans.fit_mix(torch.from_numpy(u_feats).to(device), torch.from_numpy(l_feats).to(device), torch.from_numpy(l_targets).to(device))
cat_pred = kmeans.labels_.cpu().numpy()
u_pred = cat_pred[l_num:]
silh_score = silhouette_score(u_feats, u_pred)
return silh_score, cat_pred
示例20
def labeled_val_fun(u_feats, l_feats, l_targets, k):
if device=='cuda':
torch.cuda.empty_cache()
l_num=len(l_targets)
kmeans = K_Means(k, pairwise_batch_size = 200)
kmeans.fit_mix(torch.from_numpy(u_feats).to(device), torch.from_numpy(l_feats).to(device), torch.from_numpy(l_targets).to(device))
cat_pred = kmeans.labels_.cpu().numpy()
u_pred = cat_pred[l_num:]
silh_score = silhouette_score(u_feats, u_pred)
del kmeans
return silh_score, cat_pred
示例21
def test_gmm():
sil = pyclust.validate.Silhouette()
sil_score = sil.score(X, ypred, sample_size=None)
print(sil_score[0])
print(sil.sample_scores[:10])
print(silhouette_score(X, ypred, sample_size=None))
print(silhouette_samples(X, ypred)[:10])
示例22
def get_clusters(self, features_2d):
""" Mapping instances to clusters, using silhouette-scores to determine
number of cluster.
Returns
-------
paths: List[str]
paths to plots
"""
# get silhouette scores for k_means with 2 to 12 clusters
# use number of clusters with highest silhouette score
best_score, best_n_clusters = -1, -1
min_clusters, max_clusters = 2, min(features_2d.shape[0], 12)
clusters = None
for n_clusters in range(min_clusters, max_clusters):
km = KMeans(n_clusters=n_clusters)
y_pred = km.fit_predict(features_2d)
score = silhouette_score(features_2d, y_pred)
if score > best_score:
best_n_clusters = n_clusters
best_score = score
clusters = y_pred
self.logger.debug("%d clusters detected using silhouette scores",
best_n_clusters)
cluster_dict = {n: [] for n in range(best_n_clusters)}
for i, c in enumerate(clusters):
cluster_dict[c].append(self.insts[i])
self.logger.debug("Distribution over clusters: %s",
str({k: len(v) for k, v in cluster_dict.items()}))
return clusters, cluster_dict
示例23
def bench_k_means(estimator, name, data):
t0 = time()
estimator.fit(data)
print('% 9s %.2fs %i %.3f %.3f %.3f %.3f %.3f %.3f'
% (name, (time() - t0), estimator.inertia_,
metrics.homogeneity_score(labels, estimator.labels_),
metrics.completeness_score(labels, estimator.labels_),
metrics.v_measure_score(labels, estimator.labels_),
metrics.adjusted_rand_score(labels, estimator.labels_),
metrics.adjusted_mutual_info_score(labels, estimator.labels_),
metrics.silhouette_score(data, estimator.labels_,
metric='euclidean',
sample_size=sample_size)))
示例24
def test_silhouette_score(self):
result = self.df.metrics.silhouette_score()
expected = metrics.silhouette_score(self.data, self.pred)
self.assertAlmostEqual(result, expected)
示例25
def test_KMeans_scores(self):
digits = datasets.load_digits()
df = pdml.ModelFrame(digits)
scaled = pp.scale(digits.data)
df.data = df.data.pp.scale()
self.assert_numpy_array_almost_equal(df.data.values, scaled)
clf1 = cluster.KMeans(init='k-means++', n_clusters=10,
n_init=10, random_state=self.random_state)
clf2 = df.cluster.KMeans(init='k-means++', n_clusters=10,
n_init=10, random_state=self.random_state)
clf1.fit(scaled)
df.fit_predict(clf2)
expected = m.homogeneity_score(digits.target, clf1.labels_)
self.assertEqual(df.metrics.homogeneity_score(), expected)
expected = m.completeness_score(digits.target, clf1.labels_)
self.assertEqual(df.metrics.completeness_score(), expected)
expected = m.v_measure_score(digits.target, clf1.labels_)
self.assertEqual(df.metrics.v_measure_score(), expected)
expected = m.adjusted_rand_score(digits.target, clf1.labels_)
self.assertEqual(df.metrics.adjusted_rand_score(), expected)
expected = m.homogeneity_score(digits.target, clf1.labels_)
self.assertEqual(df.metrics.homogeneity_score(), expected)
expected = m.silhouette_score(scaled, clf1.labels_, metric='euclidean',
sample_size=300, random_state=self.random_state)
result = df.metrics.silhouette_score(metric='euclidean', sample_size=300,
random_state=self.random_state)
self.assertAlmostEqual(result, expected)
示例26
def run_silhouette_cv_estimator(estimator, x, n_folds=10):
"""
只针对kmean的cv验证,使用silhouette_score对聚类后的结果labels_
进行度量使用silhouette_score,kmean的cv验证只是简单的通过np.random.choice
进行随机筛选x数据进行聚类的silhouette_score度量,并不涉及训练集测试集
:param estimator: keman或者支持estimator.labels_, 只通过if not isinstance(estimator, ClusterMixin)进行过滤
:param x: x特征矩阵
:param n_folds: int,透传KFold参数,切割训练集测试集参数,默认10
:return: eg: array([ 0.693 , 0.652 , 0.6845, 0.6696, 0.6732, 0.6874, 0.668 ,
0.6743, 0.6748, 0.671 ])
"""
if not isinstance(estimator, ClusterMixin):
print('estimator must be ClusterMixin')
return
silhouette_list = list()
# eg: n_folds = 10, len(x) = 150 -> 150 * 0.9 = 135
choice_cnt = int(len(x) * ((n_folds - 1) / n_folds))
choice_source = np.arange(0, x.shape[0])
# 所有执行fit的操作使用clone一个新的
estimator = clone(estimator)
for _ in np.arange(0, n_folds):
# 只是简单的通过np.random.choice进行随机筛选x数据
choice_index = np.random.choice(choice_source, choice_cnt)
x_choice = x[choice_index]
estimator.fit(x_choice)
# 进行聚类的silhouette_score度量
silhouette_score = metrics.silhouette_score(x_choice, estimator.labels_, metric='euclidean')
silhouette_list.append(silhouette_score)
return silhouette_list
示例27
def fit(self, X, y=None, sample_weight=None):
silhouette_avgs = []
for n_clusters in range(1, self.max_n_clusters):
self.clusterers[n_clusters-1].fit(X, y, sample_weight)
if n_clusters == 1:
silhouette_avgs.append(-1.1) # TODO
else:
silhouette_avgs.append(silhouette_score(X, self.clusterers[n_clusters-1].labels_))
self.best_n_clusters = silhouette_avgs.index(max(silhouette_avgs)) + 1
self.labels_ = self.clusterers[self.best_n_clusters-1].labels_
self.cluster_centers_ = self.clusterers[self.best_n_clusters-1].cluster_centers_
示例28
def bench_k_means(estimator, name, data):
t0 = time()
estimator.fit(data)
print('% 9s %.2fs %i %.3f %.3f %.3f %.3f %.3f %.3f'
% (name, (time() - t0), estimator.inertia_,
metrics.homogeneity_score(labels, estimator.labels_),
metrics.completeness_score(labels, estimator.labels_),
metrics.v_measure_score(labels, estimator.labels_),
metrics.adjusted_rand_score(labels, estimator.labels_),
metrics.adjusted_mutual_info_score(labels, estimator.labels_),
metrics.silhouette_score(data, estimator.labels_,
metric='euclidean',
sample_size=sample_size)))
示例29
def kmeans(phate_op, n_clusters='auto', max_clusters=10, random_state=None, k=None, **kwargs):
"""KMeans on the PHATE potential
Clustering on the PHATE operator as introduced in Moon et al.
This is similar to spectral clustering.
Parameters
----------
phate_op : phate.PHATE
Fitted PHATE operator
n_clusters : int, optional (default: 'auto')
Number of clusters.
If 'auto', uses the Silhouette score to determine the optimal number of clusters
max_clusters : int, optional (default: 10)
Maximum number of clusters to test if using the Silhouette score.
random_state : int or None, optional (default: None)
Random seed for k-means
k : deprecated for `n_clusters`
kwargs : additional arguments for `sklearn.cluster.KMeans`
Returns
-------
clusters : np.ndarray
Integer array of cluster assignments
"""
if k is not None:
warnings.warn(
"k is deprecated. Please use n_clusters in future.", FutureWarning
)
n_clusters = k
if not isinstance(phate_op, PHATE):
raise TypeError("Expected phate_op to be of type PHATE. Got {}".format(phate_op))
if phate_op.graph is not None:
if n_clusters == 'auto':
n_clusters = np.arange(2, max_clusters)
silhouette_scores = [silhouette_score(phate_op, k, random_state=random_state, **kwargs) for k in n_clusters]
n_clusters = n_clusters[np.argmax(silhouette_scores)]
return cluster.KMeans(n_clusters, random_state=random_state, **kwargs).fit_predict(
phate_op.diff_potential
)
else:
raise exceptions.NotFittedError(
"This PHATE instance is not fitted yet. Call "
"'fit' with appropriate arguments before "
"using this method."
)
示例30
def calculate_cluster_scores(x, cluster_labels, output):
with open("%s_scores.log" % output, "w+") as fh:
# Filter out singleton "cluster" (labeled as -1)
filtered_x, filtered_cluster_labels, singletons = ([] for _ in range(3))
cluster_groups = defaultdict(list)
for vec, lab in zip(x, cluster_labels):
if not lab == -1:
filtered_x.append(vec)
filtered_cluster_labels.append(lab)
cluster_groups[lab].append(vec)
else:
singletons.append(vec)
ln = "Number of clustered events: %d/%d (%f%%)\n" % (len(filtered_x), len(filtered_x)+len(singletons),
(len(filtered_x)/(len(filtered_x)+len(singletons)))*100)
print(ln.strip("\n"))
fh.write(ln)
for group in cluster_groups:
n_events = len(cluster_groups[group])
ln = "Cluster %d contains %d events\n" % (group, n_events)
print(ln.strip("\n"))
fh.write(ln)
rmsstd_scores = []
for group in cluster_groups:
rmsstd = calculate_rmsstd(np.array(cluster_groups[group]))
ln = "The RMSSTD score for cluster %d is %f\n" % (group, rmsstd)
print(ln.strip("\n"))
fh.write(ln)
rmsstd_scores.append(rmsstd)
try:
silhouette_avg = silhouette_score(np.array(filtered_x), np.array(filtered_cluster_labels))
ln = "The average silhouette score is : %f\n" % silhouette_avg
print(ln.strip("\n"))
fh.write(ln)
except:
silhouette_avg = float("nan")
ln = "Impossible to calculate silhouette score. Only 1 cluster group identified.\n"
print(ln.strip("\n"))
fh.write(ln)
return silhouette_avg, rmsstd_scores