您当前的位置:首页 > 计算机 > 编程开发 > Python

PyQt作品 – PingTester – 多点Ping测试工具

时间:11-14来源:作者:点击数:

每次在一个临时测试点此测试一大片服务器的延迟和丢包, 一个个去跑太蛋疼, 于是用PyQt做了这么个小工具来测试各种线路的延迟丢包等信息.

截图:


(Archlinux / KDE4 环境下)

(Windows XP)

这个工具我已经初步实现了跨平台(在以上截图环境下运行正常), 在编写过程中, 我有如下的收获:

  • 不可以在子线程中直接操作UI, 以免引起资源冲突导致Segmentation Fault
  • 使用Queue类可以初步实现子线程与UI线程更新界面的通信. Signal方面, 我实例了一个QTimer, 每隔一定时间处理一次消息队列.
  • QTableView比QTableWidget效率高得多, 尤其是在Win32平台下. 因此应尽量采用 QTableView + QStandardItemModel 的搭配来做Table

注: 代码中已经内置了一份测试IP列表, 可以根据需要添加/删除. 第一次运行会生成 ips.conf 文件, 以后需要修改IP列表, 只需要编辑此文件.

再注: 这个程序写的确实很丑, 欢迎各种拍砖 = =

下面贴上全部的代码:

# -*- coding: utf-8 -*-

import sys
import time
import subprocess
from threading import Thread
import re
from PyQt4.QtGui import QMainWindow, QApplication, QStandardItemModel, QStandardItem, QWidget, QVBoxLayout, QTableView
from PyQt4.QtCore import pyqtSignature, Qt, QTimer, SIGNAL, QString, QMetaObject
from Queue import Queue

#from Ui_main import Ui_MainWindow

try:
    _fromUtf8 = QString.fromUtf8
except AttributeError:
    _fromUtf8 = lambda s: s

class Ui_MainWindow(object):
    def setupUi(self, MainWindow):
        MainWindow.setObjectName(_fromUtf8("MainWindow"))
        MainWindow.resize(500, 435)
        self.centralWidget = QWidget(MainWindow)
        self.centralWidget.setObjectName(_fromUtf8("centralWidget"))
        self.verticalLayout = QVBoxLayout(self.centralWidget)
        self.verticalLayout.setObjectName(_fromUtf8("verticalLayout"))
        self.tableView = QTableView(self.centralWidget)
        self.tableView.setObjectName(_fromUtf8("tableView"))
        self.verticalLayout.addWidget(self.tableView)
        MainWindow.setCentralWidget(self.centralWidget)

        self.retranslateUi(MainWindow)
        QMetaObject.connectSlotsByName(MainWindow)

    def retranslateUi(self, MainWindow):
        MainWindow.setWindowTitle(QApplication.translate("MainWindow", "Ping Tester", None, QApplication.UnicodeUTF8))

if sys.platform.startswith('linux'):
    getdata = re.compile(r"icmp_req=(\d+) ttl=(\d+) time=([\d\.]+)\sms")
    pingstr = ["ping", "-n", "-i 0.2"]
    filtered = "Packet filtered"
    delaytime = 200
else:
    getdata = re.compile(r"=([\d\.]+)ms TTL=(\d+)")
    pingstr = ["ping.exe", "-t"]
    timeout = "Request timed out."
    delaytime = 500

try:
    with open("ips.conf", "r") as f:
        t_node = f.read().decode('utf-8')
        if not t_node:
            raise IOError
except IOError:
    with open("ips.conf", "w") as f:
        t_node = u"""
        210.51.176.182-北京联通BGP
        202.97.224.68-黑龙江网通
        202.99.96.68-天津网通
        202.99.160.68-河北网通
        202.96.69.38-大连网通
        221.208.172.1-哈尔滨网通
        202.99.192.68-山西网通
        202.99.160.68-河北网通
        210.52.149.2-湖北网通
        218.30.64.121-湖南电信
        202.100.4.15-陕西电信 
        202.96.199.133-上海电信 
        219.150.150.150-河南电信 
        219.146.0.130-黑龙江电信
        202.98.96.68-四川电信
        59.66.4.50-北京教育网
        202.112.26.246-上海教育网
        202.114.0.254-武汉教育网
        211.161.159.9-武汉长城宽带
        143.90.12.170-日本东京odn
        202.160.123.5-新加坡Host1Plus
        60.199.17.138-台湾固网
        220.128.152.1-台湾HINET
        168.95.1.1-台湾中华电信
        202.65.207.1-香港DYX
        202.181.171.1-香港HKCIX
        203.169.186.1-香港NTT.HKNET
        59.148.193.38-香港HKCTI
        220.232.214.1-香港Supernet
        59.152.208.1-香港WTT
        210.56.48.1-香港HKSUN
        202.76.56.1-香港CPCNET
        218.213.250.130-香港SNL
        69.162.132.1-美国芝加哥Level3
        205.185.112.131-美国佛里蒙特HE
        204.152.221.1-美国洛杉矶nLayer
        178.63.62.208-德国Nuremburg
        94.23.202.83-法国OVH
        69.163.43.73-美国波特兰HE
        216.189.1.14-美国RockHill
        69.172.231.1-美国洛杉矶Peer1
        184.22.112.34-美国迈阿密HE
        199.48.146.37-美国圣琼斯
        216.245.208.1-美国达拉斯
        109.74.207.9-英国伦敦
        """
        f.write(t_node.encode('utf-8'))

node = []
for line in t_node.split('\n'):
    try:
        ip, desc = line.strip().split("-")
        node.append((ip, desc))
    except ValueError:
        pass
nodecount = len(node)

class MainWindow(QMainWindow, Ui_MainWindow):
    """
    Class documentation goes here.
    """
    def __init__(self, parent = None):
        """
        Constructor
        """
        QMainWindow.__init__(self, parent)
        self.setupUi(self)
        self.model = QStandardItemModel()
        self.model.setColumnCount(6)
        self.model.setRowCount(nodecount)
        self.model.setHorizontalHeaderLabels(["IP", "Description", "Loss%", "CurPing", "AvgPing", "TTL"])
        for i, (ip, desc) in enumerate(node):
            self.setitem(i, 0, ip)
            self.setitem(i, 1, desc)
            self.setitem(i, 2, "")
            self.setitem(i, 3, "")
            self.setitem(i, 4, "")
            self.setitem(i, 5, "")
        self.tableView.setModel(self.model)
        for i in range(len(node)):
            self.tableView.setRowHeight(i, 18)
        self.resizetable()
        self.timer = QTimer(self)
        self.connect(self.timer,
                     SIGNAL("timeout()"),
                     self.checkitems)
        self.timer.start(delaytime)
    
    def checkitems(self):
        while not q.empty():
            item = q.get()
            self.chgtxt(*item)
            q.task_done()
        self.resizetable()
    
    def resizetable(self):
        self.tableView.resizeColumnsToContents()
        
    def chgtxt(self, x, y, value):
        self.model.item(x, y).setText(value)
    
    def setitem(self, x, y, value):
        self.model.setItem(x, y, QStandardItem(value))
        
app = QApplication(sys.argv)
ui = MainWindow()
ui.show()
q = Queue()

def pinger(i, ip, desc):
    s = ""
    avgping = 0
    count = 0
    timeoutcount = 0
    ret = subprocess.Popen(pingstr + [ip],
                            stdout=subprocess.PIPE)
    while True:
        try:
            s += ret.stdout.read(1)
            
            tryfind = getdata.findall(s)
            if sys.platform.startswith('linux'):
                if len(tryfind) > 0:
                    req, ttl, crtping = tryfind[-1]
                    avgping += float(crtping)
                    count += 1
                    q.put((i, 3, crtping + "ms"))
                    q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms"))
                    q.put((i, 5, ttl))
                    q.put((i, 2, "%.2f" % ((int(req) - count) * 100.0 / int(req))))
                    s = ""
                elif filtered in s:
                    q.put((i, 2, "Failed"))
                    q.put((i, 3, "Failed"))
                    q.put((i, 4, "Failed"))
                    q.put((i, 5, "Failed"))
                    ret.kill()
                    s = ""
            else:
                if len(tryfind) > 0:
                    crtping, ttl = tryfind[-1]
                    avgping += float(crtping)
                    count += 1
                    q.put((i, 3, crtping + "ms"))
                    q.put((i, 4, "%.2f" % (avgping * 1.0 / count) + "ms"))
                    q.put((i, 5, ttl))
                    q.put((i, 2, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount))))
                elif timeout in s:
                    timeoutcount += 1
                    q.put((i, 2, "-"))
                    q.put((i, 3, "-"))
                    if count:
                        q.put((i, 5, "%.2f" % (timeoutcount * 100.0 / (count + timeoutcount))))
                    else:
                        q.put((i, 5, "-"))
                    s = ""
        except IOError:
            print s
            break
            
def startworkers():
    for i, (ip, desc) in enumerate(node):
        worker = Thread(target=pinger, args=(i, ip, desc))
        worker.setDaemon(True)
        worker.start()
        time.sleep(delaytime / 10000.0)

startthread = Thread(target=startworkers)
startthread.setDaemon(True)
startthread.start()

sys.exit(app.exec_())
方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门