?? sybase.py
字號:
args = [] for a in (charlen, numericprec, numericscale): if a is not None: args.append(a) coltype = self.ischema_names.get(type, None) if coltype == SybaseString and charlen == -1: coltype = SybaseText() else: if coltype is None: util.warn("Did not recognize type '%s' of column '%s'" % (type, name)) coltype = sqltypes.NULLTYPE coltype = coltype(*args) colargs= [] if default is not None: colargs.append(schema.PassiveDefault(sql.text(default))) # any sequences ? col = schema.Column(name, coltype, nullable=nullable, primary_key=primary_key, *colargs) if int(max_identity) > 0: col.sequence = schema.Sequence(name + '_identity') col.sequence.start = int(max_identity) col.sequence.increment = 1 # append the column table.append_column(col) # any foreign key constraint for this table ? # note: no multi-column foreign keys are considered s = "select st1.table_name, sc1.column_name, st2.table_name, sc2.column_name from systable as st1 join sysfkcol on st1.table_id=sysfkcol.foreign_table_id join sysforeignkey join systable as st2 on sysforeignkey.primary_table_id = st2.table_id join syscolumn as sc1 on sysfkcol.foreign_column_id=sc1.column_id and sc1.table_id=st1.table_id join syscolumn as sc2 on sysfkcol.primary_column_id=sc2.column_id and sc2.table_id=st2.table_id where st1.table_name='%(table_name)s';" % { 'table_name' : table.name } c = connection.execute(s) foreignKeys = {} while True: row = c.fetchone() if row is None: break (foreign_table, foreign_column, primary_table, primary_column) = ( row[0], row[1], row[2], row[3], ) if not primary_table in foreignKeys.keys(): foreignKeys[primary_table] = [['%s'%(foreign_column)], ['%s.%s'%(primary_table,primary_column)]] else: foreignKeys[primary_table][0].append('%s'%(foreign_column)) foreignKeys[primary_table][1].append('%s.%s'%(primary_table,primary_column)) for primary_table in foreignKeys.keys(): #table.append_constraint(schema.ForeignKeyConstraint(['%s.%s'%(foreign_table, foreign_column)], ['%s.%s'%(primary_table,primary_column)])) table.append_constraint(schema.ForeignKeyConstraint(foreignKeys[primary_table][0], foreignKeys[primary_table][1])) if not found_table: raise exceptions.NoSuchTableError(table.name)class SybaseSQLDialect_mxodbc(SybaseSQLDialect): def __init__(self, **params): super(SybaseSQLDialect_mxodbc, self).__init__(**params) self.dbapi_type_map = {'getdate' : SybaseDate_mxodbc()} def import_dbapi(cls): #import mx.ODBC.Windows as module import mxODBC as module return module import_dbapi = classmethod(import_dbapi) colspecs = SybaseSQLDialect.colspecs.copy() colspecs[sqltypes.Time] = SybaseTime_mxodbc colspecs[sqltypes.Date] = SybaseDate_mxodbc colspecs[sqltypes.DateTime] = SybaseDateTime_mxodbc ischema_names = SybaseSQLDialect.ischema_names.copy() ischema_names['time'] = SybaseTime_mxodbc ischema_names['date'] = SybaseDate_mxodbc ischema_names['datetime'] = SybaseDateTime_mxodbc ischema_names['smalldatetime'] = SybaseDateTime_mxodbc def is_disconnect(self, e): # FIXME: optimize #return isinstance(e, self.dbapi.Error) and '[08S01]' in str(e) #return True return False def create_execution_context(self, *args, **kwargs): return SybaseSQLExecutionContext_mxodbc(self, *args, **kwargs) def do_execute(self, cursor, statement, parameters, context=None, **kwargs): super(SybaseSQLDialect_mxodbc, self).do_execute(cursor, statement, parameters, context=context, **kwargs) def create_connect_args(self, url): '''Return a tuple of *args,**kwargs''' # FIXME: handle mx.odbc.Windows proprietary args opts = url.translate_connect_args(username='user') opts.update(url.query) argsDict = {} argsDict['user'] = opts['user'] argsDict['password'] = opts['password'] connArgs = [[opts['dsn']], argsDict] return connArgsclass SybaseSQLDialect_pyodbc(SybaseSQLDialect): def __init__(self, **params): super(SybaseSQLDialect_pyodbc, self).__init__(**params) self.dbapi_type_map = {'getdate' : SybaseDate_pyodbc()} def import_dbapi(cls): import mypyodbc as module return module import_dbapi = classmethod(import_dbapi) colspecs = SybaseSQLDialect.colspecs.copy() colspecs[sqltypes.Time] = SybaseTime_pyodbc colspecs[sqltypes.Date] = SybaseDate_pyodbc colspecs[sqltypes.DateTime] = SybaseDateTime_pyodbc ischema_names = SybaseSQLDialect.ischema_names.copy() ischema_names['time'] = SybaseTime_pyodbc ischema_names['date'] = SybaseDate_pyodbc ischema_names['datetime'] = SybaseDateTime_pyodbc ischema_names['smalldatetime'] = SybaseDateTime_pyodbc def is_disconnect(self, e): # FIXME: optimize #return isinstance(e, self.dbapi.Error) and '[08S01]' in str(e) #return True return False def create_execution_context(self, *args, **kwargs): return SybaseSQLExecutionContext_pyodbc(self, *args, **kwargs) def do_execute(self, cursor, statement, parameters, context=None, **kwargs): super(SybaseSQLDialect_pyodbc, self).do_execute(cursor, statement, parameters, context=context, **kwargs) def create_connect_args(self, url): '''Return a tuple of *args,**kwargs''' # FIXME: handle pyodbc proprietary args opts = url.translate_connect_args(username='user') opts.update(url.query) self.autocommit = False if 'autocommit' in opts: self.autocommit = bool(int(opts.pop('autocommit'))) argsDict = {} argsDict['UID'] = opts['user'] argsDict['PWD'] = opts['password'] argsDict['DSN'] = opts['dsn'] connArgs = [[';'.join(["%s=%s"%(key, argsDict[key]) for key in argsDict])], {'autocommit' : self.autocommit}] return connArgsdialect_mapping = { 'sqlalchemy.databases.mxODBC' : SybaseSQLDialect_mxodbc,# 'pyodbc' : SybaseSQLDialect_pyodbc, }class SybaseSQLCompiler(compiler.DefaultCompiler): operators = compiler.DefaultCompiler.operators.copy() operators.update({ sql_operators.mod: lambda x, y: "MOD(%s, %s)" % (x, y), }) def bindparam_string(self, name): res = super(SybaseSQLCompiler, self).bindparam_string(name) if name.lower().startswith('literal'): res = 'STRING(%s)'%res return res def get_select_precolumns(self, select): s = select._distinct and "DISTINCT " or "" if select._limit: #if select._limit == 1: #s += "FIRST " #else: #s += "TOP %s " % (select._limit,) s += "TOP %s " % (select._limit,) if select._offset: if not select._limit: # FIXME: sybase doesn't allow an offset without a limit # so use a huge value for TOP here s += "TOP 1000000 " s += "START AT %s " % (select._offset+1,) return s def limit_clause(self, select): # Limit in sybase is after the select keyword return "" def visit_binary(self, binary): """Move bind parameters to the right-hand side of an operator, where possible.""" if isinstance(binary.left, expression._BindParamClause) and binary.operator == operator.eq: return self.process(expression._BinaryExpression(binary.right, binary.left, binary.operator)) else: return super(SybaseSQLCompiler, self).visit_binary(binary) def label_select_column(self, select, column, asfrom): if isinstance(column, expression._Function): return column.label(None) else: return super(SybaseSQLCompiler, self).label_select_column(select, column, asfrom) function_rewrites = {'current_date': 'getdate', } def visit_function(self, func): func.name = self.function_rewrites.get(func.name, func.name) res = super(SybaseSQLCompiler, self).visit_function(func) if func.name.lower() == 'getdate': # apply CAST operator # FIXME: what about _pyodbc ? cast = expression._Cast(func, SybaseDate_mxodbc) # infinite recursion # res = self.visit_cast(cast) res = "CAST(%s AS %s)" % (res, self.process(cast.typeclause)) return res def for_update_clause(self, select): # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use return '' def order_by_clause(self, select): order_by = self.process(select._order_by_clause) # SybaseSQL only allows ORDER BY in subqueries if there is a LIMIT if order_by and (not self.is_subquery(select) or select._limit): return " ORDER BY " + order_by else: return ""class SybaseSQLSchemaGenerator(compiler.SchemaGenerator): def get_column_specification(self, column, **kwargs): colspec = self.preparer.format_column(column) if (not getattr(column.table, 'has_sequence', False)) and column.primary_key and \ column.autoincrement and isinstance(column.type, sqltypes.Integer): if column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional): column.sequence = schema.Sequence(column.name + '_seq') if hasattr(column, 'sequence'): column.table.has_sequence = column #colspec += " numeric(30,0) IDENTITY" colspec += " Integer IDENTITY" else: colspec += " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec() if not column.nullable: colspec += " NOT NULL" default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default return colspecclass SybaseSQLSchemaDropper(compiler.SchemaDropper): def visit_index(self, index): self.append("\nDROP INDEX %s.%s" % ( self.preparer.quote_identifier(index.table.name), self.preparer.quote_identifier(index.name) )) self.execute()class SybaseSQLDefaultRunner(base.DefaultRunner): passclass SybaseSQLIdentifierPreparer(compiler.IdentifierPreparer): reserved_words = RESERVED_WORDS def __init__(self, dialect): super(SybaseSQLIdentifierPreparer, self).__init__(dialect) def _escape_identifier(self, value): #TODO: determin SybaseSQL's escapeing rules return value def _fold_identifier_case(self, value): #TODO: determin SybaseSQL's case folding rules return valuedialect = SybaseSQLDialectdialect.statement_compiler = SybaseSQLCompilerdialect.schemagenerator = SybaseSQLSchemaGeneratordialect.schemadropper = SybaseSQLSchemaDropperdialect.preparer = SybaseSQLIdentifierPreparerdialect.defaultrunner = SybaseSQLDefaultRunner
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -