?? file.cpp
字號:
}
#ifdef USE_SYSV_SHARED_MEMORY
if (!shmem.open(name, initSize))
{
status = errno;
if (fd >= 0)
{
::close(fd);
}
return status;
}
mmapSize = initSize;
mmapAddr = shmem.get_base();
#else
mmapAddr = (char*)valloc(mmapSize);
if (mmapAddr == NULL)
{
status = errno;
if (fd >= 0)
{
::close(fd);
}
return status;
}
#endif
lseek(fd, 0, SEEK_SET);
if ((size_t)::read(fd, mmapAddr, fileSize) != fileSize)
{
#ifdef USE_SYSV_SHARED_MEMORY
shmem.close();
#else
free(mmapAddr);
#endif
mmapAddr = NULL;
status = errno;
if (fd >= 0)
{
::close(fd);
}
return status;
}
#else // NO_MMAP
mmapAddr = (char*)mmap(NULL, mmapSize,
readonly ? PROT_READ : PROT_READ|PROT_WRITE,
mmap_attr, fd, 0);
if (mmapAddr == (char*)-1)
{
status = errno;
mmapAddr = NULL;
if (fd >= 0)
{
::close(fd);
}
return status;
}
#endif // NO_MMAP
#endif // USE_SYSV_SHARED_MEMORY && DISKLESS_CONIFIGURATION
#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)
pageSize = getpagesize();
pageMapSize = (mmapSize + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);
pageMap = new int[pageMapSize];
memset(pageMap, 0, pageMapSize*sizeof(int));
#endif
#if defined(REPLICATION_SUPPORT)
db = NULL;
int nPages = getMaxPages();
currUpdateCount = new int[nPages];
if (replicationSupport)
{
char* cFileName = new char[strlen(name) + 5];
strcat(strcpy(cFileName, name), ".cnt");
#ifndef DISKLESS_CONFIGURATION
cfd = ::open(cFileName, O_RDWR|O_DSYNC|O_CREAT, 0666);
delete[] cFileName;
if (cfd < 0)
{
return errno;
}
if (ftruncate(cfd, nPages*sizeof(int)) != ok)
{
status = errno;
::close(cfd);
return status;
}
#else
int mmap_attr = MAP_SHARED;
#ifndef MAP_ANONYMOUS
cfd = ::open("/dev/zero", O_RDONLY, 0);
#else
cfd = -1;
mmap_attr |= MAP_ANONYMOUS;
#endif
#endif
diskUpdateCount = (int*)mmap(NULL, nPages*sizeof(int),
PROT_READ|PROT_WRITE, mmap_attr, cfd, 0);
if (diskUpdateCount == (int*)-1)
{
int status = errno;
diskUpdateCount = NULL;
if (cfd >= 0)
{
::close(cfd);
}
return status;
}
int maxCount = 0;
rootPage = dbMalloc(pageSize);
for (int i = 0; i < nPages; i++)
{
int count = diskUpdateCount[i];
currUpdateCount[i] = count;
if (count > maxCount)
{
maxCount = count;
}
}
updateCounter = maxCount;
nRecovered = 0;
recoveredEvent.open(true);
syncEvent.open();
startSync();
}
#endif
return ok;
}
#if defined(REPLICATION_SUPPORT)
void dbFile::syncToDisk()
{
syncThread.setPriority(dbThread::THR_PRI_LOW);
dbCriticalSection cs(syncCS);
while (doSync)
{
int i, j, k;
int maxUpdated = 0;
for (i = 0; i < int(mmapSize >> dbModMapBlockBits);)
{
int updateCounters[dbMaxSyncSegmentSize];
for (j=i; j < (int)(mmapSize >> dbModMapBlockBits) && j-i < dbMaxSyncSegmentSize
&& currUpdateCount[j] > diskUpdateCount[j]; j++)
{
updateCounters[j-i] = currUpdateCount[j];
}
if (i != j)
{
size_t pos = (i << dbModMapBlockBits) & ~(pageSize-1);
size_t size = (((j-i) << dbModMapBlockBits) + pageSize - 1) & ~(pageSize-1);
#ifdef NO_MMAP
if (lseek(fd, pos, SEEK_SET) != pos
|| ::write(fd, mmapAddr + pos, size) != size)
{
dbTrace("Failed to save page to the disk, position=%ld, size=%ld, error=%d\n",
(long)pos, (long)size, errno);
}
#else
msync(mmapAddr + pos, size, MS_SYNC);
#endif
for (k = 0; i < j; k++, i++)
{
diskUpdateCount[i] = updateCounters[k];
}
maxUpdated = i;
}
else
{
i += 1;
}
if (!doSync)
{
return;
}
}
if (maxUpdated != 0)
{
msync(diskUpdateCount, maxUpdated*sizeof(int), MS_SYNC);
}
if (closing && maxUpdated == 0)
{
return;
}
else
{
syncEvent.wait(syncCS, dbSyncTimeout);
}
}
}
#endif
int dbFile::create(const char* name, bool)
{
mmapAddr = NULL;
fd = ::open(name, O_RDWR|O_TRUNC|O_CREAT, 0666);
if (fd < 0)
{
return errno;
}
return ok;
}
int dbFile::read(void* buf, size_t& readBytes, size_t size)
{
long rc = ::read(fd, buf, size);
if (rc < 0)
{
readBytes = 0;
return errno;
}
readBytes = rc;
return ok;
}
int dbFile::write(void const* buf, size_t& writtenBytes, size_t size)
{
long rc = ::write(fd, buf, size);
if (rc < 0)
{
writtenBytes = 0;
return errno;
}
writtenBytes = rc;
return ok;
}
int dbFile::setSize(size_t size, char const*, bool)
{
#ifdef REPLICATION_SUPPORT
dbCriticalSection cs1(syncCS);
dbCriticalSection cs2(replCS);
#endif
#if defined(DISKLESS_CONFIGURATION) || defined(USE_SYSV_SHARED_MEMORY)
assert(false);
#else
#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)
int newPageMapSize = (size + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);
int* newPageMap = new int[newPageMapSize];
memcpy(newPageMap, pageMap, pageMapSize*sizeof(int));
memset(newPageMap + pageMapSize, 0,
(newPageMapSize-pageMapSize)*sizeof(int));
delete[] pageMap;
pageMapSize = newPageMapSize;
pageMap = newPageMap;
#endif
#ifdef NO_MMAP
char* newBuf = (char*)valloc(size);
if (newBuf == NULL)
{
return errno;
}
memcpy(newBuf, mmapAddr, mmapSize);
free(mmapAddr);
mmapAddr = newBuf;
mmapSize = size;
if (ftruncate(fd, size) != ok)
{
return errno;
}
#else
if (munmap(mmapAddr, mmapSize) != ok ||
ftruncate(fd, size) != ok ||
(mmapAddr = (char*)mmap(NULL, size, PROT_READ|PROT_WRITE,
MAP_SHARED, fd, 0)) == (char*)-1)
{
return errno;
}
#endif
mmapSize = size;
#endif
return ok;
}
int dbFile::flush(bool physical)
{
#if defined(REPLICATION_SUPPORT)
dbCriticalSection cs(replCS);
if (db == NULL)
{
physical = true;
}
if (!physical)
{
updateCounter += 1;
}
#endif
#if defined(REPLICATION_SUPPORT) || (defined(NO_MMAP) && !defined(DISKLESS_CONFIGURATION))
int* map = pageMap;
for (int i = 0, n = pageMapSize; i < n; i++)
{
if (map[i] != 0)
{
size_t pos = (size_t)i << (dbModMapBlockBits + 5);
unsigned mask = map[i];
int count = 0;
do
{
int size = 0;
while ((mask & 1) == 0)
{
pos += dbModMapBlockSize;
mask >>= 1;
count += 1;
}
while (true)
{
do
{
#ifdef REPLICATION_SUPPORT
if (!physical)
{
currUpdateCount[(pos + size) >> dbModMapBlockBits] = updateCounter;
}
#endif
size += dbModMapBlockSize;
mask >>= 1;
count += 1;
}
while ((mask & 1) != 0);
if (i+1 < n && count == 32 && size < dbMaxSyncSegmentSize*dbModMapBlockSize
&& (map[i+1] & 1) != 0)
{
map[i] = 0;
mask = map[++i];
count = 0;
}
else
{
break;
}
}
#if defined(REPLICATION_SUPPORT)
if (db != NULL)
{
if (!physical)
{
for (int j = db->nServers; --j >= 0;)
{
if (db->con[j].status == dbReplicatedDatabase::ST_STANDBY)
{
ReplicationRequest rr;
rr.op = ReplicationRequest::RR_UPDATE_PAGE;
rr.nodeId = db->id;
rr.page.updateCount = updateCounter;
rr.page.offs = pos;
rr.size = size;
db->writeReq(j, rr, mmapAddr + pos, size);
}
}
}
pos += size;
continue;
}
#else
if ((size_t)lseek(fd, pos, SEEK_SET) != pos
|| ::write(fd, mmapAddr + pos, size) != size)
{
return errno;
}
#endif
pos += size;
}
while (mask != 0);
map[i] = 0;
}
}
#endif
#if !defined(NO_MMAP) && !defined(DISKLESS_CONFIGURATION) && !defined(REPLICATION_SUPPORT)
if (msync(mmapAddr, mmapSize, MS_SYNC) != ok)
{
return errno;
}
#endif
return ok;
}
int dbFile::erase()
{
#ifdef USE_SYSV_SHARED_MEMORY
shmem.erase();
#endif
return ok;
}
int dbFile::close()
{
#if defined(REPLICATION_SUPPORT)
if (db != NULL)
{
closing = true;
stopSync();
{
dbCriticalSection cs(replCS);
if (nRecovered != 0)
{
recoveredEvent.wait(replCS);
}
}
syncEvent.close();
recoveredEvent.close();
munmap(diskUpdateCount, getMaxPages()*sizeof(int));
if (cfd >= 0)
{
::close(cfd);
}
}
delete[] currUpdateCount;
currUpdateCount = NULL;
dbFree(rootPage);
rootPage = NULL;
#endif // REPLICATION_SUPPORT
if (mmapAddr != NULL)
{
#ifdef NO_MMAP
int rc = flush();
if (rc != ok)
{
return rc;
}
#endif
#ifdef USE_SYSV_SHARED_MEMORY
shmem.close();
#elif defined(NO_MMAP)
free(mmapAddr);
#else
if (munmap(mmapAddr, mmapSize) != ok)
{
return errno;
}
#endif
mmapAddr = NULL;
#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)
delete[] pageMap;
#endif
}
return fd < 0 && ::close(fd) != ok ? errno : ok;
}
char* dbFile::errorText(int code, char* buf, size_t bufSize)
{
return strncpy(buf, strerror(code), bufSize);
}
#endif
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -