?? marsutils.cu
字號:
/**
*This is the source code for Mars, a MapReduce framework on graphics
*processors.
*Author: Wenbin Fang (HKUST), Bingsheng He (HKUST)
*Mentor: Naga K. Govindaraju (Microsoft Corp.), Qiong Luo (HKUST), Tuyong
*Wang (Sina.com).
*If you have any question on the code, please contact us at {saven,
*wenbin, luo}@cse.ust.hk.
*The copyright is held by HKUST. Mars is provided "as is" without any
*guarantees of any kind.
*/
//--------------------------------------------------
//runtime functions which can be called by users
//1, Required
//2, Iterators
//3, Debug tools
//--------------------------------------------------
#include "MarsInc.h"
//========================================================
//Required
//========================================================
//--------------------------------------------------------
//factory function, get a default runtime configuration
//--------------------------------------------------------
Spec_t *GetDefaultSpec()
{
//EnterFunc("GetDefaultSpec");
Spec_t *spec = (Spec_t*)BenMalloc(sizeof(Spec_t));
BenMemset(&spec->inputFile, 0, sizeof(FileName_t));
BenMemset(&spec->interFile, 0, sizeof(FileName_t));
BenMemset(&spec->outputFile, 0, sizeof(FileName_t));
spec->inputFile.keyFile = BenStrDup(DEFAULT_INPUT_KEY_FILE);
spec->inputFile.valFile = BenStrDup(DEFAULT_INPUT_VAL_FILE);
spec->inputFile.indexFile = BenStrDup(DEFAULT_INPUT_INDEX_FILE);
spec->inputChunk = (ChunkInfo_t*)BenMalloc(sizeof(ChunkInfo_t));
spec->interChunk = (ChunkInfo_t*)BenMalloc(sizeof(ChunkInfo_t));
spec->outputChunk = (ChunkInfo_t*)BenMalloc(sizeof(ChunkInfo_t));
spec->cpuSched = (Schedule_t*)BenMalloc(sizeof(Schedule_t));
spec->gpuSched = (Schedule_t*)BenMalloc(sizeof(Schedule_t));
spec->gpuSched->gpuMapSharedMemSize = 0;
spec->gpuSched->gpuReduceSharedMemSize = 0;
spec->sortInfo = (SortInfo_t*)BenMalloc(sizeof(SortInfo_t));
spec->sortInfo->fullChunkCount = DEFAULL_FULL_SORT_CHUNK_COUNT;
spec->sortInfo->chunks = (SortChunk_t*)BenMalloc(sizeof(SortChunk_t)*spec->sortInfo->fullChunkCount);
spec->flushThreshhold = DEFAULT_THRESHHOLD;
//LeaveFunc("GetDefaultSpec");
return spec;
}
//------------------------------------------------
//Add input records
//------------------------------------------------
size_t d_keyChunkSize = 0;
size_t d_valChunkSize = 0;
size_t d_indexChunkSize = 0;
static int2 curOffset;
static int3 curChunkNum;
static void AddMapInputRecordSingle(Spec_t* spec,
void* key,
void* val,
size_t keySize,
size_t valSize)
{
//EnterFunc("AddMapInputRecord");
BEN_ASSERT(spec != NULL);
int index = spec->inputChunk->recCount;
const size_t dataChunkSize = 1024*1024*16;
if (spec->totalInputRecCount > 0)
{
size_t xAllSize = dataChunkSize*curChunkNum.x;
if ( xAllSize < (curOffset.x + keySize))
{
spec->inputChunk->keys = (char*)BenRealloc(spec->inputChunk->keys,
xAllSize, (++curChunkNum.x)*dataChunkSize);
d_keyChunkSize += dataChunkSize;
}
BenMemcpy(spec->inputChunk->keys+curOffset.x, key, keySize);
size_t yAllSize = dataChunkSize*curChunkNum.y;
if (yAllSize < (curOffset.y + valSize))
{
spec->inputChunk->vals = (char*)BenRealloc(spec->inputChunk->vals,
yAllSize, (++curChunkNum.y)*dataChunkSize);
d_valChunkSize += dataChunkSize;
}
BenMemcpy(spec->inputChunk->vals+curOffset.y, val, valSize);
size_t zAllSize = dataChunkSize*curChunkNum.z;
if (zAllSize < (spec->inputChunk->recCount+1)*sizeof(int4))
{
spec->inputChunk->index = (int4*)BenRealloc((void*)spec->inputChunk->index,
zAllSize, (++curChunkNum.z)*dataChunkSize);
d_indexChunkSize += dataChunkSize;
}
}
else
{
spec->inputChunk->keys = (char*)BenMalloc(dataChunkSize);
if (NULL == spec->inputChunk->keys) exit(-1);
BenMemcpy(spec->inputChunk->keys, key, keySize);
spec->inputChunk->vals = (char*)BenMalloc(dataChunkSize);
if (NULL == spec->inputChunk->vals) exit(-1);
BenMemcpy(spec->inputChunk->vals, val, valSize);
spec->inputChunk->index = (int4*)BenMalloc(dataChunkSize);
curChunkNum.x++;
curChunkNum.y++;
curChunkNum.z++;
d_keyChunkSize += dataChunkSize;
d_valChunkSize += dataChunkSize;
d_indexChunkSize += dataChunkSize;
}
spec->inputChunk->index[index].x = curOffset.x;
spec->inputChunk->index[index].y = keySize;
spec->inputChunk->index[index].z = curOffset.y;
spec->inputChunk->index[index].w = valSize;
curOffset.x += keySize;
curOffset.y += valSize;
spec->inputChunk->keySize += keySize;
spec->inputChunk->valSize += valSize;
spec->inputChunk->indexSize += sizeof(int4);
spec->inputChunk->recCount++;
spec->totalInputRecCount++;
//flush in memory buffer to disk
//LeaveFunc("AddMapInputRecord");
}
static int2 localIndex;
static void AddMapInputRecordMem(Spec_t* spec,
void* key,
void* val,
size_t keySize,
size_t valSize)
{
//EnterFunc("AddMapInputRecord");
BEN_ASSERT(spec != NULL);
int index = spec->inputChunk->recCount;
const size_t dataChunkSize = 1024*1024*16;
if (spec->totalInputRecCount > 0)
{
size_t xAllSize = dataChunkSize*curChunkNum.x;
if ( xAllSize < (curOffset.x + keySize))
{
spec->inputChunk->keys = (char*)BenRealloc(spec->inputChunk->keys,
xAllSize, (++curChunkNum.x)*dataChunkSize);
d_keyChunkSize += dataChunkSize;
}
BenMemcpy(spec->inputChunk->keys+curOffset.x, key, keySize);
size_t yAllSize = dataChunkSize*curChunkNum.y;
if (yAllSize < (curOffset.y + valSize))
{
spec->inputChunk->vals = (char*)BenRealloc(spec->inputChunk->vals,
yAllSize, (++curChunkNum.y)*dataChunkSize);
d_valChunkSize += dataChunkSize;
}
BenMemcpy(spec->inputChunk->vals+curOffset.y, val, valSize);
size_t zAllSize = dataChunkSize*curChunkNum.z;
if (zAllSize < (spec->inputChunk->recCount+1)*sizeof(int4))
{
spec->inputChunk->index = (int4*)BenRealloc((void*)spec->inputChunk->index,
zAllSize, (++curChunkNum.z)*dataChunkSize);
d_indexChunkSize += dataChunkSize;
}
}
else
{
spec->inputChunk->keys = (char*)BenMalloc(dataChunkSize);
if (NULL == spec->inputChunk->keys) exit(-1);
BenMemcpy(spec->inputChunk->keys, key, keySize);
spec->inputChunk->vals = (char*)BenMalloc(dataChunkSize);
if (NULL == spec->inputChunk->vals) exit(-1);
BenMemcpy(spec->inputChunk->vals, val, valSize);
spec->inputChunk->index = (int4*)BenMalloc(dataChunkSize);
curChunkNum.x++;
curChunkNum.y++;
curChunkNum.z++;
d_keyChunkSize += dataChunkSize;
d_valChunkSize += dataChunkSize;
d_indexChunkSize += dataChunkSize;
}
spec->inputChunk->index[index].x = localIndex.x;
spec->inputChunk->index[index].y = keySize;
spec->inputChunk->index[index].z = localIndex.x;
spec->inputChunk->index[index].w = valSize;
curOffset.x += keySize;
curOffset.y += valSize;
localIndex.x += keySize;
localIndex.y += valSize;
spec->inputChunk->keySize += keySize;
spec->inputChunk->valSize += valSize;
spec->inputChunk->indexSize += sizeof(int4);
spec->inputChunk->recCount++;
spec->totalInputRecCount++;
//flush in memory buffer to disk
//LeaveFunc("AddMapInputRecord");
}
static void AddMapInputRecordFile(Spec_t* spec,
void* key,
void* val,
size_t keySize,
size_t valSize)
{
//flush buffer to file
if (spec->inputChunk->recCount >= spec->flushThreshhold)
{
/*for (int i = 0; i < spec->inputChunk->recCount; i++)
{
spec->inputChunk->index[i].x += spec->inputChunk->keyOffset;
spec->inputChunk->index[i].z += spec->inputChunk->valOffset;
}*/
if (spec->totalInputRecCount > spec->flushThreshhold)
{
BenAppendFile(spec->inputFile.keyFile,
spec->inputChunk->keys, spec->inputChunk->keySize);
BenAppendFile(spec->inputFile.valFile,
spec->inputChunk->vals, spec->inputChunk->valSize);
BenAppendFile(spec->inputFile.indexFile,
spec->inputChunk->index, spec->inputChunk->indexSize);
}
else
{
BenWriteFile(spec->inputFile.keyFile,
spec->inputChunk->keys, spec->inputChunk->keySize);
BenWriteFile(spec->inputFile.valFile,
spec->inputChunk->vals, spec->inputChunk->valSize);
BenWriteFile(spec->inputFile.indexFile,
spec->inputChunk->index, spec->inputChunk->indexSize);
}
spec->inputChunk->keyOffset += spec->inputChunk->keySize;
spec->inputChunk->valOffset += spec->inputChunk->valSize;
spec->inputChunk->recCount = 0;
spec->inputChunk->keySize = 0;
spec->inputChunk->valSize = 0;
spec->inputChunk->indexSize = 0;
curOffset.x = 0;
curOffset.y = 0;
}
//put record temporally in memory
AddMapInputRecordMem(spec, key, val, keySize, valSize);
}
void AddMapInputRecord(Spec_t* spec,
void* key,
void* val,
size_t keySize,
size_t valSize)
{
if (spec->mode & USE_MEM)
AddMapInputRecordSingle(spec, key, val, keySize, valSize);
else if (spec->mode & USE_FILE)
AddMapInputRecordFile(spec, key, val, keySize, valSize);
else
return ;
}
void ResetInput(Spec_t *spec)
{
if (spec->mode & USE_MEM)
{
BenFree((char**)&(spec->inputChunk->keys), d_keyChunkSize);
BenFree((char**)&(spec->inputChunk->vals), d_valChunkSize);
BenFree((char**)&(spec->inputChunk->index), d_indexChunkSize);
d_keyChunkSize = 0;
d_valChunkSize = 0;
d_indexChunkSize = 0;
localIndex.x = 0;
localIndex.y = 0;
curOffset.x = 0;
curOffset.y = 0;
curChunkNum.x = 0;
curChunkNum.y = 0;
curChunkNum.z = 0;
BenMemset(spec->inputChunk, 0, sizeof(ChunkInfo_t));
spec->totalInputRecCount = 0;
}
}
void ResetInter(Spec_t *spec)
{
if (spec->mode & USE_MEM)
{
BenFree((char**)&(spec->interChunk->keys), spec->interChunk->keySize);
BenFree((char**)&(spec->interChunk->vals), spec->interChunk->valSize);
BenFree((char**)&(spec->interChunk->index), spec->interChunk->indexSize);
BenFree((char**)&(spec->interChunk->keyListRange), sizeof(int2)*spec->interChunk->diffKeyCount);
BenMemset(spec->interChunk, 0, sizeof(ChunkInfo_t));
spec->totalInterRecCount = 0;
spec->totalDiffKeyCount = 0;
}
}
void ResetOutput(Spec_t *spec)
{
if (spec->mode & USE_MEM)
{
BenFree((char**)&(spec->outputChunk->keys), spec->outputChunk->keySize);
BenFree((char**)&spec->outputChunk->vals, spec->outputChunk->valSize);
BenFree((char**)&spec->outputChunk->index, spec->outputChunk->indexSize);
BenFree((char**)&(spec->outputChunk->keyListRange), sizeof(int2)*spec->outputChunk->diffKeyCount);
BenMemset(spec->outputChunk, 0, sizeof(ChunkInfo_t));
}
}
static void FlushToDisk(ChunkInfo_t *chunk, FileName_t *file)
{
BenAppendFile(file->keyFile, chunk->keys, chunk->keySize);
BenAppendFile(file->valFile, chunk->vals, chunk->valSize);
BenAppendFile(file->indexFile, chunk->index, chunk->indexSize);
}
void FlushInputToDisk(Spec_t *spec)
{
if (spec->inputChunk->recCount > 0 &&
spec->mode & USE_FILE)
{
FlushToDisk(spec->inputChunk, &(spec->inputFile));
BenFree((char**)&(spec->inputChunk->keys), d_keyChunkSize);
BenFree((char**)&(spec->inputChunk->vals), d_valChunkSize);
BenFree((char**)&(spec->inputChunk->index), d_indexChunkSize);
spec->inputChunk->recCount = 0;
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -