TinyDB是一个小型,简单易用,面向文档的数据库;代码仅1800行,纯python编写。TinyDB项目大小刚好,学习它可以了解NOSQL数据库的实现,本文包括下面几部分:
本次阅读采用的版本号是 4.0.0
, 项目结构如下:
文件 | 描述 |
---|---|
database | database的实现 |
middlewares | 中间件实现,包括cache |
operations | 对database的一些操作方法 |
queries | 查询功能实现 |
storages | 存储功能实现 |
table | 文档;表/集合功能实现 |
utils | 工具类 |
version | 版本号 |
项目的类图:
tinydb类图
从类图可以看到,代码主要集中在database,table和query三个部分。
TinyDB的使用示例:
from tinydb import TinyDB
from tinydb import Query
from tinydb import JSONStorage
from tinydb.middlewares import CachingMiddleware
db = TinyDB('cache_db.json', storage=CachingMiddleware(JSONStorage))
db.purge_tables() # 重置数据
db.insert({'int': 1, 'char': 'a'}) # 插入数据
db.insert({'int': 2, 'char': 'b'})
table = db.table('user')
table.insert({'name': "shawn", "age": 18})
table.insert({'name': "shelton", "age": 28})
print(table.all()) # [{'name': 'shawn', 'age': 18}, {'name': 'shelton', 'age': 28}]
User = Query()
table.update({'name': 'shawn', 'age': 19}, User.name == 'shawn') # 修改数据
print(table.search(User.name == 'shawn')) # [{'name': 'shawn', 'age': 19}]
table.remove(User.name == 'shawn') # 删除数据
db.close()
上面的示例演示了数据库的CRUD等基础功能,可见tinydb的api非常简洁直观:
从下面的数据文件cache_db.json
,可以看到每个文档都有一个int型id,标识doc的唯一性。
{
"_default": {
"9": {
"int": 1,
"char": "a"
},
"10": {
"int": 2,
"char": "b"
}
},
"user": {
"2": {
"name": "shelton",
"age": 28
}
}
}
storages 包括下面3个类:
先看简单的 MemoryStorage 实现:
class MemoryStorage(Storage):
def __init__(self):
super().__init__()
self.memory = None
def read(self) -> Optional[Dict[str, Dict[str, Any]]]:
return self.memory
def write(self, data: Dict[str, Dict[str, Any]]):
self.memory = data
storage的实现就是每次更换数据全量,这里的data是整个database的数据。
再看默认的 JSONStorage 实现:
class JSONStorage(Storage):
"""
Store the data in a JSON file.
"""
def __init__(self, path: str, create_dirs=False, encoding=None, **kwargs):
super().__init__()
touch(path, create_dirs=create_dirs) # 创建文件及目录
self.kwargs = kwargs
self._handle = open(path, 'r+', encoding=encoding) # 文件读写
def close(self) -> None:
self._handle.close() # 文件存储,需要合法的关闭
def read(self) -> Optional[Dict[str, Dict[str, Any]]]:
self._handle.seek(0, os.SEEK_END)
size = self._handle.tell() # 判断文档内容大小
if not size:
return None
else:
self._handle.seek(0)
return json.load(self._handle) # 加载数据
def write(self, data: Dict[str, Dict[str, Any]]):
self._handle.seek(0)
serialized = json.dumps(data, **self.kwargs) # json序列化
self._handle.write(serialized)
self._handle.flush()
os.fsync(self._handle.fileno()) # 强制写入磁盘,保存数据
self._handle.truncate() # 截断原始数据
JSONStorage 主要就是文件的操作,然后进行json数据序列化和反序列化。官方的参考文档中还有扩展的 YAMLStorage ,大家可以通过参考链接自行去查看。
document是普通字典+doc_id属性,非常简单
class Document(dict):
def __init__(self, value: Mapping, doc_id: int):
super().__init__(value)
self.doc_id = doc_id
Table的实现代码较多,先看构造方法:
class Table:
document_class = Document # 存储数据类,可以被扩展
document_id_class = int # 数据主键,默认int,可以被扩展
query_cache_class = LRUCache # 查询缓存
default_query_cache_capacity = 10 # 查询缓存容量
def __init__(
self,
storage: Storage,
name: str,
cache_size: int = default_query_cache_capacity
):
self._storage = storage # 存储的引用
self._name = name # 表名
self._query_cache = self.query_cache_class(capacity=cache_size) \
# type: LRUCache[Query, List[Document]]
self._next_id = None # 主键记录
Table主要包括读和写两部分,我们先看写的代表 search 方法
def search(self, cond: Query) -> List[Document]:
if cond in self._query_cache: # 优先从查询缓存获取
docs = self._query_cache.get(cond)
if docs is not None:
return docs[:]
docs = [doc for doc in self if cond(doc)] # 使用Query判断数据是否符合条件
self._query_cache[cond] = docs[:] # 缓存下次使用
return docs
table 对象可以迭代,是因为实现了 iter
方法
def __iter__(self) -> Iterator[Document]:
"""
Iterate over all documents stored in the table.
:returns: an iterator over all documents.
"""
# Iterate all documents and their IDs
for doc_id, doc in self._read_table().items(): # 读取所有数据
# Convert documents to the document class
yield self.document_class(doc, doc_id) # 包装Document对象
重点之一在 read_table 方法实现:
def _read_table(self) -> Dict[int, Mapping]:
tables = self._storage.read() # 从storage读取数据
...
table = tables[self.name] # 获取当前表
...
return {
self.document_id_class(doc_id): doc
for doc_id, doc in table.items()
} # 生成全部数据
查询中还有一个重点在于查询条件的处理,这是由Query实现的,稍后再介绍。继续查看插入数据方法 insert 的实现:
def insert(self, document: Mapping) -> int:
...
doc_id = self._get_next_id() # 获取自增ID
def updater(table: dict):
...
table[doc_id] = dict(document) # 插入数据
self._update_table(updater) # 抽象的更新表方法
return doc_id
主键自增的 get_next_id 方法
def _get_next_id(self):
if self._next_id is not None: # 第一条记录
next_id = self._next_id
self._next_id = next_id + 1
return next_id # 快速返回
table = self._read_table() # 从存储中读取数据
if not table: # 空表
next_id = 1
self._next_id = next_id + 1
return next_id
# 查找已有数据的最大主键
max_id = max(self.document_id_class(i) for i in table.keys())
self._next_id = max_id + 1 # 主键自增
return self._next_id
更新数据最重要的 update_table :
def _update_table(self, updater: Callable[[Dict[int, Mapping]], None]):
tables = self._storage.read() # 载入已有database数据
if tables is None:
# The database is empty
tables = {}
try:
raw_table = tables[self.name] # 读取table数据
except KeyError:
# The table does not exist yet, so it is empty
raw_table = {}
# 转换document对象
table = {
self.document_id_class(doc_id): doc
for doc_id, doc in raw_table.items()
}
updater(table) # 更新数据
tables[self.name] = {
str(doc_id): doc
for doc_id, doc in table.items()
} # 封装document
self._storage.write(tables) # 写入数据
self.clear_cache() # 数据变动,清空查询缓存
可以看到更新数据模版主要步骤是:
query是可以进行布尔运算和算术运算的conditon,由QueryInstance父类和Query子类两级实现。QueryInstance定义了布尔运算的规则,Query定义了算术运算的规则。
class QueryInstance:
def __init__(self, test: Callable[[Mapping], bool], hashval: Tuple):
self._test = test # 计算函数
self._hash = hashval # hash值
def __call__(self, value: Mapping) -> bool: # 执行获取布尔值
return self._test(value)
def __hash__(self):
return hash(self._hash)
def __repr__(self):
return 'QueryImpl{}'.format(self._hash)
def __eq__(self, other: object):
if isinstance(other, QueryInstance): # 类型和hash相同
return self._hash == other._hash
return False
QueryInstance使用hash值确定对象的唯一性,布尔运算 and
, or
和 not
也都是基于对象的hash判断。frozenset 是给对象计算hash值的关键函数,在utils中提供。
def __and__(self, other: 'QueryInstance') -> 'QueryInstance':
return QueryInstance(lambda value: self(value) and other(value),
('and', frozenset([self._hash, other._hash])))
def __or__(self, other: 'QueryInstance') -> 'QueryInstance':
return QueryInstance(lambda value: self(value) or other(value),
('or', frozenset([self._hash, other._hash])))
def __invert__(self) -> 'QueryInstance':
return QueryInstance(lambda value: not self(value),
('not', self._hash))
Query子类中定义了算术运算的函数,包括:
算术运算和布尔运算不同,是基于Query对象的条件进行判断:
def _generate_test(
self,
test: Callable[[Any], bool],
hashval: Tuple,
) -> QueryInstance:
def runner(value):
try:
# Resolve the path
for part in self._path:
value = value[part] # 字典取值
except (KeyError, TypeError):
return False
else:
return test(value)
return QueryInstance(
lambda value: runner(value),
hashval
)
def __eq__(self, rhs: Any):
"""
Test a dict value for equality.
>>> Query().f1 == 42
:param rhs: The value to compare against
"""
return self._generate_test(
lambda value: value == rhs, # 判断逻辑,使用匿名函数
('==', self._path, freeze(rhs))
)
query对象的条件是这样设置的:
# Query().f1 == 42
def __getattr__(self, item: str):
# Generate a new query object with the new query path
# We use type(self) to get the class of the current query in case
# someone uses a subclass of ``Query``
query = type(self)() # 新生成query对象
# Now we add the accessed item to the query path ...
query._path = self._path + (item,)
# ... and update the query hash
query._hash = ('path', query._path)
return query
query还提供了一些api:exists, matches, search, test, any, all 和 one_of 进行集合判断。
database只是维护表的集合和存储,整体实现很简单:
class TinyDB:
table_class = Table
default_table_name = '_default'
default_storage_class = JSONStorage # 默认存储实现,可以扩展
def __init__(self, *args, **kwargs):
storage = kwargs.pop('storage', self.default_storage_class)
self._storage = storage(*args, **kwargs) # 准备存储实现
self._opened = True
self._tables = {} # 支持多表
def __getattr__(self, name):
return getattr(self.table(self.default_table_name), name)
# db.insert({'int': 1, 'char': 'a'}) # insert语法通过getattr透传到default-table
cache是数据库的重要实现,tinydb提供了2种cache。一种是table的query-cache, 比如之前的search查询:
def search(self, cond: Query) -> List[Document]:
if cond in self._query_cache:
docs = self._query_cache.get(cond)
if docs is not None:
return docs[:]
...
查询缓存使用LRU实现,LRU全称Least Recently Used:最近最少使用淘汰算法。同类的还有,LFU全称Least Frequently Used),最不经常使用淘汰算法。LFU是淘汰一段时间内,使用次数最少的数据;LRU是淘汰最长时间没有被使用的数据,更多说明请见参考链接。
class LRUCache(abc.MutableMapping, Generic[K, V]):
def __init__(self, capacity=None):
self.capacity = capacity # 缓存容量
self.cache = OrderedDict() # 有序字典
def get(self, key: K, default: D = None) -> Optional[Union[V, D]]:
value = self.cache.get(key) # 从换成获取
if value is not None:
del self.cache[key]
self.cache[key] = value # 更新缓存顺序
return value
return default
def set(self, key: K, value: V):
if self.cache.get(key):
del self.cache[key]
self.cache[key] = value # 更新缓存顺序及值
else:
self.cache[key] = value
if self.capacity is not None and self.length > self.capacity:
self.cache.popitem(last=False) # 淘汰最古老的数据
虽然LRUCache的实现比较简单,容量无法字典增长,数据每次淘汰一条比较低效,但是也体现了缓存实现的主要特点:
另外一种cache是,数据写入的cache。默认的storage中,每次有数据更新都要写入磁盘,这样效率较低,cachemiddleware中对数据进行缓存,变成N次数据变动后一次写入:
class CachingMiddleware(Middleware):
WRITE_CACHE_SIZE = 1000 # 变动上限
def __init__(self, storage_cls):
super().__init__(storage_cls)
self.cache = None
self._cache_modified_count = 0 # 变动计数
def read(self):
if self.cache is None:
self.cache = self.storage.read() # 初始化
return self.cache
def write(self, data):
self.cache = data # 缓存数据
self._cache_modified_count += 1 # 计数增长
# 判读是否需要写入磁盘
if self._cache_modified_count >= self.WRITE_CACHE_SIZE:
self.flush()
def close(self): # 安全关闭
self.flush()
self.storage.close()
我们可以简单小结一下文档型数据库的实现:
python 抽象类在 ABC
模块中提供,使用 abstractmethod 配合 NotImplementedError 异常定义:
class Storage(ABC):
@abstractmethod
def read(self) -> Optional[Dict[str, Dict[str, Any]]]:
raise NotImplementedError('To be overridden!')
@abstractmethod
def write(self, data: Dict[str, Dict[str, Any]]) -> None:
raise NotImplementedError('To be overridden!')
def close(self) -> None:
pass
python3 支持数据类型的 annotation ,比如read方法约定了返回值是一个字典嵌套字典的参数或者None,所以是 Optional类型;write方法的数据也是字典嵌套,没有返回值。
from typing import Dict, Any, Optional
@abstractmethod
def read(self) -> Optional[Dict[str, Dict[str, Any]]]:
pass
@abstractmethod
def write(self, data: Dict[str, Dict[str, Any]]) -> None:
pass
类型注释还支持自引用:
def __and__(self, other: 'QueryInstance') -> 'QueryInstance': # 自引用的类型注释,当前QueryInstance类还未创建
return QueryInstance(lambda value: self(value) and other(value),
('and', frozenset([self._hash, other._hash])))
自定义对象的hash
class FrozenDict(dict):
def __hash__(self):
# 转换为元祖,利用元祖不可变的特性计算hash值
return hash(tuple(sorted(self.items())))
def _immutable(self, *args, **kws):
raise TypeError('object is immutable')
# Disable write access to the dict
__setitem__ = _immutable
__delitem__ = _immutable
clear = _immutable
setdefault = _immutable
popitem = _immutable
def update(self, e=None, **f):
raise TypeError('object is immutable')
def pop(self, k, d=None):
raise TypeError('object is immutable')
def freeze(obj):
"""
使用递归方式一个对象转换成可以hash的对象
"""
if isinstance(obj, dict):
# Transform dicts into ``FrozenDict``s
return FrozenDict((k, freeze(v)) for k, v in obj.items())
elif isinstance(obj, list):
# Transform lists into tuples
return tuple(freeze(el) for el in obj)
elif isinstance(obj, set):
# Transform sets into ``frozenset``s
return frozenset(obj)
else:
# Don't handle all other objects
return obj
tinydb项目使用 poetry 进行虚拟环境管理,关于poetry的使用请看Python虚拟环境指南2020版 ;使用 pytest 进行测试用例管理。
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8