?? sqlite.py
字號(hào):
# sqlite.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.phpimport datetime, re, timefrom sqlalchemy import schema, exceptions, pool, PassiveDefaultfrom sqlalchemy.engine import defaultimport sqlalchemy.types as sqltypesimport sqlalchemy.util as utilfrom sqlalchemy.sql import compiler, functions as sql_functionsSELECT_REGEXP = re.compile(r'\s*(?:SELECT|PRAGMA)', re.I | re.UNICODE)class SLNumeric(sqltypes.Numeric): def bind_processor(self, dialect): type_ = self.asdecimal and str or float def process(value): if value is not None: return type_(value) else: return value return process def get_col_spec(self): if self.precision is None: return "NUMERIC" else: return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length}class SLInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER"class SLSmallInteger(sqltypes.Smallinteger): def get_col_spec(self): return "SMALLINT"class DateTimeMixin(object): __format__ = "%Y-%m-%d %H:%M:%S" def bind_processor(self, dialect): def process(value): if isinstance(value, basestring): # pass string values thru return value elif value is not None: if self.__microsecond__ and getattr(value, 'microsecond', None) is not None: return value.strftime(self.__format__ + "." + str(value.microsecond)) else: return value.strftime(self.__format__) else: return None return process def _cvt(self, value, dialect): if value is None: return None try: (value, microsecond) = value.split('.') microsecond = int(microsecond) except ValueError: microsecond = 0 return time.strptime(value, self.__format__)[0:6] + (microsecond,)class SLDateTime(DateTimeMixin,sqltypes.DateTime): __format__ = "%Y-%m-%d %H:%M:%S" __microsecond__ = True def get_col_spec(self): return "TIMESTAMP" def result_processor(self, dialect): def process(value): tup = self._cvt(value, dialect) return tup and datetime.datetime(*tup) return processclass SLDate(DateTimeMixin, sqltypes.Date): __format__ = "%Y-%m-%d" __microsecond__ = False def get_col_spec(self): return "DATE" def result_processor(self, dialect): def process(value): tup = self._cvt(value, dialect) return tup and datetime.date(*tup[0:3]) return processclass SLTime(DateTimeMixin, sqltypes.Time): __format__ = "%H:%M:%S" __microsecond__ = True def get_col_spec(self): return "TIME" def result_processor(self, dialect): def process(value): tup = self._cvt(value, dialect) return tup and datetime.time(*tup[3:7]) return processclass SLText(sqltypes.Text): def get_col_spec(self): return "TEXT"class SLString(sqltypes.String): def get_col_spec(self): return "VARCHAR(%(length)s)" % {'length' : self.length}class SLChar(sqltypes.CHAR): def get_col_spec(self): return "CHAR(%(length)s)" % {'length' : self.length}class SLBinary(sqltypes.Binary): def get_col_spec(self): return "BLOB"class SLBoolean(sqltypes.Boolean): def get_col_spec(self): return "BOOLEAN" def bind_processor(self, dialect): def process(value): if value is None: return None return value and 1 or 0 return process def result_processor(self, dialect): def process(value): if value is None: return None return value and True or False return processcolspecs = { sqltypes.Binary: SLBinary, sqltypes.Boolean: SLBoolean, sqltypes.CHAR: SLChar, sqltypes.Date: SLDate, sqltypes.DateTime: SLDateTime, sqltypes.Float: SLNumeric, sqltypes.Integer: SLInteger, sqltypes.NCHAR: SLChar, sqltypes.Numeric: SLNumeric, sqltypes.Smallinteger: SLSmallInteger, sqltypes.String: SLString, sqltypes.Text: SLText, sqltypes.Time: SLTime,}ischema_names = { 'BLOB': SLBinary, 'BOOL': SLBoolean, 'BOOLEAN': SLBoolean, 'CHAR': SLChar, 'DATE': SLDate, 'DATETIME': SLDateTime, 'DECIMAL': SLNumeric, 'FLOAT': SLNumeric, 'INT': SLInteger, 'INTEGER': SLInteger, 'NUMERIC': SLNumeric, 'REAL': SLNumeric, 'SMALLINT': SLSmallInteger, 'TEXT': SLText, 'TIME': SLTime, 'TIMESTAMP': SLDateTime, 'VARCHAR': SLString,}def descriptor(): return {'name':'sqlite', 'description':'SQLite', 'arguments':[ ('database', "Database Filename",None) ]}class SQLiteExecutionContext(default.DefaultExecutionContext): def post_exec(self): if self.compiled.isinsert and not self.executemany: if not len(self._last_inserted_ids) or self._last_inserted_ids[0] is None: self._last_inserted_ids = [self.cursor.lastrowid] + self._last_inserted_ids[1:] def returns_rows_text(self, statement): return SELECT_REGEXP.match(statement)class SQLiteDialect(default.DefaultDialect): supports_alter = False supports_unicode_statements = True def __init__(self, **kwargs): default.DefaultDialect.__init__(self, default_paramstyle='qmark', **kwargs) def vers(num): return tuple([int(x) for x in num.split('.')]) if self.dbapi is not None: sqlite_ver = self.dbapi.version_info if sqlite_ver < (2,1,'3'): util.warn( ("The installed version of pysqlite2 (%s) is out-dated " "and will cause errors in some cases. Version 2.1.3 " "or greater is recommended.") % '.'.join([str(subver) for subver in sqlite_ver])) self.supports_cast = (self.dbapi is None or vers(self.dbapi.sqlite_version) >= vers("3.2.3")) def dbapi(cls): try: from pysqlite2 import dbapi2 as sqlite except ImportError, e: try: from sqlite3 import dbapi2 as sqlite #try the 2.5+ stdlib name. except ImportError: raise e return sqlite dbapi = classmethod(dbapi) def server_version_info(self, connection): return self.dbapi.sqlite_version_info def create_connect_args(self, url): filename = url.database or ':memory:' opts = url.query.copy() util.coerce_kw_type(opts, 'timeout', float) util.coerce_kw_type(opts, 'isolation_level', str) util.coerce_kw_type(opts, 'detect_types', int) util.coerce_kw_type(opts, 'check_same_thread', bool) util.coerce_kw_type(opts, 'cached_statements', int) return ([filename], opts) def type_descriptor(self, typeobj): return sqltypes.adapt_type(typeobj, colspecs) def create_execution_context(self, connection, **kwargs): return SQLiteExecutionContext(self, connection, **kwargs)
?? 快捷鍵說(shuō)明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -