?? file.cpp
字號:
//-< FILE.CPP >------------------------------------------------------*--------*
// FastDB Version 1.0 (c) 1999 GARRET * ? *
// (Main Memory Database Management System) * /\| *
// * / \ *
// Created: 20-Nov-98 K.A. Knizhnik * / [] \ *
// Last update: 10-Dec-98 K.A. Knizhnik * GARRET *
//-------------------------------------------------------------------*--------*
// System dependent implementation of mapped on memory file
//-------------------------------------------------------------------*--------*
#define INSIDE_FASTDB
#include "stdtp.h"
#include "file.h"
dbFile::dbFile()
{
sharedName = NULL;
mmapAddr = NULL;
mmapSize = 0;
#ifdef REPLICATION_SUPPORT
currUpdateCount = NULL;
diskUpdateCount = NULL;
rootPage = NULL;
db = NULL;
#endif
}
#if defined(REPLICATION_SUPPORT) || defined(NO_MMAP)
const int dbMaxSyncSegmentSize = 128*1024 / dbModMapBlockSize;
#endif
#ifdef REPLICATION_SUPPORT
#include "database.h"
int dbFile::dbSyncTimeout = 1000; // one second
bool dbFile::updatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size)
{
if (pos + size > mmapSize)
{
size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;
setSize(newSize, sharedName);
}
if (s->read(mmapAddr + pos, size))
{
int pageNo = pos >> dbModMapBlockBits;
if (updateCounter < pageUpdateCounter)
{
updateCounter = pageUpdateCounter;
}
while (size > 0)
{
currUpdateCount[pageNo++] = pageUpdateCounter;
size -= dbModMapBlockSize;
}
return true;
}
return false;
}
bool dbFile::concurrentUpdatePages(socket_t* s, size_t pos, int pageUpdateCounter, int size)
{
if (pos + size > mmapSize)
{
size_t newSize = pos + size > mmapSize*2 ? pos + size : mmapSize*2;
db->beginTransaction(dbDatabase::dbCommitLock);
setSize(newSize, sharedName);
((dbHeader*)mmapAddr)->size = newSize;
db->version = db->monitor->version += 1;
}
if (pos == 0 && size <= pageSize)
{
if (!s->read(rootPage, size))
{
return false;
}
if (((dbHeader*)rootPage)->curr != ((dbHeader*)mmapAddr)->curr)
{
db->beginTransaction(dbDatabase::dbCommitLock);
memcpy(mmapAddr, rootPage, size);
// now readers will see updated data
db->monitor->curr ^= 1;
db->endTransaction();
}
else
{
memcpy(mmapAddr, rootPage, size);
}
}
else
{
if (!s->read(mmapAddr + pos, size))
{
return false;
}
}
int pageNo = pos >> dbModMapBlockBits;
if (updateCounter < pageUpdateCounter)
{
updateCounter = pageUpdateCounter;
}
while (size > 0)
{
currUpdateCount[pageNo++] = pageUpdateCounter;
size -= dbModMapBlockSize;
}
return true;
}
int dbFile::getUpdateCountTableSize()
{
int nPages = mmapSize >> dbModMapBlockBits;
while (--nPages >= 0 && diskUpdateCount[nPages] == 0)
;
return nPages + 1;
}
int dbFile::getMaxPages()
{
return 1 << (dbDatabaseOffsetBits - dbModMapBlockBits);
}
void dbFile::startSync()
{
#ifndef DISKLESS_CONFIGURATION
doSync = true;
closing = false;
syncEvent.reset();
syncThread.create(startSyncToDisk, this);
#endif
}
void dbFile::stopSync()
{
#ifndef DISKLESS_CONFIGURATION
doSync = false;
syncEvent.signal();
syncThread.join();
#endif
}
void thread_proc dbFile::startSyncToDisk(void* arg)
{
((dbFile*)arg)->syncToDisk();
}
void thread_proc dbFile::startRecovery(void* arg)
{
RecoveryRequest* rr = (RecoveryRequest*)arg;
rr->file->doRecovery(rr->nodeId, rr->updateCounters, rr->nPages);
{
dbCriticalSection cs(rr->file->replCS);
if (--rr->file->nRecovered == 0)
{
rr->file->recoveredEvent.signal();
}
}
delete rr;
}
void dbFile::recovery(int nodeId, int* updateCounters, int nPages)
{
RecoveryRequest* rr = new RecoveryRequest;
rr->nodeId = nodeId;
rr->updateCounters = updateCounters;
rr->nPages = nPages;
rr->file = this;
{
dbCriticalSection cs(replCS);
if (nRecovered++ == 0)
{
recoveredEvent.reset();
}
}
dbThread recoveryThread;
recoveryThread.create(startRecovery, rr);
recoveryThread.setPriority(dbThread::THR_PRI_HIGH);
}
void dbFile::doRecovery(int nodeId, int* updateCounters, int nPages)
{
ReplicationRequest rr;
memset(updateCounters+nPages, 0, (getMaxPages() - nPages)*sizeof(int));
int i, j, n;
if (db->con[nodeId].reqSock == NULL)
{
char buf[256];
socket_t* s = socket_t::connect(db->serverURL[nodeId],
socket_t::sock_global_domain,
dbReplicatedDatabase::dbRecoveryConnectionAttempts);
if (!s->is_ok())
{
s->get_error_text(buf, sizeof buf);
dbTrace("Failed to establish connection with node %d: %s\n",
nodeId, buf);
delete s;
return;
}
rr.op = ReplicationRequest::RR_GET_STATUS;
rr.nodeId = db->id;
if (!s->write(&rr, sizeof rr) || !s->read(&rr, sizeof rr))
{
s->get_error_text(buf, sizeof buf);
dbTrace("Connection with node %d is broken: %s\n",
nodeId, buf);
delete s;
return;
}
if (rr.op != ReplicationRequest::RR_STATUS && rr.status != dbReplicatedDatabase::ST_STANDBY)
{
dbTrace("Unexpected response from standby node %d: code %d status %d\n",
nodeId, rr.op, rr.status);
delete s;
return;
}
else
{
db->addConnection(nodeId, s);
}
}
while (true)
{
int maxUpdateCount = 0;
{
dbCriticalSection cs(syncCS);
for (i = 0, j = 0, n = mmapSize >> dbModMapBlockBits; i < n; i++)
{
if (updateCounters[i] > currUpdateCount[i])
{
updateCounters[i] = 0;
}
else
{
if (updateCounters[i] > maxUpdateCount)
{
maxUpdateCount = updateCounters[i];
}
}
if (i > j && (currUpdateCount[i] <= updateCounters[i] || i-j >= dbMaxSyncSegmentSize
|| currUpdateCount[i] != currUpdateCount[j]))
{
rr.op = ReplicationRequest::RR_UPDATE_PAGE;
rr.nodeId = nodeId;
rr.size = (i-j)*dbModMapBlockSize;
rr.page.offs = (size_t)j << dbModMapBlockBits;
rr.page.updateCount = currUpdateCount[j];
if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size))
{
delete[] updateCounters;
return;
}
j = i;
}
if (currUpdateCount[i] > updateCounters[i])
{
if (currUpdateCount[i] > maxUpdateCount)
{
maxUpdateCount = currUpdateCount[i];
}
updateCounters[i] = currUpdateCount[i];
}
else
{
j = i + 1;
}
}
if (i != j)
{
rr.op = ReplicationRequest::RR_UPDATE_PAGE;
rr.nodeId = nodeId;
rr.size = (i-j)*dbModMapBlockSize;
rr.page.offs = (size_t)j << dbModMapBlockBits;
rr.page.updateCount = currUpdateCount[j];
if (!db->writeReq(nodeId, rr, mmapAddr + rr.page.offs, rr.size))
{
delete[] updateCounters;
return;
}
}
}
{
dbCriticalSection cs(replCS);
if (maxUpdateCount == updateCounter)
{
dbTrace("Complete recovery of node %d\n", nodeId);
delete[] updateCounters;
rr.op = ReplicationRequest::RR_STATUS;
rr.nodeId = nodeId;
db->con[nodeId].status = rr.status = dbReplicatedDatabase::ST_STANDBY;
for (i = 0, n = db->nServers; i < n; i++)
{
if (db->con[i].status != dbReplicatedDatabase::ST_OFFLINE && i != db->id)
{
db->writeReq(i, rr);
}
}
return;
}
}
}
}
#endif
bool dbFile::write(void const* buf, size_t size)
{
size_t writtenBytes;
bool result = write(buf, writtenBytes, size) == ok && writtenBytes == size;
assert(result);
return result;
}
#ifdef _WIN32
class OS_info : public OSVERSIONINFO
{
public:
OS_info()
{
dwOSVersionInfoSize = sizeof(OSVERSIONINFO);
GetVersionEx(this);
}
};
static OS_info osinfo;
#define BAD_POS 0xFFFFFFFF // returned by SetFilePointer and GetFileSize
int dbFile::erase()
{
return ok;
}
int dbFile::open(char const* fileName, char const* sharedName, bool readonly,
size_t initSize, bool replicationSupport)
{
int status;
size_t fileSize;
#ifndef DISKLESS_CONFIGURATION
fh = CreateFile(fileName, readonly ? GENERIC_READ : (GENERIC_READ|GENERIC_WRITE),
FILE_SHARE_READ | FILE_SHARE_WRITE, FASTDB_SECURITY_ATTRIBUTES,
readonly ? OPEN_EXISTING : OPEN_ALWAYS,
FILE_FLAG_RANDOM_ACCESS
#ifdef NO_MMAP
|FILE_FLAG_NO_BUFFERING
#endif
#if 0 // not needed as we do explicit flush ???
|FILE_FLAG_WRITE_THROUGH
#endif
, NULL);
if (fh == INVALID_HANDLE_VALUE)
{
return GetLastError();
}
DWORD highSize;
fileSize = GetFileSize(fh, &highSize);
if (fileSize == BAD_POS && (status = GetLastError()) != ok)
{
CloseHandle(fh);
return status;
}
assert(highSize == 0);
mmapSize = fileSize;
this->sharedName = new char[strlen(sharedName) + 1];
strcpy(this->sharedName, sharedName);
if (!readonly && fileSize == 0)
{
mmapSize = initSize;
}
#else
fh = INVALID_HANDLE_VALUE;
this->sharedName = NULL;
mmapSize = fileSize = initSize;
#endif
#if defined(NO_MMAP)
if (fileSize < mmapSize && !readonly)
{
if (SetFilePointer(fh, mmapSize, NULL, FILE_BEGIN) != mmapSize || !SetEndOfFile(fh))
{
status = GetLastError();
CloseHandle(fh);
return status;
}
}
mmapAddr = (char*)VirtualAlloc(NULL, mmapSize, MEM_COMMIT|MEM_RESERVE,
PAGE_READWRITE);
#ifdef DISKLESS_CONFIGURATION
if (mmapAddr == NULL)
#else
DWORD readBytes;
if (mmapAddr == NULL
|| !ReadFile(fh, mmapAddr, fileSize, &readBytes, NULL) || readBytes != fileSize)
#endif
{
status = GetLastError();
if (fh != INVALID_HANDLE_VALUE)
{
CloseHandle(fh);
}
return status;
}
memset(mmapAddr+fileSize, 0, mmapSize - fileSize);
mh = NULL;
#else
mh = CreateFileMapping(fh, FASTDB_SECURITY_ATTRIBUTES, readonly ? PAGE_READONLY : PAGE_READWRITE,
0, mmapSize, sharedName);
status = GetLastError();
if (mh == NULL)
{
if (fh != INVALID_HANDLE_VALUE)
{
CloseHandle(fh);
}
return status;
}
mmapAddr = (char*)MapViewOfFile(mh, readonly
? FILE_MAP_READ : FILE_MAP_ALL_ACCESS,
0, 0, 0);
if (mmapAddr == NULL)
{
status = GetLastError();
CloseHandle(mh);
if (fh != INVALID_HANDLE_VALUE)
{
CloseHandle(fh);
}
return status;
}
if (status != ERROR_ALREADY_EXISTS && mmapSize > fileSize)
// && osinfo.dwPlatformId != VER_PLATFORM_WIN32_NT)
{
// Windows 95 doesn't initialize pages
memset(mmapAddr+fileSize, 0, mmapSize - fileSize);
}
#endif
#if defined(NO_MMAP) || defined(REPLICATION_SUPPORT)
SYSTEM_INFO systemInfo;
GetSystemInfo(&systemInfo);
pageSize = systemInfo.dwPageSize;
pageMapSize = (mmapSize + dbModMapBlockSize*32 - 1) >> (dbModMapBlockBits + 5);
pageMap = new int[pageMapSize];
memset(pageMap, 0, pageMapSize*sizeof(int));
#endif
#if defined(REPLICATION_SUPPORT)
db = NULL;
int nPages = getMaxPages();
currUpdateCount = new int[nPages];
if (replicationSupport)
{
char* cFileName = new char[strlen(fileName) + 5];
strcat(strcpy(cFileName, fileName), ".cnt");
#ifdef DISKLESS_CONFIGURATION
cfh = INVALID_HANDLE_VALUE;
#else
cfh = CreateFile(cFileName, GENERIC_READ|GENERIC_WRITE,
0, NULL, OPEN_ALWAYS,
FILE_FLAG_RANDOM_ACCESS|FILE_FLAG_WRITE_THROUGH,
NULL);
delete[] cFileName;
if (cfh == INVALID_HANDLE_VALUE)
{
status = errno;
return status;
}
#endif
cmh = CreateFileMapping(cfh, NULL, PAGE_READWRITE, 0,
nPages*sizeof(int), NULL);
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -