?? marsschedmem.cu
字號:
/**
*This is the source code for Mars, a MapReduce framework on graphics
*processors.
*Author: Wenbin Fang (HKUST), Bingsheng He (HKUST)
*Mentor: Naga K. Govindaraju (Microsoft Corp.), Qiong Luo (HKUST), Tuyong
*Wang (Sina.com).
*If you have any question on the code, please contact us at {saven,
*wenbin, luo}@cse.ust.hk.
*The copyright is held by HKUST. Mars is provided "as is" without any
*guarantees of any kind.
*/
//---------------------------------------------------------
//for GPU-only or CPU-only
//---------------------------------------------------------
static void SingleMapMem(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
if ((spec->mode & GPU) &&
!(spec->mode & CPU))
{
BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
spec->gpuSched->gpuMapGridDim = spec->gpuMapGridDim;
spec->gpuSched->gpuMapBlockDim = spec->gpuMapBlockDim;
spec->gpuSched->gpuMapSharedMemSize = spec->gpuMapSharedMemSize;
StartGPUMap(spec->gpuSched, spec->mode);
}
else if (!(spec->mode & GPU) &&
(spec->mode & CPU))
{
BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
spec->cpuSched->cpuMapThreadNum = spec->cpuMapThreadNum;
StartCPUMap(spec->cpuSched, spec->mode);
}
else
{
BenLog("Error: please use single processor!\n");
return;
}
}
static void SingleSortMem(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
// if (spec->mode & MAP_SORT ||
// spec->mode & MAP_SORT_REDUCE)
{
if (spec->mode & GPU)
{
if (!spec->cpuSort)
StartGPUSort(spec->gpuSched, spec->mode);
else
StartGPUSort_cpu(spec->gpuSched, spec->mode);
}
else if (spec->mode & CPU)
{
StartCPUSort(spec->cpuSched, spec->mode);
}
}
}
static void SingleMergeInterMem(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
Schedule_t *sched = NULL;
if ((spec->mode & GPU) &&
!(spec->mode & CPU))
{
sched = spec->gpuSched;
}
else if (!(spec->mode & GPU) &&
(spec->mode & CPU))
{
sched = spec->cpuSched;
}
else
{
BenLog("Error: please use single processor!\n");
return;
}
if (spec->mode & MAP_ONLY ||
spec->mode & MAP_SORT)
{
spec->totalOutputRecCount = sched->outputSmallChunk.recCount;
spec->totalDiffKeyCount = sched->outputSmallChunk.diffKeyCount;
BenMemcpy(spec->outputChunk, &(sched->outputSmallChunk), sizeof(ChunkInfo_t));
}
else
{
BenMemcpy(spec->interChunk, &(sched->outputSmallChunk), sizeof(ChunkInfo_t));
}
}
static void SingleReduceMem(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
if ((spec->mode & GPU) &&
!(spec->mode & CPU))
{
BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
spec->gpuSched->gpuReduceGridDim = spec->gpuReduceGridDim;
spec->gpuSched->gpuReduceBlockDim = spec->gpuReduceBlockDim;
spec->gpuSched->gpuReduceSharedMemSize = spec->gpuReduceSharedMemSize;
StartGPUReduce(spec->gpuSched, spec->mode);
}
else if (!(spec->mode & GPU) &&
(spec->mode & CPU))
{
BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
spec->cpuSched->cpuReduceThreadNum = spec->cpuReduceThreadNum;
StartCPUReduce(spec->cpuSched, spec->mode);
}
else
{
BenLog("Error: please use single processor!\n");
return;
}
}
static void SingleMergeOutputMem(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
Schedule_t *sched = NULL;
if ((spec->mode & GPU) &&
!(spec->mode & CPU))
{
sched = spec->gpuSched;
}
else if (!(spec->mode & GPU) &&
(spec->mode & CPU))
{
sched = spec->cpuSched;
}
else
{
BenLog("Error: please use single processor!\n");
return;
}
spec->totalOutputRecCount = sched->outputSmallChunk.recCount;
BenMemcpy(spec->outputChunk, &(sched->outputSmallChunk), sizeof(ChunkInfo_t));
}
//-------------------------------------------------------
//for gpu and cpu co-processing
//-------------------------------------------------------
static void *gpuMapWorkerMem(void *args)
{
Spec_t *spec = (Spec_t*)args;
BEN_ASSERT(spec != NULL);
StartGPUMap(spec->gpuSched, spec->mode);
StartGPUSort(spec->gpuSched, spec->mode);
return 0;
}
static void *cpuMapWorkerMem(void *args)
{
Spec_t *spec = (Spec_t*)args;
BEN_ASSERT(spec != NULL);
StartCPUMap(spec->cpuSched, spec->mode);
StartCPUSort(spec->cpuSched, spec->mode);
return 0;
}
static void CoprocessMapMem(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
//set up gpu schedule configuration
BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
spec->gpuSched->gpuMapGridDim = spec->gpuMapGridDim;
spec->gpuSched->gpuMapBlockDim = spec->gpuMapBlockDim;
spec->gpuSched->gpuMapSharedMemSize = spec->gpuMapSharedMemSize;
BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
spec->gpuSched->inputSmallChunk.recCount *= spec->gpuInputRatio;
//set up cpu schedule configuration
BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
spec->cpuSched->cpuMapThreadNum = spec->cpuMapThreadNum;
BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->inputChunk, sizeof(ChunkInfo_t));
spec->cpuSched->inputSmallChunk.index += spec->gpuSched->inputSmallChunk.recCount;
spec->cpuSched->inputSmallChunk.recCount -= spec->gpuSched->inputSmallChunk.recCount;
//launch gpu and cpu processing
BenThread_t tp[2];
tp[0] = BenNewThread(gpuMapWorkerMem, (void*)spec);
tp[1] = BenNewThread(cpuMapWorkerMem, (void*)spec);
BenWaitForMul(tp, 2);
spec->totalInterRecCount += spec->gpuSched->outputSmallChunk.recCount;
spec->totalInterRecCount += spec->cpuSched->outputSmallChunk.recCount;
}
static void MergeUnsortMem(ChunkInfo_t *dest, ChunkInfo_t *src1, ChunkInfo_t *src2)
{
BEN_ASSERT(dest != NULL);
BEN_ASSERT(src1 != NULL);
BEN_ASSERT(src2 != NULL);
dest->keySize = src1->keySize + src2->keySize;
dest->valSize = src1->valSize + src2->valSize;
dest->indexSize = src1->indexSize + src2->indexSize;
dest->recCount = src1->recCount + src2->recCount;
if (dest->recCount <= 0) return;
dest->keys = (char*)BenMalloc(dest->keySize);
dest->vals = (char*)BenMalloc(dest->valSize);
dest->index = (int4*)BenMalloc(dest->indexSize);
BenMemcpy(dest->keys, src1->keys, src1->keySize);
BenMemcpy(dest->vals, src1->vals, src1->valSize);
BenMemcpy(dest->index, src1->index, src1->indexSize);
//adjust key/val offset
for (int i = 0; i < src2->recCount; i++)
{
src2->index[i].x += src1->keySize;
src2->index[i].z += src1->valSize;
}
BenMemcpy(dest->keys+src1->keySize, src2->keys, src2->keySize);
BenMemcpy(dest->vals+src1->valSize, src2->vals, src2->valSize);
BenMemcpy(dest->index+src1->recCount, src2->index, src2->indexSize);
BenFree((char**)&src1->keys, src1->keySize);
BenFree((char**)&src1->vals, src1->valSize);
BenFree((char**)&src1->index, src1->indexSize);
BenFree((char**)&src2->keys, src2->keySize);
BenFree((char**)&src2->vals, src2->valSize);
BenFree((char**)&src2->index, src2->indexSize);
}
static void MergeSortMem(ChunkInfo_t *dest, ChunkInfo_t *src1, ChunkInfo_t *src2)
{
BEN_ASSERT(dest != NULL);
BEN_ASSERT(src1 != NULL);
BEN_ASSERT(src2 != NULL);
dest->keySize = src1->keySize + src2->keySize;
dest->valSize = src1->valSize + src2->valSize;
dest->indexSize = src1->indexSize + src2->indexSize;
dest->recCount = src1->recCount + src2->recCount;
dest->keys = (char*)BenMalloc(dest->keySize);
dest->vals = (char*)BenMalloc(dest->valSize);
dest->index = (int4*)BenMalloc(dest->indexSize);
BenMemcpy(dest->keys, src1->keys, src1->keySize);
BenMemcpy(dest->vals, src1->vals, src1->valSize);
BenMemcpy(dest->keys+src1->keySize, src2->keys, src2->keySize);
BenMemcpy(dest->vals+src1->valSize, src2->vals, src2->valSize);
//adjust key/val offset
for (int i = 0; i < src2->recCount; i++)
{
src2->index[i].x += src1->keySize;
src2->index[i].z += src1->valSize;
}
/* InitGroupMem(dest);
int j, k, l;
int2 *keyListRange1 = src1->keyListRange;
int2 *keyListRange2 = src2->keyListRange;
for (j = 0, k = 0, l = 0;
j < src1->recCount || k < src2->recCount;
)
{
if (j < src1->recCount &&
k < src2->recCount)
{
if (cmp_wrap(src1->index+j, src2->index+k) <= 0)
{
size_t count = keyListRange1->y-keyListRange1->x;
BenMemcpy(dest->index+l, src1->index+j, sizeof(int4)*count);
j+=count;
l+=count;
keyListRange1++;
}
else
{
size_t count = keyListRange2->y-keyListRange2->x;
BenMemcpy(dest->index+l, src2->index+k, sizeof(int4)*count);
k+=count;
l+=count;
keyListRange2++;
}
}
if (j >= src1->recCount &&
k < src2->recCount)
{
size_t count = src2->recCount - k;
BenMemcpy(dest->index+l, src2->index+k, sizeof(int4)*count);
break;
}
if (j < src1->recCount &&
k >= src2->recCount)
{
size_t count = src1->recCount - j;
BenMemcpy(dest->index+l, src1->index+j, sizeof(int4)*count);
break;
}
}*/
//!!!
BenMemcpy(dest->index, src1->index, src1->indexSize);
BenMemcpy(dest->index+src1->recCount, src2->index, src2->indexSize);
QuickSortMem(dest);
//!!!
GroupByMem(dest);
BenFree((char**)&src1->keys, src1->keySize);
BenFree((char**)&src1->vals, src1->valSize);
BenFree((char**)&src1->index, src1->indexSize);
BenFree((char**)&src1->keyListRange, src1->diffKeyCount*sizeof(int2));
BenFree((char**)&src2->keys, src2->keySize);
BenFree((char**)&src2->vals, src2->valSize);
BenFree((char**)&src2->index, src2->indexSize);
BenFree((char**)&src2->keyListRange, src2->diffKeyCount*sizeof(int2));
}
static void CoprocessMergeInterMem(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
if (spec->mode & MAP_ONLY)
{
MergeUnsortMem(spec->outputChunk, &spec->gpuSched->outputSmallChunk,
&spec->cpuSched->outputSmallChunk);
spec->totalOutputRecCount = spec->cpuSched->outputSmallChunk.recCount +
spec->gpuSched->outputSmallChunk.recCount;
}
else if (spec->mode & MAP_SORT)
{
MergeSortMem(spec->outputChunk, &spec->gpuSched->outputSmallChunk,
&spec->cpuSched->outputSmallChunk);
spec->totalOutputRecCount = spec->totalInterRecCount;
spec->totalDiffKeyCount = spec->outputChunk->diffKeyCount;
}
else if (spec->mode & MAP_SORT_REDUCE)
{
MergeSortMem(spec->interChunk, &spec->gpuSched->outputSmallChunk,
&spec->cpuSched->outputSmallChunk);
spec->totalOutputRecCount = spec->cpuSched->outputSmallChunk.recCount +
spec->gpuSched->outputSmallChunk.recCount;
}
}
static void *gpuReduceWorkerMem(void *args)
{
Spec_t *spec = (Spec_t*)args;
BEN_ASSERT(spec != NULL);
StartGPUReduce(spec->gpuSched, spec->mode);
return 0;
}
static void *cpuReduceWorkerMem(void *args)
{
Spec_t *spec = (Spec_t*)args;
BEN_ASSERT(spec != NULL);
StartCPUReduce(spec->cpuSched, spec->mode);
return 0;
}
static void CoprocessReduceMem(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
//set up gpu schedule configuration
BenMemset(spec->gpuSched, 0, sizeof(Schedule_t));
spec->gpuSched->gpuReduceGridDim = spec->gpuReduceGridDim;
spec->gpuSched->gpuReduceBlockDim = spec->gpuReduceBlockDim;
spec->gpuSched->gpuReduceSharedMemSize = spec->gpuReduceSharedMemSize;
BenMemcpy(&(spec->gpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
spec->gpuSched->inputSmallChunk.diffKeyCount *= spec->gpuInputRatio;
//set up cpu schedule configuration
BenMemset(spec->cpuSched, 0, sizeof(Schedule_t));
spec->cpuSched->cpuReduceThreadNum = spec->cpuReduceThreadNum;
BenMemcpy(&(spec->cpuSched->inputSmallChunk), spec->interChunk, sizeof(ChunkInfo_t));
spec->cpuSched->inputSmallChunk.keyListRange += spec->gpuSched->inputSmallChunk.diffKeyCount;
spec->cpuSched->inputSmallChunk.diffKeyCount -= spec->gpuSched->inputSmallChunk.diffKeyCount;
//launch gpu and cpu processing
BenThread_t tp[2];
tp[0] = BenNewThread(gpuReduceWorkerMem, (void*)spec);
tp[1] = BenNewThread(cpuReduceWorkerMem, (void*)spec);
BenWaitForMul(tp, 2);
//gpuReduceWorkerMem(spec);
//cpuReduceWorkerMem(spec);
}
static void CoprocessMergeOutputMem(Spec_t *spec)
{
BEN_ASSERT(spec != NULL);
MergeUnsortMem(spec->outputChunk, &spec->gpuSched->outputSmallChunk,
&spec->cpuSched->outputSmallChunk);
spec->totalOutputRecCount = spec->outputChunk->recCount;
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -