?? marsschedfile.cu
字號:
/**
*This is the source code for Mars, a MapReduce framework on graphics
*processors.
*Author: Wenbin Fang (HKUST), Bingsheng He (HKUST)
*Mentor: Naga K. Govindaraju (Microsoft Corp.), Qiong Luo (HKUST), Tuyong
*Wang (Sina.com).
*If you have any question on the code, please contact us at {saven,
*wenbin, luo}@cse.ust.hk.
*The copyright is held by HKUST. Mars is provided "as is" without any
*guarantees of any kind.
*/
//-------------------------------------------------------
//for gpu only or cpu only
//-------------------------------------------------------
static void FillSortInfo(SortInfo_t *sortInfo, ChunkInfo_t *chunk)
{
BEN_ASSERT(sortInfo != NULL);
BEN_ASSERT(chunk != NULL);
if (chunk->recCount <= 0 || chunk->diffKeyCount <= 0) return;
size_t cur = sortInfo->realChunkCount;
static size_t cursorOffset = 0;
if (cur != 0)
sortInfo->chunks[cur].keyOffset = sortInfo->chunks[cur-1].keyOffset +
sortInfo->chunks[cur-1].keySize;
else
sortInfo->chunks[cur].keyOffset = 0;
sortInfo->chunks[cur].keySize = chunk->keySize;
if (cur != 0)
sortInfo->chunks[cur].valOffset = sortInfo->chunks[cur-1].valOffset +
sortInfo->chunks[cur-1].valSize;
else
sortInfo->chunks[cur].valOffset = 0;
sortInfo->chunks[cur].valSize = chunk->valSize;
if (cur != 0)
sortInfo->chunks[cur].indexOffset = sortInfo->chunks[cur-1].indexOffset +
sortInfo->chunks[cur-1].indexSize;
else
sortInfo->chunks[cur].indexOffset = 0;
sortInfo->chunks[cur].indexSize = chunk->indexSize;
sortInfo->chunks[cur].recCount = chunk->recCount;
if (cur != 0)
sortInfo->chunks[cur].rangeOffset = sortInfo->chunks[cur-1].rangeOffset +
sortInfo->chunks[cur-1].rangeSize;
else
sortInfo->chunks[cur].rangeOffset = 0;
sortInfo->chunks[cur].rangeSize = chunk->rangeSize;
sortInfo->chunks[cur].diffKeyCount = chunk->diffKeyCount;
sortInfo->chunks[cur].cursor = cursorOffset;
cursorOffset += chunk->diffKeyCount;
sortInfo->realChunkCount++;
if (sortInfo->realChunkCount >= sortInfo->fullChunkCount)
{
sortInfo->chunks = (SortChunk_t*)BenRealloc(sortInfo->chunks,
sortInfo->fullChunkCount*sizeof(SortChunk_t),
(sortInfo->fullChunkCount*2)*sizeof(SortChunk_t));
sortInfo->fullChunkCount *= 2;
}
}
static void SingleMapFile(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
Schedule_t *sched = NULL;
if ((spec->mode & GPU) &&
!(spec->mode & CPU))
{
sched = spec->gpuSched;
BenMemset(sched, 0, sizeof(Schedule_t));
sched->gpuMapGridDim = spec->gpuMapGridDim;
sched->gpuMapBlockDim = spec->gpuMapBlockDim;
sched->gpuMapSharedMemSize = spec->gpuMapSharedMemSize;
}
else if (!(spec->mode & GPU) &&
(spec->mode & CPU))
{
sched = spec->cpuSched;
BenMemset(sched, 0, sizeof(Schedule_t));
sched->cpuMapThreadNum = spec->cpuMapThreadNum;
}
else
{
BenLog("Error: please use single processor!\n");
return ;
}
while(spec->inputChunk->fileCursor < spec->totalInputRecCount)
{
ReadChunkFromFile(spec->inputChunk, &spec->inputFile, spec->totalInputRecCount,
spec->inputChunk->fileCursor, spec->flushThreshhold);
BenMemcpy(&(sched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
if (spec->mode & GPU)
StartGPUMap(sched, spec->mode);
else
StartCPUMap(sched, spec->mode);
spec->inputChunk->fileCursor += spec->flushThreshhold;//spec->inputChunk->recCount;
spec->inputChunk->keyOffset += spec->inputChunk->keySize;
spec->inputChunk->valOffset += spec->inputChunk->valSize;
if (!(spec->mode & MAP_ONLY))
{
RearrangeKeyVal(&(sched->outputSmallChunk));
FillSortInfo(spec->sortInfo, &sched->outputSmallChunk);
WriteChunkToFile(&(sched->outputSmallChunk),
&spec->interFile, &spec->totalInterRecCount, spec->mode);
}
else
{
WriteChunkToFile(&(sched->outputSmallChunk),
&spec->outputFile, &spec->totalOutputRecCount, spec->mode);
}
FreeChunk(&(sched->inputSmallChunk));
sched->outputSmallChunk.recCount = 0;
}
}
static void SwapFileName(FileName_t *file1, FileName_t *file2)
{
char *tmp = NULL;
tmp = file1->keyFile;
file1->keyFile = file2->keyFile;
file2->keyFile = tmp;
tmp = file1->valFile;
file1->valFile = file2->valFile;
file2->valFile = tmp;
tmp = file1->indexFile;
file1->indexFile = file2->indexFile;
file2->indexFile = tmp;
tmp = file1->rangeFile;
file1->rangeFile = file2->rangeFile;
file2->rangeFile = tmp;
}
static void SingleMergeInterFile(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
if (spec->mode & MAP_SORT)
{
SwapFileName(&spec->interFile, &spec->outputFile);
spec->totalOutputRecCount = spec->totalInterRecCount;
MergeSortFile(&spec->outputFile, &spec->tmpFile, spec->sortInfo,
spec->flushThreshhold, &spec->totalOutputRecCount, &spec->totalDiffKeyCount);
SwapFileName(&spec->outputFile, &spec->tmpFile);
//!!!
//delete tmpFile
//!!!
}
else if (spec->mode & MAP_SORT_REDUCE)
{
MergeSortFile(&spec->interFile, &spec->tmpFile, spec->sortInfo,
spec->flushThreshhold, &spec->totalInterRecCount, &spec->totalDiffKeyCount);
SwapFileName(&spec->interFile, &spec->tmpFile);
//!!!
//delete tmpFile
//!!!
}
}
static void SingleReduceFile(Spec_t *spec)
{
BEN_ASSERT(spec);
Schedule_t *sched = NULL;
if ((spec->mode & GPU) &&
!(spec->mode & CPU))
{
sched = spec->gpuSched;
BenMemset(sched, 0, sizeof(Schedule_t));
sched->gpuReduceGridDim = spec->gpuReduceGridDim;
sched->gpuReduceBlockDim = spec->gpuReduceBlockDim;
sched->gpuReduceSharedMemSize = spec->gpuReduceSharedMemSize;
}
else if (!(spec->mode & GPU) &&
(spec->mode & CPU))
{
sched = spec->cpuSched;
BenMemset(sched, 0, sizeof(Schedule_t));
sched->cpuReduceThreadNum = spec->cpuReduceThreadNum;
}
else
{
BenLog("Error: please use single processor!\n");
return ;
}
while(spec->interChunk->fileCursor < spec->totalDiffKeyCount)
{
ReadGroupChunkFromFile(spec->interChunk, &spec->interFile, spec->totalDiffKeyCount,
spec->interChunk->fileCursor, spec->flushThreshhold);
BenMemcpy(&(sched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
if (spec->mode & GPU)
StartGPUReduce(sched, spec->mode);
else
StartCPUReduce(sched, spec->mode);
spec->interChunk->fileCursor += spec->flushThreshhold;//spec->inputChunk->recCount;
spec->interChunk->keyOffset += spec->interChunk->keySize;
spec->interChunk->valOffset += spec->interChunk->valSize;
if (!(spec->mode & MAP_ONLY))
{
RearrangeKeyVal(&(sched->outputSmallChunk));
//FillSortInfo(spec->sortInfo, &sched->outputSmallChunk);
}
WriteChunkToFile(&(sched->outputSmallChunk),
&spec->outputFile, &spec->totalOutputRecCount, spec->mode);
FreeChunk(spec->interChunk);
sched->outputSmallChunk.recCount = 0;
}
}
static void SingleMergeOutputFile(Spec_t *spec)
{
BEN_ASSERT(spec);
}
//-------------------------------------------------------
//for gpu and cpu co-processing
//-------------------------------------------------------
static void CoprocessMapFile(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
while(spec->inputChunk->fileCursor < spec->totalInputRecCount)
{
//read from file
ReadChunkFromFile(spec->inputChunk, &spec->inputFile, spec->totalInputRecCount,
spec->inputChunk->fileCursor, spec->flushThreshhold);
//set up gpu schedule configuration
BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
spec->gpuSched->gpuMapGridDim = spec->gpuMapGridDim;
spec->gpuSched->gpuMapBlockDim = spec->gpuMapBlockDim;
spec->gpuSched->gpuMapSharedMemSize = spec->gpuMapSharedMemSize;
BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
spec->gpuSched->inputSmallChunk.recCount *= spec->gpuInputRatio;
//set up cpu schedule configuration
BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
spec->cpuSched->cpuMapThreadNum = spec->cpuMapThreadNum;
BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
spec->cpuSched->inputSmallChunk.index += spec->gpuSched->inputSmallChunk.recCount;
spec->cpuSched->inputSmallChunk.recCount -= spec->gpuSched->inputSmallChunk.recCount;
//launch gpu and cpu processing
BenThread_t tp[2];
tp[0] = BenNewThread(gpuMapWorkerMem, (void*)spec);
tp[1] = BenNewThread(cpuMapWorkerMem, (void*)spec);
BenWaitForMul(tp, 2);
//adjust cursors
spec->inputChunk->fileCursor += spec->flushThreshhold;//spec->inputChunk->recCount;
spec->inputChunk->keyOffset += spec->inputChunk->keySize;
spec->inputChunk->valOffset += spec->inputChunk->valSize;
PrintRecords(&(spec->gpuSched->outputSmallChunk), NULL,
(spec->gpuSched->outputSmallChunk.recCount), spec, INT, INT, 100);
PrintRecords(&(spec->cpuSched->outputSmallChunk), NULL,
(spec->cpuSched->outputSmallChunk.recCount), spec, INT, INT, 100);
//in memory merge to interChunk
//unsorted
if (spec->mode & MAP_ONLY)
{
MergeUnsortMem(spec->interChunk,
&spec->gpuSched->outputSmallChunk,
&spec->cpuSched->outputSmallChunk);
//write interChunk to output file
WriteChunkToFile(spec->interChunk,
&spec->outputFile, &spec->totalOutputRecCount, spec->mode);
}
//sorted
else if (spec->mode & MAP_SORT ||
spec->mode & MAP_SORT_REDUCE)
{
MergeSortMem(spec->interChunk,
&spec->gpuSched->outputSmallChunk,
&spec->cpuSched->outputSmallChunk);
spec->interChunk->rangeSize = spec->interChunk->diffKeyCount*sizeof(int2);
RearrangeKeyVal(spec->interChunk);
FillSortInfo(spec->sortInfo, spec->interChunk);
size_t bk = spec->interChunk->keyOffset;
size_t bv = spec->interChunk->valOffset;
spec->interChunk->keyOffset = 0;
spec->interChunk->valOffset = 0;
PrintRecords(spec->interChunk, NULL,
spec->interChunk->recCount, spec, INT, INT, 100);
spec->interChunk->keyOffset = bk;
spec->interChunk->valOffset = bv;
if (spec->mode & MAP_SORT_REDUCE)
{
WriteChunkToFile(spec->interChunk,
&spec->interFile, &spec->totalInterRecCount, spec->mode);
}
else if (spec->mode & MAP_SORT)
{
WriteChunkToFile(spec->interChunk,
&spec->outputFile, &spec->totalOutputRecCount, spec->mode);
}
}
FreeChunk(spec->inputChunk);
spec->inputChunk->recCount = 0;
spec->gpuSched->outputSmallChunk.recCount = 0;
spec->cpuSched->outputSmallChunk.recCount = 0;
}//while*/
}
static void CoprocessMergeInterFile(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
if (spec->mode & MAP_SORT)
{
MergeSortFile(&spec->outputFile, &spec->tmpFile, spec->sortInfo,
spec->flushThreshhold, &spec->totalOutputRecCount, &spec->totalDiffKeyCount);
SwapFileName(&spec->outputFile, &spec->tmpFile);
}
else if (spec->mode & MAP_SORT_REDUCE)
{
MergeSortFile(&spec->interFile, &spec->tmpFile, spec->sortInfo,
spec->flushThreshhold, &spec->totalInterRecCount, &spec->totalDiffKeyCount);
SwapFileName(&spec->interFile, &spec->tmpFile);
}
}
static void CoprocessReduceFile(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
//set up gpu schedule configuration
while(spec->interChunk->fileCursor < spec->totalDiffKeyCount)
{
ReadGroupChunkFromFile(spec->interChunk, &spec->interFile, spec->totalDiffKeyCount,
spec->interChunk->fileCursor, spec->flushThreshhold);
BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
spec->gpuSched->gpuReduceGridDim = spec->gpuReduceGridDim;
spec->gpuSched->gpuReduceBlockDim = spec->gpuReduceBlockDim;
spec->gpuSched->gpuReduceSharedMemSize = spec->gpuReduceSharedMemSize;
BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
spec->gpuSched->inputSmallChunk.diffKeyCount *= spec->gpuInputRatio;
//set up cpu schedule configuration
BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
spec->cpuSched->cpuReduceThreadNum = spec->cpuReduceThreadNum;
BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
spec->cpuSched->inputSmallChunk.keyListRange += spec->gpuSched->inputSmallChunk.diffKeyCount;
spec->cpuSched->inputSmallChunk.diffKeyCount -= spec->gpuSched->inputSmallChunk.diffKeyCount;
//launch gpu and cpu processing
BenThread_t tp[2];
tp[0] = BenNewThread(gpuReduceWorkerMem, (void*)spec);
tp[1] = BenNewThread(cpuReduceWorkerMem, (void*)spec);
BenWaitForMul(tp, 2);
// gpuReduceWorkerMem(spec);
// cpuReduceWorkerMem(spec);
spec->interChunk->fileCursor += spec->flushThreshhold;//spec->inputChunk->recCount;
spec->interChunk->keyOffset += spec->interChunk->keySize;
spec->interChunk->valOffset += spec->interChunk->valSize;
MergeUnsortMem(spec->outputChunk,
&spec->gpuSched->outputSmallChunk,
&spec->cpuSched->outputSmallChunk);
WriteChunkToFile(spec->outputChunk,
&spec->outputFile, &spec->totalOutputRecCount, spec->mode);
FreeChunk(spec->interChunk);
}//while
}
static void CoprocessMergeOutputFile(Spec_t *spec)
{
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -