?? pgserverthread.java
字號(hào):
/*
* Copyright 2004-2008 H2 Group. Licensed under the H2 License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.server.pg;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.io.StringReader;
import java.net.Socket;
import java.sql.Connection;
import java.sql.ParameterMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.HashMap;
import java.util.HashSet;
import org.h2.constant.SysProperties;
import org.h2.engine.ConnectionInfo;
import org.h2.jdbc.JdbcConnection;
import org.h2.util.IOUtils;
import org.h2.util.JdbcUtils;
import org.h2.util.ObjectUtils;
import org.h2.util.ScriptReader;
/**
* One server thread is opened for each client.
*/
public class PgServerThread implements Runnable {
private static final int TYPE_STRING = Types.VARCHAR;
private PgServer server;
private Socket socket;
private Connection conn;
private boolean stop;
private DataInputStream dataInRaw;
private DataInputStream dataIn;
private OutputStream out;
private int messageType;
private ByteArrayOutputStream outBuffer;
private DataOutputStream dataOut;
private Thread thread;
private boolean initDone;
private String userName;
private String databaseName;
private int processId;
private String clientEncoding = "UTF-8";
private String dateStyle = "ISO";
private HashMap prepared = new HashMap();
private HashMap portals = new HashMap();
private HashSet types = new HashSet();
PgServerThread(Socket socket, PgServer server) {
this.server = server;
this.socket = socket;
}
public void run() {
try {
server.log("Connect");
InputStream ins = socket.getInputStream();
out = socket.getOutputStream();
dataInRaw = new DataInputStream(ins);
while (!stop) {
process();
out.flush();
}
} catch (EOFException e) {
// more or less normal disconnect
} catch (Exception e) {
error("process", e);
server.logError(e);
} finally {
server.log("Disconnect");
close();
}
}
private String readString() throws IOException {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
while (true) {
int x = dataIn.read();
if (x <= 0) {
break;
}
buff.write(x);
}
return new String(buff.toByteArray(), getEncoding());
}
private int readInt() throws IOException {
return dataIn.readInt();
}
private int readShort() throws IOException {
return dataIn.readShort();
}
private byte readByte() throws IOException {
return dataIn.readByte();
}
private void readFully(byte[] buff) throws IOException {
dataIn.readFully(buff);
}
private void error(String message, Exception e) {
if (e != null) {
server.logError(e);
}
}
private void process() throws IOException {
int x;
if (initDone) {
x = dataInRaw.read();
if (x < 0) {
stop = true;
return;
}
} else {
x = 0;
}
int len = dataInRaw.readInt();
len -= 4;
byte[] data = new byte[len];
dataInRaw.readFully(data, 0, len);
dataIn = new DataInputStream(new ByteArrayInputStream(data, 0, len));
switch (x) {
case 0:
server.log("Init");
int version = readInt();
if (version == 80877102) {
server.log("CancelRequest (not supported)");
server.log(" pid: " + readInt());
server.log(" key: " + readInt());
error("CancelRequest", null);
} else if (version == 80877103) {
server.log("SSLRequest");
out.write('N');
} else {
server.log("StartupMessage");
server.log(" version " + version + " (" + (version >> 16) + "." + (version & 0xff) + ")");
while (true) {
String param = readString();
if (param.length() == 0) {
break;
}
String value = readString();
if ("user".equals(param)) {
this.userName = value;
} else if ("database".equals(param)) {
this.databaseName = value;
} else if ("client_encoding".equals(param)) {
clientEncoding = value;
} else if ("DateStyle".equals(param)) {
dateStyle = value;
}
// server.log(" param " + param + "=" + value);
}
sendAuthenticationCleartextPassword();
initDone = true;
}
break;
case 'p': {
server.log("PasswordMessage");
String password = readString();
try {
ConnectionInfo ci = new ConnectionInfo(databaseName);
String baseDir = server.getBaseDir();
if (baseDir == null) {
baseDir = SysProperties.getBaseDir();
}
if (baseDir != null) {
ci.setBaseDir(baseDir);
}
if (server.getIfExists()) {
ci.setProperty("IFEXISTS", "TRUE");
}
ci.setProperty("MODE", "PostgreSQL");
ci.setOriginalURL("jdbc:h2:" + databaseName + ";MODE=PostgreSQL");
ci.setUserName(userName);
ci.setProperty("PASSWORD", password);
ci.readPasswords();
conn = new JdbcConnection(ci, false);
// can not do this because when called inside
// DriverManager.getConnection, a deadlock occurs
// conn = DriverManager.getConnection(url, userName, password);
initDb();
sendAuthenticationOk();
} catch (SQLException e) {
e.printStackTrace();
stop = true;
}
break;
}
case 'P': {
server.log("Parse");
Prepared p = new Prepared();
p.name = readString();
p.sql = getSQL(readString());
int count = readShort();
p.paramType = new int[count];
for (int i = 0; i < count; i++) {
int type = readInt();
checkType(type);
p.paramType[i] = type;
}
try {
p.prep = conn.prepareStatement(p.sql);
prepared.put(p.name, p);
sendParseComplete();
} catch (SQLException e) {
sendErrorResponse(e);
}
break;
}
case 'B': {
server.log("Bind");
Portal portal = new Portal();
portal.name = readString();
String prepName = readString();
Prepared prep = (Prepared) prepared.get(prepName);
if (prep == null) {
sendErrorResponse("Portal not found");
break;
}
portal.sql = prep.sql;
portal.prep = prep.prep;
portals.put(portal.name, portal);
int formatCodeCount = readShort();
int[] formatCodes = new int[formatCodeCount];
for (int i = 0; i < formatCodeCount; i++) {
formatCodes[i] = readShort();
}
int paramCount = readShort();
for (int i = 0; i < paramCount; i++) {
int paramLen = readInt();
byte[] d2 = new byte[paramLen];
readFully(d2);
try {
setParameter(portal.prep, i, d2, formatCodes);
} catch (SQLException e) {
sendErrorResponse(e);
}
}
int resultCodeCount = readShort();
portal.resultColumnFormat = new int[resultCodeCount];
for (int i = 0; i < resultCodeCount; i++) {
portal.resultColumnFormat[i] = readShort();
}
sendBindComplete();
break;
}
case 'D': {
char type = (char) readByte();
String name = readString();
server.log("Describe");
PreparedStatement prep;
if (type == 'S') {
Prepared p = (Prepared) prepared.get(name);
if (p == null) {
sendErrorResponse("Prepared not found: " + name);
}
prep = p.prep;
sendParameterDescription(p);
} else if (type == 'P') {
Portal p = (Portal) portals.get(name);
if (p == null) {
sendErrorResponse("Portal not found: " + name);
}
prep = p.prep;
try {
ResultSetMetaData meta = prep.getMetaData();
sendRowDescription(meta);
} catch (SQLException e) {
sendErrorResponse(e);
}
} else {
error("expected S or P, got " + type, null);
sendErrorResponse("expected S or P");
}
break;
}
case 'E': {
String name = readString();
server.log("Execute");
Portal p = (Portal) portals.get(name);
if (p == null) {
sendErrorResponse("Portal not found: " + name);
break;
}
int maxRows = readShort();
PreparedStatement prep = p.prep;
server.log(p.sql);
try {
prep.setMaxRows(maxRows);
boolean result = prep.execute();
if (result) {
try {
ResultSet rs = prep.getResultSet();
ResultSetMetaData meta = rs.getMetaData();
sendRowDescription(meta);
while (rs.next()) {
sendDataRow(p.resultColumnFormat, rs);
}
sendCommandComplete(p.sql, 0);
} catch (SQLException e) {
sendErrorResponse(e);
}
} else {
sendCommandComplete(p.sql, prep.getUpdateCount());
}
} catch (SQLException e) {
sendErrorResponse(e);
}
break;
}
case 'S': {
server.log("Sync");
sendReadyForQuery();
break;
}
case 'Q': {
server.log("Query");
String query = readString();
ScriptReader reader = new ScriptReader(new StringReader(query));
while (true) {
Statement stat = null;
try {
String s = reader.readStatement();
if (s == null) {
break;
}
s = getSQL(s);
stat = conn.createStatement();
boolean result = stat.execute(s);
if (result) {
ResultSet rs = stat.getResultSet();
ResultSetMetaData meta = rs.getMetaData();
sendRowDescription(meta);
while (rs.next()) {
sendDataRow(null, rs);
}
sendCommandComplete(s, 0);
} else {
sendCommandComplete(s, stat.getUpdateCount());
}
} catch (SQLException e) {
sendErrorResponse(e);
} finally {
JdbcUtils.closeSilently(stat);
}
}
sendReadyForQuery();
break;
}
case 'X': {
server.log("Terminate");
close();
break;
}
default:
error("Unsupported: " + x + " (" + (char) x + ")", null);
break;
}
}
private void checkType(int type) {
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -