?? server.cpp
字號:
//-< SERVER.CPP >----------------------------------------------------*--------*
// FastDB Version 1.0 (c) 1999 GARRET * ? *
// (Main Memory Database Management System) * /\| *
// * / \ *
// Created: 13-Jan-2000 K.A. Knizhnik * / [] \ *
// Last update: 13-Jan-2000 K.A. Knizhnik * GARRET *
//-------------------------------------------------------------------*--------*
// CLI multithreaded server implementation
//-------------------------------------------------------------------*--------*
#include <ctype.h>
#include "fastdb.h"
#include "compiler.h"
#include "wwwapi.h"
#include "subsql.h"
#include "symtab.h"
#include "hashtab.h"
#include "ttree.h"
#include "cli.h"
#include "cliproto.h"
#include "server.h"
#include "localcli.h"
#if !THREADS_SUPPORTED
#error Server requires multithreading support
#endif
int dbColumnBinding::unpackArray(char* dst, size_t offs)
{
int len = this->len;
int i;
if (cliType >= cli_array_of_oid)
{
switch (sizeof_type[cliType - cli_array_of_oid])
{
case 1:
memcpy(dst + offs, ptr + 4, len);
break;
case 2:
for (i = 0; i < len; i++)
{
unpack2(dst + offs + i*2, ptr + 4 + i*2);
}
break;
case 4:
for (i = 0; i < len; i++)
{
unpack4(dst + offs + i*4, ptr + 4 + i*4);
}
break;
case 8:
for (i = 0; i < len; i++)
{
unpack8(dst + offs + i*8, ptr + 4 + i*8);
}
break;
default:
assert(false);
}
}
else
{ // string
memcpy(dst + offs, ptr + 4, len);
}
return len;
}
void dbColumnBinding::unpackScalar(char* dst)
{
if (cliType == cli_autoincrement)
{
assert(fd->type == dbField::tpInt4);
#ifdef AUTOINCREMENT_SUPPORT
*(int4*)(dst+fd->dbsOffs) = fd->defTable->autoincrementCount;
#else
*(int4*)(dst+fd->dbsOffs) = ((dbTable*)fd->defTable->db->getRow(fd->defTable->tableId))->nRows;
#endif
return;
}
switch (fd->type)
{
case dbField::tpBool:
case dbField::tpInt1:
switch (sizeof_type[cliType])
{
case 1:
*(dst + fd->dbsOffs) = *ptr;
break;
case 2:
*(dst + fd->dbsOffs) = (char)unpack2(ptr);
break;
case 4:
*(dst + fd->dbsOffs) = (char)unpack4(ptr);
break;
case 8:
*(dst + fd->dbsOffs) = (char)unpack8(ptr);
break;
default:
assert(false);
}
break;
case dbField::tpInt2:
switch (sizeof_type[cliType])
{
case 1:
*(int2*)(dst+fd->dbsOffs) = *ptr;
break;
case 2:
unpack2(dst+fd->dbsOffs, ptr);
break;
case 4:
*(int2*)(dst+fd->dbsOffs) = (int2)unpack4(ptr);
break;
case 8:
*(int2*)(dst+fd->dbsOffs) = (int2)unpack8(ptr);
break;
default:
assert(false);
}
break;
case dbField::tpInt4:
switch (sizeof_type[cliType])
{
case 1:
*(int4*)(dst+fd->dbsOffs) = *ptr;
break;
case 2:
*(int4*)(dst+fd->dbsOffs) = unpack2(ptr);
break;
case 4:
unpack4(dst+fd->dbsOffs, ptr);
break;
case 8:
*(int4*)(dst+fd->dbsOffs) = (int4)unpack8(ptr);
break;
default:
assert(false);
}
break;
case dbField::tpInt8:
switch (sizeof_type[cliType])
{
case 1:
*(db_int8*)(dst+fd->dbsOffs) = *ptr;
break;
case 2:
*(db_int8*)(dst+fd->dbsOffs) = unpack2(ptr);
break;
case 4:
*(db_int8*)(dst+fd->dbsOffs) = unpack4(ptr);
break;
case 8:
unpack8(dst+fd->dbsOffs, ptr);
break;
default:
assert(false);
}
break;
case dbField::tpReal4:
switch (cliType)
{
case cli_real4:
unpack4(dst+fd->dbsOffs, ptr);
break;
case cli_real8:
{
real8 temp;
unpack8((char*)&temp, ptr);
*(real4*)(dst + fd->dbsOffs) = (real4)temp;
}
break;
default:
assert(false);
}
break;
case dbField::tpReal8:
switch (cliType)
{
case cli_real4:
{
real4 temp;
unpack4((char*)&temp, ptr);
*(real8*)(dst + fd->dbsOffs) = temp;
}
break;
case cli_real8:
unpack8(dst+fd->dbsOffs, ptr);
break;
default:
assert(false);
}
break;
default:
assert(false);
}
}
void dbStatement::reset()
{
dbColumnBinding *cb, *next;
for (cb = columns; cb != NULL; cb = next)
{
next = cb->next;
delete cb;
}
columns = NULL;
delete[] params;
params = NULL;
delete cursor;
cursor = NULL;
query.reset();
table = NULL;
}
int dbQueryScanner::get
()
{
int i = 0, ch, digits;
do
{
if ((ch = *p++) == '\0')
{
return tkn_eof;
}
}
while (isspace(ch));
if (ch == '*')
{
return tkn_all;
}
else if (isdigit(ch) || ch == '+' || ch == '-')
{
do
{
buf[i++] = ch;
if (i == dbQueryMaxIdLength)
{
// Numeric constant too long
return tkn_error;
}
ch = *p++;
}
while (ch != '\0'
&& (isdigit(ch) || ch == '+' || ch == '-' || ch == 'e' ||
ch == 'E' || ch == '.'));
p -= 1;
buf[i] = '\0';
if (sscanf(buf, INT8_FORMAT "%n", &ival, &digits) != 1)
{
// Bad integer constant
return tkn_error;
}
if (digits != i)
{
if (sscanf(buf, "%lf%n", &fval, &digits) != 1 || digits != i)
{
// Bad float constant
return tkn_error;
}
return tkn_fconst;
}
return tkn_iconst;
}
else if (isalpha(ch) || ch == '$' || ch == '_')
{
do
{
buf[i++] = ch;
if (i == dbQueryMaxIdLength)
{
// Identifier too long
return tkn_error;
}
ch = *p++;
}
while (ch != EOF && (isalnum(ch) || ch == '$' || ch == '_'));
p -= 1;
buf[i] = '\0';
ident = buf;
return dbSymbolTable::add
(ident, tkn_ident);
}
else
{
// Invalid symbol
return tkn_error;
}
}
dbServer* dbServer::chain;
inline dbStatement* dbServer::findStatement(dbSession* session, int stmt_id)
{
for (dbStatement* stmt = session->stmts; stmt != NULL; stmt = stmt->next)
{
if (stmt->id == stmt_id)
{
return stmt;
}
}
return NULL;
}
void thread_proc dbServer::serverThread(void* arg)
{
((dbServer*)arg)->serveClient();
}
void thread_proc dbServer::acceptLocalThread(void* arg)
{
dbServer* server = (dbServer*)arg;
server->acceptConnection(server->localAcceptSock);
}
void thread_proc dbServer::acceptGlobalThread(void* arg)
{
dbServer* server = (dbServer*)arg;
server->acceptConnection(server->globalAcceptSock);
}
dbServer::dbServer(dbDatabase* db,
char const* serverURL,
int optimalNumberOfThreads,
int connectionQueueLen)
{
char buf[256];
next = chain;
chain = this;
this->db = db;
this->optimalNumberOfThreads = optimalNumberOfThreads;
this->URL = new char[strlen(serverURL)+1];
strcpy(URL, serverURL);
globalAcceptSock =
socket_t::create_global(serverURL, connectionQueueLen);
if (!globalAcceptSock->is_ok())
{
globalAcceptSock->get_error_text(buf, sizeof buf);
dbTrace("Failed to create global socket: %s\n", buf);
delete globalAcceptSock;
globalAcceptSock = NULL;
}
localAcceptSock =
socket_t::create_local(serverURL, connectionQueueLen);
if (!localAcceptSock->is_ok())
{
localAcceptSock->get_error_text(buf, sizeof buf);
dbTrace("Failed to create local socket: %s\n", buf);
delete localAcceptSock;
localAcceptSock = NULL;
}
freeList = activeList = waitList = NULL;
waitListLength = 0;
}
dbServer* dbServer::find(char const* URL)
{
for (dbServer* server = chain; server != NULL; server = server->next)
{
if (strcmp(URL, server->URL) == 0)
{
return server;
}
}
return NULL;
}
void dbServer::cleanup()
{
dbServer *server, *next;
for (server = chain; server != NULL; server = next)
{
next = server->next;
delete server;
}
}
void dbServer::start()
{
nActiveThreads = nIdleThreads = 0;
cancelWait = cancelSession = cancelAccept = false;
go.open();
done.open();
if (globalAcceptSock != NULL)
{
globalAcceptThread.create(acceptGlobalThread, this);
}
if (localAcceptSock != NULL)
{
localAcceptThread.create(acceptLocalThread, this);
}
}
void dbServer::stop()
{
cancelAccept = true;
if (globalAcceptSock != NULL)
{
globalAcceptSock->cancel_accept();
globalAcceptThread.join();
}
delete globalAcceptSock;
globalAcceptSock = NULL;
if (localAcceptSock != NULL)
{
localAcceptSock->cancel_accept();
localAcceptThread.join();
}
delete localAcceptSock;
localAcceptSock = NULL;
dbCriticalSection cs(mutex);
cancelSession = true;
while (activeList != NULL)
{
activeList->sock->shutdown();
done.wait(mutex);
}
cancelWait = true;
while (nIdleThreads != 0)
{
go.signal();
done.wait(mutex);
}
while (waitList != NULL)
{
dbSession* next = waitList->next;
delete waitList->sock;
waitList->next = freeList;
freeList = waitList;
waitList = next;
}
waitListLength = 0;
assert(nActiveThreads == 0);
done.close();
go.close();
}
bool dbServer::freeze(dbSession* session, int stmt_id)
{
dbStatement* stmt = findStatement(session, stmt_id);
int4 response = cli_ok;
if (stmt == NULL || stmt->cursor == NULL)
{
response = cli_bad_descriptor;
}
else
{
stmt->cursor->freeze();
}
pack4(response);
return session->sock->write(&response, sizeof response);
}
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -