?? mssql.py
字號:
def do_rollback(self, connection): # pymssql throws an error on repeated rollbacks. Ignore it. # TODO: this is normal behavior for most DBs. are we sure we want to ignore it ? try: connection.rollback() except: pass def create_connect_args(self, url): r = super(MSSQLDialect_pymssql, self).create_connect_args(url) if hasattr(self, 'query_timeout'): self.dbapi._mssql.set_query_timeout(self.query_timeout) return r def make_connect_string(self, keys): if keys.get('port'): # pymssql expects port as host:port, not a separate arg keys['host'] = ''.join([keys.get('host', ''), ':', str(keys['port'])]) del keys['port'] return [[], keys] def is_disconnect(self, e): return isinstance(e, self.dbapi.DatabaseError) and "Error 10054" in str(e)## This code is leftover from the initial implementation, for reference## def do_begin(self, connection):## """implementations might want to put logic here for turning autocommit on/off, etc."""## pass## def do_rollback(self, connection):## """implementations might want to put logic here for turning autocommit on/off, etc."""## try:## # connection.rollback() for pymmsql failed sometimes--the begin tran doesn't show up## # this is a workaround that seems to be handle it.## r = self.raw_connection(connection)## r.query("if @@trancount > 0 rollback tran")## r.fetch_array()## r.query("begin tran")## r.fetch_array()## except:## pass## def do_commit(self, connection):## """implementations might want to put logic here for turning autocommit on/off, etc.## do_commit is set for pymmsql connections--ADO seems to handle transactions without any issue## """## # ADO Uses Implicit Transactions.## # This is very pymssql specific. We use this instead of its commit, because it hangs on failed rollbacks.## # By using the "if" we don't assume an open transaction--much better.## r = self.raw_connection(connection)## r.query("if @@trancount > 0 commit tran")## r.fetch_array()## r.query("begin tran")## r.fetch_array()class MSSQLDialect_pyodbc(MSSQLDialect): supports_sane_rowcount = False supports_sane_multi_rowcount = False # PyODBC unicode is broken on UCS-4 builds supports_unicode = sys.maxunicode == 65535 supports_unicode_statements = supports_unicode def __init__(self, **params): super(MSSQLDialect_pyodbc, self).__init__(**params) # whether use_scope_identity will work depends on the version of pyodbc try: import pyodbc self.use_scope_identity = hasattr(pyodbc.Cursor, 'nextset') except: pass def import_dbapi(cls): import pyodbc as module return module import_dbapi = classmethod(import_dbapi) colspecs = MSSQLDialect.colspecs.copy() if supports_unicode: colspecs[sqltypes.Unicode] = AdoMSNVarchar colspecs[sqltypes.Date] = MSDate_pyodbc colspecs[sqltypes.DateTime] = MSDateTime_pyodbc ischema_names = MSSQLDialect.ischema_names.copy() if supports_unicode: ischema_names['nvarchar'] = AdoMSNVarchar ischema_names['smalldatetime'] = MSDate_pyodbc ischema_names['datetime'] = MSDateTime_pyodbc def make_connect_string(self, keys): if 'dsn' in keys: connectors = ['dsn=%s' % keys['dsn']] else: connectors = ["Driver={SQL Server}"] if 'port' in keys: connectors.append('Server=%s,%d' % (keys.get('host'), keys.get('port'))) else: connectors.append('Server=%s' % keys.get('host')) connectors.append("Database=%s" % keys.get("database")) user = keys.get("user") if user: connectors.append("UID=%s" % user) connectors.append("PWD=%s" % keys.get("password", "")) else: connectors.append ("TrustedConnection=Yes") return [[";".join (connectors)], {}] def is_disconnect(self, e): return isinstance(e, self.dbapi.Error) and '[08S01]' in str(e) def create_execution_context(self, *args, **kwargs): return MSSQLExecutionContext_pyodbc(self, *args, **kwargs) def do_execute(self, cursor, statement, parameters, context=None, **kwargs): super(MSSQLDialect_pyodbc, self).do_execute(cursor, statement, parameters, context=context, **kwargs) if context and context.HASIDENT and (not context.IINSERT) and context.dialect.use_scope_identity: import pyodbc # Fetch the last inserted id from the manipulated statement # We may have to skip over a number of result sets with no data (due to triggers, etc.) while True: try: row = cursor.fetchone() break except pyodbc.Error, e: cursor.nextset() context._last_inserted_ids = [int(row[0])]class MSSQLDialect_adodbapi(MSSQLDialect): supports_sane_rowcount = True supports_sane_multi_rowcount = True supports_unicode = sys.maxunicode == 65535 supports_unicode_statements = True def import_dbapi(cls): import adodbapi as module return module import_dbapi = classmethod(import_dbapi) colspecs = MSSQLDialect.colspecs.copy() colspecs[sqltypes.Unicode] = AdoMSNVarchar colspecs[sqltypes.DateTime] = MSDateTime_adodbapi ischema_names = MSSQLDialect.ischema_names.copy() ischema_names['nvarchar'] = AdoMSNVarchar ischema_names['datetime'] = MSDateTime_adodbapi def make_connect_string(self, keys): connectors = ["Provider=SQLOLEDB"] if 'port' in keys: connectors.append ("Data Source=%s, %s" % (keys.get("host"), keys.get("port"))) else: connectors.append ("Data Source=%s" % keys.get("host")) connectors.append ("Initial Catalog=%s" % keys.get("database")) user = keys.get("user") if user: connectors.append("User Id=%s" % user) connectors.append("Password=%s" % keys.get("password", "")) else: connectors.append("Integrated Security=SSPI") return [[";".join (connectors)], {}] def is_disconnect(self, e): return isinstance(e, self.dbapi.adodbapi.DatabaseError) and "'connection failure'" in str(e)dialect_mapping = { 'pymssql': MSSQLDialect_pymssql, 'pyodbc': MSSQLDialect_pyodbc, 'adodbapi': MSSQLDialect_adodbapi }class MSSQLCompiler(compiler.DefaultCompiler): operators = compiler.OPERATORS.copy() operators[sqlops.concat_op] = '+' functions = compiler.DefaultCompiler.functions.copy() functions.update ( { sql_functions.now: 'CURRENT_TIMESTAMP' } ) def __init__(self, *args, **kwargs): super(MSSQLCompiler, self).__init__(*args, **kwargs) self.tablealiases = {} def get_select_precolumns(self, select): """ MS-SQL puts TOP, it's version of LIMIT here """ if not self.dialect.has_window_funcs: s = select._distinct and "DISTINCT " or "" if select._limit: s += "TOP %s " % (select._limit,) if select._offset: raise exceptions.InvalidRequestError('MSSQL does not support LIMIT with an offset') return s return compiler.DefaultCompiler.get_select_precolumns(self, select) def limit_clause(self, select): # Limit in mssql is after the select keyword return "" def visit_select(self, select, **kwargs): """Look for ``LIMIT`` and OFFSET in a select statement, and if so tries to wrap it in a subquery with ``row_number()`` criterion. """ if self.dialect.has_window_funcs and (not getattr(select, '_mssql_visit', None)) and (select._limit is not None or select._offset is not None): # to use ROW_NUMBER(), an ORDER BY is required. orderby = self.process(select._order_by_clause) if not orderby: orderby = list(select.oid_column.proxies)[0] orderby = self.process(orderby) _offset = select._offset _limit = select._limit select._mssql_visit = True select = select.column(sql.literal_column("ROW_NUMBER() OVER (ORDER BY %s)" % orderby).label("mssql_rn")).order_by(None).alias() limitselect = sql.select([c for c in select.c if c.key!='mssql_rn']) if _offset is not None: limitselect.append_whereclause("mssql_rn>=%d" % _offset) if _limit is not None: limitselect.append_whereclause("mssql_rn<=%d" % (_limit + _offset)) else: limitselect.append_whereclause("mssql_rn<=%d" % _limit) return self.process(limitselect, iswrapper=True, **kwargs) else: return compiler.DefaultCompiler.visit_select(self, select, **kwargs) def _schema_aliased_table(self, table): if getattr(table, 'schema', None) is not None: if table not in self.tablealiases: self.tablealiases[table] = table.alias() return self.tablealiases[table] else: return None def visit_table(self, table, mssql_aliased=False, **kwargs): if mssql_aliased: return super(MSSQLCompiler, self).visit_table(table, **kwargs) # alias schema-qualified tables alias = self._schema_aliased_table(table) if alias is not None: return self.process(alias, mssql_aliased=True, **kwargs) else: return super(MSSQLCompiler, self).visit_table(table, **kwargs) def visit_alias(self, alias, **kwargs): # translate for schema-qualified table aliases self.tablealiases[alias.original] = alias kwargs['mssql_aliased'] = True return super(MSSQLCompiler, self).visit_alias(alias, **kwargs) def visit_column(self, column, **kwargs): if column.table is not None and not self.isupdate and not self.isdelete: # translate for schema-qualified table aliases t = self._schema_aliased_table(column.table) if t is not None: return self.process(expression._corresponding_column_or_error(t, column)) else: kwargs['use_schema'] = True return super(MSSQLCompiler, self).visit_column(column, **kwargs) def visit_binary(self, binary, **kwargs): """Move bind parameters to the right-hand side of an operator, where possible.""" if isinstance(binary.left, expression._BindParamClause) and binary.operator == operator.eq: return self.process(expression._BinaryExpression(binary.right, binary.left, binary.operator), **kwargs) else: return super(MSSQLCompiler, self).visit_binary(binary, **kwargs) def label_select_column(self, select, column, asfrom): if isinstance(column, expression._Function): return column.label(None) else: return super(MSSQLCompiler, self).label_select_column(select, column, asfrom) function_rewrites = {'current_date': 'getdate', 'length': 'len', } def visit_function(self, func, **kwargs): func.name = self.function_rewrites.get(func.name, func.name) return super(MSSQLCompiler, self).visit_function(func, **kwargs) def for_update_clause(self, select): # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use return '' def order_by_clause(self, select): order_by = self.process(select._order_by_clause) # MSSQL only allows ORDER BY in subqueries if there is a LIMIT if order_by and (not self.is_subquery(select) or select._limit): return " ORDER BY " + order_by else: return ""class MSSQLSchemaGenerator(compiler.SchemaGenerator): def get_column_specification(self, column, **kwargs): colspec = self.preparer.format_column(column) + " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec() # install a IDENTITY Sequence if we have an implicit IDENTITY column if (not getattr(column.table, 'has_sequence', False)) and column.primary_key and \ column.autoincrement and isinstance(column.type, sqltypes.Integer) and not column.foreign_keys: if column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional): column.sequence = schema.Sequence(column.name + '_seq') if not column.nullable: colspec += " NOT NULL" if hasattr(column, 'sequence'): column.table.has_sequence = column colspec += " IDENTITY(%s,%s)" % (column.sequence.start or 1, column.sequence.increment or 1) else: default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default return colspecclass MSSQLSchemaDropper(compiler.SchemaDropper): def visit_index(self, index): self.append("\nDROP INDEX %s.%s" % ( self.preparer.quote_identifier(index.table.name), self.preparer.quote_identifier(index.name) )) self.execute()class MSSQLDefaultRunner(base.DefaultRunner): # TODO: does ms-sql have standalone sequences ? passclass MSSQLIdentifierPreparer(compiler.IdentifierPreparer): reserved_words = compiler.IdentifierPreparer.reserved_words.union(MSSQL_RESERVED_WORDS) def __init__(self, dialect): super(MSSQLIdentifierPreparer, self).__init__(dialect, initial_quote='[', final_quote=']') def _escape_identifier(self, value): #TODO: determin MSSQL's escapeing rules return valuedialect = MSSQLDialectdialect.statement_compiler = MSSQLCompilerdialect.schemagenerator = MSSQLSchemaGeneratordialect.schemadropper = MSSQLSchemaDropperdialect.preparer = MSSQLIdentifierPreparerdialect.defaultrunner = MSSQLDefaultRunner
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -