加载中...
返回

PyQt5初探——与scapy的美妙结合

PyQt5初探

自读书摘抄站点构建完成以来,一直没有进行什么有趣的实践,这回由于毕设的需要,首次尝试进行GUI编程;此外同样由于毕设需要,使用了scapy进行流量嗅探,因此就有了这一篇四不像的记录性质的文章。

Qt(音 cute )是一套被广泛使用的GUI组件库,而 PyQt 则是这套框架与Python语言的结合,是Python中最强大的GUI库之一。

本次选用这个框架其实经过了一定的曲折,因为Python的GUI库有太多了🤣

  • TkInter :python标准Tk GUI工具包的接口,我对这个库倒没什么成见,以前的项目使用过(虽然是队友做的),这次想换个别的库。

  • PySimpleGUI :框架主打一个简单易用,也是我最初选用的方案;接触了半个多小时之后放弃,文档属实写得像💩。

  • Kivy :这个库自己还开发了一套设计语言,界面文件和程序文件分离,但只有英文文档且框架本身对中文支持不好,接触了半个多小时后放弃。

  • PyQt :以上所有库放的都是官方链接,唯独这里放的是一篇中文教程,个人觉得这位大佬翻译得不错,文档学起来是比较舒服的,遂采用。

安装PyQt非常简单:

pip install PyQt5
pip install pyqt5-tools

还有简单易用的 Qt Designer ,可以直接通过拖拽的方式来放置控件、设计页面,牛逼格拉斯,但我准备手撸代码,因此没有尝试对它进行配置。

环境安装完成之后,可以照着 教程 实现一些功能。

经过一系列的探索,我的程序初具雏形:

界面有如下几个组件:

  • 在线分析:启动在线流量分析。目前还没有加入“分析”的功能,仅能对流量进行实时捕捉。

  • 结束在线分析:顾名思义。

  • 打开文件:打开一个 pcap 文件,留待后续使用,主要希望将离线分析的功能也整合进来。

  • 大大的文本框:用于输出实时流量捕捉的内容。

运行起来的效果如下:

总体实现的内容比较简单,因此后文仅对我开发过程中遇到的一些疑难杂症进行叙述。

不过,还是需要先对本文的另一个主角 scapy 进行一些介绍。

scapy初探

Scapy 是一套完整的网络报文操作工具,可以实现抓包、解包、构造和发送数据包等功能。

这次将 scapyPyQt5 结合,实际上只需要用到抓包的功能:

def StartSniff(self):
    self.stopSniff = False
    sniff(prn=self.SniffCallBack, count=0, stop_filter=self.isStop)

def SniffCallBack(self, pkt: scapy.packet.Packet) -> None:
    ''' Do Something '''

def isStop(self, pkt):
    return self.stopSniff

可以看到,这个功能只需要调用一个 sniff() 函数,并配置如下几个参数:

  • prn :抓包的回调函数。每抓到一个数据包就会调用一次这个函数,函数有个默认参数 packet ,类型是 scapy.packet.Packet ,支持对该数据包进行一些操作。

  • count :要捕捉的数据包个数,设置为 0 表示无限。

  • stop_filter :用于设置抓包停止的条件,当函数返回 True 时停止抓包。这个参数跟 count=0 简直是不可分割的,但网络上各教程中介绍不多,很迷惑。参数对应的是一个 函数 ,函数也默认传入 packet ,跟 prn 回调函数的用法区别不大。在这里,我将其配置为返回一个成员变量,当我希望停止抓包的时候,把这个成员变量改为 True 就行。

PyQt和Scapy的结合:QThread和pyqtSIgnal

现在又要将思路跳回PyQt上了,记得我开发的窗口有这些按钮:

当点击“在线分析“的时候,scapy需要开始抓包,但抓包的工作不能影响窗口本身的运行,这里就需要使用多线程 QThread

主窗体的代码如下:

class Analyzer(QWidget):
    def __init__(self) -> None:
        super().__init__()
        # 定义用于抓包的成员
        self.sniffer = Sniffer()
        self.initUI()

    def initUI(self) -> None:
        # 窗体 & 标题
        self.setGeometry(300, 300, 500, 400)
        self.setWindowTitle("Traffic Analyzer")
        # 居中定位
        self.alignCenter()
        # 设置提示框字体
        QToolTip.setFont(QFont('HanziPen TC', 10))
        # 关联事件
        self.sniffer.updateSignal.connect(self.SnifferUpdate)
        # 创建按钮
        ### 在线分析
        startBtn = QPushButton('在线分析', self, clicked=self.sniffer.start)
        startBtn.setToolTip('启动在线流量分析')
        startBtn.resize(startBtn.sizeHint())
        ### 结束在线分析
        endBtn = QPushButton('结束在线分析', self, clicked=self.sniffer.pause)
        endBtn.setToolTip('结束在线流量分析')
        endBtn.resize(endBtn.sizeHint())
        ### 打开文件
        fileBtn = QPushButton('打开文件', self)
        fileBtn.setToolTip('打开一个pcap文件')
        fileBtn.clicked.connect(self.openFileDialog)
        fileBtn.resize(fileBtn.sizeHint())
        ### 文本编辑
        self.textEdit = QTextEdit()
        self.textEdit.setReadOnly(True)
        # 设置布局
        grid = QGridLayout()
        self.setLayout(grid)
        grid.addWidget(startBtn, 0, 0)
        grid.addWidget(endBtn, 0, 1)
        grid.addWidget(fileBtn, 0, 2)
        grid.addWidget(self.textEdit, 2, 0, 5, 3)
        # 呈现
        self.show()
    ''' 省略若干不重要的函数 '''
    def SnifferUpdate(self):
        self.textEdit.setText(self.sniffer.text)
        self.textEdit.moveCursor(self.textEdit.textCursor().End)

上述代码的重点在于两个按钮的定义:

startBtn = QPushButton('在线分析', self, clicked=self.sniffer.start)
...
endBtn = QPushButton('结束在线分析', self, clicked=self.sniffer.pause)

self.sniffer 是一个自定义的对象,它应该基于 QThread 实现子线程,基于 scapy 实现抓包,基于 pyqtSignal 实现信号传递。

# 继承 QThread 实现子线程
class Sniffer(QThread):
    # 用于向主窗口传说信号
    updateSignal = pyqtSignal()

    def __init__(self) -> None:
        super().__init__()
        self.pktInfo = []
        self.text = ''
        self.stopSniff = True
    # 重写 run 函数
    def run(self):
        self.StartSniff()
    # 重写 pause 函数以停止抓包
    def pause(self):
        self.EndSniff()
    # 基于 scapy 实现抓包
    def StartSniff(self):
        self.stopSniff = False
        sniff(prn=self.SniffCallBack, count=0, stop_filter=self.isStop)

    def isStop(self, pkt):
        return self.stopSniff

    def EndSniff(self):
        self.stopSniff = True

    def SniffCallBack(self, pkt: scapy.packet.Packet) -> None:
        if pkt.haslayer('TCP'):
            infoStr = pkt.sprintf("TCP -> %IP.src%:%TCP.sport% -> %IP.dst%:%TCP.dport%")
            self.pktInfo.append(infoStr)
            if len(self.pktInfo) > 100:
                for i in self.pktInfo:
                    self.text += i + '\n'
                self.pktInfo.clear()
                # 向主窗口发送一个信号
                self.updateSignal.emit()

在这里,定义 SniffCallBack 的功能为捕捉TCP数据包,每捕捉一个包就产生一条记录信息,每捉满100个包 就向主窗体发送一个信号。

Analyzer 中对子进程信号进行了处理:

self.sniffer.updateSignal.connect(self.SnifferUpdate)
...
def SnifferUpdate(self):
    self.textEdit.setText(self.sniffer.text)
    self.textEdit.moveCursor(self.textEdit.textCursor().End)

收到信号时,主窗体从 Sniffer 当中取出现有的所有数据包的信息,显示到文本框上,并将文本框滚动条滑至底部。

总的代码如下:

from struct import pack
import sys
from telnetlib import NOP
from turtle import left
from PyQt5.QtWidgets import (QApplication, QGridLayout, 
    QToolTip, QPushButton, QMessageBox, QDesktopWidget, 
    QWidget, QTextEdit, QFileDialog)
from PyQt5.QtGui import QFont
from PyQt5.QtCore import Qt, pyqtSignal, QThread
from scapy.all import *


class Sniffer(QThread):
    updateSignal = pyqtSignal()

    def __init__(self) -> None:
        super().__init__()
        self.pktInfo = []
        self.text = ''
        self.stopSniff = True

    def run(self):
        self.StartSniff()

    def pause(self):
        self.EndSniff()

    def StartSniff(self):
        self.stopSniff = False
        sniff(prn=self.SniffCallBack, count=0, stop_filter=self.isStop)

    def isStop(self, pkt):
        return self.stopSniff

    def EndSniff(self):
        self.stopSniff = True

    def SniffCallBack(self, pkt: scapy.packet.Packet) -> None:
        if pkt.haslayer('TCP'):
            infoStr = pkt.sprintf("TCP -> %IP.src%:%TCP.sport% -> %IP.dst%:%TCP.dport%")
            self.pktInfo.append(infoStr)
            # print(infoStr + '\t' + str(len(self.pktInfo)))
            if len(self.pktInfo) > 100:
                for i in self.pktInfo:
                    self.text += i + '\n'
                self.pktInfo.clear()
                self.updateSignal.emit()


class Analyzer(QWidget):
    def __init__(self) -> None:
        super().__init__()
        self.sniffer = Sniffer()
        self.initUI()

    def initUI(self) -> None:
        # 窗体 & 标题
        self.setGeometry(300, 300, 500, 400)
        self.setWindowTitle("Traffic Analyzer")
        # 状态栏(父类 QMainWindow 与栅格布局冲突)
        # self.statusBar().showMessage("就绪")
        # 居中定位
        self.alignCenter()
        # 设置提示框字体
        QToolTip.setFont(QFont('HanziPen TC', 10))
        # 关联事件
        self.sniffer.updateSignal.connect(self.SnifferUpdate)
        # 创建按钮
        ### 在线分析
        startBtn = QPushButton('在线分析', self, clicked=self.sniffer.start)
        startBtn.setToolTip('启动在线流量分析')
        startBtn.resize(startBtn.sizeHint())
        ### 结束在线分析
        endBtn = QPushButton('结束在线分析', self, clicked=self.sniffer.pause)
        endBtn.setToolTip('结束在线流量分析')
        endBtn.resize(endBtn.sizeHint())
        ### 打开文件
        fileBtn = QPushButton('打开文件', self)
        fileBtn.setToolTip('打开一个pcap文件')
        fileBtn.clicked.connect(self.openFileDialog)
        fileBtn.resize(fileBtn.sizeHint())
        ### 文本编辑
        self.textEdit = QTextEdit()
        self.textEdit.setReadOnly(True)
        # 设置布局
        grid = QGridLayout()
        self.setLayout(grid)
        grid.addWidget(startBtn, 0, 0)
        grid.addWidget(endBtn, 0, 1)
        grid.addWidget(fileBtn, 0, 2)
        grid.addWidget(self.textEdit, 2, 0, 5, 3)
        # 呈现
        self.show()

    def closeEvent(self, event) -> None:
        reply = QMessageBox.question(self, '关闭程序', '确定关闭程序?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
        if reply == QMessageBox.Yes:
            event.accept()
        else:
            event.ignore()

    def alignCenter(self) -> None:
        qr = self.frameGeometry()
        cp = QDesktopWidget().availableGeometry().center()
        qr.moveCenter(cp)
        self.move(qr.topLeft())

    def keyPressEvent(self, e) -> None:
        if e.key() == Qt.Key_Escape:
            self.close()

    def openFileDialog(self):
        fname = QFileDialog.getOpenFileName(self, '打开文件', '~/', filter='*.py')
        if fname[0]:
            f = open(fname[0], 'r')
            with f:
                data = f.read()
                self.textEdit.setText(data)

    def SnifferUpdate(self):
        self.textEdit.setText(self.sniffer.text)
        self.textEdit.moveCursor(self.textEdit.textCursor().End)

if __name__ == '__main__':
    app = QApplication(sys.argv)
    analyzer = Analyzer()
    sys.exit(app.exec_())

后记

初探PyQt和Scapy,自觉本文写得杂乱,仅做自娱。代码也许还有一些参考价值,请读者自行斟酌罢。

有朋自远方来,不亦说乎?