Python SQLite3数据库操作类分享

1084次阅读  |  发布于5年以前

接触Python时间也不是很长的,最近有个项目需要分析数据,于是选用Python为编程语言,除了语言特性外主要还是看重Python对于SQLite3数据库良好的支持能力了,因为需要灵活处理大量的中间数据。

刚开始一些模块我还乐此不疲的写SQL语句,后来渐渐厌倦了,回想到以前捣鼓C#的时候利用反射初步构建了个SQL查询构造器,直到发现linq,于是放弃了这个计划,当然微软后来又推出了Entity Framework,这些都是后话了,而且现在我对微软的东西兴趣不是很大的,好了,扯多了,下面继续正文。

对了,再扯一句,优秀的博客程序Drupal也使用了类似的查询构造器进行数据库查询,避免直接写SQL语句,另外这样做的一点点好处就是,可以一定程度的屏蔽平台相关性,对于数据库迁移还是有帮助的。

不过我今天介绍的数据库辅助类查询构造器是个很简单的东东,甚至只限于SQLite数据库,如果有童鞋感兴趣可以完善下,我目前只要操作SQLite顺手就可以了,对于比较大的数据库应用就直接上ORM吧。

先看代码:

复制代码 代码如下:

import sqlite3

***

*

* Description: Python操作SQLite3数据库辅助类(查询构造器)

* Author: wangye

*

***

def _wrap_value(value):
return repr(value)

def _wrap_values(values):
return list(map(_wrap_value, values))

def _wrap_fields(fields):
for key,value in fields.items():
fields[key] = _wrap_value(value)
return fields

def _concat_keys(keys):
return "[" + "],[".join(keys) + "]"

def _concat_values(values):
return ",".join(values)

def _concat_fields(fields, operator = (None, ",")):
if operator:
unit_operator, group_operator = operator

fields = _wrap_fields(fields)

compiled = []  
for key,value in fields.items():  
    compiled.append("[" + key + "]")  
    if unit_operator:  
        compiled.append(unit_operator)  
        compiled.append(value)  
    compiled.append(group_operator)  
compiled.pop() # pop last group_operator  
return " ".join(compiled)  

class DataCondition(object):
"""
本类用于操作SQL构造器辅助类的条件语句部分

    例如:  
    DataCondition(("=", "AND"), id = 26)  
    DataCondition(("=", "AND"), True, id = 26)  
"""  

def __init__(self, operator = ("=", "AND"), ingroup = True, **kwargs):  
    """  
        构造方法  
        参数:  
            operator 操作符,分为(表达式操作符, 条件运算符)  
            ingroup  是否分组,如果分组,将以括号包含  
            kwargs   键值元组,包含数据库表的列名以及值  
                     注意这里的等于号不等于实际生成SQL语句符号  
                     实际符号是由operator[0]控制的  
        例如:  
        DataCondition(("=", "AND"), id = 26)  
        (id=26)  
        DataCondition((">", "OR"), id = 26, age = 35)  
        (id>26 OR age>35)  
        DataCondition(("LIKE", "OR"), False, name = "John", company = "Google")  
        name LIKE 'John' OR company LIKE "Google"  
    """  
    self.ingroup = ingroup  
    self.fields = kwargs  
    self.operator = operator  

def __unicode__(self):  
    self.fields = _wrap_fields(self.fields)  
    result = _concat_fields(self.fields, self.operator)  
    if self.ingroup:  
        return "(" + result + ")"  
    return result  

def __str__(self):  
    return self.__unicode__()  

def toString(self):  
    return self.__unicode__()  

class DataHelper(object):

"""  
    SQLite3 数据查询辅助类  
"""  

def __init__(self, filename):  
    """  
        构造方法  
        参数: filename 为SQLite3 数据库文件名  
    """  
    self.file_name = filename  

def open(self):  
    """  
        打开数据库并设置游标  
    """  
    self.connection = sqlite3.connect(self.file_name)  
    self.cursor = self.connection.cursor()  
    return self  

def close(self):  
    """  
        关闭数据库,注意若不显式调用此方法,  
        在类被回收时也会尝试调用  
    """  
    if hasattr(self, "connection") and self.connection:  
        self.connection.close()  

def __del__(self):  
    """  
        析构方法,做一些清理工作  
    """  
    self.close()  

def commit(self):  
    """  
        提交事务  
        SELECT语句不需要此操作,默认的execute方法的  
        commit_at_once设为True会隐式调用此方法,  
        否则就需要显示调用本方法。  
    """  
    self.connection.commit()  

def execute(self, sql = None, commit_at_once = True):  
    """  
        执行SQL语句  
        参数:  
            sql  要执行的SQL语句,若为None,则调用构造器生成的SQL语句。  
            commit_at_once 是否立即提交事务,如果不立即提交,  
            对于非查询操作,则需要调用commit显式提交。  
    """  
    if not sql:  
        sql = self.sql  
    self.cursor.execute(sql)  
    if commit_at_once:  
        self.commit()  

def fetchone(self, sql = None):  
    """  
        取一条记录  
    """  
    self.execute(sql, False)  
    return self.cursor.fetchone()  

def fetchall(self, sql = None):  
    """  
        取所有记录  
    """  
    self.execute(sql, False)  
    return self.cursor.fetchall()  

def __concat_keys(self, keys):  
    return _concat_keys(keys)  

def __concat_values(self, values):  
    return _concat_values(values)  

def table(self, *args):  
    """  
        设置查询的表,多个表名用逗号分隔  
    """  
    self.tables = args  
    self.tables_snippet = self.__concat_keys(self.tables)  
    return self  

def __wrap_value(self, value):  
    return _wrap_value(value)  

def __wrap_values(self, values):  
    return _wrap_values(values)  

def __wrap_fields(self, fields):  
    return _wrap_fields(fields)  

def __where(self):  
    # self.condition_snippet  
    if hasattr(self, "condition_snippet"):  
        self.where_snippet = " WHERE " + self.condition_snippet  

def __select(self):  
    template = "SELECT %(keys)s FROM %(tables)s"  
    body_snippet_fields = {  
        "tables" : self.tables_snippet,  
        "keys" : self.__concat_keys(self.body_keys),   
    }  
    self.sql = template % body_snippet_fields  

def __insert(self):  
    template = "INSERT INTO %(tables)s (%(keys)s) VALUES (%(values)s)"  
    body_snippet_fields = {  
        "tables" : self.tables_snippet,  
        "keys" : self.__concat_keys(list(self.body_fields.keys())),  
        "values" : self.__concat_values(list(self.body_fields.values()))  
    }  
    self.sql = template % body_snippet_fields  

def __update(self):  
    template = "UPDATE %(tables)s SET %(fields)s"  
    body_snippet_fields = {  
        "tables" : self.tables_snippet,  
        "fields" : _concat_fields(self.body_fields, ("=",","))  
    }  
    self.sql = template % body_snippet_fields  

def __delete(self):  
    template = "DELETE FROM %(tables)s"  
    body_snippet_fields = {  
        "tables" : self.tables_snippet  
    }  
    self.sql = template % body_snippet_fields  

def __build(self):  
    {  
        "SELECT": self.__select,  
        "INSERT": self.__insert,  
        "UPDATE": self.__update,  
        "DELETE": self.__delete  
    }[self.current_token]()  

def __unicode__(self):  
    return self.sql  

def __str__(self):  
    return self.__unicode__()  

def select(self, *args):  
    self.current_token = "SELECT"  
    self.body_keys = args  
    self.__build()  
    return self  

def insert(self, **kwargs):  
    self.current_token = "INSERT"  
    self.body_fields = self.__wrap_fields(kwargs)  
    self.__build()  
    return self  

def update(self, **kwargs):  
    self.current_token = "UPDATE"  
    self.body_fields = self.__wrap_fields(kwargs)  
    self.__build()  
    return self  

def delete(self, *conditions):  
    self.current_token = "DELETE"  
    self.__build()  
    #if *conditions:  
    self.where(*conditions)  
    return self  

def where(self, *conditions):  
    conditions = list(map(str, conditions))  
    self.condition_snippet = " AND ".join(conditions)  
    self.__where()  
    if hasattr(self, "where_snippet"):  
        self.sql += self.where_snippet  
    return self  

下面举几个例子供大家参考吧:

复制代码 代码如下:

db = DataHelper("/home/wangye/sample.db3")
db.open() # 打开数据库
db.execute("""
CREATE TABLE [staffs] (
[staff_id] INTEGER PRIMARY KEY AUTOINCREMENT,
[staff_name] TEXT NOT NULL,
[staff_cardnum] TEXT NOT NULL,
[staff_reserved] INTEGER NOT NULL
)
""") # 直接执行SQL语句,注意这里commit_at_once默认为True

db.table("staffs").insert(staff_name="John", staff_cardnum="1001", staff_reserved=0)

插入一条记录

rs = db.table("staffs").select("staff_id", "staff_name").fetchall()

直接取出所有staff_id和staff_name

rs = db.table("staffs").select("staff_name").where(DataCondition(("=", "AND"), id = 1)).fetchone()

取一条staff_id为1的staff_name

rs = db.table("staffs").select("staff_name").where(DataCondition(("<", "AND"), id = 100), DataCondition(("=", "AND"), staff_reserved = 1)).fetchone()

取一条id小于100并且staff_reserved为1的staff_name记录

db.close() # 关闭数据库

目前还没有让其支持星号(*)操作符,另外在多表同名列操作方面处理得也不是很好,这个只用于日常简单的脚本操作,最好不要用于生产环境,因为可能有未知问题。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8