0%

如何为 telegraf 写插件

Abstract

telegraf 的插件分为四类,数据源(Input)、指标处理(Processor)、指标聚合(Aggregator)、数据输出(Output)。
telegraf 提供插件运行的框架和标准,将所有 Input 收集到的时序数据通过 Processor 处理,之后再给到 Aggregator 进行聚合计算,最后输出到 Output。只要符合 telegraf 的插件接口和数据标准,即可接入 telegraf。

约束

本文写成时, telegraf 最新版本 1.13.4, Go 语言最新版本: 1.14.

Input 输入插件

Input 插件用来收集或者自己产生指标, 比如使用操作系统接口获取CPU/MEM/DISK 使用情况, 或者接受其他系统传过来的指标数据等

接口标准

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 普通 Input 插件需要实现的接口
type Input interface {
// SampleConfig: 需要返回当前插件的默认配置
SampleConfig() string

// Description: 一句话说明这个插件是干啥的
Description() string

// Gather: 把收集到的指标上报到 Accumulator
// 这个函数是周期性被 telegraf 调用的, 默认配置在 telegraf.conf > agent.interval 指定
// 也可以自定义 interval, 需要在配置项中声明 Interval 变量, 返回整数时间, 单位是秒
Gather(Accumulator) error
}

// 特殊类型的 Input 插件, ServiceInput
// 可以用来启动一个并行的服务, 并自行决定什么时机上报数据
type ServiceInput interface {
// 需要实现普通 Input 的接口
Input

// ServiceInput 插件会被显式的调用 Start 接口来启动服务
// Accumulator 是数据上报的接口, ServiceInput 插件可以将这个保存下来.
// 通过 Accumlator, 插件可以自己决定什么时候上报什么数据, 也可以通过 Gather 接口被动上报
Start(Accumulator) error

// Stop: 停止 ServiceInput 插件
// 主要是清理不再需要的资源, 关闭链接, 关闭 channel
Stop()
}

Output 输出插件

Output 插件是将时序数据写到某个地方,比如数据库、其他网络服务、消息队列或者文件等。

接口标准

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 普通 Output 输出插件
type Output interface {
// Connect: 初始化到目的端的链接, 如链接存储服务器等
Connect() error

// Close: 关闭到目的端的链接
// 会在 telegraf 退出时执行
Close() error

// Description: 一句话描述这个插件是干啥的
Description() string

// SampleConfig: 返回文本形式的默认配置, toml 规则
SampleConfig() string

// Write: 接收时序数据, 输出到目的端
// 这个是 telegraf 周期调用的方法, telegraf 会自己缓存 Interval 时长的数据, 批量写入
// Interval 默认配置在 telegraf.conf > agent > interval
// 也可以自己在插件中新增这个配置项, 插件 struct 中该配置必须名为 Interval, 才可被 telegraf 探测到自定义配置
// 整数, 单位 秒
// 注意: 这里的执行时间需要控制, 过长(超过一个interval)会出错.
Write(metrics []Metric) error
}

Processor 数据变换处理插件

Processor 插件一般是对数据格式、列名、数据单位等进行变换, 比如对指标重命名, 将华氏度的上报改成摄氏度, 将经纬度变换为地址,将UNIX的秒时间变换成 human-readable 时间等.
在 Input 插件之后, Aggregator 插件之前 和 之后运行.

接口标准

1
2
3
4
5
6
7
8
9
10
11
12
type Processor interface {
// SampleConfig: 返回文本格式的默认配置项, toml 格式
SampleConfig() string

// Description: 一句话描述这个插件是干啥的
Description() string

// Apply: 对给定的指标进行变换处理.
// 这个函数必须是幂等的, 因为每个指标都会过一遍, 需要尽量高效
// 如果有网络 IO 等高耗时的操作, 会导致整个 telegraf 卡死
Apply(in ...Metric) []Metric
}

Aggregator 数据聚合插件

Aggregator 插件会将一个时间周期内的指标进行聚合/计算等, 然后产生新的指标输出, 时间周期是可以配置的. 一般做的操作有 计算最大值/最小值/均值/标准差等等. 插件可以指定对应的原始指标项是否抛弃, 通过 Add 方法的返回值实现。
这个插件的接口比较多, 在实际运行时, telegraf 会保证 Add/Push/Reset 不会并发执行, 所以开发时可不考虑并发问题.
运行在 Processors 插件之后, 在聚合完成后, telegraf 会让新产生的指标重新过一遍 Processor 插件.

接口标准

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Aggregator interface {
// SampleConfig: 返回文本格式的默认配置项, toml 格式
SampleConfig() string

// Description: 一句话描述这个插件是干啥的
Description() string

// Add: 由 telegraf 调用, 不断的给插件喂指标, 喂完的计算结果自己存储好即可
Add(in Metric)

// Push: 周期性执行的, 要求将汇总计算出来的结果上报给 Accumulator
// 主要步骤是将存储的计算结果写成 telegraf.Metric 格式, 使用 acc.AddFiled/... 接口上报即可
Push(acc Accumulator)

// Reset: 一个周期执行完, telegraf 会请求重置插件, 这里将上一周期的计算结果清空即可
Reset()
}

插件开发示例

这里实现一个简单的 Output 插件, 插件名字叫 fat_rabbit, 插件功能是将指标变换成自定义的样式输出到屏幕上。

代码

在 telegraf源码/plugins/outputs 新建一个目录 fat_rabbit, 并新建源码文件 fat_rabbit.go, 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package fat_rabbit

import (
"fmt"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
)

type FatRabbit struct {
StarCount int `toml:"star_count"` // 在打印指标名时加多少个小星星 *
Log telegraf.Logger `toml:"-"`
}

func (s *FatRabbit) Description() string {
return "output plugin output stars"
}

func (s *FatRabbit) SampleConfig() string {
return `
star_count = 3
`
}

// 这个方法 telegraf 框架内部在 InitPlugin 的时候主动调用, 但是并非必须, 可以不写
func (s *FatRabbit) Init() error {
return nil
}

func (s *FatRabbit) Connect() error {
return nil
}

func (s *FatRabbit) Close() error {
return nil
}

func (s *FatRabbit) Write(metrics []telegraf.Metric) error {
for _, metric := range metrics {
name := strings.Repeat("*", s.StarCount) + metric.Name() + strings.Repeat("*", s.StarCount)
s.Log.Errorf("%s:%d", name, metric.HashID() )
}
return nil
}

// 在包的默认初始化函数中, 告知 telegraf 该插件
func init() {
outputs.Add("fat_rabbit", func() telegraf.Output { return &FatRabbit{} })
}

注册

在 telegraf源码/plugins/outputs/all/all.go 文件中, 新增 fat_rabbit 包的引用:

1
import _ "github.com/influxdata/telegraf/plugins/outputs/fat_rabbit"

配置文件

在 telegraf源码 根目录执行 make, 编译, 生成的可执行文件就在当前目录。

1
2
3
4
5
6
$ make
go mod download
go build -ldflags " -X main.commit=e9e4f2c3 -X main.branch=master" ./cmd/telegraf

### 生成默认配置文件, SampleConfig 内容由telegraf 自身配置和各插件的 SampleConfig 输出组成
$ ./telegraf config > telegraf.conf

打开 telegraf.conf 搜索 fat_rabbit 就可以找到我们 fat_rabbit 插件的配置,内容如下:

1
2
3
# # output plugin output stars
# [[outputs.fat_rabbit]]
# star_count = 3

去掉注释, 让组件生效, 如:
1
2
3
# output plugin output stars
[[outputs.fat_rabbit]]
star_count = 3

telegraf 默认放开了 CPU/MEM/DISKIO/… 的指标采集, 在 inputs 插件非注释的列表里可以看到, 为了方便, 我们只放开 inputs.cpu.totalcpu

运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
./telegraf --config telegraf.conf
2020-02-29T10:27:02Z I! Starting Telegraf
2020-02-29T10:27:02Z I! Loaded inputs: cpu
2020-02-29T10:27:02Z I! Loaded aggregators:
2020-02-29T10:27:02Z I! Loaded processors:
2020-02-29T10:27:02Z I! Loaded outputs: fat_rabbit
2020-02-29T10:27:02Z I! Tags enabled:
2020-02-29T10:27:02Z I! [agent] Config: Interval:10s, Quiet:false, Hostname:"", Flush Interval:10s
2020-02-29T10:27:02Z D! [agent] Initializing plugins
2020-02-29T10:27:02Z D! [agent] Connecting outputs
2020-02-29T10:27:02Z D! [agent] Attempting connection to [outputs.fat_rabbit]
2020-02-29T10:27:02Z D! [agent] Successfully connected to outputs.fat_rabbit
2020-02-29T10:27:02Z D! [agent] Starting service inputs
2020-02-29T10:27:20Z D! [outputs.fat_rabbit] Buffer fullness: 0 / 10000 metrics
### 这里就是我们插件的输出, 指标名 cpu 两边各有三个 *
### 之所以有多个指标且名字一样, 是因为内部的 tag 不同, 可以自己打出来看看
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:8038969306203302764
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:659057056757823603
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:658104879687982102
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:11999125212488521182
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:12000077389558362683
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:12012495273884917792
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:12013456247047784981
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:12010604113884773322
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:12011556290954614823
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:12008668973419500412
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:12009647538768418977
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:12006760221233304566
2020-02-29T10:27:30Z E! [outputs.fat_rabbit] ***cpu***:12007712398303146067
2020-02-29T10:27:30Z D! [outputs.fat_rabbit] Wrote batch of 13 metrics in 279.149µs

总结

其他的插件也是一样的方式集成到 telegraf.
下载 telegraf 源码后, 可以看到各个插件的源码, 可以针对性的学习下。