From 02095421036bc48af06b4e6c63567cc7f5e2d92a Mon Sep 17 00:00:00 2001 From: Book-smile-man <1351340502@qq.com> Date: Sat, 27 Mar 2021 11:08:46 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E6=8C=87=E6=A0=87=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- controllers/collect.go | 7 ++- controllers/tj.go | 10 +++ logic/tj.go | 9 +++ models/pluginparam.go | 1 + plugins/swap_pagefault.py | 93 +++++++++++++++++++++++++++ static/index.html | 3 +- test/grafana-JSON/lmp.json | 125 ++++++++++++++++++++++++++++++++++++- 7 files changed, 245 insertions(+), 3 deletions(-) create mode 100644 plugins/swap_pagefault.py diff --git a/controllers/collect.go b/controllers/collect.go index 07d70059..b4e61a3f 100644 --- a/controllers/collect.go +++ b/controllers/collect.go @@ -27,7 +27,12 @@ func Collect(c *gin.Context) { func fillFrontMessage(c *gin.Context) models.ConfigMessage { var m models.ConfigMessage - + if v, ok := c.GetPostForm("swap_pagefault"); ok && v == "true" { + m.Swap_pagefault = true + m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"swap_pagefault.py") + } else { + m.Swap_pagefault = false + } if v, ok := c.GetPostForm("cpuutilize"); ok && v == "true" { m.Cpuutilize = true m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"cpuutilize.py") diff --git a/controllers/tj.go b/controllers/tj.go index d8e933d0..4452afa3 100644 --- a/controllers/tj.go +++ b/controllers/tj.go @@ -5,6 +5,16 @@ import ( "github.com/linuxkerneltravel/lmp/logic" "go.uber.org/zap" ) +func QuerySwap_pagefault(c *gin.Context) { + res, err := logic.DoQuerySwap_pagefault() + if err != nil { + zap.L().Error("ERROR in QuerySwap_pagefault():", zap.Error(err)) + ResponseError(c, CodeServerBusy) + return + } + + ResponseSuccess(c, res) +} func QueryIRQ(c *gin.Context) { res, err := logic.DoQueryIRQ() diff --git a/logic/tj.go b/logic/tj.go index 785b4984..d8a3f6f4 100644 --- a/logic/tj.go +++ b/logic/tj.go @@ -16,6 +16,15 @@ func DoQueryIRQ() (res []client.Result, err error) { return } +func DoQuerySwap_pagefault() (res []client.Result, err error) { + res, err = influxdb.QueryDB(`select last("duration") from "swap_pagefault"`) + if err != nil { + zap.L().Error("ERROR in Doswap_pagefault():", zap.Error(err)) + return nil, err + } + return +} + func DoQueryCpuUtilize() (res []client.Result, err error) { res, err = influxdb.QueryDB(`select last("perce") from "cpuutilize"`) if err != nil { diff --git a/models/pluginparam.go b/models/pluginparam.go index f8c7fbf4..bf19bc57 100644 --- a/models/pluginparam.go +++ b/models/pluginparam.go @@ -9,6 +9,7 @@ type ConfigMessage struct { Runqlen bool `json:"runqlen"` Vfsstat bool `json:"vfsstat"` Dcache bool `json:"dcache"` + Swap_pagefault bool `json:"swap_pagefault"` // Store the config above to the 'BpfFilePath' BpfFilePath []string `json:"bpfFilePath"` diff --git a/plugins/swap_pagefault.py b/plugins/swap_pagefault.py new file mode 100644 index 00000000..b9bae777 --- /dev/null +++ b/plugins/swap_pagefault.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python +# coding=utf-8 + +from __future__ import print_function +from bcc import BPF +from time import sleep, strftime + +# for influxdb +from influxdb import InfluxDBClient +import lmp_influxdb as db +from db_modules import write2db + +from datetime import datetime + + +DBNAME = 'lmp' + +client = db.connect(DBNAME,user='root',passwd=123456) + +b = BPF(text = ''' + #include + #include + + BPF_HASH(timer, u32, ktime_t); + + int kprobe__do_swap_page(struct pt_regs *ctx) + { + + u32 pid = bpf_get_current_pid_tgid(); + + ktime_t start = bpf_ktime_get_ns(); + timer.update(&pid, &start); + + return 0; + } + + int kretprobe__do_swap_page(struct pt_regs *ctx) + { + + ktime_t end = bpf_ktime_get_ns(); + int ret = PT_REGS_RC(ctx); + + u32 pid = bpf_get_current_pid_tgid(); + + ktime_t delta; + + ktime_t *tsp = timer.lookup(&pid); + if ((ret >= 0) && (tsp != NULL)) + delta = end - *tsp; + + if (delta >= 10000000) {/* 大于10ms的进行输出 */ + bpf_trace_printk("%lld\\n", delta); + } + + //bpf_trace_printk("%lld\\n", delta); + + return 0; + } + ''') + + +# data structure from template +class lmp_data(object): + def __init__(self,a,b,c): + self.time = a + self.glob = b + self.duration = c + +data_struct = {"measurement":'swap_pagefault', + "time":[], + "tags":['glob'], + "fields":['duration']} + + + +timer = b.get_table("timer") + +#print("%-6s%-6s%-6s%-6s" % ("CPU", "PID", "TGID", "TIME(us)")) +while (1): + try: + sleep(1) + for k, v in timer.items(): + #print("%-6d%-6d%-6d%-6d" % (k.cpu, k.pid, k.tgid, v.value / 1000)) + test_data = lmp_data(datetime.now().isoformat(),'glob', v.value/1000) + write2db(data_struct, test_data, client) + #print("This is success") + timer.clear() + except KeyboardInterrupt: + exit() + + + + diff --git a/static/index.html b/static/index.html index 3b93eab3..e044f7a6 100644 --- a/static/index.html +++ b/static/index.html @@ -46,6 +46,7 @@ runqlen    vfsstat    dcache  + swap_pagefault 
@@ -57,4 +58,4 @@ - \ No newline at end of file + diff --git a/test/grafana-JSON/lmp.json b/test/grafana-JSON/lmp.json index 27d734ca..cfed0b4e 100644 --- a/test/grafana-JSON/lmp.json +++ b/test/grafana-JSON/lmp.json @@ -560,6 +560,129 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 17 + }, + "hiddenSeries": false, + "id": 14, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "percentage": false, + "pluginVersion": "7.1.3", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT \"duration\" FROM \"swap_pagefault\" ", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "swap_pagefault", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "aliasColors": {}, "bars": false, @@ -713,4 +836,4 @@ "title": "LMP", "uid": "wRfOqufMk", "version": 3 -} \ No newline at end of file +} -- Gitee From a0dbae5d027158a6ad7ac805681113b562033532 Mon Sep 17 00:00:00 2001 From: Your Name Date: Thu, 15 Apr 2021 23:45:02 +0800 Subject: [PATCH 2/9] add bcc ext4 latency --- controllers/collect.go | 16 ++- controllers/tj.go | 28 ++-- logic/tj.go | 21 ++- models/pluginparam.go | 16 +-- plugins/ext4_latency.py | 265 +++++++++++++++++++++++++++++++++++++ static/index.html | 1 + test/grafana-JSON/lmp.json | 241 +++++++++++++++++++++++++++++++++ 7 files changed, 560 insertions(+), 28 deletions(-) create mode 100644 plugins/ext4_latency.py diff --git a/controllers/collect.go b/controllers/collect.go index b4e61a3f..febbc187 100644 --- a/controllers/collect.go +++ b/controllers/collect.go @@ -27,12 +27,18 @@ func Collect(c *gin.Context) { func fillFrontMessage(c *gin.Context) models.ConfigMessage { var m models.ConfigMessage + if v, ok := c.GetPostForm("ext4_latency"); ok && v == "true" { + m.Ext4_latency = true + m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"ext4_latency.py") + } else { + m.Ext4_latency = false + } if v, ok := c.GetPostForm("swap_pagefault"); ok && v == "true" { - m.Swap_pagefault = true - m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"swap_pagefault.py") - } else { - m.Swap_pagefault = false - } + m.Swap_pagefault = true + m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"swap_pagefault.py") + } else { + m.Swap_pagefault = false + } if v, ok := c.GetPostForm("cpuutilize"); ok && v == "true" { m.Cpuutilize = true m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"cpuutilize.py") diff --git a/controllers/tj.go b/controllers/tj.go index 4452afa3..6c9823b6 100644 --- a/controllers/tj.go +++ b/controllers/tj.go @@ -5,15 +5,27 @@ import ( "github.com/linuxkerneltravel/lmp/logic" "go.uber.org/zap" ) + +func QueryExt4_latency(c *gin.Context) { + res, err := logic.DoQueryExt4_latency() + if err != nil { + zap.L().Error("ERROR in QuerySwap_pagefault():", zap.Error(err)) + ResponseError(c, CodeServerBusy) + return + } + + ResponseSuccess(c, res) +} + func QuerySwap_pagefault(c *gin.Context) { - res, err := logic.DoQuerySwap_pagefault() - if err != nil { - zap.L().Error("ERROR in QuerySwap_pagefault():", zap.Error(err)) - ResponseError(c, CodeServerBusy) - return - } - - ResponseSuccess(c, res) + res, err := logic.DoQuerySwap_pagefault() + if err != nil { + zap.L().Error("ERROR in QuerySwap_pagefault():", zap.Error(err)) + ResponseError(c, CodeServerBusy) + return + } + + ResponseSuccess(c, res) } func QueryIRQ(c *gin.Context) { diff --git a/logic/tj.go b/logic/tj.go index d8a3f6f4..f74ebf6a 100644 --- a/logic/tj.go +++ b/logic/tj.go @@ -15,14 +15,21 @@ func DoQueryIRQ() (res []client.Result, err error) { } return } - +func DoQueryExt4_latency() (res []client.Result, err error) { + res, err = influxdb.QueryDB(`select last("latency") from "ext4LatencyTable"`) + if err != nil { + zap.L().Error("ERROR in DoQueryExt4_latency():", zap.Error(err)) + return nil, err + } + return +} func DoQuerySwap_pagefault() (res []client.Result, err error) { - res, err = influxdb.QueryDB(`select last("duration") from "swap_pagefault"`) - if err != nil { - zap.L().Error("ERROR in Doswap_pagefault():", zap.Error(err)) - return nil, err - } - return + res, err = influxdb.QueryDB(`select last("duration") from "swap_pagefault"`) + if err != nil { + zap.L().Error("ERROR in Doswap_pagefault():", zap.Error(err)) + return nil, err + } + return } func DoQueryCpuUtilize() (res []client.Result, err error) { diff --git a/models/pluginparam.go b/models/pluginparam.go index bf19bc57..8471c09f 100644 --- a/models/pluginparam.go +++ b/models/pluginparam.go @@ -2,15 +2,15 @@ package models // ConfigMessage struct type ConfigMessage struct { - Cpuutilize bool `json:"cpuutilize"` - Irq bool `json:"irq"` - Memusage bool `json:"memusage"` - Picknexttask bool `json:"picknexttask"` - Runqlen bool `json:"runqlen"` - Vfsstat bool `json:"vfsstat"` - Dcache bool `json:"dcache"` + Cpuutilize bool `json:"cpuutilize"` + Irq bool `json:"irq"` + Memusage bool `json:"memusage"` + Picknexttask bool `json:"picknexttask"` + Runqlen bool `json:"runqlen"` + Vfsstat bool `json:"vfsstat"` + Dcache bool `json:"dcache"` Swap_pagefault bool `json:"swap_pagefault"` - + Ext4_latency bool `json:"ext4_latency"` // Store the config above to the 'BpfFilePath' BpfFilePath []string `json:"bpfFilePath"` // time diff --git a/plugins/ext4_latency.py b/plugins/ext4_latency.py new file mode 100644 index 00000000..a7d401b5 --- /dev/null +++ b/plugins/ext4_latency.py @@ -0,0 +1,265 @@ +#!/usr/bin/python +# @lint-avoid-python-3-compatibility-imports +# +# ext4dist Summarize ext4 operation latency. +# For Linux, uses BCC, eBPF. +# +# USAGE: ext4dist [-h] [-T] [-m] [-p PID] [interval] [count] +# +# Copyright 2016 Netflix, Inc. +# Licensed under the Apache License, Version 2.0 (the "License") +# +# 12-Feb-2016 Brendan Gregg Created this. +from __future__ import print_function +from bcc import BPF +from time import sleep, strftime +import argparse +# for influxdb +from influxdb import InfluxDBClient +import lmp_influxdb as db +from db_modules import write2db + +from datetime import datetime + +DBNAME = 'lmp' + +client = db.connect(DBNAME,user='root',passwd=123456) + +# symbols +kallsyms = "/proc/kallsyms" +# arguments +examples = """examples: + ./ext4dist -p 181 # trace PID 181 only + ./ext4dist 1 10 # print 1 second summaries, 10 times + ./ext4dist 5 # 5s summaries, milliseconds +""" +parser = argparse.ArgumentParser( + description="Summarize ext4 operation latency", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=examples) +parser.add_argument("-T", "--notimestamp", action="store_true", + help="don't include timestamp on interval output") +parser.add_argument("-m", "--milliseconds", action="store_true", + help="output in milliseconds") +parser.add_argument("-p", "--pid", + help="trace this PID only") +parser.add_argument("interval", nargs="?", + help="output interval, in seconds") +parser.add_argument("count", nargs="?", default=99999999, + help="number of outputs") +parser.add_argument("--ebpf", action="store_true", + help=argparse.SUPPRESS) +args = parser.parse_args() +pid = args.pid +countdown = int(args.count) +# if args.milliseconds: +# factor = 1000000 +# label = "msecs" +# else: +# factor = 1000 +# label = "usecs" +if args.interval and int(args.interval) == 0: + interval = 1 +else: + interval = args.interval + +debug = 0 +# define BPF program +bpf_text = """ +#include +#include +#include +#include +#define OP_NAME_LEN 8 +typedef struct key_s { + u32 pid; + u32 flag; + u64 ts; +} key_s; +typedef struct val_t { + char op[OP_NAME_LEN]; + char comm[TASK_COMM_LEN]; + char flags; + u64 delta; +} val_t ; +//record start time +BPF_HASH(start, u32,key_s); +//output to userspace +BPF_HASH(dist, key_s, val_t ); +// time operation +int trace_entry(struct pt_regs *ctx,struct kiocb *iocb,struct iov_iter *from) +{ + u32 pid = bpf_get_current_pid_tgid(); + if (FILTER_PID) + return 0; + u64 ts = bpf_ktime_get_ns(); + struct file *fp = iocb->ki_filp; + key_s key = { + .pid = pid, + .flag = fp->f_flags, + .ts = ts, + }; + start.update(&pid, &key); + return 0; +} +// old version +EXT4_TRACE_READ_CODE +static int trace_return(struct pt_regs *ctx, const char *op) +{ + key_s *key; + val_t *valp, zero = {}; + u32 pid = bpf_get_current_pid_tgid(); + // fetch timestamp and calculate delta + key = start.lookup(&pid); + if (!key || key->pid != pid) { + return 0; // missed start or filtered + } + //calculate delta + u64 delta = bpf_ktime_get_ns() - key->ts; + // Skip entries with backwards time: temp workaround for #728 + if ((s64) delta < 0) + return 0; + //delta /= FACTOR; + // store as histogram + valp = dist.lookup_or_try_init(key, &zero); + if (valp){ + valp->delta = delta; + bpf_get_current_comm(valp->comm, sizeof(valp->comm)); + __builtin_memcpy(valp->op, op, sizeof(valp->op)); + } + start.delete(&pid); + return 0; +} +int trace_read_return(struct pt_regs *ctx) +{ + char *op = "read"; + return trace_return(ctx, op); +} +int trace_write_return(struct pt_regs *ctx) +{ + char *op = "write"; + return trace_return(ctx, op); +} + +int trace_open_return(struct pt_regs *ctx) +{ + char *op = "open"; + return trace_return(ctx, op); +} +int trace_fsync_return(struct pt_regs *ctx) +{ + char *op = "fsync"; + return trace_return(ctx, op); +} + +""" +# Starting from Linux 4.10 ext4_file_operations.read_iter has been changed from +# using generic_file_read_iter() to its own ext4_file_read_iter(). +# +# To detect the proper function to trace check if ext4_file_read_iter() is +# defined in /proc/kallsyms, if it's defined attach to that function, otherwise +# use generic_file_read_iter() and inside the trace hook filter on ext4 read +# events (checking if file->f_op == ext4_file_operations). +if BPF.get_kprobe_functions(b'ext4_file_read_iter'): + ext4_read_fn = 'ext4_file_read_iter' + ext4_trace_read_fn = 'trace_entry' + ext4_trace_read_code = '' +else: + ext4_read_fn = 'generic_file_read_iter' + ext4_trace_read_fn = 'trace_read_entry' + ext4_file_ops_addr = '' + with open(kallsyms) as syms: + for line in syms: + (addr, size, name) = line.rstrip().split(" ", 2) + name = name.split("\t")[0] + if name == "ext4_file_operations": + ext4_file_ops_addr = "0x" + addr + break + if ext4_file_ops_addr == '': + print("ERROR: no ext4_file_operations in /proc/kallsyms. Exiting.") + print("HINT: the kernel should be built with CONFIG_KALLSYMS_ALL.") + exit() + ext4_trace_read_code = """ +int trace_read_entry(struct pt_regs *ctx, struct kiocb *iocb) +{ + u32 pid = bpf_get_current_pid_tgid(); + if (FILTER_PID) + return 0; + // ext4 filter on file->f_op == ext4_file_operations + struct file *fp = iocb->ki_filp; + if ((u64)fp->f_op != %s) + return 0; + u64 ts = bpf_ktime_get_ns(); + key_s key = { + .pid = pid, + .flag = fp->f_flags, + .ts = ts + }; + start.update(&pid, &key); + return 0; +}""" % ext4_file_ops_addr +# code replacements +bpf_text = bpf_text.replace('EXT4_TRACE_READ_CODE', ext4_trace_read_code) +# bpf_text = bpf_text.replace('FACTOR', str(factor)) +if args.pid: + bpf_text = bpf_text.replace('FILTER_PID', 'pid != %s' % pid) +else: + bpf_text = bpf_text.replace('FILTER_PID', '0') +if debug or args.ebpf: + print(bpf_text) + if args.ebpf: + exit() +# load BPF program +b = BPF(text=bpf_text) +b.attach_kprobe(event=ext4_read_fn, fn_name=ext4_trace_read_fn) +b.attach_kprobe(event="ext4_file_write_iter", fn_name="trace_entry") +b.attach_kprobe(event="ext4_file_open", fn_name="trace_entry") +b.attach_kprobe(event="ext4_sync_file", fn_name="trace_entry") +b.attach_kretprobe(event=ext4_read_fn, fn_name='trace_read_return') +b.attach_kretprobe(event="ext4_file_write_iter", fn_name="trace_write_return") +b.attach_kretprobe(event="ext4_file_open", fn_name="trace_open_return") +b.attach_kretprobe(event="ext4_sync_file", fn_name="trace_fsync_return") + + +data_struct = {"measurement":'ext4LatencyTable', + "time":[], + "tags":['glob'], + "fields":['comm','pid','operate','latency']} + +class lmp_data(object): + def __init__(self,a,b,c,d,e,f): + self.glob = a + self.comm = b + self.pid = c + self.operate = d + self.latency = e + self.time = f + + + +# print("Tracing ext4 operation latency... Hit Ctrl-C to end.") +# print("%-16s %-6s %-8s %s" % ("COMM", "PID","OP", "LAT(ms)")) +# output +exiting = 0 +while (1): + try: + if args.interval: + sleep(int(args.interval)) + else: + sleep(5) + except KeyboardInterrupt: + exiting = 1 + dist = b.get_table("dist") + # print("The number counted in %d seconds is: %d" %(int(args.interval),dist.__len__())) + # print() + sum =0; + for k,v in dist.items(): + delay = float(v.delta)/1000000 + # print("%-16s %-6d %-8s %f" % (v.comm.decode('utf-8', 'replace'),k.pid,v.op.decode('utf-8', 'replace'),delay)) + test_data = lmp_data('glob',v.comm.decode('utf-8', 'replace'),k.pid,v.op.decode('utf-8', 'replace'),delay,datetime.now().isoformat()) + write2db(data_struct, test_data, client) + # print() + dist.clear() + countdown -= 1 + if exiting or countdown == 0: + exit() \ No newline at end of file diff --git a/static/index.html b/static/index.html index e044f7a6..70672101 100644 --- a/static/index.html +++ b/static/index.html @@ -47,6 +47,7 @@ vfsstat    dcache  swap_pagefault  + ext4_latency 
diff --git a/test/grafana-JSON/lmp.json b/test/grafana-JSON/lmp.json index cfed0b4e..88d16540 100644 --- a/test/grafana-JSON/lmp.json +++ b/test/grafana-JSON/lmp.json @@ -683,6 +683,247 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 9 + }, + "hiddenSeries": false, + "id": 16, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.4", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT \"latency\" FROM \"ext4LatencyTable\" WHERE operate = 'read'\n", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT \"latency\" FROM \"ext4LatencyTable\" WHERE operate = 'open'", + "rawQuery": true, + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT \"latency\" FROM \"ext4LatencyTable\" WHERE operate = 'write'", + "rawQuery": true, + "refId": "C", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT \"latency\" FROM \"ext4LatencyTable\" WHERE operate = 'fsync'", + "rawQuery": true, + "refId": "D", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Ext4 Latency", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:520", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:521", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + }, + "datasource": null + }, { "aliasColors": {}, "bars": false, -- Gitee From 0c17090ae8ea4e3d3d78312ac13c3b8ab5448aa1 Mon Sep 17 00:00:00 2001 From: Piano Date: Mon, 10 May 2021 13:56:43 +0800 Subject: [PATCH 3/9] Delete collect.go --- controllers/collect.go | 93 ------------------------------------------ 1 file changed, 93 deletions(-) delete mode 100644 controllers/collect.go diff --git a/controllers/collect.go b/controllers/collect.go deleted file mode 100644 index febbc187..00000000 --- a/controllers/collect.go +++ /dev/null @@ -1,93 +0,0 @@ -package controllers - -import ( - _ "context" - "fmt" - "strconv" - _ "time" - - "github.com/gin-gonic/gin" - "github.com/linuxkerneltravel/lmp/logic" - "github.com/linuxkerneltravel/lmp/models" - "github.com/linuxkerneltravel/lmp/settings" - "go.uber.org/zap" -) - -func Collect(c *gin.Context) { - m := fillFrontMessage(c) - - if err := logic.DoCollect(m); err != nil { - zap.L().Error("error in logic.DoCollect()", zap.Error(err)) - ResponseError(c, CodeInvalidParam) - return - } - - ResponseSuccess(c, fmt.Sprintf("completed")) -} - -func fillFrontMessage(c *gin.Context) models.ConfigMessage { - var m models.ConfigMessage - if v, ok := c.GetPostForm("ext4_latency"); ok && v == "true" { - m.Ext4_latency = true - m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"ext4_latency.py") - } else { - m.Ext4_latency = false - } - if v, ok := c.GetPostForm("swap_pagefault"); ok && v == "true" { - m.Swap_pagefault = true - m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"swap_pagefault.py") - } else { - m.Swap_pagefault = false - } - if v, ok := c.GetPostForm("cpuutilize"); ok && v == "true" { - m.Cpuutilize = true - m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"cpuutilize.py") - } else { - m.Cpuutilize = false - } - if v, ok := c.GetPostForm("irq"); ok && v == "true" { - m.Irq = true - m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"irq.py") - } else { - m.Irq = false - } - if v, ok := c.GetPostForm("memusage"); ok && v == "true" { - m.Memusage = true - m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"memusage.py") - } else { - m.Memusage = false - } - if v, ok := c.GetPostForm("picknexttask"); ok && v == "true" { - m.Picknexttask = true - m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"picknext.py") - } else { - m.Picknexttask = false - } - if v, ok := c.GetPostForm("runqlen"); ok && v == "true" { - m.Runqlen = true - m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"waitingqueuelength.py") - } else { - m.Runqlen = false - } - if v, ok := c.GetPostForm("vfsstat"); ok && v == "true" { - m.Vfsstat = true - m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"vfsstat.py") - } else { - m.Vfsstat = false - } - if v, ok := c.GetPostForm("dcache"); ok && v == "true" { - m.Dcache = true - m.BpfFilePath = append(m.BpfFilePath, settings.Conf.PluginConfig.Path+"dcache.py") - } else { - m.Dcache = false - } - - if collectTime, ok := c.GetPostForm("collecttime"); ok { - tmpTime, _ := strconv.Atoi(collectTime) - m.CollectTime = tmpTime * 60 - } else { - m.CollectTime = settings.Conf.PluginConfig.CollectTime * 60 - } - - return m -} -- Gitee From b7fd44450fd9e9c6ccf852ec8799dc1fae0f8df1 Mon Sep 17 00:00:00 2001 From: Piano Date: Mon, 10 May 2021 14:00:36 +0800 Subject: [PATCH 4/9] Delete pluginparam.go --- models/pluginparam.go | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 models/pluginparam.go diff --git a/models/pluginparam.go b/models/pluginparam.go deleted file mode 100644 index 8471c09f..00000000 --- a/models/pluginparam.go +++ /dev/null @@ -1,18 +0,0 @@ -package models - -// ConfigMessage struct -type ConfigMessage struct { - Cpuutilize bool `json:"cpuutilize"` - Irq bool `json:"irq"` - Memusage bool `json:"memusage"` - Picknexttask bool `json:"picknexttask"` - Runqlen bool `json:"runqlen"` - Vfsstat bool `json:"vfsstat"` - Dcache bool `json:"dcache"` - Swap_pagefault bool `json:"swap_pagefault"` - Ext4_latency bool `json:"ext4_latency"` - // Store the config above to the 'BpfFilePath' - BpfFilePath []string `json:"bpfFilePath"` - // time - CollectTime int `json:"collecttime"` -} -- Gitee From 4c06737494cc1dd0067b1e36438c734708debce2 Mon Sep 17 00:00:00 2001 From: Piano Date: Mon, 10 May 2021 14:00:55 +0800 Subject: [PATCH 5/9] Delete tj.go --- controllers/tj.go | 95 ----------------------------------------------- 1 file changed, 95 deletions(-) delete mode 100644 controllers/tj.go diff --git a/controllers/tj.go b/controllers/tj.go deleted file mode 100644 index 6c9823b6..00000000 --- a/controllers/tj.go +++ /dev/null @@ -1,95 +0,0 @@ -package controllers - -import ( - "github.com/gin-gonic/gin" - "github.com/linuxkerneltravel/lmp/logic" - "go.uber.org/zap" -) - -func QueryExt4_latency(c *gin.Context) { - res, err := logic.DoQueryExt4_latency() - if err != nil { - zap.L().Error("ERROR in QuerySwap_pagefault():", zap.Error(err)) - ResponseError(c, CodeServerBusy) - return - } - - ResponseSuccess(c, res) -} - -func QuerySwap_pagefault(c *gin.Context) { - res, err := logic.DoQuerySwap_pagefault() - if err != nil { - zap.L().Error("ERROR in QuerySwap_pagefault():", zap.Error(err)) - ResponseError(c, CodeServerBusy) - return - } - - ResponseSuccess(c, res) -} - -func QueryIRQ(c *gin.Context) { - res, err := logic.DoQueryIRQ() - if err != nil { - zap.L().Error("ERROR in QueryIRQ():", zap.Error(err)) - ResponseError(c, CodeServerBusy) - return - } - - ResponseSuccess(c, res) -} - -func QueryCpuUtilize(c *gin.Context) { - res, err := logic.DoQueryCpuUtilize() - if err != nil { - zap.L().Error("ERROR in QueryCpuUtilize():", zap.Error(err)) - ResponseError(c, CodeServerBusy) - return - } - - ResponseSuccess(c, res) -} - -func QueryPickNext(c *gin.Context) { - res, err := logic.DoQueryPickNext() - if err != nil { - zap.L().Error("ERROR in QueryPickNext():", zap.Error(err)) - ResponseError(c, CodeServerBusy) - return - } - - ResponseSuccess(c, res) -} - -func QueryTaskSwitch(c *gin.Context) { - res, err := logic.DoQueryTaskSwitch() - if err != nil { - zap.L().Error("ERROR in QueryTaskSwitch():", zap.Error(err)) - ResponseError(c, CodeServerBusy) - return - } - - ResponseSuccess(c, res) -} - -func QueryHardDiskReadWriteTime(c *gin.Context) { - res, err := logic.DoQueryHardDiskReadWriteTime() - if err != nil { - zap.L().Error("ERROR in QueryHardDiskReadWriteTime():", zap.Error(err)) - ResponseError(c, CodeServerBusy) - return - } - - ResponseSuccess(c, res) -} - -func QueryWaterMark(c *gin.Context) { - res, err := logic.DoQueryWaterMark() - if err != nil { - zap.L().Error("ERROR in QueryWaterMark():", zap.Error(err)) - ResponseError(c, CodeServerBusy) - return - } - - ResponseSuccess(c, res) -} -- Gitee From 253524bc2c44b066d9d027d65d2d50422cfc873f Mon Sep 17 00:00:00 2001 From: Piano Date: Mon, 10 May 2021 14:01:22 +0800 Subject: [PATCH 6/9] Delete tj.go --- logic/tj.go | 78 ----------------------------------------------------- 1 file changed, 78 deletions(-) delete mode 100644 logic/tj.go diff --git a/logic/tj.go b/logic/tj.go deleted file mode 100644 index f74ebf6a..00000000 --- a/logic/tj.go +++ /dev/null @@ -1,78 +0,0 @@ -package logic - -import ( - client "github.com/influxdata/influxdb1-client/v2" - "github.com/linuxkerneltravel/lmp/dao/influxdb" - "go.uber.org/zap" -) - -func DoQueryIRQ() (res []client.Result, err error) { - // 调用dao层influxdb API - res, err = influxdb.QueryDB(`select last("duration") from "irq"`) - if err != nil { - zap.L().Error("ERROR in DoQueryIRQ():", zap.Error(err)) - return nil, err - } - return -} -func DoQueryExt4_latency() (res []client.Result, err error) { - res, err = influxdb.QueryDB(`select last("latency") from "ext4LatencyTable"`) - if err != nil { - zap.L().Error("ERROR in DoQueryExt4_latency():", zap.Error(err)) - return nil, err - } - return -} -func DoQuerySwap_pagefault() (res []client.Result, err error) { - res, err = influxdb.QueryDB(`select last("duration") from "swap_pagefault"`) - if err != nil { - zap.L().Error("ERROR in Doswap_pagefault():", zap.Error(err)) - return nil, err - } - return -} - -func DoQueryCpuUtilize() (res []client.Result, err error) { - res, err = influxdb.QueryDB(`select last("perce") from "cpuutilize"`) - if err != nil { - zap.L().Error("ERROR in DoQueryIRQ():", zap.Error(err)) - return nil, err - } - return -} - -func DoQueryPickNext() (res []client.Result, err error) { - res, err = influxdb.QueryDB(`select last("duration") from "picknext"`) - if err != nil { - zap.L().Error("ERROR in DoQueryPickNext():", zap.Error(err)) - return nil, err - } - return -} - -func DoQueryTaskSwitch() (res []client.Result, err error) { - res, err = influxdb.QueryDB(`select last("duration") from "taskswitch"`) - if err != nil { - zap.L().Error("ERROR in DoQueryTaskSwitch():", zap.Error(err)) - return nil, err - } - return -} - -func DoQueryHardDiskReadWriteTime() (res []client.Result, err error) { - res, err = influxdb.QueryDB(`select last("lat") from "HardDiskReadWriteTime"`) - if err != nil { - zap.L().Error("ERROR in DoQueryHardDiskReadWriteTime():", zap.Error(err)) - return nil, err - } - return -} - -func DoQueryWaterMark() (res []client.Result, err error) { - res, err = influxdb.QueryDB(`select last("normal") from "memusage"`) - if err != nil { - zap.L().Error("ERROR in DoQueryWaterMark():", zap.Error(err)) - return nil, err - } - return -} -- Gitee From 83a8246e7cb509c4e6f8a88b24af27a993dffd8d Mon Sep 17 00:00:00 2001 From: Piano Date: Mon, 10 May 2021 14:03:00 +0800 Subject: [PATCH 7/9] Delete index.html --- static/index.html | 62 ----------------------------------------------- 1 file changed, 62 deletions(-) delete mode 100644 static/index.html diff --git a/static/index.html b/static/index.html deleted file mode 100644 index 70672101..00000000 --- a/static/index.html +++ /dev/null @@ -1,62 +0,0 @@ - - - - - Title - - - -
-

Linux Microscope

-
- -
-
-
-
- cpuutilize    - irq    - memusage    - picknexttask    -
- runqlen    - vfsstat    - dcache  - swap_pagefault  - ext4_latency  - - -
-
-
- -
-
-
- - - -- Gitee From 1fd05cb694af90b738f441cba591098db4cd0560 Mon Sep 17 00:00:00 2001 From: Your Name Date: Tue, 18 May 2021 15:06:31 +0800 Subject: [PATCH 8/9] "add bcc plugin biotop" --- plugins/biotop.py | 273 +++++++++++++++++++++++++++++++++++++ test/grafana-JSON/lmp.json | 237 ++++++++++++++++++++++++++++++++ 2 files changed, 510 insertions(+) create mode 100755 plugins/biotop.py diff --git a/plugins/biotop.py b/plugins/biotop.py new file mode 100755 index 00000000..1f081890 --- /dev/null +++ b/plugins/biotop.py @@ -0,0 +1,273 @@ +#!/usr/bin/python +# @lint-avoid-python-3-compatibility-imports +# +# biotop block device (disk) I/O by process. +# For Linux, uses BCC, eBPF. +# +# USAGE: biotop.py [-h] [-C] [-r MAXROWS] [interval] [count] +# +# This uses in-kernel eBPF maps to cache process details (PID and comm) by I/O +# request, as well as a starting timestamp for calculating I/O latency. +# +# Copyright 2016 Netflix, Inc. +# Licensed under the Apache License, Version 2.0 (the "License") +# +# 06-Feb-2016 Brendan Gregg Created this. + +from __future__ import print_function +from bcc import BPF +from time import sleep, strftime +import argparse +import signal +from subprocess import call +# for influxdb +from const import DatabaseType +from influxdb import InfluxDBClient +import lmp_influxdb as db +from db_modules import write2db + +from datetime import datetime + +DBNAME = 'lmp' + +client = db.connect(DBNAME,user='root',passwd=123456) +# arguments +examples = """examples: + ./biotop # block device I/O top, 1 second refresh + ./biotop -C # don't clear the screen + ./biotop 5 # 5 second summaries + ./biotop 5 10 # 5 second summaries, 10 times only +""" +parser = argparse.ArgumentParser( + description="Block device (disk) I/O by process", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=examples) +parser.add_argument("-C", "--noclear", action="store_true", + help="don't clear the screen") +parser.add_argument("-r", "--maxrows", default=20, + help="maximum rows to print, default 20") +parser.add_argument("interval", nargs="?", default=1, + help="output interval, in seconds") +parser.add_argument("count", nargs="?", default=99999999, + help="number of outputs") +parser.add_argument("--ebpf", action="store_true", + help=argparse.SUPPRESS) +args = parser.parse_args() +interval = int(args.interval) +countdown = int(args.count) +maxrows = int(args.maxrows) +clear = not int(args.noclear) + +# linux stats. 0.27 0.25 0.20 2/624 2955 +loadavg = "/proc/loadavg" + +# 反映了当前操作系统在内存中的运行情况,前三个数字是1、5、15分钟内的平均进程数,一个的分子是正在运行的进程数,分母是进程总数;另一个是最近运行的进程ID号。 + +diskstats = "/proc/diskstats" +# 内核通过diskstats文件,将通用块设备层的一些重要指标以文件的形式呈现给用户。 + +# signal handler +def signal_ignore(signal_value, frame): + print() + +# load BPF program +bpf_text = """ +#include +#include + +// for saving process info by request +struct who_t { + u32 pid; + char name[TASK_COMM_LEN]; +}; + +// the key for the output summary +struct info_t { + u32 pid; + int rwflag; + int major; + int minor; + char name[TASK_COMM_LEN]; +}; + +// the value of the output summary +struct val_t { + u64 bytes; + u64 us; + u32 io; +}; + +BPF_HASH(start, struct request *); +BPF_HASH(whobyreq, struct request *, struct who_t); +BPF_HASH(counts, struct info_t, struct val_t); + +// cache PID and comm by-req +int trace_pid_start(struct pt_regs *ctx, struct request *req) +{ + struct who_t who = {}; + + if (bpf_get_current_comm(&who.name, sizeof(who.name)) == 0) { + who.pid = bpf_get_current_pid_tgid() >> 32; + whobyreq.update(&req, &who); + } + + return 0; +} + +// time block I/O +int trace_req_start(struct pt_regs *ctx, struct request *req) +{ + u64 ts; + + ts = bpf_ktime_get_ns(); + start.update(&req, &ts); + + return 0; +} + +// output +int trace_req_completion(struct pt_regs *ctx, struct request *req) +{ + u64 *tsp; + + // fetch timestamp and calculate delta + tsp = start.lookup(&req); + if (tsp == 0) { + return 0; // missed tracing issue + } + + struct who_t *whop; + struct val_t *valp, zero = {}; + u64 delta_us = (bpf_ktime_get_ns() - *tsp) / 1000; + + // setup info_t key + struct info_t info = {}; + info.major = req->rq_disk->major; + info.minor = req->rq_disk->first_minor; +/* + * The following deals with a kernel version change (in mainline 4.7, although + * it may be backported to earlier kernels) with how block request write flags + * are tested. We handle both pre- and post-change versions here. Please avoid + * kernel version tests like this as much as possible: they inflate the code, + * test, and maintenance burden. + */ +#ifdef REQ_WRITE + info.rwflag = !!(req->cmd_flags & REQ_WRITE); +#elif defined(REQ_OP_SHIFT) + info.rwflag = !!((req->cmd_flags >> REQ_OP_SHIFT) == REQ_OP_WRITE); +#else + info.rwflag = !!((req->cmd_flags & REQ_OP_MASK) == REQ_OP_WRITE); +#endif + + whop = whobyreq.lookup(&req); + if (whop == 0) { + // missed pid who, save stats as pid 0 + valp = counts.lookup_or_try_init(&info, &zero); + } else { + info.pid = whop->pid; + __builtin_memcpy(&info.name, whop->name, sizeof(info.name)); + valp = counts.lookup_or_try_init(&info, &zero); + } + + if (valp) { + // save stats + valp->us += delta_us; + valp->bytes += req->__data_len; + valp->io++; + } + + start.delete(&req); + whobyreq.delete(&req); + + return 0; +} +""" + +if args.ebpf: + print(bpf_text) + exit() + +b = BPF(text=bpf_text) +b.attach_kprobe(event="blk_account_io_start", fn_name="trace_pid_start") +if BPF.get_kprobe_functions(b'blk_start_request'): + b.attach_kprobe(event="blk_start_request", fn_name="trace_req_start") +b.attach_kprobe(event="blk_mq_start_request", fn_name="trace_req_start") +b.attach_kprobe(event="blk_account_io_done", + fn_name="trace_req_completion") + +data_struct = {"measurement":'bioTopTable', + "time":[], + "tags":['glob'], + "fields":['comm','pid','operate','maj', 'min', 'disk', 'io', 'kbytes', 'avgms']} + +class lmp_data(object): + def __init__(self,glob,comm,pid,operate,maj,min,disk,io,kbytes,avgms,time): + self.glob = glob + self.comm = comm + self.pid = pid + self.operate = operate + self.maj = maj + self.min = min + self.disk = disk + self.io = io + self.kbytes = kbytes + self.avgms = avgms + self.time = time + + +print('Tracing... Output every %d secs. Hit Ctrl-C to end' % interval) + +# cache disk major,minor -> diskname +disklookup = {} +with open(diskstats) as stats: + for line in stats: + a = line.split() + disklookup[a[0] + "," + a[1]] = a[2] + +# output +exiting = 0 +while 1: + try: + sleep(interval) + except KeyboardInterrupt: + exiting = 1 + + # header + if clear: + call("clear") + else: + print() + # with open(loadavg) as stats: + # print("%-8s loadavg: %s" % (strftime("%H:%M:%S"), stats.read())) + # print("%-6s %-16s %1s %-3s %-3s %-8s %5s %7s %6s" % ("PID", "COMM", + # "D", "MAJ", "MIN", "DISK", "I/O", "Kbytes", "AVGms")) + + # by-PID output + counts = b.get_table("counts") + line = 0 + for k, v in reversed(sorted(counts.items(), + key=lambda counts: counts[1].bytes)): + + # lookup disk + disk = str(k.major) + "," + str(k.minor) + if disk in disklookup: + diskname = disklookup[disk] + else: + diskname = "?" + + # print line + avg_ms = (float(v.us) / 1000) / v.io + # print("%-6d %-16s %1s %-3d %-3d %-8s %5s %7s %6.2f" % (k.pid, + # k.name.decode('utf-8', 'replace'), "W" if k.rwflag else "R", + # k.major, k.minor, diskname, v.io, v.bytes / 1024, avg_ms)) + test_data = lmp_data('glob',k.name.decode('utf-8', 'replace'),k.pid,"W" if k.rwflag else "R",k.major,k.minor,diskname, v.io, v.bytes / 1024, avg_ms,datetime.now().isoformat()) + write2db(data_struct, test_data, client,DatabaseType.INFLUXDB.value) + line += 1 + if line >= maxrows: + break + counts.clear() + + countdown -= 1 + if exiting or countdown == 0: + # print("Detaching...") + exit() diff --git a/test/grafana-JSON/lmp.json b/test/grafana-JSON/lmp.json index 88d16540..8b01bd23 100644 --- a/test/grafana-JSON/lmp.json +++ b/test/grafana-JSON/lmp.json @@ -1046,6 +1046,243 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": null, + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "hiddenSeries": false, + "id": 18, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.4", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT \"avgms\" FROM \"bioTopTable\" WHERE operate = 'R'", + "rawQuery": true, + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT \"avgms\" FROM \"bioTopTable\" WHERE operate = 'W'", + "rawQuery": true, + "refId": "B", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT \"kbytes\" FROM \"bioTopTable\" WHERE operate = 'R'", + "rawQuery": true, + "refId": "C", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + }, + { + "groupBy": [ + { + "params": [ + "$__interval" + ], + "type": "time" + }, + { + "params": [ + "null" + ], + "type": "fill" + } + ], + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT \"kbytes\" FROM \"bioTopTable\" WHERE operate = 'W'", + "rawQuery": true, + "refId": "D", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "value" + ], + "type": "field" + }, + { + "params": [], + "type": "mean" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "BioTop", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "refresh": "5s", -- Gitee From 51d6361e5b8f09fe9429057250062233168aaac3 Mon Sep 17 00:00:00 2001 From: szp Date: Tue, 10 Aug 2021 02:06:16 +0000 Subject: [PATCH 9/9] Collect I/O performance indicators in different configurations. --- plugins/io_collector/config/sysfs/sysfs.go | 62 +++++++ plugins/io_collector/dao/args_dao.go | 77 ++++++++ plugins/io_collector/dao/dict_dao.go | 31 ++++ plugins/io_collector/db/db_conn.go | 37 ++++ plugins/io_collector/monitor/collect.go | 199 +++++++++++++++++++++ plugins/io_collector/utils/string_utils.go | 20 +++ 6 files changed, 426 insertions(+) create mode 100644 plugins/io_collector/config/sysfs/sysfs.go create mode 100644 plugins/io_collector/dao/args_dao.go create mode 100644 plugins/io_collector/dao/dict_dao.go create mode 100644 plugins/io_collector/db/db_conn.go create mode 100644 plugins/io_collector/monitor/collect.go create mode 100644 plugins/io_collector/utils/string_utils.go diff --git a/plugins/io_collector/config/sysfs/sysfs.go b/plugins/io_collector/config/sysfs/sysfs.go new file mode 100644 index 00000000..f19be3c9 --- /dev/null +++ b/plugins/io_collector/config/sysfs/sysfs.go @@ -0,0 +1,62 @@ +package config + +import ( + "collector/src/utils" + _ "collector/src/utils" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "regexp" +) +type Sysfs struct { + + Option string +} + +func NewSysfs() *Sysfs { + return &Sysfs{ + Option: "/sys", + } +} + +func (this *Sysfs)Get(key string) string { + + data,err :=ioutil.ReadFile(utils.Format("%s/%s",this.Option,key)) + if err != nil { + log.Printf("Open file faild ! : %+v\n",err) + return "" + } + pattern := regexp.MustCompile(".*\\[(.*)\\].*") + searchObj := pattern.FindStringSubmatch(string(data)) + + if searchObj != nil { + return searchObj[1] + } + return string(data) +} +func (this *Sysfs)Set(key string,value interface{}) { + var format string + switch value.(type){ + case string: + format = "%s" + break + case int: + format = "%d" + break + default: + log.Fatalf("Pramater is vaild! : %+v", value) + return + } + fp,e := os.OpenFile(utils.Format("%s/%s",this.Option,key), os.O_RDWR|os.O_TRUNC, 0666) //打开文件 + if e != nil { + log.Fatalf("Open file error: %+v", e) + } + + _, err := io.WriteString(fp, fmt.Sprintf(format, value)) + if err != nil { + log.Fatalf("Set parm: %+v, error: %+v", value, err) + } +} + diff --git a/plugins/io_collector/dao/args_dao.go b/plugins/io_collector/dao/args_dao.go new file mode 100644 index 00000000..3064df74 --- /dev/null +++ b/plugins/io_collector/dao/args_dao.go @@ -0,0 +1,77 @@ +package dao + +import ( + "collector/src/db" + "log" + "strconv" +) + +type Arg struct { + Id int + Name string + Value string + ValType int + Path string + PreId int + PreArgVal string +} + +func (this * Arg)String() string{ + return "id:"+ strconv.Itoa(this.Id)+", name:"+this.Name+", value:"+this.Value+", valType:"+strconv.Itoa(this.ValType)+", path:"+this.Path+", preid:"+strconv.Itoa(this.PreId)+", preArgVal:"+this.PreArgVal +} + + +//查询操作 +func QueryArgList() []Arg{ + var args []Arg + var arg = new(Arg) + rows, e := db.DB.Query("select id,name,value,val_type,path,pre_id,pre_arg_val from args") + if e != nil { + log.Fatalf("query incur error: %+v", e) + return nil + } + for rows.Next() { + e := rows.Scan(&arg.Id,&arg.Name, &arg.Value,&arg.ValType, &arg.Path,&arg.PreId,&arg.PreArgVal ) + if e == nil { + args = append(args,*arg) + } + } + rows.Close() + return args + //db.DB.QueryRow("select * from arg where id=1").Scan(arg.age, arg.id, arg.name, arg.phone, arg.sex) + // + //stmt, e := db.DB.Prepare("select * from arg where id=?") + //query, e := stmt.QueryArgList(1) + //query.Scan() +} + + +func QueryArgByName(name string) *Arg{ + stmt, e := db.DB.Prepare("select id,name,value,val_type,path,pre_id,pre_arg_val from args where name=?") + if e!=nil { + log.Fatalf("query incur error: %+v", e) + } + query := stmt.QueryRow(name) + var arg = new(Arg) + query.Scan(&arg.Id,&arg.Name, &arg.Value,&arg.ValType, &arg.Path ,&arg.PreId,&arg.PreArgVal ) + + stmt.Close() + return arg +} +func QueryArgById(id int) *Arg{ + stmt, e := db.DB.Prepare("select id,name,value,val_type,path,pre_id,pre_arg_val from args where id=?") + if e!=nil { + log.Fatalf("query incur error: %+v", e) + } + query := stmt.QueryRow(id) + var arg = new(Arg) + query.Scan(&arg.Id,&arg.Name, &arg.Value,&arg.ValType, &arg.Path ,&arg.PreId,&arg.PreArgVal ) + + stmt.Close() + return arg +} + + + + + diff --git a/plugins/io_collector/dao/dict_dao.go b/plugins/io_collector/dao/dict_dao.go new file mode 100644 index 00000000..6882b3ba --- /dev/null +++ b/plugins/io_collector/dao/dict_dao.go @@ -0,0 +1,31 @@ +package dao + +import ( + "collector/src/db" + "log" + "strconv" +) + +type Dict struct { + Id int64 + Name string + Num int + ArgId int +} + +func (this * Dict)String() string{ + return "id:"+ strconv.FormatInt(this.Id,11)+", name:"+this.Name+", Num:"+strconv.Itoa(this.Num)+", ArgId:"+strconv.Itoa(this.ArgId) +} + + +//查询操作 +func QueryByNumAndArgID(num int ,argId int) (dict Dict, err error){ + stmt, e := db.DB.Prepare("select id,name,num,arg_id from dict where arg_id=? and num=?") + if e!=nil { + log.Fatalf("query incur error: %+v", e) + } + defer stmt.Close() + query := stmt.QueryRow(argId,num) + err = query.Scan(&dict.Id,&dict.Name, &dict.Num,&dict.ArgId) + return +} diff --git a/plugins/io_collector/db/db_conn.go b/plugins/io_collector/db/db_conn.go new file mode 100644 index 00000000..7cfbf820 --- /dev/null +++ b/plugins/io_collector/db/db_conn.go @@ -0,0 +1,37 @@ +package db + +import ( + "database/sql" + _ "github.com/go-sql-driver/mysql" + "log" + "strings" +) + +//数据库配置 +const ( + userName = "root" + password = "root" + ip = "127.0.0.1" + port = "3306" + dbName = "tuner" +) +//Db数据库连接池 +var DB *sql.DB + +func InitDB() { + //构建连接:"用户名:密码@tcp(IP:端口)/数据库?charset=utf8" + path := strings.Join([]string{userName, ":", password, "@tcp(",ip, ":", port, ")/", dbName, "?charset=utf8"}, "") + + //打开数据库,前者是驱动名,所以要导入: _ "github.com/go-sql-driver/mysql" + DB, _ = sql.Open("mysql", path) + //设置数据库最大连接数 + DB.SetConnMaxLifetime(10) + //设置上数据库最大闲置连接数 + DB.SetMaxIdleConns(5) + //验证连接 + if err := DB.Ping(); err != nil{ + log.Fatalf("Connnect database fail! %+v",err) + return + } + log.Println("Connnect Mysql Success!") +} diff --git a/plugins/io_collector/monitor/collect.go b/plugins/io_collector/monitor/collect.go new file mode 100644 index 00000000..bd5a0e67 --- /dev/null +++ b/plugins/io_collector/monitor/collect.go @@ -0,0 +1,199 @@ +package monitor + +import ( + "bufio" + config "collector/src/config/sysfs" + "collector/src/dao" + "collector/src/utils" + "context" + "encoding/csv" + "io" + "log" + "os" + "os/exec" + "regexp" + "strconv" + "strings" + "sync" + "time" +) + +//待写到配置文件中 +var perfDataChan = make(chan []string) +var columns = []string{"ops", "opss", "throughput", "reads","writes","latency","latencys"} +var samplesPath = "./src/data/samples.csv" +var cmdString = "sudo filebench -f /home/szp/workloads/fileserver.f" +var dataPath = "./src/data/data.csv" + +func ReadCsv() { + sysfs := config.NewSysfs() + + fs, err := os.Open(samplesPath) + if err!=nil { + log.Fatal("cant not open the file , err is %+v",err) + } + defer fs.Close() + + r := csv.NewReader(fs) + + var ( + index = 0 + attrs []string + ) + for { + row, err := r.Read() + if err != nil && err != io.EOF { + log.Fatalf("can not read, err is %+v", err) + } + if index == 0 { + attrs =row + index++ + writeCsv(append(attrs,columns...),dataPath,os.O_CREATE|os.O_RDWR) + continue + } + if err == io.EOF { + break + } + var arg *dao.Arg + for j , _ := range row{ + + + arg = dao.QueryArgByName(attrs[j]) + + if arg == nil{ + log.Fatalf("can not query arg, err is %+v", err) + break; + } + log.Println("本次参数:",arg) + if arg.PreId != 0 {//判断前置是否设置 + preArg := dao.QueryArgById(arg.PreId) + preCurrentVal := sysfs.Get(preArg.Path+preArg.Name) + if(arg.PreArgVal != preCurrentVal){ + log.Printf("忽略本次参数设置!PreArgVal:%s,CurrentVal:%s\n", arg.PreArgVal, preCurrentVal) + + currentVal:=sysfs.Get(arg.Path+arg.Name) + + if("" ==currentVal ){ + row[j]=strconv.Itoa(0) + }else { + row[j]=currentVal + } + log.Println("忽略设置。row[j]:%v",row[j]) + continue + } + } + //离散变量 + if arg.ValType==0 { + _,float:=utils.IsFloat(row[j]) + dict, err :=dao.QueryByNumAndArgID(utils.Round(float),arg.Id) + if err == nil { + sysfs.Set(arg.Path+arg.Name, dict.Name) + + row[j]=dict.Name + + log.Printf("离散变量,字符串类型。setArg:%v;row[j]:%v",dict.Name,row[j]) + }else { + intArg := utils.Round(float) + sysfs.Set(arg.Path+arg.Name, intArg) + row[j]=strconv.Itoa(intArg) + + log.Printf("离散变量,数字类型。float:%f;intArg:%d;row[j]:%v",float,intArg,row[j]) + } + }else { + if bool,float:=utils.IsFloat(row[j]); bool==true{ + intArg := utils.Round(float) + sysfs.Set(arg.Path+arg.Name, intArg) + row[j]= strconv.Itoa(intArg) + log.Printf("连续变量,浮点类型。float:%f;intArg:%d;row[j]:%v",float,intArg,row[j]) + }else { + sysfs.Set(arg.Path+arg.Name, row[j]) + log.Printf("连续变量,非浮点类型。row[j]:%v",row[j]) + } + } + } + + ctx, cancel := context.WithCancel(context.Background()) + + go writeCsvData(row,dataPath) + Command(ctx, cmdString) + + log.Println("Wait 10s ...") + time.Sleep(10 * time.Second) + cancel() + log.Println("Mission Complete: "+ strconv.Itoa(index)) + index++ + } + +} + +func writeCsvData(attrs []string, path string) { + perfData := <-perfDataChan + attrs = append(attrs,perfData...) + writeCsv(attrs,path,os.O_CREATE|os.O_RDWR|os.O_APPEND) +} +func writeCsv(data []string, path string,flag int) { + file, err := os.OpenFile(path,flag , 0644) + if err != nil { + log.Println("open file is failed, err: ", err) + } + defer file.Close() + w := csv.NewWriter(file) + w.Write(data) + w.Flush() +} + +func read(ctx context.Context, wg *sync.WaitGroup, std io.ReadCloser) { + reader := bufio.NewReader(std) + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + readString, err := reader.ReadString('\n') + if err != nil || err == io.EOF { + return + } + + if strings.Contains(readString,"IO Summary:") { + AnalysisData(readString) + } + + + } + } +} +func AnalysisData(line string) { + //解析正则表达式,如果成功返回解释器 + reg := regexp.MustCompile("([0-9]+[.]?[0-9]+)") + if reg == nil { + log.Println("regexp err") + return + } + //根据规则提取关键信息 + result := reg.FindAllString(line,-1) + perfDataChan <- result + log.Println("result1 = ", result) +} + +func Command(ctx context.Context, cmd string) error { + + c := exec.CommandContext(ctx, "bash", "-c", cmd) + stdout, err := c.StdoutPipe() + if err != nil{ + return err + } + stderr, err := c.StderrPipe() + if err != nil{ + return err + } + var wg sync.WaitGroup + + wg.Add(2) + go read(ctx, &wg, stderr) + go read(ctx, &wg, stdout) + + err = c.Start() + wg.Wait() + return err +} diff --git a/plugins/io_collector/utils/string_utils.go b/plugins/io_collector/utils/string_utils.go new file mode 100644 index 00000000..8b9ae687 --- /dev/null +++ b/plugins/io_collector/utils/string_utils.go @@ -0,0 +1,20 @@ +package utils + +import ( + "fmt" + "math" + "strconv" +) + +func Format(format string, a ...interface{}) string{ + s := fmt.Sprintf(format, a...) + return s +} + +func Round(x float64) int { + return int(math.Ceil(x-0.5)) +} +func IsFloat(s string) (bool,float64) { + float, err := strconv.ParseFloat(s, 64) + return err == nil,float +} \ No newline at end of file -- Gitee