?? mssql.py
字號:
class MSSQLExecutionContext_pyodbc (MSSQLExecutionContext): def pre_exec(self): """where appropriate, issue "select scope_identity()" in the same statement""" super(MSSQLExecutionContext_pyodbc, self).pre_exec() if self.compiled.isinsert and self.HASIDENT and (not self.IINSERT) \ and len(self.parameters) == 1 and self.dialect.use_scope_identity: self.statement += "; select scope_identity()" def post_exec(self): if self.compiled.isinsert and self.HASIDENT and (not self.IINSERT) and self.dialect.use_scope_identity: # do nothing - id was fetched in dialect.do_execute() pass else: super(MSSQLExecutionContext_pyodbc, self).post_exec()class MSSQLDialect(default.DefaultDialect): colspecs = { sqltypes.Unicode : MSNVarchar, sqltypes.Integer : MSInteger, sqltypes.Smallinteger: MSSmallInteger, sqltypes.Numeric : MSNumeric, sqltypes.Float : MSFloat, sqltypes.DateTime : MSDateTime, sqltypes.Date : MSDate, sqltypes.Time : MSTime, sqltypes.String : MSString, sqltypes.Binary : MSBinary, sqltypes.Boolean : MSBoolean, sqltypes.Text : MSText, sqltypes.CHAR: MSChar, sqltypes.NCHAR: MSNChar, sqltypes.TIMESTAMP: MSTimeStamp, } ischema_names = { 'int' : MSInteger, 'bigint': MSBigInteger, 'smallint' : MSSmallInteger, 'tinyint' : MSTinyInteger, 'varchar' : MSString, 'nvarchar' : MSNVarchar, 'char' : MSChar, 'nchar' : MSNChar, 'text' : MSText, 'ntext' : MSText, 'decimal' : MSNumeric, 'numeric' : MSNumeric, 'float' : MSFloat, 'datetime' : MSDateTime, 'smalldatetime' : MSDate, 'binary' : MSBinary, 'varbinary' : MSBinary, 'bit': MSBoolean, 'real' : MSFloat, 'image' : MSBinary, 'timestamp': MSTimeStamp, 'money': MSMoney, 'smallmoney': MSSmallMoney, 'uniqueidentifier': MSUniqueIdentifier, 'sql_variant': MSVariant, } def __new__(cls, dbapi=None, *args, **kwargs): if cls != MSSQLDialect: return super(MSSQLDialect, cls).__new__(cls, *args, **kwargs) if dbapi: dialect = dialect_mapping.get(dbapi.__name__) return dialect(*args, **kwargs) else: return object.__new__(cls, *args, **kwargs) def __init__(self, auto_identity_insert=True, **params): super(MSSQLDialect, self).__init__(**params) self.auto_identity_insert = auto_identity_insert self.text_as_varchar = False self.use_scope_identity = False self.has_window_funcs = False self.set_default_schema_name("dbo") def dbapi(cls, module_name=None): if module_name: try: dialect_cls = dialect_mapping[module_name] return dialect_cls.import_dbapi() except KeyError: raise exceptions.InvalidRequestError("Unsupported MSSQL module '%s' requested (must be adodbpi, pymssql or pyodbc)" % module_name) else: for dialect_cls in [MSSQLDialect_pyodbc, MSSQLDialect_pymssql, MSSQLDialect_adodbapi]: try: return dialect_cls.import_dbapi() except ImportError, e: pass else: raise ImportError('No DBAPI module detected for MSSQL - please install pyodbc, pymssql, or adodbapi') dbapi = classmethod(dbapi) def create_connect_args(self, url): opts = url.translate_connect_args(username='user') opts.update(url.query) if 'auto_identity_insert' in opts: self.auto_identity_insert = bool(int(opts.pop('auto_identity_insert'))) if 'query_timeout' in opts: self.query_timeout = int(opts.pop('query_timeout')) if 'text_as_varchar' in opts: self.text_as_varchar = bool(int(opts.pop('text_as_varchar'))) if 'use_scope_identity' in opts: self.use_scope_identity = bool(int(opts.pop('use_scope_identity'))) if 'has_window_funcs' in opts: self.has_window_funcs = bool(int(opts.pop('has_window_funcs'))) return self.make_connect_string(opts) def create_execution_context(self, *args, **kwargs): return MSSQLExecutionContext(self, *args, **kwargs) def type_descriptor(self, typeobj): newobj = sqltypes.adapt_type(typeobj, self.colspecs) # Some types need to know about the dialect if isinstance(newobj, (MSText, MSNVarchar)): newobj.dialect = self return newobj def last_inserted_ids(self): return self.context.last_inserted_ids def get_default_schema_name(self, connection): return self.schema_name def set_default_schema_name(self, schema_name): self.schema_name = schema_name def last_inserted_ids(self): return self.context.last_inserted_ids def do_execute(self, cursor, statement, params, context=None, **kwargs): if params == {}: params = () try: super(MSSQLDialect, self).do_execute(cursor, statement, params, context=context, **kwargs) finally: if context.IINSERT: cursor.execute("SET IDENTITY_INSERT %s OFF" % self.identifier_preparer.format_table(context.compiled.statement.table)) def do_executemany(self, cursor, statement, params, context=None, **kwargs): try: super(MSSQLDialect, self).do_executemany(cursor, statement, params, context=context, **kwargs) finally: if context.IINSERT: cursor.execute("SET IDENTITY_INSERT %s OFF" % self.identifier_preparer.format_table(context.compiled.statement.table)) def _execute(self, c, statement, parameters): try: if parameters == {}: parameters = () c.execute(statement, parameters) self.context.rowcount = c.rowcount c.DBPROP_COMMITPRESERVE = "Y" except Exception, e: raise exceptions.DBAPIError.instance(statement, parameters, e) def table_names(self, connection, schema): from sqlalchemy.databases import information_schema as ischema return ischema.table_names(connection, schema) def raw_connection(self, connection): """Pull the raw pymmsql connection out--sensative to "pool.ConnectionFairy" and pymssql.pymssqlCnx Classes""" try: # TODO: probably want to move this to individual dialect subclasses to # save on the exception throw + simplify return connection.connection.__dict__['_pymssqlCnx__cnx'] except: return connection.connection.adoConn def uppercase_table(self, t): # convert all names to uppercase -- fixes refs to INFORMATION_SCHEMA for case-senstive DBs, and won't matter for case-insensitive t.name = t.name.upper() if t.schema: t.schema = t.schema.upper() for c in t.columns: c.name = c.name.upper() return t def has_table(self, connection, tablename, schema=None): import sqlalchemy.databases.information_schema as ischema current_schema = schema or self.get_default_schema_name(connection) columns = self.uppercase_table(ischema.columns) s = sql.select([columns], current_schema and sql.and_(columns.c.table_name==tablename, columns.c.table_schema==current_schema) or columns.c.table_name==tablename, ) c = connection.execute(s) row = c.fetchone() return row is not None def reflecttable(self, connection, table, include_columns): import sqlalchemy.databases.information_schema as ischema # Get base columns if table.schema is not None: current_schema = table.schema else: current_schema = self.get_default_schema_name(connection) columns = self.uppercase_table(ischema.columns) s = sql.select([columns], current_schema and sql.and_(columns.c.table_name==table.name, columns.c.table_schema==current_schema) or columns.c.table_name==table.name, order_by=[columns.c.ordinal_position]) c = connection.execute(s) found_table = False while True: row = c.fetchone() if row is None: break found_table = True (name, type, nullable, charlen, numericprec, numericscale, default) = ( row[columns.c.column_name], row[columns.c.data_type], row[columns.c.is_nullable] == 'YES', row[columns.c.character_maximum_length], row[columns.c.numeric_precision], row[columns.c.numeric_scale], row[columns.c.column_default] ) if include_columns and name not in include_columns: continue args = [] for a in (charlen, numericprec, numericscale): if a is not None: args.append(a) coltype = self.ischema_names.get(type, None) if coltype == MSString and charlen == -1: coltype = MSText() else: if coltype is None: util.warn("Did not recognize type '%s' of column '%s'" % (type, name)) coltype = sqltypes.NULLTYPE elif coltype in (MSNVarchar, AdoMSNVarchar) and charlen == -1: args[0] = None coltype = coltype(*args) colargs= [] if default is not None: colargs.append(schema.PassiveDefault(sql.text(default))) table.append_column(schema.Column(name, coltype, nullable=nullable, autoincrement=False, *colargs)) if not found_table: raise exceptions.NoSuchTableError(table.name) # We also run an sp_columns to check for identity columns: cursor = connection.execute("sp_columns @table_name = '%s', @table_owner = '%s'" % (table.name, current_schema)) ic = None while True: row = cursor.fetchone() if row is None: break col_name, type_name = row[3], row[5] if type_name.endswith("identity"): ic = table.c[col_name] ic.autoincrement = True # setup a psuedo-sequence to represent the identity attribute - we interpret this at table.create() time as the identity attribute ic.sequence = schema.Sequence(ic.name + '_identity') # MSSQL: only one identity per table allowed cursor.close() break if not ic is None: try: cursor = connection.execute("select ident_seed(?), ident_incr(?)", table.fullname, table.fullname) row = cursor.fetchone() cursor.close() if not row is None: ic.sequence.start=int(row[0]) ic.sequence.increment=int(row[1]) except: # ignoring it, works just like before pass # Add constraints RR = self.uppercase_table(ischema.ref_constraints) #information_schema.referential_constraints TC = self.uppercase_table(ischema.constraints) #information_schema.table_constraints C = self.uppercase_table(ischema.pg_key_constraints).alias('C') #information_schema.constraint_column_usage: the constrained column R = self.uppercase_table(ischema.pg_key_constraints).alias('R') #information_schema.constraint_column_usage: the referenced column # Primary key constraints s = sql.select([C.c.column_name, TC.c.constraint_type], sql.and_(TC.c.constraint_name == C.c.constraint_name, C.c.table_name == table.name)) c = connection.execute(s) for row in c: if 'PRIMARY' in row[TC.c.constraint_type.name]: table.primary_key.add(table.c[row[0]]) # Foreign key constraints s = sql.select([C.c.column_name, R.c.table_schema, R.c.table_name, R.c.column_name, RR.c.constraint_name, RR.c.match_option, RR.c.update_rule, RR.c.delete_rule], sql.and_(C.c.table_name == table.name, C.c.table_schema == current_schema, C.c.constraint_name == RR.c.constraint_name, R.c.constraint_name == RR.c.unique_constraint_name, C.c.ordinal_position == R.c.ordinal_position ), order_by = [RR.c.constraint_name, R.c.ordinal_position]) rows = connection.execute(s).fetchall() # group rows by constraint ID, to handle multi-column FKs fknm, scols, rcols = (None, [], []) for r in rows: scol, rschema, rtbl, rcol, rfknm, fkmatch, fkuprule, fkdelrule = r if rfknm != fknm: if fknm: table.append_constraint(schema.ForeignKeyConstraint(scols, ['%s.%s' % (t,c) for (s,t,c) in rcols], fknm)) fknm, scols, rcols = (rfknm, [], []) if (not scol in scols): scols.append(scol) if (not (rschema, rtbl, rcol) in rcols): rcols.append((rschema, rtbl, rcol)) if fknm and scols: table.append_constraint(schema.ForeignKeyConstraint(scols, ['%s.%s' % (t,c) for (s,t,c) in rcols], fknm))class MSSQLDialect_pymssql(MSSQLDialect): supports_sane_rowcount = False max_identifier_length = 30 def import_dbapi(cls): import pymssql as module # pymmsql doesn't have a Binary method. we use string # TODO: monkeypatching here is less than ideal module.Binary = lambda st: str(st) return module import_dbapi = classmethod(import_dbapi) colspecs = MSSQLDialect.colspecs.copy() colspecs[sqltypes.Date] = MSDate_pymssql ischema_names = MSSQLDialect.ischema_names.copy() ischema_names['smalldatetime'] = MSDate_pymssql def __init__(self, **params): super(MSSQLDialect_pymssql, self).__init__(**params) self.use_scope_identity = True # pymssql understands only ascii if self.convert_unicode: self.encoding = params.get('encoding', 'ascii')
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -