Python实现的Google IP 可用性检测脚本

1042次阅读  |  发布于5年以前

需要 Python 3.4+,一个参数用来选择测试搜索服务还是 GAE 服务。测试 GAE 服务的话需要先修改开头的两个变量。从标准输入读取 IP 地址或者 IP 段(形如 192.168.0.0/16)列表,每行一个。可用 IP 输出到标准输出。实时测试结果输出到标准错误。50 线程并发。

checkgoogleip


    #!/usr/bin/env python3

    import sys
    from ipaddress import IPv4Network
    import http.client as client
    from concurrent.futures import ThreadPoolExecutor
    import argparse
    import ssl
    import socket

    # 先按自己的情况修改以下几行
    APP_ID = 'your_id_here'
    APP_PATH = '/fetch.py'

    context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
    context.verify_mode = ssl.CERT_REQUIRED
    context.load_verify_locations('/etc/ssl/certs/ca-certificates.crt')

    class HTTPSConnection(client.HTTPSConnection):
     def __init__(self, *args, hostname=None, **kwargs):
      self._hostname = hostname
      super().__init__(*args, **kwargs)

     def connect(self):
      super(client.HTTPSConnection, self).connect()

      if self._tunnel_host:
       server_hostname = self._tunnel_host
      else:
       server_hostname = self._hostname or self.host
       sni_hostname = server_hostname if ssl.HAS_SNI else None

      self.sock = self._context.wrap_socket(self.sock,
                         server_hostname=sni_hostname)
      if not self._context.check_hostname and self._check_hostname:
       try:
        ssl.match_hostname(self.sock.getpeercert(), server_hostname)
       except Exception:
        self.sock.shutdown(socket.SHUT_RDWR)
        self.sock.close()
        raise

    def check_ip_p(ip, func):
     if func(ip):
      print(ip, flush=True)

    def check_for_gae(ip):
     return _check(APP_ID + '.appspot.com', APP_PATH, ip)

    def check_for_search(ip):
     return _check('www.google.com', '/', ip)

    def _check(host, path, ip):
     for chance in range(1,-1,-1):
      try:
       conn = HTTPSConnection(
        ip, timeout = 5,
        context = context,
        hostname = host,
       )
       conn.request('GET', path, headers = {
        'Host': host,
       })
       response = conn.getresponse()
       if response.status < 400:
        print('GOOD:', ip, file=sys.stderr)
       else:
        raise Exception('HTTP Error %s %s' % (
         response.status, response.reason))
       return True
      except KeyboardInterrupt:
       raise
      except Exception as e:
       if isinstance(e, ssl.CertificateError):
        print('WARN: %s is not Google\'s!' % ip, file=sys.stderr)
        chance = 0
       if chance == 0:
        print('BAD :', ip, e, file=sys.stderr)
        return False
       else:
        print('RE :', ip, e, file=sys.stderr)

    def main():
     parser = argparse.ArgumentParser(description='Check Google IPs')
     parser.add_argument('service', choices=['search', 'gae'],
               help='service to check')
     args = parser.parse_args()
     func = globals()['check_for_' + args.service]

     count = 0
     with ThreadPoolExecutor(max_workers=50) as executor:
      for l in sys.stdin:
       l = l.strip()
       if '/' in l:
        for ip in IPv4Network(l).hosts():
         executor.submit(check_ip_p, str(ip), func)
         count += 1
       else:
        executor.submit(check_ip_p, l, func)
        count += 1
     print('%d IP checked.' % count)

    if __name__ == '__main__':
     main()

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8