?? firebird.py
字號:
# firebird.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.php"""Firebird backend================This module implements the Firebird backend, thru the kinterbasdb_DBAPI module.Firebird dialects-----------------Firebird offers two distinct dialects_ (not to be confused with theSA ``Dialect`` thing):dialect 1 This is the old syntax and behaviour, inherited from Interbase pre-6.0.dialect 3 This is the newer and supported syntax, introduced in Interbase 6.0.From the user point of view, the biggest change is in date/timehandling: under dialect 1, there's a single kind of field, ``DATE``with a synonim ``DATETIME``, that holds a `timestamp` value, that is adate with hour, minute, second. Under dialect 3 there are three kinds,a ``DATE`` that holds a date, a ``TIME`` that holds a *time of theday* value and a ``TIMESTAMP``, equivalent to the old ``DATE``.The problem is that the dialect of a Firebird database is a propertyof the database itself [#]_ (that is, any single database has beencreated with one dialect or the other: there is no way to change theafter creation). SQLAlchemy has a single instance of the class thatcontrols all the connections to a particular kind of database, so itcannot easily differentiate between the two modes, and in particularit **cannot** simultaneously talk with two distinct Firebird databaseswith different dialects.By default this module is biased toward dialect 3, but you can easilytweak it to handle dialect 1 if needed:: from sqlalchemy import types as sqltypes from sqlalchemy.databases.firebird import FBDate, colspecs, ischema_names # Adjust the mapping of the timestamp kind ischema_names['TIMESTAMP'] = FBDate colspecs[sqltypes.DateTime] = FBDate,Other aspects may be version-specific. You can use the ``server_version_info()`` methodon the ``FBDialect`` class to do whatever is needed:: from sqlalchemy.databases.firebird import FBCompiler if engine.dialect.server_version_info(connection) < (2,0): # Change the name of the function ``length`` to use the UDF version # instead of ``char_length`` FBCompiler.LENGTH_FUNCTION_NAME = 'strlen'Pooling connections-------------------The default strategy used by SQLAlchemy to pool the database connectionsin particular cases may raise an ``OperationalError`` with a message`"object XYZ is in use"`. This happens on Firebird when there are twoconnections to the database, one is using, or has used, a particular tableand the other tries to drop or alter the same table. To garantee DDLoperations success Firebird recommend doing them as the single connected user.In case your SA application effectively needs to do DDL operations while otherconnections are active, the following setting may alleviate the problem:: from sqlalchemy import pool from sqlalchemy.databases.firebird import dialect # Force SA to use a single connection per thread dialect.poolclass = pool.SingletonThreadPool.. [#] Well, that is not the whole story, as the client may still ask a different (lower) dialect..... _dialects: http://mc-computing.com/Databases/Firebird/SQL_Dialect.html.. _kinterbasdb: http://sourceforge.net/projects/kinterbasdb"""import datetimefrom sqlalchemy import exceptions, schema, types as sqltypes, sql, utilfrom sqlalchemy.engine import base, default_initialized_kb = Falseclass FBNumeric(sqltypes.Numeric): """Handle ``NUMERIC(precision,length)`` datatype.""" def get_col_spec(self): if self.precision is None: return "NUMERIC" else: return "NUMERIC(%(precision)s, %(length)s)" % { 'precision': self.precision, 'length' : self.length } def bind_processor(self, dialect): return None def result_processor(self, dialect): if self.asdecimal: return None else: def process(value): if isinstance(value, util.decimal_type): return float(value) else: return value return processclass FBFloat(sqltypes.Float): """Handle ``FLOAT(precision)`` datatype.""" def get_col_spec(self): if not self.precision: return "FLOAT" else: return "FLOAT(%(precision)s)" % {'precision': self.precision}class FBInteger(sqltypes.Integer): """Handle ``INTEGER`` datatype.""" def get_col_spec(self): return "INTEGER"class FBSmallInteger(sqltypes.Smallinteger): """Handle ``SMALLINT`` datatype.""" def get_col_spec(self): return "SMALLINT"class FBDateTime(sqltypes.DateTime): """Handle ``TIMESTAMP`` datatype.""" def get_col_spec(self): return "TIMESTAMP" def bind_processor(self, dialect): def process(value): if value is None or isinstance(value, datetime.datetime): return value else: return datetime.datetime(year=value.year, month=value.month, day=value.day) return processclass FBDate(sqltypes.DateTime): """Handle ``DATE`` datatype.""" def get_col_spec(self): return "DATE"class FBTime(sqltypes.Time): """Handle ``TIME`` datatype.""" def get_col_spec(self): return "TIME"class FBText(sqltypes.Text): """Handle ``BLOB SUB_TYPE 1`` datatype (aka *textual* blob).""" def get_col_spec(self): return "BLOB SUB_TYPE 1"class FBString(sqltypes.String): """Handle ``VARCHAR(length)`` datatype.""" def get_col_spec(self): return "VARCHAR(%(length)s)" % {'length' : self.length}class FBChar(sqltypes.CHAR): """Handle ``CHAR(length)`` datatype.""" def get_col_spec(self): return "CHAR(%(length)s)" % {'length' : self.length}class FBBinary(sqltypes.Binary): """Handle ``BLOB SUB_TYPE 0`` datatype (aka *binary* blob).""" def get_col_spec(self): return "BLOB SUB_TYPE 0"class FBBoolean(sqltypes.Boolean): """Handle boolean values as a ``SMALLINT`` datatype.""" def get_col_spec(self): return "SMALLINT"colspecs = { sqltypes.Integer : FBInteger, sqltypes.Smallinteger : FBSmallInteger, sqltypes.Numeric : FBNumeric, sqltypes.Float : FBFloat, sqltypes.DateTime : FBDateTime, sqltypes.Date : FBDate, sqltypes.Time : FBTime, sqltypes.String : FBString, sqltypes.Binary : FBBinary, sqltypes.Boolean : FBBoolean, sqltypes.Text : FBText, sqltypes.CHAR: FBChar,}ischema_names = { 'SHORT': lambda r: FBSmallInteger(), 'LONG': lambda r: FBInteger(), 'QUAD': lambda r: FBFloat(), 'FLOAT': lambda r: FBFloat(), 'DATE': lambda r: FBDate(), 'TIME': lambda r: FBTime(), 'TEXT': lambda r: FBString(r['flen']), 'INT64': lambda r: FBNumeric(precision=r['fprec'], length=r['fscale'] * -1), # This generically handles NUMERIC() 'DOUBLE': lambda r: FBFloat(), 'TIMESTAMP': lambda r: FBDateTime(), 'VARYING': lambda r: FBString(r['flen']), 'CSTRING': lambda r: FBChar(r['flen']), 'BLOB': lambda r: r['stype']==1 and FBText() or FBBinary() }def descriptor(): return {'name':'firebird', 'description':'Firebird', 'arguments':[ ('host', 'Host Server Name', None), ('database', 'Database Name', None), ('user', 'Username', None), ('password', 'Password', None) ]}class FBExecutionContext(default.DefaultExecutionContext): passclass FBDialect(default.DefaultDialect): """Firebird dialect""" supports_sane_rowcount = False supports_sane_multi_rowcount = False max_identifier_length = 31 preexecute_pk_sequences = True supports_pk_autoincrement = False def __init__(self, type_conv=200, concurrency_level=1, **kwargs): default.DefaultDialect.__init__(self, **kwargs) self.type_conv = type_conv self.concurrency_level= concurrency_level def dbapi(cls): import kinterbasdb return kinterbasdb dbapi = classmethod(dbapi) def create_connect_args(self, url): opts = url.translate_connect_args(username='user') if opts.get('port'): opts['host'] = "%s/%s" % (opts['host'], opts['port']) del opts['port'] opts.update(url.query) type_conv = opts.pop('type_conv', self.type_conv) concurrency_level = opts.pop('concurrency_level', self.concurrency_level) global _initialized_kb if not _initialized_kb and self.dbapi is not None: _initialized_kb = True self.dbapi.init(type_conv=type_conv, concurrency_level=concurrency_level) return ([], opts) def create_execution_context(self, *args, **kwargs): return FBExecutionContext(self, *args, **kwargs) def type_descriptor(self, typeobj): return sqltypes.adapt_type(typeobj, colspecs) def server_version_info(self, connection): """Get the version of the Firebird server used by a connection. Returns a tuple of (`major`, `minor`, `build`), three integers representing the version of the attached server. """ # This is the simpler approach (the other uses the services api), # that for backward compatibility reasons returns a string like # LI-V6.3.3.12981 Firebird 2.0 # where the first version is a fake one resembling the old # Interbase signature. This is more than enough for our purposes, # as this is mainly (only?) used by the testsuite. from re import match fbconn = connection.connection.connection version = fbconn.server_version m = match('\w+-V(\d+)\.(\d+)\.(\d+)\.(\d+) \w+ (\d+)\.(\d+)', version) if not m: raise exceptions.AssertionError("Could not determine version from string '%s'" % version) return tuple([int(x) for x in m.group(5, 6, 4)]) def _normalize_name(self, name): """Convert the name to lowercase if it is possible""" # Remove trailing spaces: FB uses a CHAR() type, # that is padded with spaces name = name and name.rstrip() if name is None: return None elif name.upper() == name and not self.identifier_preparer._requires_quotes(name.lower()): return name.lower() else: return name def _denormalize_name(self, name): """Revert a *normalized* name to its uppercase equivalent""" if name is None: return None elif name.lower() == name and not self.identifier_preparer._requires_quotes(name.lower()): return name.upper() else: return name def table_names(self, connection, schema): """Return a list of *normalized* table names omitting system relations.""" s = """ SELECT r.rdb$relation_name FROM rdb$relations r WHERE r.rdb$system_flag=0 """
?? 快捷鍵說明
復制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號
Ctrl + =
減小字號
Ctrl + -