?? mpi.cpp
字號:
//=================================================================
// Name : 基于分段的平行排序
// Author : 袁冬(2107020046)
// Copyright : 中國海洋大學
// LastUpdate : 2007.10.03
// Develop Evrionment : Windows Server 2003 SP2
// + Eclipse 3.3.1 + CDT 4.0.1
// + MinGW 3.81 + GDB 6.6
// + mpich2-1.0.6-win32-ia32
//=================================================================
/*
* 算法描述: 分段 -> 段內排序 -> 歸約結果。
* 1,根進程讀取輸入,將元素個數廣播給各個進程。
* 2,然后各進程計算段長度和段偏移。
* 3,然后根進程選擇第一個段,標記站位符。
* 4,跟進程將剩余元素發送給下一進程,下一進程選擇段的同時,根進程排序。
* 5,下一進程繼續此過程,直到最后一個進程,所有元素都進行排序。
* 6,進程將排序好的元素,按照段偏移歸約給根進程。
* 7,根進程輸入結果。
*
* 時間復雜度計算:
* 設共有進程p,共有元素n
* 則段數為p,每段長度為m = n / p
* 最長時間為從根進程分段開始,至末進程規約為止,又注意到,分段時串行進行的,故
*
* t = 分段時間 * 段數 + 最后一段的排序時間
*
* 用大歐米伽表示法表示
*
* O(分段時間) = m * n = n * n / p
* O(最后一段的排序時間) = m * m
*
* 所以
*
* O(t) = n * n / p * p + m * m
* = n * n + m * m;
* = n * n
*
* 所以,此算法排序時間復雜度為n的平方,和起泡排序的時間復雜度相同。
*
* 分析與優化:
* 從時間復雜度的分析可以知道,算法的瓶頸在于分段算法,因為該算法從本質上講,是串行進行的。
* 因個人水平有限,分段沒有找到并行算法,導致整個算法為偽并行。
* 但是有一個辦法可以將分算法的時間減半,
* 即從最大和最小兩邊開始,分別進行分組,可以使時間復雜度減低一半,但總體時間復雜度的O認為n*n。
*
* 另外,在“取得當前段”的算法中,如果每次循環i時,段索引沒有改變,再下一輪時,可以不再遍歷。
* 相當于加入緩存,估計此緩存命中幾率比較高,可以較大幅度的改善時間復雜度。
*
*/
//#define DEBUG 1 //調試符號
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <time.h>
#include "mpi.h"
using namespace std;
#define TRUE 1
#define FALSE 0
#define Bool int
#define MAX_LENGTH 5000 //允許的最大元素個數
#define MAX_LENGTH_WITH_MARK 10000 //允許的最大元素數及其對應的標記的長度,此值應為MAX_LENGTH的兩倍
int pID, pSize; //pID:當前進程ID,pSize:總的進程數
int input[MAX_LENGTH][2], length, packageSize; //input:輸入,length:輸入的元素個數,packageSize:段大小
int result[MAX_LENGTH], finalResult[MAX_LENGTH], packageIndex[MAX_LENGTH]; //result:當前結果,finalResult:最終結果,packageIndex:段值索引
int resultOffset; //resultOffset:短偏移
//讀取輸入
void read() {
int i;
//讀取元素個數
printf("Input the length of the array(100-5000):");
fflush(stdout);
scanf("%d", &length);
//讀取元素
printf("Input the element of the array total %d:", length);
fflush(stdout);
//randomize();
for (i = 0; i < length; i++) {
//讀取元素,經對應站位標記標記為0
input[i][0] = rand();
input[i][1] = 0;
}
}
//輸出
void output() {
int i;
printf("\n");
for (i = 0; i < length; i++)
printf("%d\t", finalResult[i]);
fflush(stdout);
}
//準備:計算進程常量
int prepare() {
packageSize = length / pSize; //段大小
cout << endl << length << endl << pSize << endl;
resultOffset = packageSize * pID; //結果起始偏移值,表示該段位于整體的哪個部分
//對于最后一個進程,需要修正段大小,包含所有余下的元素
if (pID == pSize - 1)
packageSize += length % pSize;
#ifdef DEBUG
//調試信息:段大小
printf("resultOffset: %d, From %d.\n", resultOffset, pID);
#endif
return 0;
}
//分段,取得當前進程負責的段
void findCurrentRange() {
int i, j = 0, maxIndex, beginIndex = 0;
//填充默認的值
for (i = 0; i < packageSize; i++) {
while (input[beginIndex][1])
beginIndex++;
packageIndex[i] = beginIndex;
beginIndex++;
}
#ifdef DEBUG
//調試信息:默認值
for (i = 0; i < packageSize; i++)
printf("%d", packageIndex[i]);
printf(" From %d\n", pID);
#endif
//查找所有元素,找到最小的packageSize個元素,取得其索引值
for (i = beginIndex; i < length; i++) {
//忽略被其他進程占用的元素
if (input[i][1])
continue;
//查找比當前元素索引中最大的元素的索引
maxIndex = 0;
for (j = 1; j < packageSize; j++)
if (input[packageIndex[j]][0] > input[packageIndex[maxIndex]][0])
maxIndex = j;
//如果元素索引中的最大的小于當前元素,則替換
if (input[packageIndex[maxIndex]][0] > input[i][0])
packageIndex[maxIndex] = i;
}
#ifdef DEBUG
//調試信息:當前段索引,用于判斷是否取得了正確的段索引
for (i = 0; i < packageSize; i++)
printf("%d", packageIndex[i]);
printf(" From %d\n", pID);
#endif
//將索引轉化為值,存放在result中,并標記輸入信息,表明已占用
for (j = 0; j < packageSize; j++) {
result[resultOffset + j] = input[packageIndex[j]][0];
input[packageIndex[j]][1] = 1;
}
#ifdef DEBUG
//調試信息:排序前的當前段,用于判斷是否取得了正確的段
for (i = 0; i < length; i++)
printf("%d", result[i]);
printf(" From %d\n", pID);
#endif
}
//排序一個段
void sort() {
//段內起泡
int i, j, temp;
for (i = 0; i < packageSize; i++)
for (j = 0; j < packageSize; j++) {
if (result[resultOffset + i] < result[resultOffset + j]) {
temp = result[resultOffset + i];
result[resultOffset + i] = result[resultOffset + j];
result[resultOffset + j] = temp;
}
}
#ifdef DEBUG
//調試信息:排序后的當前段
for (i = 0; i < length; i++)
printf("%d", result[i]);
printf(" From %d\n", pID);
#endif
}
//主處理進程
void process() {
//取得該進程負責的段
findCurrentRange();
//如果此進程不是最后一個進程,則將剩余部分傳遞給下一個進程
if (pID != pSize - 1)
MPI_Send(input, MAX_LENGTH_WITH_MARK, MPI_INT, pID + 1, 0, MPI_COMM_WORLD);
//排序當前進程負責的段
sort();
//歸約結果,因為最終結果初始皆為零,故采用求和的形式歸約當前結果到最終結果
MPI_Reduce(result, finalResult, MAX_LENGTH_WITH_MARK, MPI_INT, MPI_SUM, 0,
MPI_COMM_WORLD);
}
//入口,主函數
int main(int argc, char* argv[]) {
clock_t totalStart,totalEnd,sortStart,sortEnd;
MPI_Status status; //無用
MPI_Init(&argc, &argv); //初始化
MPI_Comm_rank(MPI_COMM_WORLD, &pID); //取得當前進程的ID
MPI_Comm_size(MPI_COMM_WORLD, &pSize); //取得總進程數
//根進程負責輸入
if (!pID)
{
read();
totalStart = clock();
}
//廣播輸入數數組的長度
MPI_Bcast(&length, 1, MPI_INT, 0, MPI_COMM_WORLD);
//準備:計算各進程常量
prepare();
sortStart = clock();
//從根進程啟動,處理第一段
if (!pID)
process();
else {
//其余進程等待上一進程傳遞的數據
MPI_Recv(input, MAX_LENGTH_WITH_MARK, MPI_INT, pID - 1, 0, MPI_COMM_WORLD,
&status);
//收到數據后進行處理
process();
}
//根進程負責輸出
if (!pID)
{
sortEnd = clock();
output();
totalEnd = clock();
printf("\n------------------------------\n");
printf("Total time is %f seconds\n",(double)(totalEnd - totalStart) / CLOCKS_PER_SEC);
printf("Sort time is %f seconds\n",(double)(sortEnd - sortStart) / CLOCKS_PER_SEC);
}
//結束
MPI_Finalize();
return 0;
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -