0%

Telegraf 源码阅读笔记(1)

背景

telegraf 是 influxdata 公司开源的一款插件化的,用来做指标收集、变换、聚合、输出的时序数据收集器。
influxdata 公司本身是做物联网数据存储、分析的公司,核心资产是 influxdb 时序数据库,其核心解决方案是 TICK 套装(Telegraf + Influxdb + Chronograf + Kapacitor)。

约束

写本文时, telegraf 最新版本 = v1.14

主流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
proc main():
# 加载配置
config = LoadConfig()

# 初始化插件
InitPlugins( config )

# Output 插件链接初始化, 类似 influxDB/MySQL 是需要链接到对端服务器的
# 这里也只在全局做了一次初始化,实际写入数据时链接出错, 还是要自己根据实际情况处理
ConnectOutputs( config.Outputs )

# 定义消息流, 每个 Queue 都有生产者和消费者
# 在 Go 里用 channel 实现,我们还是用 MessageQueue 表含义, 按生产者命名
# 数据流 input --> MQInput --> MQProcessor --> MQAggregator --> output
MessageQueue MQInput
MessageQueue MQProcessor
MessageQueue MQAggregator

# ServiceInput 插件:
# 需要启动一个后台并行服务的 Input 插件, 如 statsd, 会启动一个 statsd server
# Input 插件的进一步延伸, 需要实现除 Init 外所有接口 + Start() + Stop()
StartServiceInputs( config.Inputs, MQInput )

# Input 插件: 数据来源, 主要从数据源中获取需要的数据, 并转化成 telegraf 要求的标准协议样式
go StartInputs( config, MQInput )

# Processor 插件: 主要做数据预处理/变换等
go StartProcessors( config, MQInput, MQProcessor)

# Aggregator 插件: 数据聚合计算,如求最大/最小/出现次数/平均值等
go StartAggregators( config, MQProcessor, MQAggregator )

# Output 插件: 将上述处理过的数据,写到目标地址中,可以是文件/数据库/屏幕等
go StartOutputs( config, MQAggregator )

# 阻塞到上述 goroutines 都退了
block until all coroutine exited

# 与 ConnectOutputs 对应,关闭 Output 插件的链接
CloseOutputs( config )

子流程

InitPlugins 插件初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
proc InitPlugins( config ):
foreach input in config.Inputs:
call input.Init() if has Init()
exit on error

foreach processor in config.Processors:
call processor.Init() if has Init()
exit on error

foreach aggregator in config.Aggregators:
call aggregator.Init() if has Init()
exit on error

foreach output in config.Outputs:
call output.Init() if has Init()
exit on error

ConnectOutputs Output 插件链接初始化

1
2
3
4
5
6
proc ConnectOutputs( config ):
foreach output in config.Outputs:
call output.Connect()
sleep 15s on error
retry call output.Connect()
exit on error

StartServiceInput service 插件启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
proc StartServiecInputs( config, MQInput ):
foreach input in config.Inputs:
if input is instance of ServiceInput:
# Start 接口是 ServiceInput 插件必须实现的接口
call input.Start( MQInput )
else:
continue

# ServiceInput 插件的初始化函数, 不一定非要处理 MQInput, 有需要时保存
proc ServiceInput.Start( MQInput ):
bind MQInput if needed

# 如果有需要, 可以直接在后台服务中往消息队列写入指标
# 因为 ServiceInput 也是一种特殊的 Input 插件, 所以也可以不立即写, 先本地缓存/聚合,
# 等 telegraf 调用插件 Gather 时再写入。
proc ServiceInput.BackgroundEventProcess():
write telegraf.Metric to MQInput

go StartInputs() 启动 Input 插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
proc go StartInputs( config, MQInput ):
foreach input in config.Inputs:
# 每个 Input 插件起单独的 goroutine
go RunInput( input )
block until all RunInput exit
StopServiceInput()
close MQInput

proc go RunInput( input, MQInput ):
# 定时器, 每 Interval 秒执行一次
loop every input.Interval:
# Gather 是 Input 插件必须实现的接口, 上传数据到 telegraf.Accumulator
input.Gather( MQInput )
until exit signal recvd
exit

# 数据收集
proc Input.Gather( MQInput ):
foreach metric in cache:
covert metric to telegraf.Metric
write telegraf.Metric to MQInput
reset cache
exit

go StartProcessors() 启动数据预处理插件

1
2
3
4
5
6
7
8
9
10
11
proc go StartProcessors( config, MQInput, MQProcessor ):
# 对 MQInput 内的每个消息, 都用 Apply 接口处理, 处理完写入 MQProcessor 队列供后续处理
# 如果 MQInput 里没有消息, 则读取操作会阻塞到 MQInput 有新消息到达或者 MQInput 被关闭
loop read metrics from MQInput:
foreach processor in config.Processors:
# Apply 是每个 processor 插件必须实现的接口, 实现对指标的预处理(如改名等)
metrics = processor.Apply( metrics )
for metric in metrics:
write metric to MQProcessor
until MQInput closed
close MQProcessor

go StartAggregators() 启动数据聚合插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
proc go StartAggregators( config, MQProcessor, MQAggregator ):
# 启动 goroutine 给每个聚合插件喂指标数据
go run AggregatorAddMetric( config, MQProcessor )
# 临时消息队列, 汇聚各个聚合插件的输出
MessageQueue MQTmp
# 聚合插件指标周期性的将指标写入消息队列
go run AggregatorRunPeriodPush( config, MQTmp )
# 从 MQTmp 取数据再次使用 Processor 处理后转发到 MQAggregator
loop read metric from MQTmp:
foreach processor in config.Processors:
metric = processor.Apply( metric )
write metric to MQAggregator
unitl MQTmp closed

# 阻塞, 一直到上述所有 goroutine 退出
wait until AggregatorAddMetric and AggregatorPushMetric exit

# 将从 Processor 读来的指标发给每个聚合插件
# 某个指标只要任何一个聚合插件返回需要丢弃, 则丢弃该指标, 否则原样塞到消息队列继续处理
proc go AggregatorAddMetric( config, MQProcessor ):
loop read metrics from MQProcessor:
needDropMeric = false
foreach aggregator in config.Aggregators:
# Add 接口是每个聚合插件需要实现的接口功能, 完成指标计算
needDrop = aggregator.Add( metric )
if needDrop is true:
needDropMetric = true
if needDropMetric:
drop( metric )
until MQProcessor closed

# 针对每个聚合插件, 启动一个周期性的定时器
proc go AggregatorRunPeriodPush( config, MQTmp ):
foreach aggregator in config.Aggregators:
go runPeriodPush( aggregator, MQTmp )
wait until all runPeriodPush exit
# 等生产者 PeriodPush 都推出后, 关闭 MQTmp
close MQTmp

# 针对每次加进来的瞬间指标进行聚合计算
# 比如计算最小值/最大值/平均值/...
proc Aggregator.Add( metric ):
calculate min/max/average/...
save new value to aggregator.data
return true if drop the original metric else return false

# 周期性的将指标推送到临时聚合消息队列 MQTmp
proc Aggregator.Push( aggregator, MQTmp ):
loop every aggregator.Period:
write aggregator.data to MQTmp
reset aggregator.data
until main process exit

go StartOutputs() 启动数据输出插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
proc go StartOuputs( config, MQAggregator ):
# 每个 Input 插件起单独的定时器 goroutine
foreach output in config.Outputs:
go RunOutput( output )
# 读消息并写到每个 output 的 buffer 里
loop read metrics from MQAggregator:
foreach output in config.Outputs:
output.WriteBuffer( metrics )
until MQAggregator closed

block until all RunOutput exit

proc go RunOutput( output ):
# 定时器, 每 Interval 秒执行一次
# 将 output buffer 内的数据批量写入对端
loop every output.Interval:
output.Write() output buffer to remote
reset output.Buffer
until exit signal recvd
exit

CloseOutputs() 进行清理

1
2
3
4
proc go CloseOutputs( config ):
foreach output in config.Outputs:
# Close 接口是每个输出插件必须实现的接口, 需要清理资源, 或者优雅结束
output.Close()

主要数据结构/接口

数据流