?? oracle.py
字號:
# oracle.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.phpimport datetime, random, refrom sqlalchemy import util, sql, schema, exceptions, loggingfrom sqlalchemy.engine import default, basefrom sqlalchemy.sql import compiler, visitorsfrom sqlalchemy.sql import operators as sql_operators, functions as sql_functionsfrom sqlalchemy import types as sqltypesclass OracleNumeric(sqltypes.Numeric): def get_col_spec(self): if self.precision is None: return "NUMERIC" else: return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}class OracleInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER"class OracleSmallInteger(sqltypes.Smallinteger): def get_col_spec(self): return "SMALLINT"class OracleDate(sqltypes.Date): def get_col_spec(self): return "DATE" def bind_processor(self, dialect): return None def result_processor(self, dialect): def process(value): if not isinstance(value, datetime.datetime): return value else: return value.date() return processclass OracleDateTime(sqltypes.DateTime): def get_col_spec(self): return "DATE" def result_processor(self, dialect): def process(value): if value is None or isinstance(value,datetime.datetime): return value else: # convert cx_oracle datetime object returned pre-python 2.4 return datetime.datetime(value.year,value.month, value.day,value.hour, value.minute, value.second) return process# Note:# Oracle DATE == DATETIME# Oracle does not allow milliseconds in DATE# Oracle does not support TIME columns# only if cx_oracle contains TIMESTAMPclass OracleTimestamp(sqltypes.TIMESTAMP): def get_col_spec(self): return "TIMESTAMP" def get_dbapi_type(self, dialect): return dialect.TIMESTAMP def result_processor(self, dialect): def process(value): if value is None or isinstance(value,datetime.datetime): return value else: # convert cx_oracle datetime object returned pre-python 2.4 return datetime.datetime(value.year,value.month, value.day,value.hour, value.minute, value.second) return processclass OracleString(sqltypes.String): def get_col_spec(self): return "VARCHAR(%(length)s)" % {'length' : self.length}class OracleText(sqltypes.Text): def get_dbapi_type(self, dbapi): return dbapi.CLOB def get_col_spec(self): return "CLOB" def result_processor(self, dialect): super_process = super(OracleText, self).result_processor(dialect) lob = dialect.dbapi.LOB def process(value): if isinstance(value, lob): if super_process: return super_process(value.read()) else: return value.read() else: if super_process: return super_process(value) else: return value return processclass OracleRaw(sqltypes.Binary): def get_col_spec(self): return "RAW(%(length)s)" % {'length' : self.length}class OracleChar(sqltypes.CHAR): def get_col_spec(self): return "CHAR(%(length)s)" % {'length' : self.length}class OracleBinary(sqltypes.Binary): def get_dbapi_type(self, dbapi): return dbapi.BLOB def get_col_spec(self): return "BLOB" def bind_processor(self, dialect): return None def result_processor(self, dialect): lob = dialect.dbapi.LOB def process(value): if isinstance(value, lob): return value.read() else: return value return processclass OracleBoolean(sqltypes.Boolean): def get_col_spec(self): return "SMALLINT" def result_processor(self, dialect): def process(value): if value is None: return None return value and True or False return process def bind_processor(self, dialect): def process(value): if value is True: return 1 elif value is False: return 0 elif value is None: return None else: return value and True or False return processcolspecs = { sqltypes.Integer : OracleInteger, sqltypes.Smallinteger : OracleSmallInteger, sqltypes.Numeric : OracleNumeric, sqltypes.Float : OracleNumeric, sqltypes.DateTime : OracleDateTime, sqltypes.Date : OracleDate, sqltypes.String : OracleString, sqltypes.Binary : OracleBinary, sqltypes.Boolean : OracleBoolean, sqltypes.Text : OracleText, sqltypes.TIMESTAMP : OracleTimestamp, sqltypes.CHAR: OracleChar,}ischema_names = { 'VARCHAR2' : OracleString, 'DATE' : OracleDateTime, 'DATETIME' : OracleDateTime, 'NUMBER' : OracleNumeric, 'BLOB' : OracleBinary, 'CLOB' : OracleText, 'TIMESTAMP' : OracleTimestamp, 'RAW' : OracleRaw, 'FLOAT' : OracleNumeric, 'DOUBLE PRECISION' : OracleNumeric, 'LONG' : OracleText,}def descriptor(): return {'name':'oracle', 'description':'Oracle', 'arguments':[ ('dsn', 'Data Source Name', None), ('user', 'Username', None), ('password', 'Password', None) ]}class OracleExecutionContext(default.DefaultExecutionContext): def pre_exec(self): super(OracleExecutionContext, self).pre_exec() if self.dialect.auto_setinputsizes: self.set_input_sizes() if self.compiled_parameters is not None and len(self.compiled_parameters) == 1: for key in self.compiled.binds: bindparam = self.compiled.binds[key] name = self.compiled.bind_names[bindparam] value = self.compiled_parameters[0][name] if bindparam.isoutparam: dbtype = bindparam.type.dialect_impl(self.dialect).get_dbapi_type(self.dialect.dbapi) if not hasattr(self, 'out_parameters'): self.out_parameters = {} self.out_parameters[name] = self.cursor.var(dbtype) self.parameters[0][name] = self.out_parameters[name] def get_result_proxy(self): if hasattr(self, 'out_parameters'): if self.compiled_parameters is not None and len(self.compiled_parameters) == 1: for bind, name in self.compiled.bind_names.iteritems(): if name in self.out_parameters: type = bind.type self.out_parameters[name] = type.dialect_impl(self.dialect).result_processor(self.dialect)(self.out_parameters[name].getvalue()) else: for k in self.out_parameters: self.out_parameters[k] = self.out_parameters[k].getvalue() if self.cursor.description is not None: for column in self.cursor.description: type_code = column[1] if type_code in self.dialect.ORACLE_BINARY_TYPES: return base.BufferedColumnResultProxy(self) return base.ResultProxy(self)class OracleDialect(default.DefaultDialect): supports_alter = True supports_unicode_statements = False max_identifier_length = 30 supports_sane_rowcount = True supports_sane_multi_rowcount = False preexecute_pk_sequences = True supports_pk_autoincrement = False def __init__(self, use_ansi=True, auto_setinputsizes=True, auto_convert_lobs=True, threaded=True, allow_twophase=True, **kwargs): default.DefaultDialect.__init__(self, default_paramstyle='named', **kwargs) self.use_ansi = use_ansi self.threaded = threaded self.allow_twophase = allow_twophase self.supports_timestamp = self.dbapi is None or hasattr(self.dbapi, 'TIMESTAMP' ) self.auto_setinputsizes = auto_setinputsizes self.auto_convert_lobs = auto_convert_lobs if self.dbapi is None or not self.auto_convert_lobs or not 'CLOB' in self.dbapi.__dict__: self.dbapi_type_map = {} self.ORACLE_BINARY_TYPES = [] else: # only use this for LOB objects. using it for strings, dates # etc. leads to a little too much magic, reflection doesn't know if it should # expect encoded strings or unicodes, etc. self.dbapi_type_map = { self.dbapi.CLOB: OracleText(), self.dbapi.BLOB: OracleBinary(), self.dbapi.BINARY: OracleRaw(), } self.ORACLE_BINARY_TYPES = [getattr(self.dbapi, k) for k in ["BFILE", "CLOB", "NCLOB", "BLOB"] if hasattr(self.dbapi, k)] def dbapi(cls): import cx_Oracle return cx_Oracle dbapi = classmethod(dbapi) def create_connect_args(self, url): dialect_opts = dict(url.query) for opt in ('use_ansi', 'auto_setinputsizes', 'auto_convert_lobs', 'threaded', 'allow_twophase'): if opt in dialect_opts: util.coerce_kw_type(dialect_opts, opt, bool) setattr(self, opt, dialect_opts[opt]) if url.database: # if we have a database, then we have a remote host port = url.port if port: port = int(port) else: port = 1521 dsn = self.dbapi.makedsn(url.host, port, url.database) else: # we have a local tnsname dsn = url.host opts = dict( user=url.username, password=url.password, dsn=dsn, threaded=self.threaded, twophase=self.allow_twophase, ) if 'mode' in url.query: opts['mode'] = url.query['mode'] if isinstance(opts['mode'], basestring): mode = opts['mode'].upper() if mode == 'SYSDBA': opts['mode'] = self.dbapi.SYSDBA elif mode == 'SYSOPER': opts['mode'] = self.dbapi.SYSOPER else: util.coerce_kw_type(opts, 'mode', int) # Can't set 'handle' or 'pool' via URL query args, use connect_args return ([], opts) def is_disconnect(self, e): if isinstance(e, self.dbapi.InterfaceError): return "not connected" in str(e) else: return "ORA-03114" in str(e) or "ORA-03113" in str(e) def type_descriptor(self, typeobj): return sqltypes.adapt_type(typeobj, colspecs) def oid_column_name(self, column): if not isinstance(column.table, (sql.TableClause, sql.Select)): return None else: return "rowid" def create_xid(self): """create a two-phase transaction ID. this id will be passed to do_begin_twophase(), do_rollback_twophase(), do_commit_twophase(). its format is unspecified.""" id = random.randint(0,2**128) return (0x1234, "%032x" % 9, "%032x" % id) def do_release_savepoint(self, connection, name): # Oracle does not support RELEASE SAVEPOINT pass def do_begin_twophase(self, connection, xid): connection.connection.begin(*xid) def do_prepare_twophase(self, connection, xid): connection.connection.prepare() def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False): self.do_rollback(connection.connection) def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False): self.do_commit(connection.connection) def do_recover_twophase(self, connection): pass def create_execution_context(self, *args, **kwargs): return OracleExecutionContext(self, *args, **kwargs) def has_table(self, connection, table_name, schema=None): cursor = connection.execute("""select table_name from all_tables where table_name=:name""", {'name':self._denormalize_name(table_name)}) return bool( cursor.fetchone() is not None ) def has_sequence(self, connection, sequence_name): cursor = connection.execute("""select sequence_name from all_sequences where sequence_name=:name""", {'name':self._denormalize_name(sequence_name)}) return bool( cursor.fetchone() is not None ) def _locate_owner_row(self, owner, name, rows, raiseerr=False): """return the row in the given list of rows which references the given table name and owner name.""" if not rows: if raiseerr: raise exceptions.NoSuchTableError(name)
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -