[InfluxDB] Flux (사용법/문법/InfluxQL/timezone 설정)
이전 회사에서 자주 썼던 influxDB
다시 보니 또 재밌어서 잊지 않도록 정리 겸 블로깅을 해보려한다.
2022.04.11 - [Database] - [InfluxDB] InfluxDB 입문 (설치 / 사용법 /Tick Stack / 시계열 데이터베이스 / Flux)
이번에 쓸 내용은 바로 Flux
위에 포스팅에도 언급했듯이 원래 influxDB에는 기존 SQL문과 비슷한 형태의 InfluxQL이라는 언어가 있었고 여기에 join 등 함수 사용에 제한이 많아 InfluxDB 2.0 버전부터는 Flux라는 언어가 새로 등장했다.
여전히 InfluxQL을 사용할 순 있지만 강력하게 Flux가 권장된다고 한다.
문법설명은 공식 문서가 정말 잘 되어있지만 헷갈릴만한 간단한 팁들을 정리해보겠다.
Flux 기본 문법
InfluxDB는 시계열 데이터다보니 Flux 에는 data / time range / filter 를 기본적으로 사용하게 된다.
문서에서 소개하는 Flux 쿼리 과정은 이렇다
데이터를 가져오고 필터링하고 가공 처리해서 결과를 낸다.
이 과정을 하수 처리 방법에 비유한다.
data (source)
from(bucket: "example-bucket")
Filter on time range
// start 주어진 경우. Stop 은 now가 기본
from(bucket:"example-bucket")
|> range(start: -1h)
// start - stop 존재하는 경우
from(bucket:"example-bucket")
|> range(start: -1h, stop: -10m)
from(bucket:"example-bucket")
|> range(start: 2021-01-01T00:00:00Z, stop: 2021-01-01T12:00:00Z)
- 일분 전 : -1m
- 한시간 전 : -1h
- 하루 전 : -1d
- 일주일 전 : -1w
- 한달 전 : -1mo
- 일년 전 : -1y
range는 시간의 범위를 말한다.
influxDB는 시계열 DB이니 필수조건이다.
Filter on column values
from(bucket: "example-bucket")
|> range(start: -15m)
|> filter(fn: (r) => r[_measurement] == "cpu" and r[_field] == "usage_system" and r[cpu] == "cpu-total")
// dot ( . ) 표기로도 가능
|> filter(fn: (r) => r._measurement == "cpu" and r._field == "usage_system" and r.cpu == "cpu-total"
sql을 써본 사람들은 그래도 대강 가져올 컬럼 정의라는 걸 알 듯하다.
filter은 fn 을 사용해야하고 두가지방법으로 표기할 수 있다.
shape
우리가 기본적으로 값을 구성하는 부분이다.
예시만 봐도 어떻게 사용하면 될지 감이 올 것이다.
- group( ) : group by 함수
from(bucket:"example-bucket")
|> range(start: -12h)
|> filter(fn: (r) => r._measurement == "system" and r._field == "uptime" )
|> group(columns:["host", "_value"])
- join( ) : join 함수
dataStream1 = from(bucket: "example-bucket1")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "network" and r._field == "bytes-transferred")
dataStream2 = from(bucket: "example-bucket2")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "httpd" and r._field == "requests-per-sec")
join(tables: {d1: dataStream1, d2: dataStream2}, on: ["_time", "host"])
- pivot( ) : pivot 함수
from(bucket: "example-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu" and r.cpu == "cpu-total")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
업무에서 정말~ 많이 사용했던게 pivot 함수이다.
엑셀을 했던 사람들은 다 알텐데 이 함수는 열을 행으로 만들어준다.
예시를 해석해보자면
["_time"]을 기준으로 행을, ["_field"]를 열로, ["_value"]를 값으로 사용하여 데이터를 구성해 가져오라는 말이다.
이 외에도 엄청 많은 shape들이 있다.
- window( ) : time 데이터 단위로 나눠 보여줌
- keep( ) : 특정 컬럼 고정
- drop( ) : 특정 컬럼 삭제
process
값을 가공처리하는 부분이다.
이름만 봐도 무슨 역할인지 알 수 있고 문서도 잘 되어 있으니 자세한 예시는 생략하겠다.
- Aggregate Data
- sum( ) : 합계 구하는 함수
- count( ) : 갯수 구하는 함수
- mean( ) : 평균을 구하는 함수
- spread( ) : max값 - min값
- Select specific data points
- mode( ) : 가장 많이 존재하는 값
- distinct( ) : 중복 제거
- first( ) : 첫번째 값
- last( ) : 마지막 값
- top( ) : sort한 결과 가장 위에 있는 컬럼 값
- bottom( ) : sort한 결과 가장 밑에 있는 컬럼 값
- limit( ) : 제한된 결과 값만 가져오기
- Rewrite rows
- map( ) : 값을 특정 조건에 따라 매핑해 출력
- Send notifications
- http.endPoint( ) : url 지정해 http post 요청으로 alert 줌
- slack.endPoint( ) : slack으로 alert 줌
기타 함수
- yield(name : ‘결과값’) : 쿼리 결과를 이름지어줌 (as 함수)
- aggregateWindow( ) : window를 custom function처럼 사용 가능
- sort( ) : 정렬 함수
from(bucket: "example-bucket")
|> range(start: -12h)
|> filter(fn: (r) => r._measurement == "system" and r._field == "uptime")
|> sort(columns: ["region", "host", "_value"])
InfluxQL vs Flux
InfluxQL | Flux |
SELECT | filter() |
WHERE | filter(), range() |
GROUP BY | group() |
INTO | to() |
ORDER BY | sort() |
LIMIT | limit() |
SHOW DATABASES | bucket() |
SHOW MEASUREMENTS | schema.measurements |
SHOW FIELD KEYS | keys() |
InfluxQL | Flux | |
Time Range | SELECT … FROM "netdatatsdb"."autogen".… WHERE (time > now() -1d) |
from(bucket: "netdatatsdb/autogen") |> range(start: -1d) |
Filter |
SELECT … FROM "netdatatsdb"."autogen"."vpsmetrics" WHERE (time > now() -1d) |
from(bucket: "netdatatsdb/autogen") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "vpsmetrics") |
Flux자체도 어렵진 않지만 InfluxQL에 비해 조금 헷갈릴 수 있다.
그래도 자주 사용하다보면 패턴이 있으니 금방 익힐 수 있다.
자주 사용하는 Flux Query
- 해당 시간 범위내 가장 마지막으로 들어간 데이터 가져오기
from(bucket: "example-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "postgresql" and r["_field"] == "st_v")
// last( ) 사용
|> last( )
// top( ) 사용
|> top(n:1, columns: ["_time"])
// limit( ) 사용
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
- 두개의 컬럼 값을 더해 새로운 컬럼으로 데이터 가져오기
from(bucket: "example-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "postgresql"
and r["_field"] == "dc_p" or r["_field"] == "dc_v")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({r with _value: r.dc_p + r.dc_v}))
⭐ Timezone 설정
aggregateWindow 사용시 UTC 기준의 windowPeriod 이 들어가기 때문에 따로 설정 필요하다.
이게 무슨 말이냐면 aggregateWindow 함수는 시간을 기준으로 데이터를 잘라내 집계하는데 사용하는 함수인데
이때 들어가는 시간이 UTC가 되어버려 값이 예상한 값과 달라지게 된다.
import "timezone"
option location = timezone.fixed(offset: 9h)
from(bucket: "example-bucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "daily_history")
|> filter(fn: (r) => r["g_id"] == "${gId}")
|> filter(fn: (r) => r["_field"] == "op_power")
|> group(columns: ["host", "_measurement"], mode:"by")
|> aggregateWindow(every: 1d, fn: sum, timeSrc: "_start")
|> fill(value: 0.0)
이럴경우 timezone을 설정해야한다.
timezone.location(name: "America/Los_Angeles") 이런식으로 설정도 가능하지만
아쉬운건 Aisa/Seoul 은 안된다.
오랜만에 복습한 InfluxDB 언제 또 쓸 일이 있었으면 좋겠다.