?? mysql.py
字號(hào):
resultset = connection.execute("XA RECOVER") return [row['data'][0:row['gtrid_length']] for row in resultset] def do_ping(self, connection): connection.ping() def is_disconnect(self, e): if isinstance(e, self.dbapi.OperationalError): return e.args[0] in (2006, 2013, 2014, 2045, 2055) elif isinstance(e, self.dbapi.InterfaceError): # if underlying connection is closed, this is the error you get return "(0, '')" in str(e) else: return False def get_default_schema_name(self, connection): try: return self._default_schema_name except AttributeError: name = self._default_schema_name = \ connection.execute('SELECT DATABASE()').scalar() return name def table_names(self, connection, schema): """Return a Unicode SHOW TABLES from a given schema.""" charset = self._detect_charset(connection) self._autoset_identifier_style(connection) rp = connection.execute("SHOW TABLES FROM %s" % self.identifier_preparer.quote_identifier(schema)) return [row[0] for row in _compat_fetchall(rp, charset=charset)] def has_table(self, connection, table_name, schema=None): # SHOW TABLE STATUS LIKE and SHOW TABLES LIKE do not function properly # on macosx (and maybe win?) with multibyte table names. # # TODO: if this is not a problem on win, make the strategy swappable # based on platform. DESCRIBE is slower. # [ticket:726] # full_name = self.identifier_preparer.format_table(table, # use_schema=True) self._autoset_identifier_style(connection) full_name = '.'.join(self.identifier_preparer._quote_free_identifiers( schema, table_name)) st = "DESCRIBE %s" % full_name rs = None try: try: rs = connection.execute(st) have = rs.rowcount > 0 rs.close() return have except exceptions.SQLError, e: if e.orig.args[0] == 1146: return False raise finally: if rs: rs.close() def server_version_info(self, connection): """A tuple of the database server version. Formats the remote server version as a tuple of version values, e.g. ``(5, 0, 44)``. If there are strings in the version number they will be in the tuple too, so don't count on these all being ``int`` values. This is a fast check that does not require a round trip. It is also cached per-Connection. """ try: return connection.info['_mysql_server_version_info'] except KeyError: version = connection.info['_mysql_server_version_info'] = \ self._server_version_info(connection.connection.connection) return version def _server_version_info(self, dbapi_con): """Convert a MySQL-python server_info string into a tuple.""" version = [] r = re.compile('[.\-]') for n in r.split(dbapi_con.get_server_info()): try: version.append(int(n)) except ValueError: version.append(n) return tuple(version) # @deprecated def get_version_info(self, connectable): """A tuple of the database server version. Deprecated, use ``server_version_info()``. """ if isinstance(connectable, engine_base.Engine): connectable = connectable.contextual_connect() return self.server_version_info(connectable) get_version_info = util.deprecated(get_version_info) def reflecttable(self, connection, table, include_columns): """Load column definitions from the server.""" charset = self._detect_charset(connection) self._autoset_identifier_style(connection) try: reflector = self.reflector except AttributeError: preparer = self.identifier_preparer if (self.server_version_info(connection) < (4, 1) and self.use_ansiquotes): # ANSI_QUOTES doesn't affect SHOW CREATE TABLE on < 4.1 preparer = MySQLIdentifierPreparer(self) self.reflector = reflector = MySQLSchemaReflector(preparer) sql = self._show_create_table(connection, table, charset) if sql.startswith('CREATE ALGORITHM'): # Adapt views to something table-like. columns = self._describe_table(connection, table, charset) sql = reflector._describe_to_create(table, columns) self._adjust_casing(connection, table, charset) return reflector.reflect(connection, table, sql, charset, only=include_columns) def _adjust_casing(self, connection, table, charset=None): """Adjust Table name to the server case sensitivity, if needed.""" if charset is None: charset = self._detect_charset(connection) casing = self._detect_casing(connection, charset) # For winxx database hosts. TODO: is this really needed? if casing == 1 and table.name != table.name.lower(): table.name = table.name.lower() lc_alias = schema._get_table_key(table.name, table.schema) table.metadata.tables[lc_alias] = table def _detect_charset(self, connection): """Sniff out the character set in use for connection results.""" # Allow user override, won't sniff if force_charset is set. if 'force_charset' in connection.info: return connection.info['force_charset'] # Note: MySQL-python 1.2.1c7 seems to ignore changes made # on a connection via set_character_set() if self.server_version_info(connection) < (4, 1, 0): try: return connection.connection.character_set_name() except AttributeError: # < 1.2.1 final MySQL-python drivers have no charset support. # a query is needed. pass # Prefer 'character_set_results' for the current connection over the # value in the driver. SET NAMES or individual variable SETs will # change the charset without updating the driver's view of the world. # # If it's decided that issuing that sort of SQL leaves you SOL, then # this can prefer the driver value. rs = connection.execute("SHOW VARIABLES LIKE 'character_set%%'") opts = dict([(row[0], row[1]) for row in _compat_fetchall(rs)]) if 'character_set_results' in opts: return opts['character_set_results'] try: return connection.connection.character_set_name() except AttributeError: # Still no charset on < 1.2.1 final... if 'character_set' in opts: return opts['character_set'] else: util.warn( "Could not detect the connection character set with this " "combination of MySQL server and MySQL-python. " "MySQL-python >= 1.2.2 is recommended. Assuming latin1.") return 'latin1' def _detect_casing(self, connection, charset=None): """Sniff out identifier case sensitivity. Cached per-connection. This value can not change without a server restart. """ # http://dev.mysql.com/doc/refman/5.0/en/name-case-sensitivity.html try: return connection.info['lower_case_table_names'] except KeyError: row = _compat_fetchone(connection.execute( "SHOW VARIABLES LIKE 'lower_case_table_names'"), charset=charset) if not row: cs = 0 else: # 4.0.15 returns OFF or ON according to [ticket:489] # 3.23 doesn't, 4.0.27 doesn't.. if row[1] == 'OFF': cs = 0 elif row[1] == 'ON': cs = 1 else: cs = int(row[1]) row.close() connection.info['lower_case_table_names'] = cs return cs def _detect_collations(self, connection, charset=None): """Pull the active COLLATIONS list from the server. Cached per-connection. """ try: return connection.info['collations'] except KeyError: collations = {} if self.server_version_info(connection) < (4, 1, 0): pass else: rs = connection.execute('SHOW COLLATION') for row in _compat_fetchall(rs, charset): collations[row[0]] = row[1] connection.info['collations'] = collations return collations def use_ansiquotes(self, useansi): self._use_ansiquotes = useansi if useansi: self.preparer = MySQLANSIIdentifierPreparer else: self.preparer = MySQLIdentifierPreparer # icky if hasattr(self, 'identifier_preparer'): self.identifier_preparer = self.preparer(self) if hasattr(self, 'reflector'): del self.reflector use_ansiquotes = property(lambda s: s._use_ansiquotes, use_ansiquotes, doc="True if ANSI_QUOTES is in effect.") def _autoset_identifier_style(self, connection, charset=None): """Detect and adjust for the ANSI_QUOTES sql mode. If the dialect's use_ansiquotes is unset, query the server's sql mode and reset the identifier style. Note that this currently *only* runs during reflection. Ideally this would run the first time a connection pool connects to the database, but the infrastructure for that is not yet in place. """ if self.use_ansiquotes is not None: return row = _compat_fetchone( connection.execute("SHOW VARIABLES LIKE 'sql_mode'", charset=charset)) if not row: mode = '' else: mode = row[1] or '' # 4.0 if mode.isdigit(): mode_no = int(mode) mode = (mode_no | 4 == mode_no) and 'ANSI_QUOTES' or '' self.use_ansiquotes = 'ANSI_QUOTES' in mode def _show_create_table(self, connection, table, charset=None, full_name=None): """Run SHOW CREATE TABLE for a ``Table``.""" if full_name is None: full_name = self.identifier_preparer.format_table(table) st = "SHOW CREATE TABLE %s" % full_name rp = None try: try: rp = connection.execute(st) except exceptions.SQLError, e: if e.orig.args[0] == 1146: raise exceptions.NoSuchTableError(full_name) else: raise row = _compat_fetchone(rp, charset=charset) if not row: raise exceptions.NoSuchTableError(full_name) return row[1].strip() finally: if rp: rp.close() return sql def _describe_table(self, connection, table, charset=None, full_name=None): """Run DESCRIBE for a ``Table`` and return processed rows.""" if full_name is None: full_name = self.identifier_preparer.format_table(table) st = "DESCRIBE %s" % full_name rp, rows = None, None try: try: rp = connection.execute(st) except exceptions.SQLError, e: if e.orig.args[0] == 1146: raise exceptions.NoSuchTableError(full_name) else: raise rows = _compat_fetchall(rp, charset=charset) finally: if rp: rp.close() return rowsclass _MySQLPythonRowProxy(object): """Return consistent column values for all versions of MySQL-python. Smooth over data type issues (esp. with alpha driver versions) and normalize strings as Unicode regardless of user-configured driver encoding settings. """ # Some MySQL-python versions can return some columns as # sets.Set(['value']) (seriously) but thankfully that doesn't # seem to come up in DDL queries. def __init__(self, rowproxy, charset): self.rowproxy = rowproxy self.charset = charset def __getitem__(self, index): item = self.rowproxy[index] if isinstance(item, _array): item = item.tostring() if self.charset and isinstance(item, str): return item.decode(self.charset) else: return item def __getattr__(self, attr): item = getattr(self.rowproxy, attr) if isinstance(item, _array): item = item.tostring() if self.charset and isinstance(item, str): return item.decode(self.charset) else: return itemclass MySQLCompiler(compiler.DefaultCompiler): operators = compiler.DefaultCompiler.operators.copy() operators.update({ sql_operators.concat_op: lambda x, y: "concat(%s, %s)" % (x, y), sql_operators.mod: '%%' }) def visit_typeclause(self, typeclause): type_ = typeclause.type.dialect_impl(self.dialect) if isinstance(type_, MSInteger): if getattr(type_, 'unsigned', False): return 'UNSIGNED INTEGER' el
?? 快捷鍵說明
復(fù)制代碼
Ctrl + C
搜索代碼
Ctrl + F
全屏模式
F11
切換主題
Ctrl + Shift + D
顯示快捷鍵
?
增大字號(hào)
Ctrl + =
減小字號(hào)
Ctrl + -