“Psutil + Flask开发Linux /Windows系统监控”的版本间的差异

来自CloudWiki
跳转至: 导航搜索
网页模板
内存
第164行: 第164行:
  
 
</nowiki>
 
</nowiki>
 +
 +
效果:
 +
 +
[[文件:python2022052701.png|600px]]
  
  

2022年5月27日 (五) 06:26的版本

简介

psutil 是一个跨平台库(http://pythonhosted.org/psutil)能够获取到系统运行的进程和系统利用率(包括CPU、内存、磁盘、网络等)信息。主要用来做系统监控,性能分析,进程管理。支持 Linux、Mac OS、Windows 系统。

本文以 psutil 模块获取系统信息开发一个监控 Windows 系统的平台。

技术选择

  • 监控的系统是 Windows 系统
  • 监控系统模块选择 psutil 模块
  • Web 框架选择的是 Flask 框架


技术准备

安装 psutil

pip3 install psutil

安装 Flask、pyecharts、Bootstrap

Flask 的教程是在公众号文章:Web 开发 Flask 介绍

获取系统信息

初始化


import psutil
import os, signal
from random import randrange

from flask import Flask, render_template, jsonify, request
import time

app = Flask(__name__, static_folder="templates")

cpu_percent_dict = {}
net_io_dict = {'net_io_time':[], 'net_io_sent': [], 'net_io_recv': [], 'pre_sent': 0, 'pre_recv': 0, 'len': -1}
disk_dict = {'disk_time':[], 'write_bytes': [], 'read_bytes': [], 'pre_write_bytes': 0, 'pre_read_bytes': 0, 'len': -1}

cpu_percent_dict = {}

网页模板

templates/list.html:

<!DOCTYPE html>
<html lang="zh-CN">
  <head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>基于psutil的系统监控工具</title>

    <link href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css" rel="stylesheet">
  </head>


<body>
    <h1>{{title}}</h1>
     <table id="customers" class="table table-striped">
    <tr>
      <th>监测项</th>
      <th>数值</th>
    </tr>

    {% for key, value in my_dict.items() %}
    <tr>
      <td>{{ key }}</td>
      <td>{{ value}}</td>
    </tr>
    {% endfor %}
     </table>
</body>
</html>

CPU信息

通过 psutil 获取 CPU 信息

>>> import psutil
 
 
# 获取当前 CPU 的利用率
>>> psutil.cpu_percent()
53.8
 
 
# 获取当前 CPU 的用户/系统/空闲时间
>>> psutil.cpu_times()
scputimes(user=197483.49, nice=0.0, system=114213.01, idle=1942295.68)
 
 
# 1/5/15 分钟之内的 CPU 负载
>>> psutil.getloadavg()
(7.865234375, 5.1826171875, 4.37353515625)
 
 
# CPU 逻辑个数
>>> psutil.cpu_count()
4
 
 
# CPU 物理个数
>>> psutil.cpu_count(logical=False)
2

在监控平台上每 2 秒请求 url 获取 CPU 负载,并动态显示图表:

cpu_percent_dict = {}
def cpu():
    # 当前时间
    now = time.strftime('%H:%M:%S', time.localtime(time.time()))
    #  CPU 负载
    cpu_percent = psutil.cpu_percent()
    cpu_percent_dict[now] = cpu_percent
 
 
    # 保持在图表中 10 个数据
    if len(cpu_percent_dict.keys()) == 11:
        cpu_percent_dict.pop(list(cpu_percent_dict.keys())[0])
 
 
def cpu_line() -> Line:
    cpu()
    # 全量更新 pyecharts 图表
    c = (
        Line()
            .add_xaxis(list(cpu_percent_dict.keys()))
            .add_yaxis('', list(cpu_percent_dict.values()), areastyle_opts=opts.AreaStyleOpts(opacity=0.5))
            .set_global_opts(title_opts=opts.TitleOpts(title = now + "CPU负载",pos_left = "center"),
                             yaxis_opts=opts.AxisOpts(min_=0,max_=100,split_number=10,type_="value", name='%'))
    )
    return c
 
 
@app.route("/cpu")
def get_cpu_chart():
    c = cpu_line()
    return c.dump_options_with_quotes()

内存

@app.route("/memory")
def memory():
    memory = psutil.virtual_memory()
    swap = psutil.swap_memory()
    sys={}
    sys['mem_total']= memory.total
    sys['mem_used']= memory.used
    sys['mem_free']= memory.free
    sys['swap_total']=swap.total
    sys['swap_used']=swap.used
    sys['swap_free']=swap.free
    sys['memory_percent']= memory.percent

    return render_template('list.html', title="内存使用情况",my_dict=sys)

if __name__ == '__main__':#程序入口
    #app.run()#让应用运行在本地服务器上。
    app.run(debug=True,host='0.0.0.0') #允许任意网址访问本站


效果:

Python2022052701.png


https://blog.csdn.net/ityouknow/article/details/105985451

磁盘

通过 psutil 获取磁盘大小、分区、使用率和磁盘IO

在监控平台上每 2 秒请求 url 获取磁盘信息,并动态显示图表

def disk():
    disk_usage = psutil.disk_usage('/')
    disk_used = 0
    # 磁盘已使用大小 = 每个分区的总和
    partitions = psutil.disk_partitions()
    for partition in partitions:
        partition_disk_usage = psutil.disk_usage(partition[1])
        disk_used = partition_disk_usage.used + disk_used

    now = time.strftime('%H:%M:%S', time.localtime(time.time()))
    count = psutil.disk_io_counters()
    read_bytes = count.read_bytes
    write_bytes = count.write_bytes

    # 第一次请求
    if disk_dict['len'] == -1:
        disk_dict['pre_write_bytes'] = write_bytes
        disk_dict['pre_read_bytes'] = read_bytes
        disk_dict['len'] = 0
        return disk_usage.total, disk_used, disk_usage.free

    # 当前速率=现在写入/读取的总字节-前一次请求写入/读取的总字节
    disk_dict['write_bytes'].append((write_bytes - disk_dict['pre_write_bytes'])/1024)
    disk_dict['read_bytes'].append((read_bytes - disk_dict['pre_read_bytes'])/ 1024)
    disk_dict['disk_time'].append(now)
    disk_dict['len'] = disk_dict['len'] + 1

    # 把现在写入/读取的总字节放入前一个请求的变量中
    disk_dict['pre_write_bytes'] = write_bytes
    disk_dict['pre_read_bytes'] = read_bytes

    # 保持在图表中 50 个数据
    if disk_dict['len'] == 51:
        disk_dict['write_bytes'].pop(0)
        disk_dict['read_bytes'].pop(0)
        disk_dict['disk_time'].pop(0)
        disk_dict['len'] = disk_dict['len'] - 1

    return disk_usage.total, disk_used, disk_usage.free


def disk_line() -> Line:
    total, used, free = disk()

    c = (
        Line(init_opts=opts.InitOpts(width="1680px", height="800px"))
        .add_xaxis(xaxis_data=disk_dict['disk_time'])
        .add_yaxis(
            series_name="写入数据",
            y_axis=disk_dict['write_bytes'],
            areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
            linestyle_opts=opts.LineStyleOpts(),
            label_opts=opts.LabelOpts(is_show=False),
        )
        .add_yaxis(
            series_name="读取数据",
            y_axis=disk_dict['read_bytes'],
            yaxis_index=1,
            areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
            linestyle_opts=opts.LineStyleOpts(),
            label_opts=opts.LabelOpts(is_show=False),
        )
        .extend_axis(
            yaxis=opts.AxisOpts(
                name_location="start",
                type_="value",
                is_inverse=True,
                axistick_opts=opts.AxisTickOpts(is_show=True),
                splitline_opts=opts.SplitLineOpts(is_show=True),
                name='KB/2S'
            )
        )
        .set_global_opts(
            title_opts=opts.TitleOpts(
                title="磁盘IO",
                pos_left="center",
                pos_top="top",
            ),
            tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="cross"),
            legend_opts=opts.LegendOpts(pos_left="left"),
            xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
            yaxis_opts=opts.AxisOpts(type_="value", name='KB/2S'),
        )
        .set_series_opts(
            axisline_opts=opts.AxisLineOpts(),
        )
    )

    return total, used, free, c

@app.route("/disk")
def get_disk_chart():
    total, used, free, c = disk_line()
    return jsonify({'total': total, 'used': used, 'free': free, 'line': c.dump_options_with_quotes()})


网卡

通过 psutil 获取网络接口和网络连接的信息

在监控平台上每 2 秒请求 url 获取网卡IO,并动态显示图表

def net_io():
    now = time.strftime('%H:%M:%S', time.localtime(time.time()))
    # 获取网络信息
    count = psutil.net_io_counters()
    g_sent = count.bytes_sent
    g_recv = count.bytes_recv

    # 第一次请求
    if net_io_dict['len'] == -1:
        net_io_dict['pre_sent'] = g_sent
        net_io_dict['pre_recv'] = g_recv
        net_io_dict['len'] = 0
        return

    # 当前网络发送/接收的字节速率 = 现在网络发送/接收的总字节 - 前一次请求网络发送/接收的总字节
    net_io_dict['net_io_sent'].append(g_sent - net_io_dict['pre_sent'])
    net_io_dict['net_io_recv'].append(g_recv - net_io_dict['pre_recv'])
    net_io_dict['net_io_time'].append(now)
    net_io_dict['len'] = net_io_dict['len'] + 1

    net_io_dict['pre_sent'] = g_sent
    net_io_dict['pre_recv'] = g_recv

    # 保持在图表中 10 个数据
    if net_io_dict['len'] == 11:
        net_io_dict['net_io_sent'].pop(0)
        net_io_dict['net_io_recv'].pop(0)
        net_io_dict['net_io_time'].pop(0)
        net_io_dict['len'] = net_io_dict['len'] - 1


def net_io_line() -> Line:
    net_io()

    c = (
    Line()
    .add_xaxis(net_io_dict['net_io_time'])
    .add_yaxis("发送字节数", net_io_dict['net_io_sent'], is_smooth=True)
    .add_yaxis("接收字节数", net_io_dict['net_io_recv'], is_smooth=True)
    .set_series_opts(
        areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
        label_opts=opts.LabelOpts(is_show=False),
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="网卡IO", pos_left = "center"),
        xaxis_opts=opts.AxisOpts(
            axistick_opts=opts.AxisTickOpts(is_align_with_label=True),
            is_scale=False,
            boundary_gap=False,
        ),
        yaxis_opts=opts.AxisOpts(type_="value", name='B/2S'),
        legend_opts=opts.LegendOpts(pos_left="left"),
    ))
    return c

进程

通过 psutil 可以获取所有进程的信息

# 所有进程的 pid
>>> psutil.pids()
[0, 1, 134, 135, 138, 139, 140, 141, 144, 145, 147, 152, ..., 30400, 97792]
 
 
# 单个进程
>>> p = psutil.Process(30400)
 
 
# 名称
>>> p.name()
'pycharm'
 
 
# 使用内存负载
>>> p.memory_percent()
12.838459014892578
 
 
# 启动时间
>>> p.create_time()
1587029962.493182
 
 
# 路径
>>> p.exe()
'/Applications/PyCharm.app/Contents/MacOS/pycharm'
 
 
# 状态
>>> p.status()
'running'
 
 
# 用户名
>>> p.username()
'imeng'
 
 
# 内存信息
>>> p.memory_info()
pmem(rss=1093005312, vms=9914318848, pfaults=7813313, pageins=8448)
def process():
    result = []
    process_list = []
    pid = psutil.pids()
    for k, i in enumerate(pid):
        try:
            proc = psutil.Process(i)
            ctime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(proc.create_time()))
            process_list.append((str(i), proc.name(), proc.cpu_percent(), proc.memory_percent(), ctime))
        except psutil.AccessDenied:
            pass
        except psutil.NoSuchProcess:
            pass
        except SystemError:
            pass

        process_list.sort(key=process_sort, reverse=True)
    for i in process_list:
        result.append({'PID': i[0], 'name': i[1], 'cpu': i[2], 'mem': "%.2f%%"%i[3], 'ctime': i[4]})


    return jsonify({'list': result})

def process_sort(elem):
    return elem[3]

@app.route("/process")
def get_process_tab():
    c = process()
    return c

@app.route("/delprocess")
def del_process():
    pid = request.args.get("pid")
    os.kill(int(pid), signal.SIGKILL)
    return jsonify({'status': 'OK'})


代码

app.py

import os, signal
from random import randrange

from flask import Flask, render_template, jsonify, request

from pyecharts import options as opts
from pyecharts.charts import Bar, Line, Liquid, Gauge, Grid
import pyecharts.options as opts
import time
import psutil

app = Flask(__name__, static_folder="templates")

cpu_percent_dict = {}
net_io_dict = {'net_io_time':[], 'net_io_sent': [], 'net_io_recv': [], 'pre_sent': 0, 'pre_recv': 0, 'len': -1}
disk_dict = {'disk_time':[], 'write_bytes': [], 'read_bytes': [], 'pre_write_bytes': 0, 'pre_read_bytes': 0, 'len': -1}

def cpu():
    now = time.strftime('%H:%M:%S', time.localtime(time.time()))
    cpu_percent = psutil.cpu_percent()
    cpu_percent_dict[now] = cpu_percent
    # 保持在图表中 10 个数据
    if len(cpu_percent_dict.keys()) == 11:
        cpu_percent_dict.pop(list(cpu_percent_dict.keys())[0])


def cpu_line() -> Line:
    now = time.strftime('%Y年%m月%d日的', time.localtime(time.time()))
    cpu()
    c = (
        Line()
            .add_xaxis(list(cpu_percent_dict.keys()))
            .add_yaxis('', list(cpu_percent_dict.values()), areastyle_opts=opts.AreaStyleOpts(opacity=0.5))
            .set_global_opts(title_opts=opts.TitleOpts(title = now + "CPU负载",pos_left = "center"),
                             yaxis_opts=opts.AxisOpts(min_=0,max_=100,split_number=10,type_="value", name='%'))
    )
    return c

def memory():
    memory = psutil.virtual_memory()
    swap = psutil.swap_memory()
    return memory.total, memory.used, \
           memory.free, swap.total, swap.used, swap.free, memory.percent


def memory_liquid() -> Gauge:
    mtotal, mused, mfree, stotal, sused, sfree, mpercent = memory()
    c = (
        Gauge()
            .add("", [("", mpercent)])
            .set_global_opts(title_opts=opts.TitleOpts(title="内存负载", pos_left = "center"))
    )
    return mtotal, mused, mfree, stotal, sused, sfree, c



def net_io():
    now = time.strftime('%H:%M:%S', time.localtime(time.time()))
    # 获取网络信息
    count = psutil.net_io_counters()
    g_sent = count.bytes_sent
    g_recv = count.bytes_recv

    # 第一次请求
    if net_io_dict['len'] == -1:
        net_io_dict['pre_sent'] = g_sent
        net_io_dict['pre_recv'] = g_recv
        net_io_dict['len'] = 0
        return

    # 当前网络发送/接收的字节速率 = 现在网络发送/接收的总字节 - 前一次请求网络发送/接收的总字节
    net_io_dict['net_io_sent'].append(g_sent - net_io_dict['pre_sent'])
    net_io_dict['net_io_recv'].append(g_recv - net_io_dict['pre_recv'])
    net_io_dict['net_io_time'].append(now)
    net_io_dict['len'] = net_io_dict['len'] + 1

    net_io_dict['pre_sent'] = g_sent
    net_io_dict['pre_recv'] = g_recv

    # 保持在图表中 10 个数据
    if net_io_dict['len'] == 11:
        net_io_dict['net_io_sent'].pop(0)
        net_io_dict['net_io_recv'].pop(0)
        net_io_dict['net_io_time'].pop(0)
        net_io_dict['len'] = net_io_dict['len'] - 1


def net_io_line() -> Line:
    net_io()

    c = (
    Line()
    .add_xaxis(net_io_dict['net_io_time'])
    .add_yaxis("发送字节数", net_io_dict['net_io_sent'], is_smooth=True)
    .add_yaxis("接收字节数", net_io_dict['net_io_recv'], is_smooth=True)
    .set_series_opts(
        areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
        label_opts=opts.LabelOpts(is_show=False),
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="网卡IO", pos_left = "center"),
        xaxis_opts=opts.AxisOpts(
            axistick_opts=opts.AxisTickOpts(is_align_with_label=True),
            is_scale=False,
            boundary_gap=False,
        ),
        yaxis_opts=opts.AxisOpts(type_="value", name='B/2S'),
        legend_opts=opts.LegendOpts(pos_left="left"),
    ))
    return c


def process():
    result = []
    process_list = []
    pid = psutil.pids()
    for k, i in enumerate(pid):
        try:
            proc = psutil.Process(i)
            ctime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(proc.create_time()))
            process_list.append((str(i), proc.name(), proc.cpu_percent(), proc.memory_percent(), ctime))
        except psutil.AccessDenied:
            pass
        except psutil.NoSuchProcess:
            pass
        except SystemError:
            pass

        process_list.sort(key=process_sort, reverse=True)
    for i in process_list:
        result.append({'PID': i[0], 'name': i[1], 'cpu': i[2], 'mem': "%.2f%%"%i[3], 'ctime': i[4]})


    return jsonify({'list': result})

def process_sort(elem):
    return elem[3]


def disk():
    disk_usage = psutil.disk_usage('/')
    disk_used = 0
    # 磁盘已使用大小 = 每个分区的总和
    partitions = psutil.disk_partitions()
    for partition in partitions:
        partition_disk_usage = psutil.disk_usage(partition[1])
        disk_used = partition_disk_usage.used + disk_used

    now = time.strftime('%H:%M:%S', time.localtime(time.time()))
    count = psutil.disk_io_counters()
    read_bytes = count.read_bytes
    write_bytes = count.write_bytes

    # 第一次请求
    if disk_dict['len'] == -1:
        disk_dict['pre_write_bytes'] = write_bytes
        disk_dict['pre_read_bytes'] = read_bytes
        disk_dict['len'] = 0
        return disk_usage.total, disk_used, disk_usage.free

    # 当前速率=现在写入/读取的总字节-前一次请求写入/读取的总字节
    disk_dict['write_bytes'].append((write_bytes - disk_dict['pre_write_bytes'])/1024)
    disk_dict['read_bytes'].append((read_bytes - disk_dict['pre_read_bytes'])/ 1024)
    disk_dict['disk_time'].append(now)
    disk_dict['len'] = disk_dict['len'] + 1

    # 把现在写入/读取的总字节放入前一个请求的变量中
    disk_dict['pre_write_bytes'] = write_bytes
    disk_dict['pre_read_bytes'] = read_bytes

    # 保持在图表中 50 个数据
    if disk_dict['len'] == 51:
        disk_dict['write_bytes'].pop(0)
        disk_dict['read_bytes'].pop(0)
        disk_dict['disk_time'].pop(0)
        disk_dict['len'] = disk_dict['len'] - 1

    return disk_usage.total, disk_used, disk_usage.free


def disk_line() -> Line:
    total, used, free = disk()

    c = (
        Line(init_opts=opts.InitOpts(width="1680px", height="800px"))
        .add_xaxis(xaxis_data=disk_dict['disk_time'])
        .add_yaxis(
            series_name="写入数据",
            y_axis=disk_dict['write_bytes'],
            areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
            linestyle_opts=opts.LineStyleOpts(),
            label_opts=opts.LabelOpts(is_show=False),
        )
        .add_yaxis(
            series_name="读取数据",
            y_axis=disk_dict['read_bytes'],
            yaxis_index=1,
            areastyle_opts=opts.AreaStyleOpts(opacity=0.5),
            linestyle_opts=opts.LineStyleOpts(),
            label_opts=opts.LabelOpts(is_show=False),
        )
        .extend_axis(
            yaxis=opts.AxisOpts(
                name_location="start",
                type_="value",
                is_inverse=True,
                axistick_opts=opts.AxisTickOpts(is_show=True),
                splitline_opts=opts.SplitLineOpts(is_show=True),
                name='KB/2S'
            )
        )
        .set_global_opts(
            title_opts=opts.TitleOpts(
                title="磁盘IO",
                pos_left="center",
                pos_top="top",
            ),
            tooltip_opts=opts.TooltipOpts(trigger="axis", axis_pointer_type="cross"),
            legend_opts=opts.LegendOpts(pos_left="left"),
            xaxis_opts=opts.AxisOpts(type_="category", boundary_gap=False),
            yaxis_opts=opts.AxisOpts(type_="value", name='KB/2S'),
        )
        .set_series_opts(
            axisline_opts=opts.AxisLineOpts(),
        )
    )

    return total, used, free, c

@app.route("/")
def index():
    return render_template("index.html")


@app.route("/cpu")
def get_cpu_chart():
    c = cpu_line()
    return c.dump_options_with_quotes()

@app.route("/memory")
def get_memory_chart():
    mtotal, mused, mfree, stotal, sused, sfree, c = memory_liquid()
    return jsonify({'mtotal': mtotal, 'mused': mused, 'mfree': mfree, 'stotal': stotal, 'sused': sused, 'sfree': sfree, 'liquid': c.dump_options_with_quotes()})


@app.route("/netio")
def get_net_io_chart():
    c = net_io_line()
    return c.dump_options_with_quotes()

@app.route("/process")
def get_process_tab():
    c = process()
    return c

@app.route("/delprocess")
def del_process():
    pid = request.args.get("pid")
    os.kill(int(pid), signal.SIGKILL)
    return jsonify({'status': 'OK'})

@app.route("/disk")
def get_disk_chart():
    total, used, free, c = disk_line()
    return jsonify({'total': total, 'used': used, 'free': free, 'line': c.dump_options_with_quotes()})

if __name__ == "__main__":
    app.run()


templates/index.html

<!DOCTYPE html>
<html lang="zh-CN">
  <head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>基于psutil的系统监控工具</title>

    <link href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css" rel="stylesheet">
  </head>


<body>
    <h1>{{title}}</h1>
     <table id="customers" class="table table-striped">
    <tr>
      <th>ID</th>
      <th>name</th>
    </tr>

    {% for key, value in my_dict.items() %}
    <tr>
      <td>{{ key }}</td>
      <td>{{ value}}</td>
    </tr>
    {% endfor %}
     </table>
</body>
</html>

效果

Python22031901.png

参考文档 : https://blog.csdn.net/ityouknow/article/details/105985451