환경: Load balancer가 있는 airflow 2.5로 배치. worker는 2~3대

AS-IS: worker에서 output하는 경로를 지정(/app/data/) -> output하는 경로는 worker들간 file이 동기화가 될 수 있게 처리

이슈: dag별로 다른 worker에서 처리가 되는 상황에서(로드밸런싱) druid input하는 dag와 output file을 정리하는 dag가 각각 다른 worker에서 실행. 동기화가 되기 이전에 init dag가 실행되면 에러. 또는 동기화 과정에 stuck이 생겨 제대로 진행되지 않으면 각 dag가 같은 곳에서 실행되지 않는 한 에러남.

 

해결책

결국 중앙처리를 해야 하는데, 생각나는 해결법은 2가지다.

1. kafka로 produce, consume

2. s3 같은 cloud나 NAS 같은 공동 space에 작업

1은 기존 소스 기준 변경점이 크고 scale도 크지 않아서 2로 작업 계획

1을 잘 모듈화 시켜놓으면 kafka가 있을만한 규모의 아키텍쳐에선 괜찮을 거 같다.

이슈 : sql을 dataframe으로 변형하는 과정에서 groupby를 할 때 groupby key값에 NaN이 존재하는 경우 해당 row들이 삭제되어 정합성이 맞지 않음

원인 : groupby시 기본 dropna=True로 설정돼 있음

조치 :

1. groupby 옵션에서 dropna=False로 변경 

2. insert시 NaN이 있으면 문제가 있기 때문에

df.replace({np.nan: None}, inplace=True)

로 NaN을 None으로 replace해줌

들여야 할 습관 : groupby해서 NaN인 경우를 살려야 되는지 꼭 체크하고 dropna를 바꿔주자

이슈 : 쿼리로 한번에 가져온 결과와 쿼리를 각각 가져와 가공해 만든 groupby 결과가 매우 다름

원인 : member number 같은 series가 object로 잡혀있는데 concat 하는 과정에서 각각 다른 type으로 인식됨. None이나 'nomember' 같은 값이 있느냐에 따라 type이 무엇으로 인식될지가 결정.

조치 : 해당 series의 type을 str로 명시함.

 

다음부터는 ddl로 type도 맞추고 들어가는 습관을 들이면 좋겠다

1. Insert 하는 방식

Druid의 Insert는 spec과 raw data에 관한 JSON을 서버와 통신하며 이뤄진다.

JSON에는 데이터 JSON이 담긴 위치와 granularity, segment, dimension spec 등이 저장된다.

https://druid.apache.org/docs//0.15.1-incubating/ingestion/ingestion-spec.html

 

Druid | Ingestion Spec

Table of Contents <!-- ~ Licensed to the Apache Software Foundation (ASF) under one ~ or more contributor license agreements. See the NOTICE file ~ distributed with this work for additional information ~ regarding copyright ownership. The ASF licenses this

druid.apache.org

raw data는 new line으로 구분된 json이다. [다른 형태로도 가능하게 해놨는지는 잘 모르겠다]

spec에 관한 json과 raw data json이 준비됐다면,

curl -X 'POST' -H 'Content-Type:application/json' -d @{spec에관한JSON경로} http://{druid주소 및 포트}/druid/indexer/v1/task

형태로 insert 할 수 있다.

 

 

2. Delete 하는 방식

TSDB류의 특성상 DELETE는 까다로운 편이다

일단 __time의 Interval을 지정해 row를 unused로 mark해 주어야 한다.

curl -X 'POST' -H 'Content-Type:application/json' -d '{ "interval" : "1990-01-03T00:00:00.000Z/2222-02-09T00:00:00.000Z" }' http://{druid주소 및 포트}/druid/coordinator/v1/datasources/{datasource 이름}/markUnused

이렇게 하면 interval에 속하는 row들이 unused로 마크되고,

 

Datasource - Actions에서 Delete 가능하게 바뀐다.

1. Druid에 Load

Data를 가져와서(Extract) 가공(Transform)한 후 Druid에 올린다(Load). [전형적인 ETL]

드루이드에 올릴 때는 json으로 input source 및 dataSchema에 대한 정보를 적어주고 api로 전송한다.

curl -X 'POST' -H 'Content-Type:application/json' -d @{spec에관한JSON경로[Datasource에 관한 내용도 포함되어 있음]} http://{druid주소}/druid/indexer/v1/task

 

2. Airflow 자동화

Airflow는 Metadata와 설정된 DAG들을 참고하여 Scheduler가 Worker에 일을 시키는 구조를 지닌 워크플로우 관리 플랫폼이다.

Airflow Architecture

일반적으로 파이썬을 통해 ETL을 자동화 구성한다면,

DAG를 설정하는 소스를 Scheduler에 등록하고 Worker에 소스를 Deploy해서 쓴다. [각주:1]

3. 전체적인 자동화 ETL 구조

Superset까지 포함한 대략적인 구조도

1) scheduler가 cron으로 정해진 시간대에 worker한테 정해진 DAG를 실행시킨다.

2) worker는 Datasource에 가서 쿼리를 날리거나 수집하는 형태로 정해진 데이터를 받는다

3) worker에서 가공하여 Druid로 보낸다.

4) Druid는 Superset에 연계되어 데이터 시각화 및 통계를 제공한다.

  1. Worker와 Scheduler 및 Metadata DB는 각자 분리해 구성하는 것이 합리적으로 보인다. Worker들을 여러 대 관리해야 Airflow의 효율이 더 좋기 때문이다. 또한 한 Device로 scheduler에 Worker까지 구성하면 시스템이 확장될수록 scheduler의 기능적 제약이 있을 수도 있다. [본문으로]

History

초기에는 Click Stream을 기록하기 위한 수단으로서 활용되었음

기존 관계형DB가 아닌 TSDB와 거의 비슷한 개념임

가능한 Group by 조합에 모두 Index를 부과하여 퍼포먼스를 제고하는 방식

다만 TSDB와는 달리 통계량을 뽑아내는 것에 대해 퍼포먼스를 높이도록 조직됨

기존에 이런 방식으로 작동하는 OLAB이 있었으나 솔루션 가격이 매우 비쌌음

그러던 중 Imply Data라는 곳에서 자신들의 기술력을 알리기 위해 OLAB 기반으로 개발하여

아파치 재단을 통해 오픈소스로 출범시켜 버림[각주:1]

이후 에어비엔비에서 드루이드를 기반한 솔루션을 개발한 것이 Superset임

초기 Superset은 연결할 수 있는 Database로 드루이드만 지원됐으나,

지금은 Postgre, Mysql 등 다양한 DB와 Connection 할 수 있게 발전함

특징

Druid와 비슷한 TSDB로 Click House, SnowFlake, BigQuery(구글), RedShift(아마존)가 있음

다만 TSDB와는 달리 Join하기가 어려움

따라서 Join하지 않아도 되는 큰 테이블을 만들어 처리하는 것이 편리

오픈소스이기 때문에 시스템이 안정적이진 않음[각주:2]

Superset은 문자형 컬럼, 숫자형 컬럼으로 나뉨

숫자형 컬럼을 Dimension으로 선택하면 새로운 통계량을 그래프로 쉽게 나타낼 수 있음

데이터를 요청자 스스로 조정하여 Group-by graph를 얻는 데 용이함

 

개발

ETL을 통해 Database에 올림

Ingestion은 Job History가 나오고 Query탭을 통해 업로드한 Data를 확인할 수 있음

보통 Dictionary로 데이터를 저장하고 JSON 형태로 Transform함

TS 기반이면서 실시간성을 띌 수 있게 조직되었기 때문에 데이터 분할 term에 대한 segment가 존재함[각주:3]

가령 Hour segment라면 1시 1분에 올리고 1시 59분에 올리면 덮어쓰기가 됨

Data Update가 어려운 TSDB 특성상 Segment 단위를 통째로 다시 올려야 제대로 업데이트가 됨

기준 데이터에서 컬럼을 삭제하면 바로 적용되어 제대로 출력되지 않음

컬럼을 추가하는 경우 'Sync Columns From Source'를 눌러 새로고침함

  1. 이로인해 OLAB의 가격이 급락하며 어떤 분야든 데이터 분석 시대가 도래했다는 후문 [본문으로]
  2. 불안정한데도 Druid의 유지비용이 다른 솔루션들에 비해 저렴한 편은 아니라고 함 [본문으로]
  3. 이를 Granularity라고 한다. 작은 양을 짧게 자주한다면 fine-grained 반대로 큰 양을 가끔한다면 coarse-grained라고 한다. Granularity는 이 두 개념의 합집합 정도로 생각할 수 있다. [본문으로]

+ Recent posts