1. 패키지 선택

 - 방법: poll하는 패키지의 기반 언어 차이를 통한 성능 향상

 - 상황: broker의 메시지를 감당하지 못해, kafka-python에서 confluent-kafka로 전환

 - 변경점: kafka-python은 python 기반 패키지라 poll한 객체에 대해 property로 바로 접근. confluent-kafka는 librdkafka라는 C기반 라이브러리 기반이기 때문에 C의 객체 반환 방향성에 맞춰 method로 접근. (record.timestamp -> record.timestamp())

 

2. 구조 측면

이외 parallel consumer라는 패키지도 있으나(역시나 confluent에서 만든 패키지) 기본적으로는 자바 기반이다. (python으로 쓰려면 별도 그냥 쓰레드 구성해야 함)

+ parallel consumer가 만능 키같은 게 아니다. 결국 partition을 늘리기 힘든 브로커 고부하 상황에서, partition 내부까지 병렬 처리함으로써 성능 향상을 꾀하는 건데 안정성, 관리 용이성에 어느 정도 trade-off가 있어 보인다. (https://p-bear.tistory.com/85)

개인적으로 아직까지 보기엔 python 환경에서 구현한다면, docker를 partition 개수만큼 띄우고 병렬 처리하는 게 가장 부작용이 적어 보인다. 이렇게 하면 로컬 테스트도 쉽고 변경이 필요할 때도 구조가 간단한데 병렬 처리는 자연스럽게 가능하다.

 

그래도 2가지의 불편함&찝찝함이 존재하는데,

1. docker의 리소스 반환이 즉각적이지 않다. 동시에 restart해서 rebalancing하지 않으면, static하게 특정 docker에 과하게 리소스가 부여되는 현상이 관찰되었다. 일반적으로는 먼저 start된 docker에 그런 현상이 나타났다. 물론 이게 docker 형태일 때 피할 수 없는 불편함인가는 아직 잘 모르겠다. (환경은 HyperViser 내의 VM)

2. 지금 다루는 데이터의 크기가 적당히 커서(하루 parquet 기준 500~1000GB 수준) 이 구조가 가장 편한 거 아닐까? 하는 의문은 있다. 이보다 훨씬 커진다면 소스 내에서의 병렬성까지 확보해야 하지 않을까 싶다. 소스내 병렬성까지 생각해야 하는 수준까지 된다면 데이터 순서는 데이터 내부에서 구별해야 하므로, 항상 정확히 정의된 timestamp가 produce부터 고려될 필요가 있다.

 

이를 바탕으로 내가 겪어본 조합을 떠올려 보면 다음과 같다.

1. raw 데이터가 druid 수준에서 저장된다 => docker 병렬성 가능(druid에서 버티면 웬만큼 가능)

2. raw 데이터가 athena, s3 수준에서 저장된다 + python 환경이다 => docker 병렬성 / 소스내 병렬성 과도기 고민(어쩔 수 없을 때 소스내 병렬성 선택)

3.raw 데이터가 athena, s3 수준에서 저장된다 + python 환경일 필요가 없다 => broker 성능에 따라 미리 parallel consumer를 세팅하는 게 나쁘지 않아 보임

4. raw 데이터가 elastic search, open search, athena, s3, big query 수준에서 저장된다 + python 환경일 필요가 없다. => 거의 parallel consumer는 필수고, partition수를 계획적으로 조정, 테스트할 필요가 있음

 

 

개별 단위 최적화는 습관적으로 할 수 있다면 좋다.

몇몇 최적화들은 파이프라인 내 worker들의 부하를 획기적으로 줄여주기도 한다.

다만 어떤 단위 최적화는 요구사항에 대한 확장성을 방해하기도 한다.

문제는 데이터 엔지니어가 도메인, 비지니스에 대한 경험이 꽤 많다고 하더라도,

생각보다 데이터 크기 변화가 예측이 안 되는 케이스가 많다는 점이다.

 

예시를 들자면 이런 케이스가 있다.

원본 데이터는 매우 무거운데 그 중에 특정 조건을 필터링해서 로드하는 프로세스가 있다.

당연히 output data는 작은데 원본이 무겁기 때문에,

편집에서는 메모리를 많이 쓰고 이후 로드에서는 거의 메모리나 네트워크 리소스를 많이 쓰지 않을 것으로 예상된다.

그래서 데이터를 로그할 때 저장소와 메모리를 아끼기 위해 버퍼에 올리지 않고 스트림을 통해 로드하는 방식을 택했다.

 

어느 날 이 프로세스에서 요구사항이 변경되어 조건이 변경되거나 혹은 같은 조건 하에서 데이터의 변화가 있어 output이 엄청나게 커졌다. (꽤 흔히 일어나는 케이스다)

버퍼에 올리지 않았기 때문에 업로드가 굉장히 느려진다. 대신 메모리는 아끼는 구조로 짰기 때문에 많이 아끼게 될 거다.

이때 메모리는 오토스케일링으로 충분히 커버되지만 업로드가 느려져 원래의 비지니스 요구사항을 못 맞추는 리스크가 생기는 환경이라면? 기존의 결정은 최적화가 맞았을까?

또는 이와 비슷하게 어떤 최적화는 요구사항 변경을 반영하기 위해 코드의 생산성과 가독성을 많이 포기해야 하는 경우도 생긴다.

 

어떤 분들은 기계적으로 단위 최적화를 하고 내가 더 잘짰어! 라는 주장을 많이들 하신다. 

단위내 최적화를 신경 쓰다가 놓친 비지니스를 위한 커뮤니케이션이나 손해본 생산성, 겪지 않아도 됐을 이슈 등도 기회비용에 포함해야 한다.

비용이 크지 않다면 데이터가 커질 때 이슈를 적게 겪는 방향, 가독성이 좋은 방향으로 가는 여유가 필요하다.

 

 

  1. s3fs 방식
    1. 장점: 메모리를 효율적으로 씀. 파일을 로컬에 쓰지 않은 상태에서 스트림으로 올라감
    2. 단점: 대용량 파일에서 업로드가 상대적으로 더 느림
    3. 예시
      1. df.to_parquet(s3_path, engine="pyarrow")
         to_parquet 안에서 s3fs를 사용하고 있어서 s3 경로를 입력하면 알아서 s3에 업로드
      2. 단, IAM이든 credentials든 인증 확인 필요
  2. buffer + byteIO 방식
    1. 장점: 메모리에 한번에 올려서 상대적으로 부하가 큼
    2. 단점: 업로드가 상대적으로 더 빠름. (대용량 파일 업로드에 적합)
    3. 예시
      1. 바이트 배열을 만들어서 버퍼에 담고 그걸 S3로 올리는 형태 
      2. parquet_bytes = df.to_parquet(compression='gzip', engine='pyarrow', index=False) buffer = BytesIO(parquet_bytes) boto.upload_fileobj_s3(buffer, bucket_name, parquet_path + parquet_name)

 

 

Requirements

 - 6억건 이상의 데이터를 매시간 insert하고 select 퍼포먼스도 유지해야 함(insert 시간은 < 5분)

 - No SQL은 지양하고 RDB에서 구현되어야 함(데이터 정합성이 매우 중요한 pricing 데이터)

 - dimension 확장으로 인한 record counts를 효과적으로 압축해야 함

 

클러스터링

 - 특정 dimension들의 value가 같은 경우 grouping하고 grouping key를 부여

 - metric을 적용하기 위해 dimension condition을 조회하는 경우 매핑 테이블과 엮여서 조회

 - 어떤 dimension들의 조합이 압축은 잘 되면서 확장성을 가지는지 테스트

 

클러스터링 방식의 특징

 - dimension들의 조합이 과해지면 debug가 어려워지고, 클러스터링이 된 테이블 자체의 크기가 커질 수 있음(결국 Load 부하를 분산하는 형태이기 때문)

 - dimension들이 모두 significant하게 작용해 dimension별 metric차이가 심할 경우, 효과가 떨어짐(궁극적으로는 특정 metric을 targeting해서 grouping 하기 때문. 정보량에서 손해를 본다는 cost가 전제되나 이것이 무시할 수 있을 정도임이 합의 되어야 함.)

환경: Load balancer가 있는 airflow 2.5로 배치. worker는 2~3대

AS-IS: worker에서 output하는 경로를 지정(/app/data/) -> output하는 경로는 worker들간 file이 동기화가 될 수 있게 처리

이슈: dag별로 다른 worker에서 처리가 되는 상황에서(로드밸런싱) druid input하는 dag와 output file을 정리하는 dag가 각각 다른 worker에서 실행. 동기화가 되기 이전에 init dag가 실행되면 에러. 또는 동기화 과정에 stuck이 생겨 제대로 진행되지 않으면 각 dag가 같은 곳에서 실행되지 않는 한 에러남.

 

해결책

결국 중앙처리를 해야 하는데, 생각나는 해결법은 2가지다.

1. kafka로 produce, consume

2. s3 같은 cloud나 NAS 같은 공동 space에 작업

1은 기존 소스 기준 변경점이 크고 scale도 크지 않아서 2로 작업 계획

1을 잘 모듈화 시켜놓으면 kafka가 있을만한 규모의 아키텍쳐에선 괜찮을 거 같다.

이슈 : sql을 dataframe으로 변형하는 과정에서 groupby를 할 때 groupby key값에 NaN이 존재하는 경우 해당 row들이 삭제되어 정합성이 맞지 않음

원인 : groupby시 기본 dropna=True로 설정돼 있음

조치 :

1. groupby 옵션에서 dropna=False로 변경 

2. insert시 NaN이 있으면 문제가 있기 때문에

df.replace({np.nan: None}, inplace=True)

로 NaN을 None으로 replace해줌

들여야 할 습관 : groupby해서 NaN인 경우를 살려야 되는지 꼭 체크하고 dropna를 바꿔주자

이슈 : 쿼리로 한번에 가져온 결과와 쿼리를 각각 가져와 가공해 만든 groupby 결과가 매우 다름

원인 : member number 같은 series가 object로 잡혀있는데 concat 하는 과정에서 각각 다른 type으로 인식됨. None이나 'nomember' 같은 값이 있느냐에 따라 type이 무엇으로 인식될지가 결정.

조치 : 해당 series의 type을 str로 명시함.

 

다음부터는 ddl로 type도 맞추고 들어가는 습관을 들이면 좋겠다

1. Insert 하는 방식

Druid의 Insert는 spec과 raw data에 관한 JSON을 서버와 통신하며 이뤄진다.

JSON에는 데이터 JSON이 담긴 위치와 granularity, segment, dimension spec 등이 저장된다.

https://druid.apache.org/docs//0.15.1-incubating/ingestion/ingestion-spec.html

 

Druid | Ingestion Spec

Table of Contents <!-- ~ Licensed to the Apache Software Foundation (ASF) under one ~ or more contributor license agreements. See the NOTICE file ~ distributed with this work for additional information ~ regarding copyright ownership. The ASF licenses this

druid.apache.org

raw data는 new line으로 구분된 json이다. [다른 형태로도 가능하게 해놨는지는 잘 모르겠다]

spec에 관한 json과 raw data json이 준비됐다면,

curl -X 'POST' -H 'Content-Type:application/json' -d @{spec에관한JSON경로} http://{druid주소 및 포트}/druid/indexer/v1/task

형태로 insert 할 수 있다.

 

 

2. Delete 하는 방식

TSDB류의 특성상 DELETE는 까다로운 편이다

일단 __time의 Interval을 지정해 row를 unused로 mark해 주어야 한다.

curl -X 'POST' -H 'Content-Type:application/json' -d '{ "interval" : "1990-01-03T00:00:00.000Z/2222-02-09T00:00:00.000Z" }' http://{druid주소 및 포트}/druid/coordinator/v1/datasources/{datasource 이름}/markUnused

이렇게 하면 interval에 속하는 row들이 unused로 마크되고,

 

Datasource - Actions에서 Delete 가능하게 바뀐다.

+ Recent posts