?? marscpulib.cu
字號:
//-----------------------------------------------
//clean
//-----------------------------------------------
CPU_MAP_EXIT:
BenFree((char**)&keyValOffsets, sizeof(int2)*threadNum);
BenFree((char**)&curIndex, sizeof(size_t)*threadNum);
BenFree((char**)&g_map, sizeof(WorkerArg_t));
BenFree((char**)&interKeySizePerThread, sizeof(size_t)*threadNum);
BenFree((char**)&interValSizePerThread, sizeof(size_t)*threadNum);
BenFree((char**)&interCountPerThread, sizeof(size_t)*threadNum);
BenFree((char**)&g_mapcount, sizeof(CountArg_t));
BenFree((char**)&tp, sizeof(BenThread_t)*threadNum);
BenFree((char**)&psKeySizes, sizeof(size_t)*threadNum);
BenFree((char**)&psValSizes, sizeof(size_t)*threadNum);
BenFree((char**)&psCounts, sizeof(size_t)*threadNum);
//LeaveFunc("StartCPUFunc");
}
void StartCPUSort(Schedule_t *sched, char mode)
{
if (mode & MAP_SORT || mode & MAP_SORT_REDUCE)
{
if (mode & USE_FILE)
sched->outputSmallChunk.keyOffset = 0;
QuickSortMem(&sched->outputSmallChunk);
GroupByMem(&sched->outputSmallChunk);
sched->outputSmallChunk.rangeSize = sched->outputSmallChunk.diffKeyCount*sizeof(int2);
}
}
void cpuEmitCount(size_t keySize,
size_t valSize,
size_t* outputKeysSizePerTask,
size_t* outputValsSizePerTask,
size_t* outputCountPerTask,
int index)
{
outputKeysSizePerTask[index] += keySize;
outputValsSizePerTask[index] += valSize;
outputCountPerTask[index]++;
}
void cpuEmit (char* key,
char* val,
size_t keySize,
size_t valSize,
size_t* psKeySizes,
size_t* psValSizes,
size_t* psCounts,
int2* keyValOffsets,
char* outputKeys,
char* outputVals,
int4* outputOffsetSizes,
size_t* curIndex,
int index)
{
char *pKeySet = (char*)(outputKeys + psKeySizes[index] + keyValOffsets[index].x);
char *pValSet = (char*)(outputVals + psValSizes[index] + keyValOffsets[index].y);
BenMemcpy(pKeySet, (char*)key, keySize);
BenMemcpy(pValSet, (char*)val, valSize);
keyValOffsets[index].x += keySize;
keyValOffsets[index].y += valSize;
if (curIndex[index] != 0)
{
outputOffsetSizes[psCounts[index] + curIndex[index]].x =
(outputOffsetSizes[psCounts[index] + curIndex[index] - 1].x +
outputOffsetSizes[psCounts[index] + curIndex[index] - 1].y);
outputOffsetSizes[psCounts[index] + curIndex[index]].z =
(outputOffsetSizes[psCounts[index] +curIndex[index] - 1].z +
outputOffsetSizes[psCounts[index] + curIndex[index] - 1].w);
}
outputOffsetSizes[psCounts[index] + curIndex[index]].y = keySize;
outputOffsetSizes[psCounts[index] + curIndex[index]].w = valSize;
curIndex[index]++;
}
void *cpuReduceCount(void *i)
{
int index = (int)i;
size_t keyOffset = g_reducecount->keyOffset;
size_t valOffset = g_reducecount->valOffset;
for (int i = 0; i <= g_reducecount->recPerThread; i++)
{
int cindex = i*g_reducecount->threadNum+index;
if (cindex >= g_reducecount->recCount) return 0;
int valStartIndex = g_reducecount->inKeyListRange[cindex].x;
int valCount = g_reducecount->inKeyListRange[cindex].y - g_reducecount->inKeyListRange[cindex].x;
size_t keySize = g_reducecount->inIndex[valStartIndex].y;
char *key = cpuGetRecordFromBuf(g_reducecount->inKeys,
g_reducecount->inIndex, valStartIndex, 0, keyOffset, valOffset);
char *vals = cpuGetRecordFromBuf(g_reducecount->inVals,
g_reducecount->inIndex, valStartIndex, 1, keyOffset, valOffset);
cpu_reduce_count(key,
vals,
keySize,
valCount,
g_reducecount->inIndex,
g_reducecount->interKeySizePerThread,
g_reducecount->interValSizePerThread,
g_reducecount->interCountPerThread,
index,
valStartIndex);
}
return 0;
}
void *cpuReduce(void *args)
{
int index = (int)args;
g_reduce->outIndex[g_reduce->psCounts[index]].x = g_reduce->psKeySizes[index];
g_reduce->outIndex[g_reduce->psCounts[index]].z = g_reduce->psValSizes[index];
for (int i = 0; i <= g_reduce->recPerThread; i++)
{
int cindex = i*g_reduce->threadNum+index;
if (cindex >= g_reduce->recCount) return 0;
int valStartIndex = g_reduce->inKeyListRange[cindex].x;
int valCount = g_reduce->inKeyListRange[cindex].y - g_reduce->inKeyListRange[cindex].x;
size_t keySize = g_reduce->inIndex[g_reduce->inKeyListRange[cindex].x].y;
char *key = cpuGetRecordFromBuf(g_reduce->inKeys,
g_reduce->inIndex, valStartIndex, 0, g_reducecount->keyOffset, g_reducecount->valOffset);
char *vals = cpuGetRecordFromBuf(g_reduce->inVals,
g_reduce->inIndex, valStartIndex, 1,
g_reducecount->keyOffset, g_reducecount->valOffset);
cpu_reduce(key,
vals,
keySize,
valCount,
g_reduce->psKeySizes,
g_reduce->psValSizes,
g_reduce->psCounts,
g_reduce->keyValOffsets,
g_reduce->inIndex,
g_reduce->outKeys,
g_reduce->outVals,
g_reduce->outIndex,
g_reduce->curIndex,
index,
valStartIndex);
}
return 0;
}
//-------------------------------------------------
//main cpu reduce procedure
//-------------------------------------------------
void StartCPUReduce(Schedule_t *sched, char mode)
{
BEN_ASSERT(sched != NULL);
//-------------------------------------------------------
//get reduce input data
//-------------------------------------------------------
//!!!
size_t interRecCount = sched->inputSmallChunk.diffKeyCount;
//!!!
if (interRecCount <= 0) return;
//!!!
char *interKeys = sched->inputSmallChunk.keys;
char *interVals = sched->inputSmallChunk.vals;
int4 *interIndex = sched->inputSmallChunk.index;
int2 *interKeyListRange = sched->inputSmallChunk.keyListRange;
//!!!
//----------------------------------------------
//determine the number of threads to run
//----------------------------------------------
size_t threadNum = sched->cpuReduceThreadNum;
size_t recPerThread = interRecCount / threadNum;
if (0 == recPerThread)
recPerThread = 1;
//----------------------------------------------
//calculate intermediate data keys'buf size
// and values' buf size
//----------------------------------------------
size_t* outputKeySizePerThread = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
size_t* outputValSizePerThread = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
size_t* outputCountPerThread = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
g_reducecount = (CountArg_t*)BenMalloc(sizeof(CountArg_t));
g_reducecount->inKeys = interKeys;
g_reducecount->inVals = interVals;
g_reducecount->inIndex = interIndex;
g_reducecount->inKeyListRange = interKeyListRange;
g_reducecount->interKeySizePerThread = outputKeySizePerThread;
g_reducecount->interValSizePerThread = outputValSizePerThread;
g_reducecount->interCountPerThread = outputCountPerThread;
g_reducecount->recCount = interRecCount;
g_reducecount->recPerThread = recPerThread;
g_reducecount->threadNum = threadNum;
g_reducecount->keyOffset = 0;//!!!sched->inputSmallChunk.keyOffset;
g_reducecount->valOffset = 0;//!!!sched->inputSmallChunk.valOffset;
BenThread_t *tp = (BenThread_t*)BenMalloc(sizeof(BenThread_t)*threadNum);
for (int i = 0; i < threadNum; i++)
{
tp[i] = BenNewThread(cpuReduceCount, (void*)i);
//cpuReduceCount((void*)i);
}
BenWaitForMul(tp, threadNum);
//-----------------------------------------------
//prefix sum
//-----------------------------------------------
size_t allKeySize = outputKeySizePerThread[0];
size_t allValSize = outputValSizePerThread[0];
size_t allCounts = outputCountPerThread[0];
size_t *psKeySizes = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
size_t *psValSizes = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
size_t *psCounts = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
for (int i = 1; i < threadNum; i++)
{
psKeySizes[i] += (outputKeySizePerThread[i-1] + psKeySizes[i-1]);
psValSizes[i] += (outputValSizePerThread[i-1] + psValSizes[i-1]);
psCounts[i] += (outputCountPerThread[i-1] + psCounts[i-1]);
}
allKeySize = (outputKeySizePerThread[threadNum-1]+psKeySizes[threadNum-1]);
allValSize = outputValSizePerThread[threadNum-1]+psValSizes[threadNum-1];
allCounts = outputCountPerThread[threadNum-1]+psCounts[threadNum-1];
//-----------------------------------------------
//allocate intermediate memory
//-----------------------------------------------
char* outputKeys = (char*)BenMalloc(allKeySize);
char* outputVals = (char*)BenMalloc(allValSize);
int4* outputIndex = (int4*)BenMalloc(sizeof(int4)*allCounts);
int2* keyValOffsets = (int2*)BenMalloc(sizeof(int2)*threadNum);
size_t* curIndex = (size_t*)BenMalloc(sizeof(size_t)*threadNum);
//!!!size_t cur = sched->outputSmallCurCount;
if (allCounts <= 0)
goto CPU_REDUCE_EXIT;
g_reduce = (WorkerArg_t*)BenMalloc(sizeof(WorkerArg_t));
g_reduce->inKeys = interKeys;
g_reduce->inVals = interVals;
g_reduce->inIndex = interIndex;
g_reduce->inKeyListRange = interKeyListRange;
g_reduce->psKeySizes = psKeySizes;
g_reduce->psValSizes = psValSizes;
g_reduce->psCounts = psCounts;
g_reduce->keyValOffsets = keyValOffsets;
g_reduce->outKeys = outputKeys;
g_reduce->outVals = outputVals;
g_reduce->outIndex = outputIndex;
g_reduce->curIndex = curIndex;
g_reduce->recCount = interRecCount;
g_reduce->recPerThread = recPerThread;
g_reduce->threadNum = threadNum;
for (int i = 0; i < threadNum; i++)
{
tp[i] = BenNewThread(cpuReduce, (void*)i);
//cpuReduce((void*)i);
}
BenWaitForMul(tp, threadNum);
//-----------------------------------------------
//output
//-----------------------------------------------
sched->outputSmallChunk.keys = outputKeys;
sched->outputSmallChunk.vals = outputVals;
sched->outputSmallChunk.index = outputIndex;
sched->outputSmallChunk.keySize = allKeySize;
sched->outputSmallChunk.valSize = allValSize;
sched->outputSmallChunk.indexSize = allCounts*sizeof(int4);
sched->outputSmallChunk.recCount = allCounts;
//-----------------------------------------------
//clean
//-----------------------------------------------
CPU_REDUCE_EXIT:
BenFree((char**)&keyValOffsets, sizeof(int2)*threadNum);
BenFree((char**)&curIndex, sizeof(size_t)*threadNum);
BenFree((char**)&g_reduce, sizeof(WorkerArg_t));
BenFree((char**)&outputKeySizePerThread, sizeof(size_t)*threadNum);
BenFree((char**)&outputValSizePerThread, sizeof(size_t)*threadNum);
BenFree((char**)&outputCountPerThread, sizeof(size_t)*threadNum);
BenFree((char**)&g_reducecount, sizeof(CountArg_t));
BenFree((char**)&tp, sizeof(BenThread_t)*threadNum);
BenFree((char**)&psKeySizes, sizeof(size_t)*threadNum);
BenFree((char**)&psValSizes, sizeof(size_t)*threadNum);
BenFree((char**)&psCounts, sizeof(size_t)*threadNum);
}
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -