SQLAlchemy是Python SQL工具箱和ORM框架,它为应用程序开发人员提供了全面而灵活的SQL功能。它提供了一整套企业级持久化方案,旨在高效,高性能地访问数据库,并符合Pythonic之禅。项目代码量比较大,接近200个文件,7万行代码, 我们一起来挑战一下。由于篇幅原因,分成上下两篇,上篇我们学习了core部分的engine,dialect,connection和pool等部分,下篇主要学习core部分剩余的sql表达式和orm部分,包括如下内容:
上篇中,我们使用的sql都是手工编写的语句,下面这样:
create table x (a integer, b integer)
insert into x (a, b) values (1, 1)
在sqlalchemy中可以通过定义schema的方式进行数据操作,完整的示例如下:
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.sql import select
engine = create_engine('sqlite:///:memory:', echo=True)
metadata = MetaData()
users = Table('users', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('fullname', String),
)
metadata.create_all(engine)
ins = users.insert().values(name='jack', fullname='Jack Jones')
print(ins)
result = engine.execute(ins)
print(result, result.inserted_primary_key)
s = select([users])
result = conn.execute(s)
for row in result:
print(row)
result = engine.execute("select * from users")
for row in result:
print(row)
示例程序的执行过程:
下面是示例的执行日志,清晰展示了上面过程:
...
2021-04-19 10:02:09,166 INFO sqlalchemy.engine.base.Engine
CREATE TABLE users (
id INTEGER NOT NULL,
name VARCHAR,
fullname VARCHAR,
PRIMARY KEY (id)
)
2021-04-19 10:02:09,166 INFO sqlalchemy.engine.base.Engine ()
2021-04-19 10:02:09,167 INFO sqlalchemy.engine.base.Engine COMMIT
INSERT INTO users (name, fullname) VALUES (:name, :fullname)
2021-04-19 10:02:09,167 INFO sqlalchemy.engine.base.Engine INSERT INTO users (name, fullname) VALUES (?, ?)
2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine ('jack', 'Jack Jones')
2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine COMMIT
<sqlalchemy.engine.result.ResultProxy object at 0x7ffca0607070> [1]
2021-04-27 11:38:19,134 INFO sqlalchemy.engine.base.Engine SELECT users.id, users.name, users.fullname
FROM users
2021-04-27 11:38:19,134 INFO sqlalchemy.engine.base.Engine ()
(1, 'jack', 'Jack Jones')
2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine select * from users
2021-04-19 10:02:09,168 INFO sqlalchemy.engine.base.Engine ()
(1, 'jack', 'Jack Jones')
在开始之前,我们需要简单了解一下SQL语句的分类:
在我们的schema使用示例中,就包括了DDL,DML和DQL三种类型的语句,下面我们按照这3种类型,详细了解一下sqlalchemy的sql表达式部分。sql表达式主要在sql包中,部分文件的功能如下:
模块 | 描述 |
---|---|
base.py | 基础类 |
compiler.py | sql编译 |
crud.py | crud的参数处理 |
ddl.py | DDL语句 |
default_comparator.py | 比较 |
dml.py | DML语句 |
elements.py | 基本类型 |
operators.py | sql操作符 |
schema.py | schema定义 |
selectable.py | DQL |
sqltypes.py&&type_api.py | sql数据类型 |
vistitors.py | 递归算法 |
首先了解一下schema的基础实现visitable:
class VisitableType(type):
def __init__(cls, clsname, bases, clsdict):
if clsname != "Visitable" and hasattr(cls, "__visit_name__"):
_generate_dispatch(cls)
super(VisitableType, cls).__init__(clsname, bases, clsdict)
def _generate_dispatch(cls):
if "__visit_name__" in cls.__dict__:
visit_name = cls.__visit_name__
if isinstance(visit_name, str):
getter = operator.attrgetter("visit_%s" % visit_name)
def _compiler_dispatch(self, visitor, **kw):
try:
meth = getter(visitor)
except AttributeError:
raise exc.UnsupportedCompilationError(visitor, cls)
else:
return meth(self, **kw)
cls._compiler_dispatch = _compiler_dispatch
class Visitable(util.with_metaclass(VisitableType, object)):
pass
Visitable约定子类必须提供 visit_name 的类属性,用来绑定编译方法。参与sql的类都继承自Visitable:
class SchemaItem(SchemaEventTarget, visitors.Visitable):
__visit_name__ = "schema_item"
class MetaData(SchemaItem):
__visit_name__ = "metadata"
class Table(DialectKWArgs, SchemaItem, TableClause):
__visit_name__ = "table"
class Column(DialectKWArgs, SchemaItem, ColumnClause):
__visit_name__ = "column"
class TypeEngine(Visitable):
...
class Integer(_LookupExpressionAdapter, TypeEngine):
__visit_name__ = "integer"
MetaData是schema的集合,记录了所有的Table定义, 通过 _add_table
函数用来添加表:
class MetaData(SchemaItem):
def __init__(
self,
bind=None,
reflect=False,
schema=None,
quote_schema=None,
naming_convention=None,
info=None,
):
# table集合
self.tables = util.immutabledict()
self.schema = quoted_name(schema, quote_schema)
self._schemas = set()
def _add_table(self, name, schema, table):
key = _get_table_key(name, schema)
dict.__setitem__(self.tables, key, table)
if schema:
self._schemas.add(schema)
Table是column的集合,在创建table对象的时候,把自己添加到metadata中:
class Table(DialectKWArgs, SchemaItem, TableClause):
def __new__(cls, *args, **kw):
name, metadata, args = args[0], args[1], args[2:]
schema = metadata.schema
table = object.__new__(cls)
# 添加到metadata
metadata._add_table(name, schema, table)
table._init(name, metadata, *args, **kw)
return table
def _init(self, name, metadata, *args, **kwargs):
super(Table, self).__init__(
quoted_name(name, kwargs.pop("quote", None))
)
self.metadata = metadata
self.schema = metadata.schema
# column集合
self._columns = ColumnCollection()
self._init_items(*args)
def _init_items(self, *args):
# column
for item in args:
if item is not None:
item._set_parent_with_dispatch(self)
Column是通过下面的方法将column添加到table的colummns中:
class Column(DialectKWArgs, SchemaItem, ColumnClause):
def __init__(self, *args, **kwargs):
pass
def _set_parent(self, table):
table._columns.replace(self)
class ColumnCollection(util.OrderedProperties):
def replace(self, column):
...
self._data[column.key] = column
...
现阶段,我们大概厘清了metadata,table和column的数据结构:metadata持有table集合,table持有column集合。接下来我们看看这个数据结构如何转换成sql语句,API是通过 MetaData.create_all
函数实现:
class MetaData(SchemaItem):
def create_all(self, bind=None, tables=None, checkfirst=True):
bind._run_visitor(
ddl.SchemaGenerator, self, checkfirst=checkfirst, tables=tables
)
class Engine(Connectable, log.Identified):
def _run_visitor(
self, visitorcallable, element, connection=None, **kwargs
):
with self._optional_conn_ctx_manager(connection) as conn:
conn._run_visitor(visitorcallable, element, **kwargs)
class Connection(Connectable):
def _run_visitor(self, visitorcallable, element, **kwargs):
visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
create-table的sql编译主要由ddl中的SchemaGenerator实现, 下面是SchemaGenerator的继承关系和核心的traverse_single函数:
class ClauseVisitor(object):
def traverse_single(self, obj, **kw):
# 遍历所有的visit实现
for v in self.visitor_iterator:
meth = getattr(v, "visit_%s" % obj.__visit_name__, None)
if meth:
return meth(obj, **kw)
@property
def visitor_iterator(self):
v = self
while v:
yield v
v = getattr(v, "_next", None)
class SchemaVisitor(ClauseVisitor):
...
class DDLBase(SchemaVisitor):
...
class SchemaGenerator(DDLBase):
...
创建meta,table和columun的过程:
class SchemaGenerator(DDLBase):
def visit_metadata(self, metadata):
tables = list(metadata.tables.values())
collection = sort_tables_and_constraints(
[t for t in tables if self._can_create_table(t)]
)
for table, fkcs in collection:
if table is not None:
# 创建表
self.traverse_single(
table,
create_ok=True,
include_foreign_key_constraints=fkcs,
_is_metadata_operation=True,
)
def visit_table(
self,
table,
create_ok=False,
include_foreign_key_constraints=None,
_is_metadata_operation=False,
):
for column in table.columns:
if column.default is not None:
# 创建column-DDLElement
self.traverse_single(column.default)
self.connection.execute(
# fmt: off
# 创建create-table-DDLElement
CreateTable(
table,
include_foreign_key_constraints= # noqa
include_foreign_key_constraints,
)
# fmt: on
)
CreateTableDDLElement和CreateColumnDDLElement的继承关系:
class _DDLCompiles(ClauseElement):
def _compiler(self, dialect, **kw):
return dialect.ddl_compiler(dialect, self, **kw)
class DDLElement(Executable, _DDLCompiles):
...
class _CreateDropBase(DDLElement):
...
class CreateTable(_CreateDropBase):
__visit_name__ = "create_table"
def __init__(
self, element, on=None, bind=None, include_foreign_key_constraints=None
):
super(CreateTable, self).__init__(element, on=on, bind=bind)
self.columns = [CreateColumn(column) for column in element.columns]
class CreateColumn(_DDLCompiles):
__visit_name__ = "create_column"
def __init__(self, element):
self.element = element
最终这些DDLElement在compiler中被DDLCompiler编译成sql语句, CREATE TABLE
是这样被编译的:
def visit_create_table(self, create):
table = create.element
preparer = self.preparer
text = "\nCREATE "
if table._prefixes:
text += " ".join(table._prefixes) + " "
text += "TABLE " + preparer.format_table(table) + " "
create_table_suffix = self.create_table_suffix(table)
if create_table_suffix:
text += create_table_suffix + " "
text += "("
separator = "\n"
# if only one primary key, specify it along with the column
first_pk = False
for create_column in create.columns:
column = create_column.element
try:
processed = self.process(
create_column, first_pk=column.primary_key and not first_pk
)
if processed is not None:
text += separator
separator = ", \n"
text += "\t" + processed
if column.primary_key:
first_pk = True
except exc.CompileError as ce:
...
const = self.create_table_constraints(
table,
_include_foreign_key_constraints=create.include_foreign_key_constraints, # noqa
)
if const:
text += separator + "\t" + const
text += "\n)%s\n\n" % self.post_create_table(table)
return text
def visit_create_column(self, create, first_pk=False):
column = create.element
text = self.get_column_specification(column, first_pk=first_pk)
const = " ".join(
self.process(constraint) for constraint in column.constraints
)
if const:
text += " " + const
return text
在前面column介绍中,我们略过了数据类型。大家都知道sql的数据类型和python数据类型有差异, 下面是一些常见的SQL数据类型:
class TypeEngine(Visitable):
...
class Integer(_LookupExpressionAdapter, TypeEngine):
__visit_name__ = "integer"
...
class String(Concatenable, TypeEngine):
__visit_name__ = "string"
...
class CHAR(String):
__visit_name__ = "CHAR"
...
class VARCHAR(String):
__visit_name__ = "VARCHAR"
...
数据类型由GenericTypeCompiler进行编译:
class TypeCompiler(util.with_metaclass(util.EnsureKWArgType, object)):
def process(self, type_, **kw):
return type_._compiler_dispatch(self, **kw)
class GenericTypeCompiler(TypeCompiler):
def visit_INTEGER(self, type_, **kw):
return "INTEGER"
def visit_string(self, type_, **kw):
return self.visit_VARCHAR(type_, **kw)
def visit_VARCHAR(self, type_, **kw):
return self._render_string_type(type_, "VARCHAR")
def _render_string_type(self, type_, name):
text = name
if type_.length:
text += "(%d)" % type_.length
if type_.collation:
text += ' COLLATE "%s"' % type_.collation
return text
数据插入的API由TableClause提供的insert函数:
class TableClause(Immutable, FromClause):
@util.dependencies("sqlalchemy.sql.dml")
def insert(self, dml, values=None, inline=False, **kwargs):
return dml.Insert(self, values=values, inline=inline, **kwargs)
dml中提供了Insert类的实现:
class UpdateBase(
HasCTE, DialectKWArgs, HasPrefixes, Executable, ClauseElement
):
...
class ValuesBase(UpdateBase):
...
class Insert(ValuesBase):
__visit_name__ = "insert"
...
按照ddl的经验,我们查找insert语句的编译方法,在SQLCompiler中:
class SQLCompiler(Compiled):
def visit_insert(self, insert_stmt, asfrom=False, **kw):
crud_params = crud._setup_crud_params(
self, insert_stmt, crud.ISINSERT, **kw
)
if insert_stmt._has_multi_parameters:
crud_params_single = crud_params[0]
else:
crud_params_single = crud_params
preparer = self.preparer
supports_default_values = self.dialect.supports_default_values
text = "INSERT "
text += "INTO "
table_text = preparer.format_table(insert_stmt.table)
if crud_params_single or not supports_default_values:
text += " (%s)" % ", ".join(
[preparer.format_column(c[0]) for c in crud_params_single]
)
...
if insert_stmt.select is not None:
select_text = self.process(self._insert_from_select, **kw)
if self.ctes and toplevel and self.dialect.cte_follows_insert:
text += " %s%s" % (self._render_cte_clause(), select_text)
else:
text += " %s" % select_text
elif not crud_params and supports_default_values:
text += " DEFAULT VALUES"
elif insert_stmt._has_multi_parameters:
text += " VALUES %s" % (
", ".join(
"(%s)" % (", ".join(c[1] for c in crud_param_set))
for crud_param_set in crud_params
)
)
else:
text += " VALUES (%s)" % ", ".join([c[1] for c in crud_params])
return text
可以看到insert语句就是对Insert对象,通过字符串模版拼接而来。
数据查询select语句也都有特定的数据结构Select,继承关系如下:
class SelectBase(HasCTE, Executable, FromClause):
...
class GenerativeSelect(SelectBase):
...
class Select(HasPrefixes, HasSuffixes, GenerativeSelect):
__visit_name__ = "select"
def __init__(
self,
columns=None,
whereclause=None,
from_obj=None,
distinct=False,
having=None,
correlate=True,
prefixes=None,
suffixes=None,
**kwargs
):
GenerativeSelect.__init__(self, **kwargs)
...
select的编译语句也在SQLCompiler中:
class SQLCompiler(Compiled):
def visit_select(
self,
select,
asfrom=False,
parens=True,
fromhints=None,
compound_index=0,
nested_join_translation=False,
select_wraps_for=None,
lateral=False,
**kwargs
):
...
froms = self._setup_select_stack(select, entry, asfrom, lateral)
column_clause_args = kwargs.copy()
column_clause_args.update(
{"within_label_clause": False, "within_columns_clause": False}
)
text = "SELECT " # we're off to a good start !
text += self.get_select_precolumns(select, **kwargs)
# the actual list of columns to print in the SELECT column list.
inner_columns = [
c
for c in [
self._label_select_column(
select,
column,
populate_result_map,
asfrom,
column_clause_args,
name=name,
)
for name, column in select._columns_plus_names
]
if c is not None
]
...
text = self._compose_select_body(
text, select, inner_columns, froms, byfrom, kwargs
)
if select._statement_hints:
per_dialect = [
ht
for (dialect_name, ht) in select._statement_hints
if dialect_name in ("*", self.dialect.name)
]
if per_dialect:
text += " " + self.get_statement_hint_text(per_dialect)
if self.ctes and toplevel:
text = self._render_cte_clause() + text
if select._suffixes:
text += " " + self._generate_prefixes(
select, select._suffixes, **kwargs
)
self.stack.pop(-1)
if (asfrom or lateral) and parens:
return "(" + text + ")"
else:
return text
select语句一样是采用字符串拼接得到。
orm的使用和schema使用方式略有不同, 下面是orm的示例:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
engine = create_engine('sqlite:///:memory:', echo=True)
Model = declarative_base()
class User(Model):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
fullname = Column(String)
nickname = Column(String)
def __repr__(self):
return "<User(name='%s', fullname='%s', nickname='%s')>" % (
self.name, self.fullname, self.nickname)
Model.metadata.create_all(engine)
print("="*10)
Session = sessionmaker(bind=engine)
session = Session()
ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname')
session.add(ed_user)
session.commit()
print(ed_user.id)
result = engine.execute("select * from users")
for row in result:
print(row)
对比schema和orm的差异,可以得到下表:
schema方式|orm方式 创建engine,用于数据库连接|- 创建metadata,用于管理schema|创建Model 创建users表的Table|创建User模型 将metadata提交到engine(创建表)|- -|创建session 使用users插入数据|使用session插入数据
总结一下主要就2点差异:
Model类使用declarative_base动态创建:
class DeclarativeMeta(type):
def __init__(cls, classname, bases, dict_):
if "_decl_class_registry" not in cls.__dict__:
_as_declarative(cls, classname, cls.__dict__)
type.__init__(cls, classname, bases, dict_)
def __setattr__(cls, key, value):
_add_attribute(cls, key, value)
def __delattr__(cls, key):
_del_attribute(cls, key)
def declarative_base(
bind=None,
metadata=None,
mapper=None,
cls=object,
name="Base",
constructor=_declarative_constructor,
class_registry=None,
metaclass=DeclarativeMeta,
):
# 创建metadata
lcl_metadata = metadata or MetaData()
if class_registry is None:
class_registry = weakref.WeakValueDictionary()
bases = not isinstance(cls, tuple) and (cls,) or cls
class_dict = dict(
_decl_class_registry=class_registry, metadata=lcl_metadata
)
# 构造函数
if constructor:
class_dict["__init__"] = constructor
if mapper:
class_dict["__mapper_cls__"] = mapper
# class-meta
return metaclass(name, bases, class_dict)
关于如何动态创建类,在小技巧中进行介绍。declarative_base主要定义了Model类的几个特性:
__init__
使用_declarative_constructor先从构造函数_declarative_constructor开始:
def _declarative_constructor(self, **kwargs):
cls_ = type(self)
for k in kwargs:
if not hasattr(cls_, k):
raise TypeError(
"%r is an invalid keyword argument for %s" % (k, cls_.__name__)
)
setattr(self, k, kwargs[k])
_declarative_constructor.__name__ = "__init__"
看起来非常简单,但是这里做了一个类和对象实例之间的校验转换。我们先看一段演示代码:
class DummyModel(object):
name = ["dummy_model"] # 引用类型
a = DummyModel()
b = DummyModel()
assert id(a.name) == id(b.name) == id(DummyModel.name)
a.name.append("a")
assert id(a.name) == id(b.name) == id(DummyModel.name)
DummyModel的类属性name和a对象的name属性都是同一个引用。如果使用Model类:
Model = declarative_base()
class UserModel(Model):
__tablename__ = 'user' # 必须字段
id = Column(Integer, primary_key=True) # 必须字段
name = Column(String)
c = UserModel()
c.name = "c"
d = UserModel()
d.name = "d"
# 注意并不是Column
assert isinstance(UserModel.name, InstrumentedAttribute)
assert isinstance(c.name, str)
assert d.name == "d"
assert id(c.name) != id(d.name) != id(UserModel.name)
可以发现UserModel的类属性name和d对象的name属性完全不一样,类定义的是Cloumn(InstrumentedAttribute),对象变成了str。这个就是orm模型的特性之一,Model是定义格式模版,对象实例化后转化为普通数据。
Model的另外一个功能是隐式创建Table对象,在_as_declarative函数中通过_MapperConfig实现
class _MapperConfig(object):
def setup_mapping(cls, cls_, classname, dict_):
cfg_cls = _MapperConfig
cfg_cls(cls_, classname, dict_)
def __init__(self, cls_, classname, dict_):
...
self._setup_table()
...
def _setup_table(self):
...
table_cls = Table
args, table_kw = (), {}
if table_args:
if isinstance(table_args, dict):
table_kw = table_args
elif isinstance(table_args, tuple):
if isinstance(table_args[-1], dict):
args, table_kw = table_args[0:-1], table_args[-1]
else:
args = table_args
autoload = dict_.get("__autoload__")
if autoload:
table_kw["autoload"] = True
cls.__table__ = table = table_cls(
tablename,
cls.metadata,
*(tuple(declared_columns) + tuple(args)),
**table_kw
)
...
而Column是通过下面的函数实现:
def _add_attribute(cls, key, value):
if "__mapper__" in cls.__dict__:
if isinstance(value, Column):
_undefer_column_name(key, value)
cls.__table__.append_column(value)
cls.__mapper__.add_property(key, value)
...
else:
type.__setattr__(cls, key, value)
Model通过上面的方式,隐式创建了Schema(Table),实际使用过程中只需要使用Model类,不用关注Schema的定义。
session的源码由于篇幅和时间有限,留待以后再行分析
sqlalchemy可以在低层次上提供了sql语句的方式使用;在次层次上提供定义schema方式使用;在高层次上提供orm的实现,让应用可以根据项目的特点自主选择不同层级的API。
使用schema时候,主要使用Metadata,Table和Column等定义Schema数据结构,使用编译器自动将schema转换成合法的sql语句。
使用orm的时候,则是创建特定的数据模型,模型对象会隐式创建schema,通过session方式进行数据访问。
最后再回顾一下sqlalchemy的架构图:
architecture
sqlalchemy中提供了一个动态创建类的方式,主要在declarative_base和DeclarativeMeta中实现。我参考这个实现方式做了一个类工厂:
class DeclarativeMeta(type):
def __init__(cls, klass_name, bases, dict_):
print("class_init", klass_name, bases, dict_)
type.__init__(cls, klass_name, bases, dict_)
def get_attr(self, key):
print("getattr", self, key)
return self.__dict__[key]
def constructor(self, *args, **kwargs):
print("constructor", self, args, kwargs)
for k, v in kwargs.items():
setattr(self, k, v)
def dynamic_class(name):
class_dict = {
"__init__": constructor,
"__getattr__": get_attr
}
return DeclarativeMeta(name, (object,), class_dict)
DummyModel = dynamic_class("Dummy")
dummy = DummyModel(1, name="hello", age=18)
print(dummy, type(dummy), dummy.name, dummy.age)
# class_init Dummy (<class 'object'>,) {'__init__': <function test_dynamic_class.<locals>.constructor at 0x7f898827ef70>, '__getattr__': <function test_dynamic_class.<locals>.get_attr at 0x7f89882105e0>}
# constructor <sample.Dummy object at 0x7f89882a5820> (1,) {'name': 'hello', 'age': 18}
# <sample.Dummy object at 0x7f89882a5820> <class 'sample.Dummy'> hello 18
示例中我动态创建了一个DummyModel类,type(dummy)
可以看到,这个类名是 Dummy。这个类可以的构造函数可以接受name和age两个属性。这种创建方式和collections.namedtuple有点类似。
sqlalchemy的源码非常复杂,前前后后一共准备了一个月,形成的2篇文档仅仅涉及核心流程和用法,细节部分缺失较多,以后有机会还需要继续阅读。在这一个月中,克服了工作较忙,没有时间写稿的烦躁;克服了阅读进入困境,一度想放弃的心理障碍;克服了deadline临近,文稿还只是一个雏形,使用存稿顶替的羞愧;克服了笔记软件故障,写完的文稿丢失,完全重写的懊恼。战胜这些困难,最终还是得以完成,心理上有大满足。当然最大的收获还是对ORM中间件有了初步的了解,也希望梳理的ORM流程对大家有一定的帮助,如果获得大家的支持会更加满意♥️。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8