?? informix.py
字號:
# informix.py# Copyright (C) 2005,2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## coding: gbk## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.phpimport datetimefrom sqlalchemy import sql, schema, exceptions, pool, utilfrom sqlalchemy.sql import compilerfrom sqlalchemy.engine import defaultfrom sqlalchemy import types as sqltypes# for offsetclass informix_cursor(object): def __init__( self , con ): self.__cursor = con.cursor() self.rowcount = 0 def offset( self , n ): if n > 0: self.fetchmany( n ) self.rowcount = self.__cursor.rowcount - n if self.rowcount < 0: self.rowcount = 0 else: self.rowcount = self.__cursor.rowcount def execute( self , sql , params ): if params is None or len( params ) == 0: params = [] return self.__cursor.execute( sql , params ) def __getattr__( self , name ): if name not in ( 'offset' , '__cursor' , 'rowcount' , '__del__' , 'execute' ): return getattr( self.__cursor , name )class InfoNumeric(sqltypes.Numeric): def get_col_spec(self): if not self.precision: return 'NUMERIC' else: return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}class InfoInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER"class InfoSmallInteger(sqltypes.Smallinteger): def get_col_spec(self): return "SMALLINT"class InfoDate(sqltypes.Date): def get_col_spec( self ): return "DATE"class InfoDateTime(sqltypes.DateTime ): def get_col_spec(self): return "DATETIME YEAR TO SECOND" def bind_processor(self, dialect): def process(value): if value is not None: if value.microsecond: value = value.replace( microsecond = 0 ) return value return processclass InfoTime(sqltypes.Time ): def get_col_spec(self): return "DATETIME HOUR TO SECOND" def bind_processor(self, dialect): def process(value): if value is not None: if value.microsecond: value = value.replace( microsecond = 0 ) return value return process def result_processor(self, dialect): def process(value): if isinstance( value , datetime.datetime ): return value.time() else: return value return processclass InfoText(sqltypes.String): def get_col_spec(self): return "VARCHAR(255)"class InfoString(sqltypes.String): def get_col_spec(self): return "VARCHAR(%(length)s)" % {'length' : self.length} def bind_processor(self, dialect): def process(value): if value == '': return None else: return value return processclass InfoChar(sqltypes.CHAR): def get_col_spec(self): return "CHAR(%(length)s)" % {'length' : self.length}class InfoBinary(sqltypes.Binary): def get_col_spec(self): return "BYTE"class InfoBoolean(sqltypes.Boolean): default_type = 'NUM' def get_col_spec(self): return "SMALLINT" def result_processor(self, dialect): def process(value): if value is None: return None return value and True or False return process def bind_processor(self, dialect): def process(value): if value is True: return 1 elif value is False: return 0 elif value is None: return None else: return value and True or False return processcolspecs = { sqltypes.Integer : InfoInteger, sqltypes.Smallinteger : InfoSmallInteger, sqltypes.Numeric : InfoNumeric, sqltypes.Float : InfoNumeric, sqltypes.DateTime : InfoDateTime, sqltypes.Date : InfoDate, sqltypes.Time: InfoTime, sqltypes.String : InfoString, sqltypes.Binary : InfoBinary, sqltypes.Boolean : InfoBoolean, sqltypes.Text : InfoText, sqltypes.CHAR: InfoChar,}ischema_names = { 0 : InfoString, # CHAR 1 : InfoSmallInteger, # SMALLINT 2 : InfoInteger, # INT 3 : InfoNumeric, # Float 3 : InfoNumeric, # SmallFloat 5 : InfoNumeric, # DECIMAL 6 : InfoInteger, # Serial 7 : InfoDate, # DATE 8 : InfoNumeric, # MONEY 10 : InfoDateTime, # DATETIME 11 : InfoBinary, # BYTE 12 : InfoText, # TEXT 13 : InfoString, # VARCHAR 15 : InfoString, # NCHAR 16 : InfoString, # NVARCHAR 17 : InfoInteger, # INT8 18 : InfoInteger, # Serial8 43 : InfoString, # LVARCHAR -1 : InfoBinary, # BLOB -1 : InfoText, # CLOB}def descriptor(): return {'name':'informix', 'description':'Informix', 'arguments':[ ('dsn', 'Data Source Name', None), ('user', 'Username', None), ('password', 'Password', None) ]}class InfoExecutionContext(default.DefaultExecutionContext): # cursor.sqlerrd # 0 - estimated number of rows returned # 1 - serial value after insert or ISAM error code # 2 - number of rows processed # 3 - estimated cost # 4 - offset of the error into the SQL statement # 5 - rowid after insert def post_exec(self): if getattr(self.compiled, "isinsert", False) and self.last_inserted_ids() is None: self._last_inserted_ids = [self.cursor.sqlerrd[1],] elif hasattr( self.compiled , 'offset' ): self.cursor.offset( self.compiled.offset ) super(InfoExecutionContext, self).post_exec() def create_cursor( self ): return informix_cursor( self.connection.connection )class InfoDialect(default.DefaultDialect): # for informix 7.31 max_identifier_length = 18 def __init__(self, use_ansi=True,**kwargs): self.use_ansi = use_ansi default.DefaultDialect.__init__(self, **kwargs) self.paramstyle = 'qmark' def dbapi(cls): import informixdb return informixdb dbapi = classmethod(dbapi) def is_disconnect(self, e): if isinstance(e, self.dbapi.OperationalError): return 'closed the connection' in str(e) or 'connection not open' in str(e) else: return False def do_begin(self , connect ): cu = connect.cursor() cu.execute( 'SET LOCK MODE TO WAIT' ) #cu.execute( 'SET ISOLATION TO REPEATABLE READ' ) def type_descriptor(self, typeobj): return sqltypes.adapt_type(typeobj, colspecs) def create_connect_args(self, url): if url.host: dsn = '%s@%s' % ( url.database , url.host ) else: dsn = url.database if url.username: opt = { 'user':url.username , 'password': url.password } else: opt = {} return ([dsn,], opt ) def create_execution_context(self , *args, **kwargs): return InfoExecutionContext(self, *args, **kwargs) def oid_column_name(self,column): return "rowid" def table_names(self, connection, schema):
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -