?? marsutils.cu
字號:
d_keyChunkSize = 0;
d_valChunkSize = 0;
d_indexChunkSize = 0;
}
}
//---------------------------------------------------------
//main mapreduce procedure
//---------------------------------------------------------
double map_time = 0.0;
double merge_inter_time = 0.0;
double reduce_time = 0.0;
double merge_output_time = 0.0;
double group_time = 0.0;
void MapReduce(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
if (spec->mode & CPU)
{
spec->cpuSched->cpuMapThreadNum = spec->cpuMapThreadNum;
spec->cpuSched->cpuReduceThreadNum = spec->cpuReduceThreadNum;
}
if (spec->mode & GPU)
{
spec->gpuSched->gpuMapGridDim = spec->gpuMapGridDim;
spec->gpuSched->gpuMapBlockDim = spec->gpuMapBlockDim;
spec->gpuSched->gpuReduceGridDim = spec->gpuReduceGridDim;
spec->gpuSched->gpuReduceBlockDim = spec->gpuReduceBlockDim;
}
if (spec->mode & USE_MEM)
{
if (((spec->mode & CPU) &&
!(spec->mode & GPU)) ||
(!(spec->mode & CPU) &&
(spec->mode & GPU)))
{
BenSetup(0);
BenStart(0);
if(!ScheduleSingleMem(spec, MAP)) return;
BenStop(0);
map_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if (!ScheduleSingleMem(spec, SORT)) return;
BenStop(0);
group_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleSingleMem(spec, MERGE_INTER)) return;
BenStop(0);
merge_inter_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleSingleMem(spec, REDUCE)) return;
BenStop(0);
reduce_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleSingleMem(spec, MERGE_OUTPUT)) return;
BenStop(0);
merge_output_time += BenGetElapsedTime(0);
}
else if ((spec->mode & CPU) &&
(spec->mode & GPU))
{
BenSetup(0);
BenStart(0);
if(!ScheduleCoprocessMem(spec, MAP)) return;
BenStop(0);
map_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleCoprocessMem(spec, MERGE_INTER)) return;
BenStop(0);
merge_inter_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleCoprocessMem(spec, REDUCE)) return;
BenStop(0);
reduce_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleCoprocessMem(spec, MERGE_OUTPUT)) return;
BenStop(0);
merge_output_time += BenGetElapsedTime(0);
}
}
else if (spec->mode & USE_FILE)
{
if (((spec->mode & CPU) &&
!(spec->mode & GPU)) ||
(!(spec->mode & CPU) &&
(spec->mode & GPU)))
{
BenSetup(0);
BenStart(0);
if(!ScheduleSingleFile(spec, MAP)) return;
BenStop(0);
map_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleSingleFile(spec, MERGE_INTER)) return;
BenStop(0);
merge_inter_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleSingleFile(spec, REDUCE)) return;
BenStop(0);
reduce_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleSingleFile(spec, MERGE_OUTPUT)) return;
BenStop(0);
merge_output_time += BenGetElapsedTime(0);
}
else if ((spec->mode & CPU) &&
(spec->mode & GPU))
{
BenSetup(0);
BenStart(0);
if(!ScheduleCoprocessFile(spec, MAP)) return;
BenStop(0);
map_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleCoprocessFile(spec, MERGE_INTER)) return;
BenStop(0);
merge_inter_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleCoprocessFile(spec, REDUCE)) return;
BenStop(0);
reduce_time += BenGetElapsedTime(0);
BenReset(0);
BenStart(0);
if(!ScheduleCoprocessFile(spec, MERGE_OUTPUT)) return;
BenStop(0);
merge_output_time += BenGetElapsedTime(0);
}
}
else
{
BenLog("Error: please specify USE_MEM or USE_FILE\n");
return ;
}
}
//---------------------------------------------------------
//Clear buffer
//---------------------------------------------------------
void FinishMapReduce(Spec_t *spec)
{
//EnterFunc("FinishMapReduce");
BEN_ASSERT(spec != NULL);
BenFree((char**)&(spec->inputFile.keyFile), BenStrLen(spec->inputFile.keyFile)+1);
BenFree((char**)&(spec->inputFile.valFile), BenStrLen(spec->inputFile.valFile)+1);
BenFree((char**)&(spec->inputFile.indexFile), BenStrLen(spec->inputFile.indexFile)+1);
BenFree((char**)&(spec->interFile.keyFile), BenStrLen(spec->interFile.keyFile)+1);
BenFree((char**)&(spec->interFile.valFile), BenStrLen(spec->interFile.valFile)+1);
BenFree((char**)&(spec->interFile.indexFile), BenStrLen(spec->interFile.indexFile)+1);
BenFree((char**)&(spec->interFile.rangeFile), BenStrLen(spec->interFile.rangeFile)+1);
BenFree((char**)&(spec->outputFile.keyFile), BenStrLen(spec->outputFile.keyFile)+1);
BenFree((char**)&(spec->outputFile.valFile), BenStrLen(spec->outputFile.valFile)+1);
BenFree((char**)&(spec->outputFile.indexFile), BenStrLen(spec->outputFile.indexFile)+1);
BenFree((char**)&(spec->outputFile.rangeFile), BenStrLen(spec->outputFile.rangeFile)+1);
BenFree((char**)&(spec->tmpFile.keyFile), BenStrLen(spec->tmpFile.keyFile)+1);
BenFree((char**)&(spec->tmpFile.valFile), BenStrLen(spec->tmpFile.valFile)+1);
BenFree((char**)&(spec->tmpFile.indexFile), BenStrLen(spec->tmpFile.indexFile)+1);
BenFree((char**)&(spec->tmpFile.rangeFile), BenStrLen(spec->tmpFile.rangeFile)+1);
BenFree((char**)&(spec->inputChunk->keys), d_keyChunkSize);
BenFree((char**)&(spec->inputChunk->vals), d_valChunkSize);
BenFree((char**)&(spec->inputChunk->index), d_indexChunkSize);
if (spec->mode & MAP_SORT)
{
BenFree((char**)&(spec->outputChunk->keyListRange), sizeof(int2)*spec->outputChunk->diffKeyCount);
// BenFree((char**)&(spec->outputChunk->groupInfo), sizeof(int4)*spec->outputChunk->recCount);
}
else if (spec->mode & MAP_SORT_REDUCE)
{
BenFree((char**)&(spec->interChunk->keyListRange), sizeof(int2)*spec->interChunk->diffKeyCount);
// BenFree((char**)&(spec->interChunk->groupInfo), sizeof(int4)*spec->outputChunk->recCount);
}
BenFree((char**)&(spec->outputChunk->keys), spec->outputChunk->keySize);
BenFree((char**)&spec->outputChunk->vals, spec->outputChunk->valSize);
BenFree((char**)&spec->outputChunk->index, spec->outputChunk->indexSize);
BenFree((char**)&(spec->interChunk->keys), spec->interChunk->keySize);
BenFree((char**)&(spec->interChunk->vals), spec->interChunk->valSize);
BenFree((char**)&(spec->interChunk->index), spec->interChunk->indexSize);
BenFree((char**)&(spec->inputChunk), sizeof(ChunkInfo_t));
BenFree((char**)&(spec->interChunk), sizeof(ChunkInfo_t));
BenFree((char**)&(spec->outputChunk), sizeof(ChunkInfo_t));
BenFree((char**)&spec->sortInfo->chunks, sizeof(SortChunk_t)*spec->sortInfo->fullChunkCount);
BenFree((char**)&spec->sortInfo, sizeof(SortInfo_t));
BenFree((char**)&spec->cpuSched, sizeof(Schedule_t));
BenFree((char**)&spec->gpuSched, sizeof(Schedule_t));
BenFree((char**)&spec, sizeof(Spec_t));
d_keyChunkSize = 0;
d_valChunkSize = 0;
d_indexChunkSize = 0;
//LeaveFunc("FinishMapReduce");
}
//-------------------------------------------------------
//for the ease to free key/val/index buffers in a chunk
//-------------------------------------------------------
void FreeChunk(ChunkInfo_t *chunk)
{
BEN_ASSERT(chunk != NULL);
BenFree((char**)&chunk->keys, chunk->keySize);
BenFree((char**)&chunk->vals, chunk->valSize);
BenFree((char**)&chunk->index, chunk->indexSize);
BenFree((char**)&chunk->keyListRange, chunk->rangeSize);
chunk->keySize = 0;
chunk->valSize = 0;
chunk->indexSize = 0;
chunk->rangeSize = 0;
chunk->diffKeyCount = 0;
chunk->recCount = 0;
}
//========================================================
//Iterators
//========================================================
//--------------------------------------------------------
//input iterator
//--------------------------------------------------------
RecIterator_t *InitIterator(ChunkInfo_t *chunk, FileName_t *file, size_t totalRec, Spec_t *spec)
{
RecIterator_t *it = (RecIterator_t*)BenMalloc(sizeof(RecIterator_t));
//read from file
if (spec->mode & USE_FILE)
{
it->file = file;
it->chunk = chunk;
it->totalRecCount = totalRec;
it->chunkSize = spec->flushThreshhold;
}
//read from main memory
else
{
it->chunk = chunk;
it->totalRecCount = totalRec;
}
return it;
}
GroupIterator_t *InitGroupIterator(ChunkInfo_t *chunk, FileName_t *file, size_t totalGroup, Spec_t *spec)
{
GroupIterator_t *it = (GroupIterator_t*)BenMalloc(sizeof(GroupIterator_t));
//read from file
if (spec->mode & USE_FILE)
{
it->file = file;
it->chunk = chunk;
it->chunkSize = spec->flushThreshhold;
it->totalGroup = totalGroup;
}
//read from main memory
else
{
it->chunk = chunk;
}
return it;
}
//---------------------------------------------------------
//get small chunk from file
//@param: totalRec -- total number of records in file
//@param: cursor -- current record cursor in index file
//@param: chunkSize -- the number of records to read
//---------------------------------------------------------
bool ReadChunkFromFile(ChunkInfo_t *chunk, FileName_t *file,
size_t totalRec, size_t cursor, size_t chunkSize)
{
BEN_ASSERT(file != NULL);
if (cursor >= totalRec) return false;
//get indexSize
size_t recCount = chunkSize;
if (recCount + cursor >= totalRec)
recCount = totalRec - cursor;
chunk->recCount = recCount;
//read from index file
size_t indexOffset = cursor * sizeof(int4);
size_t indexSize = recCount * sizeof(int4);
//chunk->index = (int4*)BenMalloc(indexSize);
chunk->index = (int4*)BenReadFile(file->indexFile, indexOffset, indexSize);
chunk->indexSize = indexSize;
//get keySize & keyOffset & allocate key buffer
size_t keyOffset = chunk->index[0].x;
size_t keySize = chunk->index[recCount-1].x +
chunk->index[recCount-1].y - chunk->index[0].x;
//chunk->keys = (char*)BenMalloc(keySize);
chunk->keys = BenReadFile(file->keyFile, keyOffset, keySize);
chunk->keySize = keySize;
chunk->keyOffset = keyOffset;
//get valSize & valOffset & allocate val buffer
size_t valOffset = chunk->index[0].z;
size_t valSize = chunk->index[recCount-1].z +
chunk->index[recCount-1].w - chunk->index[0].z;
//chunk->vals = (char*)BenMalloc(valSize);
chunk->vals = BenReadFile(file->valFile, valOffset, valSize);
chunk->valSize = valSize;
chunk->valOffset = valOffset;
return true;
}
//---------------------------------------------------------
//get small group chunk from file
//@param: totalRec -- total number of records in file
//@param: cursor -- current record cursor in index file
//@param: chunkSize -- the number of records to read
//---------------------------------------------------------
bool ReadGroupChunkFromFile(ChunkInfo_t *chunk, FileName_t *file,
size_t totalGroup, size_t cursor, size_t chunkSize)
{
BEN_ASSERT(file != NULL);
if (cursor >= totalGroup) return false;
//get indexSize
size_t groupCount = chunkSize;
if (groupCount + cursor >= totalGroup)
groupCount = totalGroup - cursor;
chunk->diffKeyCount = groupCount;
//read from range file
size_t rangeOffset = cursor * sizeof(int2);
size_t rangeSize = groupCount * sizeof(int2);
chunk->keyListRange = (int2*)BenReadFile(file->rangeFile, rangeOffset, rangeSize);
chunk->rangeSize = rangeSize;
//read from index file
size_t recCount = chunk->keyListRange[groupCount-1].y - chunk->keyListRange->x;
size_t indexOffset = chunk->keyListRange->x * sizeof(int4);
size_t indexSize = recCount * sizeof(int4);
chunk->index = (int4*)BenReadFile(file->indexFile, indexOffset, indexSize);
chunk->indexSize = indexSize;
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -