proc InitPlugins( config ): foreach inputin config.Inputs: call input.Init() if has Init() exit on error foreach processor in config.Processors: call processor.Init() if has Init() exit on error foreach aggregator in config.Aggregators: call aggregator.Init() if has Init() exit on error foreach output in config.Outputs: call output.Init() if has Init() exit on error
ConnectOutputs Output 插件链接初始化
1 2 3 4 5 6
proc ConnectOutputs( config ): foreach output in config.Outputs: call output.Connect() sleep 15s on error retry call output.Connect() exit on error
proc go StartAggregators( config, MQProcessor, MQAggregator ): # 启动 goroutine 给每个聚合插件喂指标数据 go run AggregatorAddMetric( config, MQProcessor ) # 临时消息队列, 汇聚各个聚合插件的输出 MessageQueue MQTmp # 聚合插件指标周期性的将指标写入消息队列 go run AggregatorRunPeriodPush( config, MQTmp ) # 从 MQTmp 取数据再次使用 Processor 处理后转发到 MQAggregator loop read metric from MQTmp: foreach processor in config.Processors: metric = processor.Apply( metric ) write metric to MQAggregator unitl MQTmp closed
# 阻塞, 一直到上述所有 goroutine 退出 wait until AggregatorAddMetric and AggregatorPushMetric exit
# 将从 Processor 读来的指标发给每个聚合插件 # 某个指标只要任何一个聚合插件返回需要丢弃, 则丢弃该指标, 否则原样塞到消息队列继续处理 proc go AggregatorAddMetric( config, MQProcessor ): loop read metrics from MQProcessor: needDropMeric = false foreach aggregator in config.Aggregators: # Add 接口是每个聚合插件需要实现的接口功能, 完成指标计算 needDrop = aggregator.Add( metric ) if needDrop is true: needDropMetric = true if needDropMetric: drop( metric ) until MQProcessor closed
# 针对每个聚合插件, 启动一个周期性的定时器 proc go AggregatorRunPeriodPush( config, MQTmp ): foreach aggregator in config.Aggregators: go runPeriodPush( aggregator, MQTmp ) wait until all runPeriodPush exit # 等生产者 PeriodPush 都推出后, 关闭 MQTmp close MQTmp
# 针对每次加进来的瞬间指标进行聚合计算 # 比如计算最小值/最大值/平均值/... proc Aggregator.Add( metric ): calculate min/max/average/... save new value to aggregator.data return true if drop the original metric elsereturn false
# 周期性的将指标推送到临时聚合消息队列 MQTmp proc Aggregator.Push( aggregator, MQTmp ): loop every aggregator.Period: write aggregator.data to MQTmp reset aggregator.data until main process exit