Python通过DOM和SAX方式解析XML的应用实例分享

524次阅读  |  发布于5年以前

XML.DOM

需求
有一个表,里面数据量比较大,每天一更新,其字段可以通过xml配置文件进行配置,即,可能每次建表的字段不一样。

上游跑时会根据配置从源文件中提取,到入库这一步需要根据配置进行建表。

解决
写了一个简单的xml,配置需要字段及类型

上游读取到对应的数据

入库这一步,先把原表删除,根据配置建新表

XML文件


    <?xml version="1.0" encoding="UTF-8"?>
    <!-- 表名 ,数据库名  可灵活配置插入哪个库哪个表 -->
    <table name="top_query" db_name="evaluting_sys">
    <!-- 非业务主键,自增长,可配名,其他 INTEGER UNSIGNED AUTO_INCREMENT -->
    <primary_key>
    <name>id</name>
    </primary_key>
    <!-- 字段开始 -->
    <field>
    <name>query</name>
    <type>varchar(200)</type>
    <is_index>false</is_index>
    <description>query</description>
    </field>
    <field>
    <name>pv</name>
    <type>integer</type>
    <is_index>false</is_index>
    <description>pv</description>
    </field>
    <field>
    <name>avg_money</name>
    <type>integer</type>
    <is_index>false</is_index>
    <description></description>
    </field>
    <!-- 字段配置结束 -->
    </table>

处理脚本


    #!/usr/bin/python
    # -*- coding:utf-8 -*-
    #author: wklken
    #desc: use to read db xml config.
    #-----------------------
    #2012-02-18 created
    #----------------------

    import sys,os
    from xml.dom import minidom, Node

    def read_dbconfig_xml(xml_file_path):
      content = {}

      root = minidom.parse(xml_file_path)
      table = root.getElementsByTagName("table")[0]

      #read dbname and table name.
      table_name = table.getAttribute("name")
      db_name = table.getAttribute("db_name")

      if len(table_name) > 0 and len(db_name) > 0:
        db_sql = "create database if not exists `" + db_name +"`; use " + db_name + ";"
        table_drop_sql = "drop " + table_name + " if exists " + table_name + ";"
        content.update({"db_sql" : db_sql})
        content.update({"table_sql" : table_drop_sql })
      else:
        print "Error:attribute is not define well! db_name=" + db_name + " ;table_name=" + table_name
        sys.exit(1)
      #print table_name, db_name

      table_create_sql = "create table " + table_name +"("

      #read primary cell
      primary_key = table.getElementsByTagName("primary_key")[0]
      primary_key_name = primary_key.getElementsByTagName("name")[0].childNodes[0].nodeValue

      table_create_sql += primary_key_name + " INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,"

      #print primary_key.toxml()
      #read ordernary field
      fields = table.getElementsByTagName("field")
      f_index = 0
      for field in fields:
        f_index += 1
        name = field.getElementsByTagName("name")[0].childNodes[0].nodeValue
        type = field.getElementsByTagName("type")[0].childNodes[0].nodeValue
        table_create_sql += name + " " + type
        if f_index != len(fields):
        table_create_sql += ","
        is_index = field.getElementsByTagName("is_index")[0].childNodes[0].nodeValue

      table_create_sql += ");"
      content.update({"table_create_sql" : table_create_sql})
      #character set latin1 collate latin1_danish_ci;
      print content


    if __name__ == "__main__":
    read_dbconfig_xml(sys.argv[1])

涉及方法
root = minidom.parse(xml_file_path) 获取dom对象

root.getElementsByTagName("table") 根据tag获取节点列表

table.getAttribute("name") 获取属性

primary_key.getElementsByTagName("name")[0].childNodes[0].nodeValue 获取子节点的值(id 得到id)

SAX
需求
读取xml数据文件,文件较大,需要实时处理插入到数据库

xml文档


    <PERSONS>
    <person>
      <id>100000</id>
      <sex>男</sex>
      <address>北京,海淀区</address>
      <fansNum>437</fansNum>
      <summary>1989</summary>
      <wbNum>333</wbNum>
      <gzNum>242</gzNum>
      <blog>null</blog>
      <edu>大学</edu>
      <work></work>
      <renZh>1</renZh>
      <brithday>2月14日</brithday>
    </person>
    </PERSONS>

处理
sax处理时并不会像dom一样可以以类似节点的维度进行读取,它只有 开始标签 内容 结束标签 之分

处理思想是:通过一个handler,对开始标签,内容,结束标签各有一个处理函数

代码及注解
person 处理类


    from xml.sax import handler,parseString
    class PersonHandler(handler.ContentHandler):
     def __init__(self, db_ops):
      #db op obj
      self.db_ops = db_ops
      #存储一个person的map
      self.person = {}
      #当前的tag
      self.current_tag = ""
      #是否是tag之间的内容 ,目的拿到tag间内容,不受空白的干扰
      self.in_quote = 0
     #开始,清空map
     def startElement(self, name, attr):
      #以person,清空map
      if name == "person":
       self.person = {}
      #记录 状态
      self.current_tag = name
      self.in_quote = 1
     #结束,插入数据库
     def endElement(self, name):
      #以person结尾 代表读取一个person的信息结束
      if name == "person":
       #do something
       in_fields = tuple([ ('"' + self.person.get(i,"") + '"') for i in fields ])
       print in_sql % in_fields
       db_ops.insert( in_sql%(in_fields))
      #处理
      self.in_quote = 0
     def characters(self, content):
      #若是在tag之间的内容,更新到map中
      if self.in_quote:
       self.person.update({self.current_tag: content})

加上入库的完整代码


    #!/usr/bin/python
    # -*- coding:utf-8 -*-
    #parse_person.py
    #version : 0.1
    #author : wukunliang@163.com
    #desc : parse person.xml and out sql


    import sys,os
    import MySQLdb

    reload(sys)
    sys.setdefaultencoding('utf-8')

    in_sql = "insert into person(id,sex,address,fansNum,summary,wbNum,gzNum,blog,edu,work,renZh,brithday) values(%s, %s, %s, %s, %s, %s,
         %s, %s, %s, %s, %s, %s)"

    fields = ("id","sex","address","fansNum","summary","wbNum","gzNum","blog","edu","work","renZh","brithday")

    #数据库方法
    class Db_Connect:
      def __init__(self, db_host, user, pwd, db_name, charset="utf8", use_unicode = True):
        print "init begin"
        print db_host, user, pwd, db_name, charset , use_unicode
        self.conn = MySQLdb.Connection(db_host, user, pwd, db_name, charset=charset , use_unicode=use_unicode)
        print "init end"

      def insert(self, sql):
        try:
          n = self.conn.cursor().execute(sql)
          return n
        except MySQLdb.Warning, e:
          print "Error: execute sql '",sql,"' failed"

      def close(self):
        self.conn.close()

    #person 处理类
    from xml.sax import handler,parseString
    class PersonHandler(handler.ContentHandler):
      def __init__(self, db_ops):
        #db op obj
        self.db_ops = db_ops
        #存储一个person的map
        self.person = {}
        #当前的tag
        self.current_tag = ""
        #是否是tag之间的内容
        self.in_quote = 0
      #开始,清空map
      def startElement(self, name, attr):
        #以person,清空map
        if name == "person":
         self.person = {}
        #记录 状态
        self.current_tag = name
        self.in_quote = 1
      #结束,插入数据库
      def endElement(self, name):
        #以person结尾 代表读取一个person的信息结束
        if name == "person":
          #do something
          in_fields = tuple([ ('"' + self.person.get(i,"") + '"') for i in fields ])
          print in_sql % in_fields
          db_ops.insert( in_sql%(in_fields))
        #处理
        self.in_quote = 0
      def characters(self, content):
        #若是在tag之间的内容,更新到map中
        if self.in_quote:
          self.person.update({self.current_tag: content})

    if __name__ == "__main__":
      f = open("./person.xml")
      #如果源文件gbk 转码   若是utf-8,去掉decode.encode
      db_ops = Db_Connect("127.0.0.1", "root", "root", "test")
      parseString(f.read().decode("gbk").encode("utf-8"), PersonHandler(db_ops))
      f.close()
      db_ops.close()

平时拿python来分析数据,工具脚本还有hadoop streamming,但是用的面和深度实在欠缺 只能说道行还浅,需要多多实践

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8