?? mssql.py
字號:
# mssql.py"""MSSQL backend, thru either pymssq, adodbapi or pyodbc interfaces.* ``IDENTITY`` columns are supported by using SA ``schema.Sequence()`` objects. In other words:: Table('test', mss_engine, Column('id', Integer, Sequence('blah',100,10), primary_key=True), Column('name', String(20)) ).create() would yield:: CREATE TABLE test ( id INTEGER NOT NULL IDENTITY(100,10) PRIMARY KEY, name VARCHAR(20) ) Note that the start & increment values for sequences are optional and will default to 1,1.* Support for ``SET IDENTITY_INSERT ON`` mode (automagic on / off for ``INSERT`` s)* Support for auto-fetching of ``@@IDENTITY/@@SCOPE_IDENTITY()`` on ``INSERT``* ``select._limit`` implemented as ``SELECT TOP n``* Experimental implemention of LIMIT / OFFSET with row_number()Known issues / TODO:* No support for more than one ``IDENTITY`` column per table* pymssql has problems with binary and unicode data that this module does **not** work around"""import datetime, operator, re, sysfrom sqlalchemy import sql, schema, exceptions, utilfrom sqlalchemy.sql import compiler, expression, operators as sqlops, functions as sql_functionsfrom sqlalchemy.engine import default, basefrom sqlalchemy import types as sqltypesfrom sqlalchemy.util import Decimal as _python_DecimalMSSQL_RESERVED_WORDS = util.Set(['function'])class MSNumeric(sqltypes.Numeric): def result_processor(self, dialect): if self.asdecimal: def process(value): if value is not None: return _python_Decimal(str(value)) else: return value return process else: def process(value): return float(value) return process def bind_processor(self, dialect): def process(value): if value is None: # Not sure that this exception is needed return value else: return str(value) return process def get_col_spec(self): if self.precision is None: return "NUMERIC" else: return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}class MSFloat(sqltypes.Float): def get_col_spec(self): return "FLOAT(%(precision)s)" % {'precision': self.precision} def bind_processor(self, dialect): def process(value): """By converting to string, we can use Decimal types round-trip.""" if not value is None: return str(value) return None return processclass MSInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER"class MSBigInteger(MSInteger): def get_col_spec(self): return "BIGINT"class MSTinyInteger(MSInteger): def get_col_spec(self): return "TINYINT"class MSSmallInteger(MSInteger): def get_col_spec(self): return "SMALLINT"class MSDateTime(sqltypes.DateTime): def __init__(self, *a, **kw): super(MSDateTime, self).__init__(False) def get_col_spec(self): return "DATETIME"class MSDate(sqltypes.Date): def __init__(self, *a, **kw): super(MSDate, self).__init__(False) def get_col_spec(self): return "SMALLDATETIME"class MSTime(sqltypes.Time): __zero_date = datetime.date(1900, 1, 1) def __init__(self, *a, **kw): super(MSTime, self).__init__(False) def get_col_spec(self): return "DATETIME" def bind_processor(self, dialect): def process(value): if type(value) is datetime.datetime: value = datetime.datetime.combine(self.__zero_date, value.time()) elif type(value) is datetime.time: value = datetime.datetime.combine(self.__zero_date, value) return value return process def result_processor(self, dialect): def process(value): if type(value) is datetime.datetime: return value.time() elif type(value) is datetime.date: return datetime.time(0, 0, 0) return value return processclass MSDateTime_adodbapi(MSDateTime): def result_processor(self, dialect): def process(value): # adodbapi will return datetimes with empty time values as datetime.date() objects. # Promote them back to full datetime.datetime() if type(value) is datetime.date: return datetime.datetime(value.year, value.month, value.day) return value return processclass MSDateTime_pyodbc(MSDateTime): def bind_processor(self, dialect): def process(value): if type(value) is datetime.date: return datetime.datetime(value.year, value.month, value.day) return value return processclass MSDate_pyodbc(MSDate): def bind_processor(self, dialect): def process(value): if type(value) is datetime.date: return datetime.datetime(value.year, value.month, value.day) return value return process def result_processor(self, dialect): def process(value): # pyodbc returns SMALLDATETIME values as datetime.datetime(). truncate it back to datetime.date() if type(value) is datetime.datetime: return value.date() return value return processclass MSDate_pymssql(MSDate): def result_processor(self, dialect): def process(value): # pymssql will return SMALLDATETIME values as datetime.datetime(), truncate it back to datetime.date() if type(value) is datetime.datetime: return value.date() return value return processclass MSText(sqltypes.Text): def get_col_spec(self): if self.dialect.text_as_varchar: return "VARCHAR(max)" else: return "TEXT"class MSString(sqltypes.String): def get_col_spec(self): return "VARCHAR(%(length)s)" % {'length' : self.length}class MSNVarchar(sqltypes.Unicode): def get_col_spec(self): if self.length: return "NVARCHAR(%(length)s)" % {'length' : self.length} elif self.dialect.text_as_varchar: return "NVARCHAR(max)" else: return "NTEXT"class AdoMSNVarchar(MSNVarchar): """overrides bindparam/result processing to not convert any unicode strings""" def bind_processor(self, dialect): return None def result_processor(self, dialect): return Noneclass MSChar(sqltypes.CHAR): def get_col_spec(self): return "CHAR(%(length)s)" % {'length' : self.length}class MSNChar(sqltypes.NCHAR): def get_col_spec(self): return "NCHAR(%(length)s)" % {'length' : self.length}class MSBinary(sqltypes.Binary): def get_col_spec(self): return "IMAGE"class MSBoolean(sqltypes.Boolean): def get_col_spec(self): return "BIT" def result_processor(self, dialect): def process(value): if value is None: return None return value and True or False return process def bind_processor(self, dialect): def process(value): if value is True: return 1 elif value is False: return 0 elif value is None: return None else: return value and True or False return processclass MSTimeStamp(sqltypes.TIMESTAMP): def get_col_spec(self): return "TIMESTAMP"class MSMoney(sqltypes.TypeEngine): def get_col_spec(self): return "MONEY"class MSSmallMoney(MSMoney): def get_col_spec(self): return "SMALLMONEY"class MSUniqueIdentifier(sqltypes.TypeEngine): def get_col_spec(self): return "UNIQUEIDENTIFIER"class MSVariant(sqltypes.TypeEngine): def get_col_spec(self): return "SQL_VARIANT"def descriptor(): return {'name':'mssql', 'description':'MSSQL', 'arguments':[ ('user',"Database Username",None), ('password',"Database Password",None), ('db',"Database Name",None), ('host',"Hostname", None), ]}class MSSQLExecutionContext(default.DefaultExecutionContext): def __init__(self, *args, **kwargs): self.IINSERT = self.HASIDENT = False super(MSSQLExecutionContext, self).__init__(*args, **kwargs) def _has_implicit_sequence(self, column): if column.primary_key and column.autoincrement: if isinstance(column.type, sqltypes.Integer) and not column.foreign_keys: if column.default is None or (isinstance(column.default, schema.Sequence) and \ column.default.optional): return True return False def pre_exec(self): """MS-SQL has a special mode for inserting non-NULL values into IDENTITY columns. Activate it if the feature is turned on and needed. """ if self.compiled.isinsert: tbl = self.compiled.statement.table if not hasattr(tbl, 'has_sequence'): tbl.has_sequence = None for column in tbl.c: if getattr(column, 'sequence', False) or self._has_implicit_sequence(column): tbl.has_sequence = column break self.HASIDENT = bool(tbl.has_sequence) if self.dialect.auto_identity_insert and self.HASIDENT: if isinstance(self.compiled_parameters, list): self.IINSERT = tbl.has_sequence.key in self.compiled_parameters[0] else: self.IINSERT = tbl.has_sequence.key in self.compiled_parameters else: self.IINSERT = False if self.IINSERT: self.cursor.execute("SET IDENTITY_INSERT %s ON" % self.dialect.identifier_preparer.format_table(self.compiled.statement.table)) super(MSSQLExecutionContext, self).pre_exec() def post_exec(self): """Turn off the INDENTITY_INSERT mode if it's been activated, and fetch recently inserted IDENTIFY values (works only for one column). """ if self.compiled.isinsert and self.HASIDENT and not self.IINSERT: if not len(self._last_inserted_ids) or self._last_inserted_ids[0] is None: if self.dialect.use_scope_identity: self.cursor.execute("SELECT scope_identity() AS lastrowid") else: self.cursor.execute("SELECT @@identity AS lastrowid") row = self.cursor.fetchone() self._last_inserted_ids = [int(row[0])] + self._last_inserted_ids[1:] # print "LAST ROW ID", self._last_inserted_ids super(MSSQLExecutionContext, self).post_exec() _ms_is_select = re.compile(r'\s*(?:SELECT|sp_columns|EXEC)', re.I | re.UNICODE) def returns_rows_text(self, statement): return self._ms_is_select.match(statement) is not None
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -