使用Python的web.py框架实现类似Django的ORM查询的教程

581次阅读  |  发布于5年以前

Django中的对象查询

Django框架自带了ORM,实现了一些比较强大而且方便的查询功能,这些功能和表无关。比如下面这个例子:


    class Question(models.Model):
      question_text = models.CharField(max_length=200)
      pub_date = models.DateTimeField('date published')


    >>> Question.objects.all()
    >>> Question.objects.get(pk=1)

从例子可以看出,objects.all和objects.get这些功能都不是在class Question中定义的,可能在其父类models.Model中定义,也可能不是。那么我们在web.py中如何实现这样的功能呢?(如果你选择使用SQLAlchemy就不需要自己实现了)。
实现
思路

我们注意到Question.objects.all()这样的调用是直接访问了类属性objects,并调用了objects属性的方法all()。这里objects可能是一个实例,也可能是一个类。我个人认为(我没看过Django的实现)这应该是一个实例,因为实例化的过程可以传递一些表的信息,使得类似all()这样的函数可以工作。经过分析之后,我们可以列出我们需要解决的问题:

代码

都说不给代码就是耍流氓,我还是给吧。说明下:使用的数据库操作都是web.py的db库中的接口。


      # -*- coding: utf-8 -*-

      import web

      import config # 自定义的配置类,可以忽略


      def _connect_to_db():
        return web.database(dbn="sqlite", db=config.dbname)


      def init_db():
        db = _connect_to_db()
        for statement in config.sql_statements:
          db.query(statement)


      class ModelError(Exception):
        """Exception raised by all models.

        Attributes:
          msg: Error message.
        """

        def __init__(self, msg=""):
          self.msg = msg

        def __str__(self):
          return "ModelError: %s" % self.msg


      class ModelDefaultManager(object):
        """ModelManager implements query functions against a model.

        Attributes:
          cls: The class to be managed.
        """

        def __init__(self, cls):
          self.cls = cls
          self._table_name = cls.__name__.lower()

        def all(self):
          db = _connect_to_db()
          results = db.select(self._table_name)
          return [self.cls(x) for x in results]

        def get(self, query_vars, where):
          results = self.filter(query_vars, where, limit=1)
          if len(results) > 0:
            return results[0]
          else:
            return None

        def filter(self, query_vars, where, limit=None):
          db = _connect_to_db()
          try:
            results = db.select(self._table_name, vars=query_vars, where=where,
                      limit=limit)
          except (Exception) as e:
            raise ModelError(str(e))

          return [self.cls(x) for x in results]


      class ModelMetaClass(type):

        def __new__(cls, classname, bases, attrs):
          new_class = super(ModelMetaClass, cls).__new__(cls, classname,
                                  bases, attrs)
          objects = ModelDefaultManager(new_class)
          setattr(new_class, "objects", objects)

          return new_class


      class Model(object):
        """Parent class of all models.
        """

        __metaclass__ = ModelMetaClass

        def __init__(self):
          pass

        def _table_name(self):
          return self.__class__.__name__.lower()

        def insert(self, **kargs):
          db = _connect_to_db()
          try:
            with db.transaction():
              db.insert(self._table_name(), **kargs)
          except (Exception) as e:
            raise ModelError(str(e))

        def delete(self, where, using=None, vars=None):
          db = _connect_to_db()
          try:
            with db.transaction():
              db.delete(self._table_name(), where, vars=vars)
          except (Exception) as e:
            raise ModelError(str(e))

        def save(self, where, vars=None, **kargs):
          db = _connect_to_db()
          try:
            with db.transaction():
              db.update(self._table_name(), where, vars, **kargs)
          except (Exception) as e:
            raise ModelError(str(e))

使用

首先定义表对应的类:


    class Users(Model):
      ...

使用就和Django的方式一样:


    >>> user_list = Users.objects.all()

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8