私たちは時々、典型的な営業日などの営業期間や、細分化されたシフト期間などに基づいて InfluxDB にクエリを行う、というコミュニティからの要求を目にしてきました。次の要件を考えてみましょう:8月全体のデータを、月曜日から金曜日の午前8時から午後5時までの営業時間だけサマリーするにはどうすればよいですか?InfluxDB は現在、時間に基づいてフィルタリングする機能を持っていません。私たちに出来るのはこのようにすることです:
SELECT * FROM “mymeasurement” WHERE time >= ‘2017-08-01 08:00:00.000000’ and time <= ‘2017-08-31 17:00:00.000000’;
では、どうすればこれを達成できるでしょうか。提供されている Telegraf のプラグインは、通常、ただタイムスタンプの値を送信し、設定されたプラグインに関連したメトリクスやタグに加えて、いくつかの静的なタグを送ることができます。解決法は、Kapacitor を「プリプロセッサ」として用い、クエリをかけたい期間を表現する値を算出してデータを「装飾」つまり使いやすくすることです。
この記事では、Telegraf、InfluxDB および Kapacitor を 'localhost' で動かしていますが、本格的な環境ではこれらは異なるホスト上で動作します。
最初のステップは Telegrafを、直接 InfluxDBにではなく、Kapacitor にデータを書き込むように設定することです。telegraf.conf
ファイルの [[outputs.influxdb]]
セクションに、考慮すべき3つの重要な設定があります。
[[outputs.influxdb]]
urls = [http://localhost:9092]
database = “kap_telegraf”
retention_policy = “autogen”
urls パラメータは、InfluxDBのポート 8086 ではなく、ポート 9092 (Kapacitor のデフォルトのリスニングポート)を指している必要があります。database パラメータは、存在しないデータベースを指しているべきです(データベースが見つからないというTelegraf の警告は無視できます)。retention_policy
パラメータは、"autogen" もしくはインスタンス内で以前に作成した特定の リテンションポリシー(retention policy) に設定するのが良いでしょう。
注意: retention_policy
を ""(デフォルト)に設定したままにすることは、InfluxDBの初期化時にデフォルトのリテンションポリシーとして指定された "autogen" にすることと同じではありません。
telegraf.conf
の、その他の全ての設定は、インスタンスに対して通常通り設定できます。
次のステップは、Telegraf から来るデータを処理する TICKscript を作成することです。この例で私たちが関心があるのは、データポイントが上記で説明した営業時間内にあるかどうかに応じて、真または偽の値を含むタグを作成することです。
stream
|from()
.database('kap_telegraf')
|eval(lambda: if(((weekday("time") >= 1 AND weekday("time") <= 5) AND (hour("time") >= 8 AND (hour("time")*100+minute("time")) <= 1700)), 'true', 'false'))
.as('business_hours')
.tags('business_hours')
.keep()
|delete()
.field('business_hours')
|influxDBOut()
.database('telegraf')
.retentionPolicy('autogen')
.tag('kapacitor_augmented','true')
このTICKscript では、上記の telegraf.conf
ファイルで設定した、存在しないデータベース "kap_telegraf" からストリーミングしています。stream()
ノードの .from()
メソッドは、マッチするデータベース名のみを必要とします。次に制御が eval()
ノードに渡り、データポイントが指定したウインドウ内に到着したかどうかが評価されます。この例では "time" の値を評価するために http://docs.influxdata.com/kapacitor/v1.3/tick/expr/#time-functions に記載されている weekday()
、hour()
および minute()
関数を使っています。条件の最初のパートは曜日が月曜(1)から金曜(5)の間にあるかどうかを評価します。条件の2番目のパートは時間の値を評価しますが、私たちは時間が17(午後5時)の "00" 分で停止したいということを注意深く考えてみてください。そうするには、時間を100倍し、minute()
関数の結果を加えたものを、終了時刻の1700と比較します。time
の値がこの範囲にあれば、eval()
ノードは business_hours
というフィールドとして真の値を返します。しかし、私たちはこの値に基づいてクエリを行いたいので、これをタグとしておく必要があります。そのため .tags()
メソッドをつなげて、値を business_hours
というタグに変更します。
重要: eval()
ノードは、ストリームから他の全てのフィールドとタグを削除します。そのため、これらの値をストリームに保持するため、.keep()
メソッドを指定します。
この時点で、 eval()
ノードの出力を含む business_hours
というフィールドとタグの両方があります。delete()ノードを呼び出し、.fields()
メソッドを使用して削除するフィールドを指定し、これをストリームから取り除く必要があります。
最後に、ストリームの制御を influxDBOut()
ノードに渡し、書き込みを行う宛て先のデータベースとリテンションポリシーを指定します。この例では、kapacitor-augmented
という追加の静的タグを追加しました。 メジャメント(measurement)名のような他のすべてのデータは持ち越され、必要なのは新しい情報を提供することだけです。 この場合、 telegraf
データベースの autogen
リテンションポリシーに書き込みます。
TICKscript (以下 businesshours.tick
) を作成したら、私たちは Kapacitor にこれを実行したいことを伝えなければなりません。 この例では、Kapacitor CLIを使用してタスクを設定します。
$ kapacitor define business_hours -type stream -dbrp kap_telegraf.autogen -tick business_hours.tick
このコマンドは、上記 Telegraf で設定したデータベース名とリテンションポリシー名である kap_telegraf.autogen
への書き込みを受け取る、business_hours
というストリームタイプのタスクを定義します。タスクが正常に作成されたら、Kapacitorによって処理されるように、有効にする必要があります。
$ kapacitor enable business_hours
ステータスを確認するには、Kapacitor に既存のタスクを一覧表示させることができます:
$ kapacitor list tasks
ID Type Status Executing Databases and Retention Policies
business_hours stream enabled true ["kap_telegraf"."autogen"]
データが Kapacitor を通って InfluxDB へ流れ始めると、指定した最初のクエリに条件 AND business_hours='true'
を追加できます。
SELECT * FROM “mymeasurement” WHERE time >= ‘2017-08-01 08:00:00.000000’ and time <= ‘2017-08-31 17:00:00.000000’ AND business_hours=’true’;
要約すると、Kapacitor を使用して Telegraf から InfluxDB へのデータを使いやすくする簡単な例を示しました。 この方法を使用して他の種類のデータを追加することができます。これにより、ビジネスニーズに合わせてカスタムTelegrafプラグインを作成する必要を無くせるかもしれません。