Python多进程分块读取超大文件的方法

1344次阅读  |  发布于5年以前

本文实例讲述了Python多进程分块读取超大文件的方法。分享给大家供大家参考,具体如下:

读取超大的文本文件,使用多进程分块读取,将每一块单独输出成文件


    # -*- coding: GBK -*-
    import urlparse
    import datetime
    import os
    from multiprocessing import Process,Queue,Array,RLock
    """
    多进程分块读取文件
    """
    WORKERS = 4
    BLOCKSIZE = 100000000
    FILE_SIZE = 0
    def getFilesize(file):
      """
        获取要读取文件的大小
      """
      global FILE_SIZE
      fstream = open(file,'r')
      fstream.seek(0,os.SEEK_END)
      FILE_SIZE = fstream.tell()
      fstream.close()
    def process_found(pid,array,file,rlock):
      global FILE_SIZE
      global JOB
      global PREFIX
      """
        进程处理
        Args:
          pid:进程编号
          array:进程间共享队列,用于标记各进程所读的文件块结束位置
          file:所读文件名称
        各个进程先从array中获取当前最大的值为起始位置startpossition
        结束的位置endpossition (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)<FILE_SIZE else FILE_SIZE
        if startpossition==FILE_SIZE则进程结束
        if startpossition==0则从0开始读取
        if startpossition!=0为防止行被block截断的情况,先读一行不处理,从下一行开始正式处理
        if 当前位置 <=endpossition 就readline
        否则越过边界,就从新查找array中的最大值
      """
      fstream = open(file,'r')
      while True:
        rlock.acquire()
        print 'pid%s'%pid,','.join([str(v) for v in array])
        startpossition = max(array)      
        endpossition = array[pid] = (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)<FILE_SIZE else FILE_SIZE
        rlock.release()
        if startpossition == FILE_SIZE:#end of the file
          print 'pid%s end'%(pid)
          break
        elif startpossition !=0:
          fstream.seek(startpossition)
          fstream.readline()
        pos = ss = fstream.tell()
        ostream = open('/data/download/tmp_pid'+str(pid)+'_jobs'+str(endpossition),'w')
        while pos<endpossition:
          #处理line
          line = fstream.readline()
          ostream.write(line)
          pos = fstream.tell()
        print 'pid:%s,startposition:%s,endposition:%s,pos:%s'%(pid,ss,pos,pos)
        ostream.flush()
        ostream.close()
        ee = fstream.tell()
      fstream.close()
    def main():
      global FILE_SIZE
      print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S") 
      file = "/data/pds/download/scmcc_log/tmp_format_2011004.log"
      getFilesize(file)
      print FILE_SIZE
      rlock = RLock()
      array = Array('l',WORKERS,lock=rlock)
      threads=[]
      for i in range(WORKERS):
        p=Process(target=process_found, args=[i,array,file,rlock])
        threads.append(p)
      for i in range(WORKERS):
        threads[i].start()
      for i in range(WORKERS):
        threads[i].join()
      print datetime.datetime.now().strftime("%Y/%d/%m %H:%M:%S") 
    if __name__ == '__main__':
      main()

更多关于Python相关内容感兴趣的读者可查看本站专题:《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总

希望本文所述对大家Python程序设计有所帮助。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8