?? marscpulib.cu
字號:
/**
*This is the source code for Mars, a MapReduce framework on graphics
*processors.
*Author: Wenbin Fang (HKUST), Bingsheng He (HKUST)
*Mentor: Naga K. Govindaraju (Microsoft Corp.), Qiong Luo (HKUST), Tuyong
*Wang (Sina.com).
*If you have any question on the code, please contact us at {saven,
*wenbin, luo}@cse.ust.hk.
*The copyright is held by HKUST. Mars is provided "as is" without any
*guarantees of any kind.
*/
#undef __GPU_MAP__
#undef __GPU_REDUCE__
#undef __COMPARE__
#define __CPU_MAP__
#define __CPU_REDUCE__
#include "MarsInc.h"
#include "MarsInc/MarsConfig.h"
CountArg_t *g_mapcount = NULL;
WorkerArg_t *g_map = NULL;
CountArg_t *g_reducecount = NULL;
WorkerArg_t *g_reduce = NULL;
//------------------------------------------------
//get key or value buffer address in a record
//------------------------------------------------
static char *cpuGetRecordFromBuf(char *buf, int4 *offsetSizes,
int index, char type,
size_t keyOffset, size_t valOffset)
{
int offset = ((type == 0)? (offsetSizes[index].x-keyOffset):
(offsetSizes[index].z-valOffset));
return (buf + offset);
}
char *cpuGetVal(void *vals, int4 *index, size_t valStartIndex,
size_t valCount, size_t i)
{
if (i >= valCount) return NULL;
char *val = (char*)vals+index[valStartIndex+i].z - index[valStartIndex].z;
//BenLog("--val:%d--", *(int*)val);
return val;
}
size_t cpuGetValSize(void *vals, int4 *index, size_t valStartIndex,
size_t valCount, size_t i)
{
if (i >= valCount) return NULL;
return index[valStartIndex+i].w;
}
//------------------------------------------------
//called by cpu_map_count
//------------------------------------------------
void cpuEmitInterCount(size_t keySize,
size_t valSize,
size_t* interKeysSizePerTask,
size_t* interValsSizePerTask,
size_t* interCountPerTask,
int index)
{
interKeysSizePerTask[index] += keySize;
interValsSizePerTask[index] += valSize;
interCountPerTask[index]++;
}
//------------------------------------------------
//called by cpu_map
//------------------------------------------------
void cpuEmitIntermediate(char* key,
char* val,
size_t keySize,
size_t valSize,
size_t* psKeySizes,
size_t* psValSizes,
size_t* psCounts,
int2* keyValOffsets,
char* interKeys,
char* interVals,
int4* interOffsetSizes,
size_t* curIndex,
int index)
{
char *pKeySet = (char*)(interKeys + psKeySizes[index] + keyValOffsets[index].x);
char *pValSet = (char*)(interVals + psValSizes[index] + keyValOffsets[index].y);
BenMemcpy(pKeySet, (char*)key, keySize);
BenMemcpy(pValSet, (char*)val, valSize);
keyValOffsets[index].x += keySize;
keyValOffsets[index].y += valSize;
if (curIndex[index] != 0)
{
interOffsetSizes[psCounts[index] + curIndex[index]].x =
(interOffsetSizes[psCounts[index] + curIndex[index] - 1].x +
interOffsetSizes[psCounts[index] + curIndex[index] - 1].y);
interOffsetSizes[psCounts[index] + curIndex[index]].z =
(interOffsetSizes[psCounts[index] + curIndex[index] - 1].z +
interOffsetSizes[psCounts[index] + curIndex[index] - 1].w);
}
interOffsetSizes[psCounts[index] + curIndex[index]].y = keySize;
interOffsetSizes[psCounts[index] + curIndex[index]].w = valSize;
curIndex[index]++;
}
//-----------------------------------------------------
//a cpu map count worker on a cpu thread
//Param: args -- the worker index
//-----------------------------------------------------
static void *cpuMapCount(void *args)
{
int index = (int)args;
size_t keyOffset = g_mapcount->keyOffset;
size_t valOffset = g_mapcount->valOffset;
for (int i = 0; i <= g_mapcount->recPerThread; i++)
{
int cindex = i*g_mapcount->threadNum+index;
if (cindex >= g_mapcount->recCount) return 0;
char *key = cpuGetRecordFromBuf(g_mapcount->inKeys,
g_mapcount->inIndex, cindex, 0, keyOffset, valOffset);
char *val = cpuGetRecordFromBuf(g_mapcount->inVals,
g_mapcount->inIndex, cindex, 1, keyOffset, valOffset);
cpu_map_count(key,
val,
g_mapcount->inIndex[cindex].y,
g_mapcount->inIndex[cindex].w,
g_mapcount->interKeySizePerThread,
g_mapcount->interValSizePerThread,
g_mapcount->interCountPerThread,
index);
}
return 0;
}
//----------------------------------------------
//a cpu map worker on a cpu thread
//Param: args -- the worker index
//----------------------------------------------
static void *cpuMap(void *args)
{
int index = (int)args;
g_map->outIndex[g_map->psCounts[index]].x = g_map->psKeySizes[index];
g_map->outIndex[g_map->psCounts[index]].z = g_map->psValSizes[index];
size_t keyOffset = g_mapcount->keyOffset;
size_t valOffset = g_mapcount->valOffset;
for (int i = 0; i <= g_map->recPerThread; i++)
{
int cindex = i*g_map->threadNum+index;
if (cindex >= g_map->recCount) return 0;
char *key = cpuGetRecordFromBuf(g_map->inKeys,
g_map->inIndex, cindex, 0, keyOffset, valOffset);
char *val = cpuGetRecordFromBuf(g_map->inVals,
g_map->inIndex, cindex, 1, keyOffset, valOffset);
cpu_map(key,
val,
g_map->inIndex[cindex].y,
g_map->inIndex[cindex].w,
g_map->psKeySizes,
g_map->psValSizes,
g_map->psCounts,
g_map->keyValOffsets,
g_map->outKeys,
g_map->outVals,
g_map->outIndex,
g_map->curIndex,
index);
}
return 0;
}
//-----------------------------------------------
//Start a CPU Map procedure
//-----------------------------------------------
void StartCPUMap(Schedule_t *sched, char mode)
{
//EnterFunc("StartCPUMap");
BEN_ASSERT(sched != NULL);
//-------------------------------------------------------
//get map input data
//-------------------------------------------------------
//need to be small chunk!!!!
size_t inputRecCount = sched->inputSmallChunk.recCount;
// size_t inputKeySize = sched->inputBigChunk.keySize;
// size_t inputValSize = sched->inputBigChunk.valSize;
if (inputRecCount <= 0) return;
char *inputKeys = sched->inputSmallChunk.keys;
char *inputVals = sched->inputSmallChunk.vals;
int4 *inputIndex = sched->inputSmallChunk.index;
//----------------------------------------------
//determine the number of threads to run
//----------------------------------------------
size_t threadNum = sched->cpuMapThreadNum;
size_t recPerThread = inputRecCount / threadNum;
if (0 == recPerThread)
recPerThread = 1;
//----------------------------------------------
//calculate intermediate data keys'buf size
// and values' buf size
//----------------------------------------------
size_t* interKeySizePerThread = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
size_t* interValSizePerThread = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
size_t* interCountPerThread = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
g_mapcount = (CountArg_t*)BenMalloc(sizeof(CountArg_t));
g_mapcount->inKeys = inputKeys;
g_mapcount->inVals = inputVals;
g_mapcount->inIndex = inputIndex;
g_mapcount->interKeySizePerThread = interKeySizePerThread;
g_mapcount->interValSizePerThread = interValSizePerThread;
g_mapcount->interCountPerThread = interCountPerThread;
g_mapcount->recCount = inputRecCount;
g_mapcount->recPerThread = recPerThread;
g_mapcount->threadNum = threadNum;
g_mapcount->keyOffset = sched->inputSmallChunk.keyOffset;
g_mapcount->valOffset = sched->inputSmallChunk.valOffset;
BenThread_t *tp = (BenThread_t*)BenMalloc(sizeof(BenThread_t)*threadNum);
for (int i = 0; i < threadNum; i++)
{
tp[i] = BenNewThread(cpuMapCount, (void*)i);
//cpuMapCount((void*)i);
}
BenWaitForMul(tp, threadNum);
//-----------------------------------------------
//prefix sum
//-----------------------------------------------
size_t allKeySize = interKeySizePerThread[0];
size_t allValSize = interValSizePerThread[0];
size_t allCounts = interCountPerThread[0];
size_t *psKeySizes = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
size_t *psValSizes = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
size_t *psCounts = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
for (int i = 1; i < threadNum; i++)
{
psKeySizes[i] += (interKeySizePerThread[i-1] + psKeySizes[i-1]);
psValSizes[i] += (interValSizePerThread[i-1] + psValSizes[i-1]);
psCounts[i] += (interCountPerThread[i-1] + psCounts[i-1]);
}
allKeySize = (interKeySizePerThread[threadNum-1]+psKeySizes[threadNum-1]);
allValSize = interValSizePerThread[threadNum-1]+psValSizes[threadNum-1];
allCounts = interCountPerThread[threadNum-1]+psCounts[threadNum-1];
//-----------------------------------------------
//allocate intermediate memory
//-----------------------------------------------
char* interKeys = NULL;
char* interVals = NULL;
int4* interIndex = NULL;
int2* keyValOffsets = NULL;
size_t* curIndex = NULL;
if (allCounts <= 0)
goto CPU_MAP_EXIT;
interKeys = (char*)BenMalloc(allKeySize);
interVals = (char*)BenMalloc(allValSize);
interIndex = (int4*)BenMalloc(sizeof(int4)*allCounts);
keyValOffsets = (int2*)BenMalloc(sizeof(int2)*threadNum);
curIndex = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
g_map = (WorkerArg_t*)BenMalloc(sizeof(WorkerArg_t));
g_map->inKeys = inputKeys;
g_map->inVals = inputVals;
g_map->inIndex = inputIndex,
g_map->psKeySizes = psKeySizes;
g_map->psValSizes = psValSizes;
g_map->psCounts = psCounts;
g_map->keyValOffsets = keyValOffsets;
g_map->outKeys = interKeys;
g_map->outVals = interVals;
g_map->outIndex = interIndex;
g_map->curIndex = curIndex;
g_map->recCount = inputRecCount;
g_map->recPerThread = recPerThread;
g_map->threadNum = threadNum;
for (int i = 0; i < threadNum; i++)
{
tp[i] = BenNewThread(cpuMap, (void*)i);
//cpuMap((void*)i);
}
BenWaitForMul(tp, threadNum);
//-----------------------------------------------
//output
//-----------------------------------------------
sched->outputSmallChunk.keys = interKeys;
sched->outputSmallChunk.vals = interVals;
sched->outputSmallChunk.index = interIndex;
sched->outputSmallChunk.keySize = allKeySize;
sched->outputSmallChunk.valSize = allValSize;
sched->outputSmallChunk.indexSize = allCounts*sizeof(int4);
sched->outputSmallChunk.recCount = allCounts;
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -