利用Python实现网络测试的脚本分享

1317次阅读  |  发布于5年以前

前言

最近同学让我帮忙写一个测试网络的工具。由于工作上的事情,断断续续地拖了很久才给出一个相对完整的版本。其实,我Python用的比较少,所以基本都是边查资料边写程序。

程序的主要逻辑如下:

读取一个excel文件中的ip列表,然后使用多线程调用ping统计每个ip的网络参数,最后把结果输出到excel文件中。

代码如下所示:


    #! /usr/bin/env python
    # -*- coding: UTF-8 -*-
    # File: pingtest_test.py
    # Date: 2008-09-28
    # Author: Michael Field
    # Modified By:intheworld
    # Date: 2017-4-17
    import sys
    import os
    import getopt
    import commands
    import subprocess
    import re
    import time
    import threading
    import xlrd
    import xlwt

    TEST = [
      '220.181.57.217',
      '166.111.8.28',
      '202.114.0.242',
      '202.117.0.20',
      '202.112.26.34',
      '202.203.128.33',
      '202.115.64.33',
      '202.201.48.2',
      '202.114.0.242',
      '202.116.160.33',
      '202.202.128.33',
    ]
    RESULT={}
    def usage():
     print "USEAGE:"
     print "\t%s -n TEST|excel name [-t times of ping] [-c concurrent number(thread nums)]" %sys.argv[0]
     print "\t TEST为简单测试的IP列表"
     print "\t-t times 测试次数;默认为1000;"
     print "\t-c concurrent number 并行线程数目:默认为10"
     print "\t-h|-?, 帮助信息"
     print "\t 输出为当前目录文件ping_result.txt 和 ping_result.xls"
     print "for example:"
     print "\t./ping_test.py -n TEST -t 1 -c 10"

    def div_list(ls,n):
     if not isinstance(ls,list) or not isinstance(n,int):
      return []
     ls_len = len(ls)
     print 'ls length = %s' %ls_len
     if n<=0 or 0==ls_len:
      return []
     if n > ls_len:
      return []
     elif n == ls_len:
      return [[i] for i in ls]
     else:
      j = ls_len/n
      k = ls_len%n
      ### j,j,j,...(前面有n-1个j),j+k
      #步长j,次数n-1
      ls_return = []
      for i in xrange(0,(n-1)*j,j):
       ls_return.append(ls[i:i+j])
      #算上末尾的j+k
      ls_return.append(ls[(n-1)*j:])
      return ls_return

    def pin(IP):
     try:
      xpin=subprocess.check_output("ping -n 1 -w 100 %s" %IP, shell=True)
     except Exception:
      xpin = 'empty'
     ms = '=[0-9]+ms'.decode("utf8")
     print "%s" %ms
     print "%s" %xpin
     mstime=re.search(ms,xpin)
     if not mstime:
      MS='timeout'
      return MS
     else:
      MS=mstime.group().split('=')[1]
      return MS.strip('ms')
    def count(total_count,I):
     global RESULT
     nowsecond = int(time.time())
     nums = 0
     oknums = 0
     timeout = 0
     lostpacket = 0.0
     total_ms = 0.0
     avgms = 0.0
     maxms = -1
     while nums < total_count:
      nums += 1
      MS = pin(I)
      print 'pin output = %s' %MS
      if MS == 'timeout':
       timeout += 1
       lostpacket = timeout*100.0 / nums
      else:
       oknums += 1
       total_ms = total_ms + float(MS)
       if oknums == 0:
        oknums = 1
        maxms = float(MS)
        avgms = total_ms / oknums
       else:
        avgms = total_ms / oknums
        maxms = max(maxms, float(MS))
      RESULT[I] = (I, avgms, maxms, lostpacket)

    def thread_func(t, ipList):
     if not isinstance(ipList,list):
      return
     else:
      for ip in ipList:
       count(t, ip)

    def readIpsInFile(excelName):
     data = xlrd.open_workbook(excelName)
     table = data.sheets()[0]
     nrows = table.nrows
     print 'nrows %s' %nrows
     ips = []
     for i in range(nrows):
      ips.append(table.cell_value(i, 0))
      print table.cell_value(i, 0)
     return ips


    if __name__ == '__main__':
     file = 'ping_result.txt'
     times = 10
     network = ''
     thread_num = 10
     args = sys.argv[1:]
     try:
      (opts, getopts) = getopt.getopt(args, 'n:t:c:h?')
     except:
      print "\nInvalid command line option detected."
      usage()
      sys.exit(1)
     for opt, arg in opts:
      if opt in ('-n'):
       network = arg
      if opt in ('-h', '-?'):
       usage()
       sys.exit(0)
      if opt in ('-t'):
       times = int(arg)
      if opt in ('-c'):
       thread_num = int(arg)
     f = open(file, 'w')
     workbook = xlwt.Workbook()
     sheet1 = workbook.add_sheet("sheet1", cell_overwrite_ok=True)
     if not isinstance(times,int):
      usage()
      sys.exit(0)
     if network not in ['TEST'] and not os.path.exists(os.path.join(os.path.dirname(__file__), network)):
      print "The network is wrong or excel file does not exist. please check it."
      usage()
      sys.exit(0)
     else:
      if network == 'TEST':
       ips = TEST
      else:
       ips = readIpsInFile(network)
      print 'Starting...'
      threads = []
      nest_list = div_list(ips, thread_num)
      loops = range(len(nest_list))
      print 'Total %s Threads is working...' %len(nest_list)
      for ipList in nest_list:
       t = threading.Thread(target=thread_func,args=(times,ipList))
       threads.append(t)
      for i in loops:
       threads[i].start()
      for i in loops:
       threads[i].join()
      it = 0
      for line in RESULT:
       value = RESULT[line]
       sheet1.write(it, 0, line)
       sheet1.write(it, 1, str('%.2f'%value[1]))
       sheet1.write(it, 2, str('%.2f'%value[2]))
       sheet1.write(it, 3, str('%.2f'%value[3]))
       it+=1
       f.write(line + '\t'+ str('%.2f'%value[1]) + '\t'+ str('%.2f'%value[2]) + '\t'+ str('%.2f'%value[3]) + '\n')
      f.close()
      workbook.save('ping_result.xls')
      print 'Work Done. please check result %s and ping_result.xls.'%file

这段代码参照了别人的实现,虽然不是特别复杂,这里还是简单解释一下。

到目前为止,我对Python里面字符集的理解还不到位,所以正则表达式匹配的代码并不够强壮,不过目前勉强可以工作,以后有必要再改咯!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对脚本之家的支持。

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8