?? concurrentkmeans.java
字號:
package kmeans;
import java.util.*;
import java.util.concurrent.*;
/**
* The version of K-means clustering adapted for true concurrency
* or simultaneous multithreading (SMT). The subtasks of
* computing distances and making assignments are delegate to
* a subtask manager which oversees a thread pool.
*/
public class ConcurrentKMeans implements KMeans {
// Temporary clusters used during the clustering process. Converted to
// an array of the simpler class Cluster at the conclusion.
private ProtoCluster[] mProtoClusters;
// Cache of coordinate-to-cluster distances. Number of entries =
// number of clusters X number of coordinates.
private double[][] mDistanceCache;
// Used in makeAssignments() to figure out how many moves are made
// during each iteration -- the cluster assignment for coordinate n is
// found in mClusterAssignments[n] where the N coordinates are numbered
// 0 ... (N-1)
private int[] mClusterAssignments;
// 2D array holding the coordinates to be clustered.
private double[][] mCoordinates;
// The desired number of clusters and maximum number
// of iterations.
private int mK, mMaxIterations;
// Seed for the random number generator used to select
// coordinates for the initial cluster centers.
private long mRandomSeed;
// The number of threads used to perform the subtasks.
private int mThreadCount;
// Subtask manager that handles the thread pool to which
// time-consuming tasks are delegated.
private SubtaskManager mSubtaskManager;
// An array of Cluster objects: the output of k-means.
private Cluster[] mClusters;
// Listeners to be notified of significant happenings.
private List<KMeansListener> mListeners = new ArrayList<KMeansListener>(1);
/**
* Constructor
*
* @param coordinates two-dimensional array containing the coordinates to be clustered.
* @param k the number of desired clusters.
* @param maxIterations the maximum number of clustering iterations.
* @param randomSeed seed used with the random number generator.
* @param threadCount the number of threads to be used for computing time-consuming steps.
*/
public ConcurrentKMeans(double[][] coordinates, int k, int maxIterations,
long randomSeed, int threadCount) {
mCoordinates = coordinates;
// Can't have more clusters than coordinates.
mK = Math.min(k, mCoordinates.length);
mMaxIterations = maxIterations;
mRandomSeed = randomSeed;
mThreadCount = threadCount;
}
/**
* Constructor that uses the return from
* <tt>Runtime.getRuntime().availableProcessors()</tt> as the number
* of threads for time-consuming steps.
*
* @param coordinates two-dimensional array containing the coordinates to be clustered.
* @param k the number of desired clusters.
* @param maxIterations the maximum number of clustering iterations.
* @param randomSeed seed used with the random number generator.
*/
public ConcurrentKMeans(double[][] coordinates, int k, int maxIterations,
long randomSeed) {
this (coordinates, k, maxIterations, randomSeed,
Runtime.getRuntime().availableProcessors());
}
/**
* Adds a KMeansListener to be notified of significant happenings.
*
* @param l the listener to be added.
*/
public void addKMeansListener(KMeansListener l) {
synchronized (mListeners) {
if (!mListeners.contains(l)) {
mListeners.add(l);
}
}
}
/**
* Removes a KMeansListener
*
* @param l the listener to be removed.
*/
public void removeKMeansListener(KMeansListener l) {
synchronized (mListeners) {
mListeners.remove(l);
}
}
/**
* Posts a message to registered KMeansListeners.
*
* @param message
*/
private void postKMeansMessage(String message) {
if (mListeners.size() > 0) {
synchronized (mListeners) {
int sz = mListeners.size();
for (int i=0; i<sz; i++) {
mListeners.get(i).kmeansMessage(message);
}
}
}
}
/**
* Notifies registered listeners that k-means is complete.
*
* @param clusters the output of clustering.
* @param executionTime the number of milliseconds taken to cluster.
*/
private void postKMeansComplete(Cluster[] clusters, long executionTime) {
if (mListeners.size() > 0) {
synchronized (mListeners) {
int sz = mListeners.size();
for (int i=0; i<sz; i++) {
mListeners.get(i).kmeansComplete(clusters, executionTime);
}
}
}
}
/**
* Notifies registered listeners that k-means has failed because of
* a Throwable caught in the run method.
*
* @param err
*/
private void postKMeansError(Throwable err) {
if (mListeners.size() > 0) {
synchronized (mListeners) {
int sz = mListeners.size();
for (int i=0; i<sz; i++) {
mListeners.get(i).kmeansError(err);
}
}
}
}
/**
* Get the clusters computed by the algorithm. This method should
* not be called until clustering has completed successfully.
*
* @return an array of Cluster objects.
*/
public Cluster[] getClusters() {
return mClusters;
}
/**
* Run the clustering algorithm.
*/
public void run() {
try {
// Note the start time.
long startTime = System.currentTimeMillis();
postKMeansMessage("K-Means clustering started");
// Randomly initialize the cluster centers creating the
// array mProtoClusters.
initCenters();
postKMeansMessage("... centers initialized");
// Instantiate the subtask manager.
mSubtaskManager = new SubtaskManager(mThreadCount);
// Post a message about the state of concurrent subprocessing.
if (mThreadCount > 1) {
postKMeansMessage("... concurrent processing mode with "
+ mThreadCount + " subtask threads");
} else {
postKMeansMessage("... non-concurrent processing mode");
}
// Perform the initial computation of distances.
computeDistances();
// Make the initial cluster assignments.
makeAssignments();
// Number of moves in the iteration and the iteration counter.
int moves = 0, it = 0;
// Main Loop:
//
// Two stopping criteria:
// - no moves in makeAssignments
// (moves == 0)
// OR
// - the maximum number of iterations has been reached
// (it == mMaxIterations)
//
do {
// Compute the centers of the clusters that need updating.
computeCenters();
// Compute the stored distances between the updated clusters and the
// coordinates.
computeDistances();
// Make this iteration's assignments.
moves = makeAssignments();
it++;
postKMeansMessage("... iteration " + it + " moves = " + moves);
} while (moves > 0 && it < mMaxIterations);
// Transform the array of ProtoClusters to an array
// of the simpler class Cluster.
mClusters = generateFinalClusters();
long executionTime = System.currentTimeMillis() - startTime;
postKMeansComplete(mClusters, executionTime);
} catch (Throwable t) {
postKMeansError(t);
} finally {
// Clean up temporary data structures used during the algorithm.
cleanup();
}
}
/**
* Randomly select coordinates to be the initial cluster centers.
*/
private void initCenters() {
Random random = new Random(mRandomSeed);
int coordCount = mCoordinates.length;
// The array mClusterAssignments is used only to keep track of the cluster
// membership for each coordinate. The method makeAssignments() uses it
// to keep track of the number of moves.
if (mClusterAssignments == null) {
mClusterAssignments = new int[coordCount];
// Initialize to -1 to indicate that they haven't been assigned yet.
Arrays.fill(mClusterAssignments, -1);
}
// Place the coordinate indices into an array and shuffle it.
int[] indices = new int[coordCount];
for (int i = 0; i < coordCount; i++) {
indices[i] = i;
}
for (int i = 0, m = coordCount; m > 0; i++, m--) {
int j = i + random.nextInt(m);
if (i != j) {
// Swap the indices.
indices[i] ^= indices[j];
indices[j] ^= indices[i];
indices[i] ^= indices[j];
}
}
mProtoClusters = new ProtoCluster[mK];
for (int i=0; i<mK; i++) {
int coordIndex = indices[i];
mProtoClusters[i] = new ProtoCluster(mCoordinates[coordIndex], coordIndex);
mClusterAssignments[indices[i]] = i;
}
}
/**
* Recompute the centers of the protoclusters with
* update flags set to true.
*/
private void computeCenters() {
int numClusters = mProtoClusters.length;
// Sets the update flags of the protoclusters that haven't been deleted and
// whose memberships have changed in the iteration just completed.
//
for (int c = 0; c < numClusters; c++) {
ProtoCluster cluster = mProtoClusters[c];
if (cluster.getConsiderForAssignment()) {
if (!cluster.isEmpty()) {
// This sets the protocluster's update flag to
// true only if its membership changed in last call
// to makeAssignments().
cluster.setUpdateFlag();
// If the update flag was set, update the center.
if (cluster.needsUpdate()) {
cluster.updateCenter(mCoordinates);
}
} else {
// When a cluster loses all of its members, it
// falls out of contention. So it is possible for
// k-means to return fewer than k clusters.
cluster.setConsiderForAssignment(false);
}
}
}
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -