?? maxdb.py
字號(hào):
# maxdb.py## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.php"""Support for the MaxDB database.TODO: More module docs! MaxDB support is currently experimental.Overview--------The ``maxdb`` dialect is **experimental** and has only been tested on 7.6.03.007and 7.6.00.037. Of these, **only 7.6.03.007 will work** with SQLAlchemy's ORM.The earlier version has severe ``LEFT JOIN`` limitations and will returnincorrect results from even very simple ORM queries.Only the native Python DB-API is currently supported. ODBC driver supportis a future enhancement.Connecting----------The username is case-sensitive. If you usually connect to thedatabase with sqlcli and other tools in lower case, you likely need touse upper case for DB-API.Implementation Notes--------------------Also check the DatabaseNotes page on the wiki for detailed information.With the 7.6.00.37 driver and Python 2.5, it seems that all DB-APIgenerated exceptions are broken and can cause Python to crash.For 'somecol.in_([])' to work, the IN operator's generation must be changedto cast 'NULL' to a numeric, i.e. NUM(NULL). The DB-API doesn't accept abind parameter there, so that particular generation must inline the NULL value,which depends on [ticket:807].The DB-API is very picky about where bind params may be used in queries.Bind params for some functions (e.g. MOD) need type information supplied.The dialect does not yet do this automatically.Max will occasionally throw up 'bad sql, compile again' exceptions forperfectly valid SQL. The dialect does not currently handle these, moreresearch is needed.MaxDB 7.5 and Sap DB <= 7.4 reportedly do not support schemas. A veryslightly different version of this dialect would be required to supportthose versions, and can easily be added if there is demand. Some otherrequired components such as an Max-aware 'old oracle style' join compiler(thetas with (+) outer indicators) are already done and available forintegration- email the devel list if you're interested in working onthis."""import datetime, itertools, refrom sqlalchemy import exceptions, schema, sql, utilfrom sqlalchemy.sql import operators as sql_operators, expression as sql_exprfrom sqlalchemy.sql import compiler, visitorsfrom sqlalchemy.engine import base as engine_base, defaultfrom sqlalchemy import types as sqltypes__all__ = [ 'MaxString', 'MaxUnicode', 'MaxChar', 'MaxText', 'MaxInteger', 'MaxSmallInteger', 'MaxNumeric', 'MaxFloat', 'MaxTimestamp', 'MaxDate', 'MaxTime', 'MaxBoolean', 'MaxBlob', ]class _StringType(sqltypes.String): _type = None def __init__(self, length=None, encoding=None, **kw): super(_StringType, self).__init__(length=length, **kw) self.encoding = encoding def get_col_spec(self): if self.length is None: spec = 'LONG' else: spec = '%s(%s)' % (self._type, self.length) if self.encoding is not None: spec = ' '.join([spec, self.encoding.upper()]) return spec def bind_processor(self, dialect): if self.encoding == 'unicode': return None else: def process(value): if isinstance(value, unicode): return value.encode(dialect.encoding) else: return value return process def result_processor(self, dialect): def process(value): while True: if value is None: return None elif isinstance(value, unicode): return value elif isinstance(value, str): if self.convert_unicode or dialect.convert_unicode: return value.decode(dialect.encoding) else: return value elif hasattr(value, 'read'): # some sort of LONG, snarf and retry value = value.read(value.remainingLength()) continue else: # unexpected type, return as-is return value return processclass MaxString(_StringType): _type = 'VARCHAR' def __init__(self, *a, **kw): super(MaxString, self).__init__(*a, **kw)class MaxUnicode(_StringType): _type = 'VARCHAR' def __init__(self, length=None, **kw): super(MaxUnicode, self).__init__(length=length, encoding='unicode')class MaxChar(_StringType): _type = 'CHAR'class MaxText(_StringType): _type = 'LONG' def __init__(self, *a, **kw): super(MaxText, self).__init__(*a, **kw) def get_col_spec(self): spec = 'LONG' if self.encoding is not None: spec = ' '.join((spec, self.encoding)) elif self.convert_unicode: spec = ' '.join((spec, 'UNICODE')) return specclass MaxInteger(sqltypes.Integer): def get_col_spec(self): return 'INTEGER'class MaxSmallInteger(MaxInteger): def get_col_spec(self): return 'SMALLINT'class MaxNumeric(sqltypes.Numeric): """The FIXED (also NUMERIC, DECIMAL) data type.""" def __init__(self, precision=None, length=None, **kw): kw.setdefault('asdecimal', True) super(MaxNumeric, self).__init__(length=length, precision=precision, **kw) def bind_processor(self, dialect): return None def get_col_spec(self): if self.length and self.precision: return 'FIXED(%s, %s)' % (self.precision, self.length) elif self.precision: return 'FIXED(%s)' % self.precision else: return 'INTEGER'class MaxFloat(sqltypes.Float): """The FLOAT data type.""" def get_col_spec(self): if self.precision is None: return 'FLOAT' else: return 'FLOAT(%s)' % (self.precision,)class MaxTimestamp(sqltypes.DateTime): def get_col_spec(self): return 'TIMESTAMP' def bind_processor(self, dialect): def process(value): if value is None: return None elif isinstance(value, basestring): return value elif dialect.datetimeformat == 'internal': ms = getattr(value, 'microsecond', 0) return value.strftime("%Y%m%d%H%M%S" + ("%06u" % ms)) elif dialect.datetimeformat == 'iso': ms = getattr(value, 'microsecond', 0) return value.strftime("%Y-%m-%d %H:%M:%S." + ("%06u" % ms)) else: raise exceptions.InvalidRequestError( "datetimeformat '%s' is not supported." % ( dialect.datetimeformat,)) return process def result_processor(self, dialect): def process(value): if value is None: return None elif dialect.datetimeformat == 'internal': return datetime.datetime( *[int(v) for v in (value[0:4], value[4:6], value[6:8], value[8:10], value[10:12], value[12:14], value[14:])]) elif dialect.datetimeformat == 'iso': return datetime.datetime( *[int(v) for v in (value[0:4], value[5:7], value[8:10], value[11:13], value[14:16], value[17:19], value[20:])]) else: raise exceptions.InvalidRequestError( "datetimeformat '%s' is not supported." % ( dialect.datetimeformat,)) return processclass MaxDate(sqltypes.Date): def get_col_spec(self): return 'DATE' def bind_processor(self, dialect): def process(value): if value is None: return None elif isinstance(value, basestring): return value elif dialect.datetimeformat == 'internal': return value.strftime("%Y%m%d") elif dialect.datetimeformat == 'iso': return value.strftime("%Y-%m-%d") else: raise exceptions.InvalidRequestError( "datetimeformat '%s' is not supported." % ( dialect.datetimeformat,)) return process def result_processor(self, dialect): def process(value): if value is None: return None elif dialect.datetimeformat == 'internal': return datetime.date( *[int(v) for v in (value[0:4], value[4:6], value[6:8])]) elif dialect.datetimeformat == 'iso': return datetime.date( *[int(v) for v in (value[0:4], value[5:7], value[8:10])]) else: raise exceptions.InvalidRequestError( "datetimeformat '%s' is not supported." % ( dialect.datetimeformat,)) return processclass MaxTime(sqltypes.Time): def get_col_spec(self): return 'TIME' def bind_processor(self, dialect): def process(value): if value is None: return None elif isinstance(value, basestring): return value elif dialect.datetimeformat == 'internal': return value.strftime("%H%M%S") elif dialect.datetimeformat == 'iso': return value.strftime("%H-%M-%S") else: raise exceptions.InvalidRequestError( "datetimeformat '%s' is not supported." % ( dialect.datetimeformat,)) return process def result_processor(self, dialect): def process(value): if value is None: return None elif dialect.datetimeformat == 'internal': t = datetime.time( *[int(v) for v in (value[0:4], value[4:6], value[6:8])]) return t elif dialect.datetimeformat == 'iso': return datetime.time( *[int(v) for v in (value[0:4], value[5:7], value[8:10])]) else: raise exceptions.InvalidRequestError( "datetimeformat '%s' is not supported." % ( dialect.datetimeformat,)) return processclass MaxBoolean(sqltypes.Boolean): def get_col_spec(self): return 'BOOLEAN'class MaxBlob(sqltypes.Binary): def get_col_spec(self): return 'LONG BYTE' def bind_processor(self, dialect): def process(value): if value is None: return None else: return str(value) return process def result_processor(self, dialect): def process(value): if value is None: return None else: return value.read(value.remainingLength()) return processcolspecs = { sqltypes.Integer: MaxInteger, sqltypes.Smallinteger: MaxSmallInteger, sqltypes.Numeric: MaxNumeric, sqltypes.Float: MaxFloat, sqltypes.DateTime: MaxTimestamp, sqltypes.Date: MaxDate, sqltypes.Time: MaxTime, sqltypes.String: MaxString, sqltypes.Binary: MaxBlob, sqltypes.Boolean: MaxBoolean, sqltypes.Text: MaxText, sqltypes.CHAR: MaxChar, sqltypes.TIMESTAMP: MaxTimestamp, sqltypes.BLOB: MaxBlob, sqltypes.Unicode: MaxUnicode, }ischema_names = { 'boolean': MaxBoolean, 'char': MaxChar, 'character': MaxChar, 'date': MaxDate, 'fixed': MaxNumeric, 'float': MaxFloat, 'int': MaxInteger,
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -