1. baseDir 및 하위 filter는 files로 대체하자

 - baseDir은 웬만하면 쓰지 말자. Directory로 지정해서 쓰면 디버깅할 때, 어떤 작업에서 정확히 어떤 parquet가 쓰였는가를 찾기가 거의 불가능하다.

 - 이것때문에 아예 baseDir에 있는 파일을 미리 삭제하거나 이동시키고 작업하는 케이스도 있는데 그럼 이제 아예 작업했던 데이터를 까볼 방법이 없어진다.

 - 특히 local에 떨군 parquet를 사용해 load한다면, directory에 있는 모든 파일을 업로드해 버린다. files를 같이 써도 둘 다 적용돼 버린다. ingestion 성능을 위해서라도 필요하다.

 - files로 설정하고 spec 로깅을 남기면 spec 로깅만 보고 바로 어떤 파일이 문제였나 알 수 있다.

 - 공식 문서 기준으로 그냥 local이 아니라 s3 같은 곳에서 가져오는 경우 files가 우선이라고 하던데, 그렇다고 한들 굳이 baseDir을 쓰는 장점이 없다. spec 작성을 손으로 할 거면 모를까.

2. transformSpec을 사용하기 보다는 전처리하고 그대로 올리자

 - data type을 위해서 dimensionsSpec을 쓰는 건 ingestion에 크게 지장이 없는데, transformSpec은 데이터가 크면 ingestion에 영향을 끼친다

 - 결과적으로 transformSpec을 쓰면 worker가 해야 할 일을 druid한테 시키는 행위가 된다. 개별 worker 단위에서 어차피 데이터를 extract해서 메모리에 load하고 있는 시점이 있을텐데 굳이 druid에 넘기는 건 합리적이지 않다. (심지어 둘 중에 일반적으로는 druid 리소스가 더 귀하지 않나 싶다.)

 - 문제는 이를 활용해 만들어진 dataset은 안 쓰고 싶다고 일부만 바꾸지는 못 하는 것처럼 보인다. 최소한 나는 그랬는데, Success로 뜨고 기존 __time에 해당하는 segment가 일부 지워지는 현상을 겪었다.

3. granularitySpec

 - segmentGranularity, queryGranularity 이렇게 2개가 핵심.

 - segmentGranularity는 데이터를 저장할 때 쪼개는 시간 단위인데 기본적으로는 data의 가장 작은 단위랑 맞춰주는 게 안전하다. 데이터 활용에 대한 요구사항을 어느 정도 숙지하고 있다면, 그 부분에 맞춰서 지정해주면 좋다. 일반적으로는 HOUR 미만으로 들어가려면 최신 데이터를 빠르게 따라가는 topic이라는 거라 파이프라인 전체 관리를 타이트하게 해야 한다.

 - Load하는 관점에서 보면, segmentGranularity가 MONTH라면 5월 레코드를 하나만 넣어도, 기존 5월 data는 모두 삭제되고 지금 넣은 레코드로 업데이트 된다. MONTH로 해놨는데 5월 30일 데이터가 이상하다? 그럼 5월 전체를 다시 넣어야 된다. 하지만 데이터 활용을 거의 주로 월단위로 한다면? 그땐 선택이다. 가용 리소스 대비 데이터가 너무 크다? 그럼 불편해도 MONTH 쓰자. 아니라면 웬만해선 DAY를 초과하지 않는 걸 추천한다.

 - queryGranularity는 rollup하는 시간 단위다. druid가 group by 성능이 좋은 가장 큰 이유는, 미리 agg를 다 해놓는다는 거다. 일종의 답안지를 미리 작성해 놓는 건데, 이건 웬만하면 raw data의 최소치에 맞추는 게 좋다. 원본이 HOUR면 HOUR로 DAY면 DAY로 하자. 집계를 해버리는 거라 데이터의 사용 입장에서 시간 해상도를 더 낮추기 어렵기 때문이다. 이건 너무 크게 하면 마치 라면을 짜게 끓인 거랑 비슷하다. 밍밍하면 소금 더 타면 되지만 짤 때 물을 더 타면 이미 그건 우리가 아는 라면이 아닐 가능성이 높기 때문.

4. "dropExisting": true

 - 이건 드루이드에 있는 약간의 변태성 때문에 효율이 생기는 옵션이다. 드루이드는 seg를 교체하면 그 전에 이 seg 뭐였어 라는 걸 다 남겨두기 때문에 이 옵션을 주지 않으면 druid 저장 공간이 빠르게 차는 경험을 하게 된다.

 - superset 같은 BI툴을 연결해 사용한다면, 더더욱 latest data만 활용하기 때문에 웬만하면 기본 옵션처럼 넣는 것을 추천한다.

 - 대부분 데이터 크기가 엄청 크지 않다면 드라마틱한 효과는 없겠으나, 드루이드 관리자가 슬퍼지지 않게 설정해주자

1. 패키지 선택

 - 방법: poll하는 패키지의 기반 언어 차이를 통한 성능 향상

 - 상황: broker의 메시지를 감당하지 못해, kafka-python에서 confluent-kafka로 전환

 - 변경점: kafka-python은 python 기반 패키지라 poll한 객체에 대해 property로 바로 접근. confluent-kafka는 librdkafka라는 C기반 라이브러리 기반이기 때문에 C의 객체 반환 방향성에 맞춰 method로 접근. (record.timestamp -> record.timestamp())

 

2. 구조 측면

이외 parallel consumer라는 패키지도 있으나(역시나 confluent에서 만든 패키지) 기본적으로는 자바 기반이다. (python으로 쓰려면 별도 그냥 쓰레드 구성해야 함)

+ parallel consumer가 만능 키같은 게 아니다. 결국 partition을 늘리기 힘든 브로커 고부하 상황에서, partition 내부까지 병렬 처리함으로써 성능 향상을 꾀하는 건데 안정성, 관리 용이성에 어느 정도 trade-off가 있어 보인다. (https://p-bear.tistory.com/85)

개인적으로 아직까지 보기엔 python 환경에서 구현한다면, docker를 partition 개수만큼 띄우고 병렬 처리하는 게 가장 부작용이 적어 보인다. 이렇게 하면 로컬 테스트도 쉽고 변경이 필요할 때도 구조가 간단한데 병렬 처리는 자연스럽게 가능하다.

 

그래도 2가지의 불편함&찝찝함이 존재하는데,

1. docker의 리소스 반환이 즉각적이지 않다. 동시에 restart해서 rebalancing하지 않으면, static하게 특정 docker에 과하게 리소스가 부여되는 현상이 관찰되었다. 일반적으로는 먼저 start된 docker에 그런 현상이 나타났다. 물론 이게 docker 형태일 때 피할 수 없는 불편함인가는 아직 잘 모르겠다. (환경은 HyperViser 내의 VM)

2. 지금 다루는 데이터의 크기가 적당히 커서(하루 parquet 기준 500~1000GB 수준) 이 구조가 가장 편한 거 아닐까? 하는 의문은 있다. 이보다 훨씬 커진다면 소스 내에서의 병렬성까지 확보해야 하지 않을까 싶다. 소스내 병렬성까지 생각해야 하는 수준까지 된다면 데이터 순서는 데이터 내부에서 구별해야 하므로, 항상 정확히 정의된 timestamp가 produce부터 고려될 필요가 있다.

 

이를 바탕으로 내가 겪어본 조합을 떠올려 보면 다음과 같다.

1. raw 데이터가 druid 수준에서 저장된다 => docker 병렬성 가능(druid에서 버티면 웬만큼 가능)

2. raw 데이터가 athena, s3 수준에서 저장된다 + python 환경이다 => docker 병렬성 / 소스내 병렬성 과도기 고민(어쩔 수 없을 때 소스내 병렬성 선택)

3.raw 데이터가 athena, s3 수준에서 저장된다 + python 환경일 필요가 없다 => broker 성능에 따라 미리 parallel consumer를 세팅하는 게 나쁘지 않아 보임

4. raw 데이터가 elastic search, open search, athena, s3, big query 수준에서 저장된다 + python 환경일 필요가 없다. => 거의 parallel consumer는 필수고, partition수를 계획적으로 조정, 테스트할 필요가 있음

 

 

개별 단위 최적화는 습관적으로 할 수 있다면 좋다.

몇몇 최적화들은 파이프라인 내 worker들의 부하를 획기적으로 줄여주기도 한다.

다만 어떤 단위 최적화는 요구사항에 대한 확장성을 방해하기도 한다.

문제는 데이터 엔지니어가 도메인, 비지니스에 대한 경험이 꽤 많다고 하더라도,

생각보다 데이터 크기 변화가 예측이 안 되는 케이스가 많다는 점이다.

 

예시를 들자면 이런 케이스가 있다.

원본 데이터는 매우 무거운데 그 중에 특정 조건을 필터링해서 로드하는 프로세스가 있다.

당연히 output data는 작은데 원본이 무겁기 때문에,

편집에서는 메모리를 많이 쓰고 이후 로드에서는 거의 메모리나 네트워크 리소스를 많이 쓰지 않을 것으로 예상된다.

그래서 데이터를 로그할 때 저장소와 메모리를 아끼기 위해 버퍼에 올리지 않고 스트림을 통해 로드하는 방식을 택했다.

 

어느 날 이 프로세스에서 요구사항이 변경되어 조건이 변경되거나 혹은 같은 조건 하에서 데이터의 변화가 있어 output이 엄청나게 커졌다. (꽤 흔히 일어나는 케이스다)

버퍼에 올리지 않았기 때문에 업로드가 굉장히 느려진다. 대신 메모리는 아끼는 구조로 짰기 때문에 많이 아끼게 될 거다.

이때 메모리는 오토스케일링으로 충분히 커버되지만 업로드가 느려져 원래의 비지니스 요구사항을 못 맞추는 리스크가 생기는 환경이라면? 기존의 결정은 최적화가 맞았을까?

또는 이와 비슷하게 어떤 최적화는 요구사항 변경을 반영하기 위해 코드의 생산성과 가독성을 많이 포기해야 하는 경우도 생긴다.

 

어떤 분들은 기계적으로 단위 최적화를 하고 내가 더 잘짰어! 라는 주장을 많이들 하신다. 

단위내 최적화를 신경 쓰다가 놓친 비지니스를 위한 커뮤니케이션이나 손해본 생산성, 겪지 않아도 됐을 이슈 등도 기회비용에 포함해야 한다.

비용이 크지 않다면 데이터가 커질 때 이슈를 적게 겪는 방향, 가독성이 좋은 방향으로 가는 여유가 필요하다.

 

 

  1. s3fs 방식
    1. 장점: 메모리를 효율적으로 씀. 파일을 로컬에 쓰지 않은 상태에서 스트림으로 올라감
    2. 단점: 대용량 파일에서 업로드가 상대적으로 더 느림
    3. 예시
      1. df.to_parquet(s3_path, engine="pyarrow")
         to_parquet 안에서 s3fs를 사용하고 있어서 s3 경로를 입력하면 알아서 s3에 업로드
      2. 단, IAM이든 credentials든 인증 확인 필요
  2. buffer + byteIO 방식
    1. 장점: 메모리에 한번에 올려서 상대적으로 부하가 큼
    2. 단점: 업로드가 상대적으로 더 빠름. (대용량 파일 업로드에 적합)
    3. 예시
      1. 바이트 배열을 만들어서 버퍼에 담고 그걸 S3로 올리는 형태 
      2. parquet_bytes = df.to_parquet(compression='gzip', engine='pyarrow', index=False) buffer = BytesIO(parquet_bytes) boto.upload_fileobj_s3(buffer, bucket_name, parquet_path + parquet_name)

 

 

Requirements

 - 6억건 이상의 데이터를 매시간 insert하고 select 퍼포먼스도 유지해야 함(insert 시간은 < 5분)

 - No SQL은 지양하고 RDB에서 구현되어야 함(데이터 정합성이 매우 중요한 pricing 데이터)

 - dimension 확장으로 인한 record counts를 효과적으로 압축해야 함

 

클러스터링

 - 특정 dimension들의 value가 같은 경우 grouping하고 grouping key를 부여

 - metric을 적용하기 위해 dimension condition을 조회하는 경우 매핑 테이블과 엮여서 조회

 - 어떤 dimension들의 조합이 압축은 잘 되면서 확장성을 가지는지 테스트

 

클러스터링 방식의 특징

 - dimension들의 조합이 과해지면 debug가 어려워지고, 클러스터링이 된 테이블 자체의 크기가 커질 수 있음(결국 Load 부하를 분산하는 형태이기 때문)

 - dimension들이 모두 significant하게 작용해 dimension별 metric차이가 심할 경우, 효과가 떨어짐(궁극적으로는 특정 metric을 targeting해서 grouping 하기 때문. 정보량에서 손해를 본다는 cost가 전제되나 이것이 무시할 수 있을 정도임이 합의 되어야 함.)

환경: Load balancer가 있는 airflow 2.5로 배치. worker는 2~3대

AS-IS: worker에서 output하는 경로를 지정(/app/data/) -> output하는 경로는 worker들간 file이 동기화가 될 수 있게 처리

이슈: dag별로 다른 worker에서 처리가 되는 상황에서(로드밸런싱) druid input하는 dag와 output file을 정리하는 dag가 각각 다른 worker에서 실행. 동기화가 되기 이전에 init dag가 실행되면 에러. 또는 동기화 과정에 stuck이 생겨 제대로 진행되지 않으면 각 dag가 같은 곳에서 실행되지 않는 한 에러남.

 

해결책

결국 중앙처리를 해야 하는데, 생각나는 해결법은 2가지다.

1. kafka로 produce, consume

2. s3 같은 cloud나 NAS 같은 공동 space에 작업

1은 기존 소스 기준 변경점이 크고 scale도 크지 않아서 2로 작업 계획

1을 잘 모듈화 시켜놓으면 kafka가 있을만한 규모의 아키텍쳐에선 괜찮을 거 같다.

이슈 : sql을 dataframe으로 변형하는 과정에서 groupby를 할 때 groupby key값에 NaN이 존재하는 경우 해당 row들이 삭제되어 정합성이 맞지 않음

원인 : groupby시 기본 dropna=True로 설정돼 있음

조치 :

1. groupby 옵션에서 dropna=False로 변경 

2. insert시 NaN이 있으면 문제가 있기 때문에

df.replace({np.nan: None}, inplace=True)

로 NaN을 None으로 replace해줌

들여야 할 습관 : groupby해서 NaN인 경우를 살려야 되는지 꼭 체크하고 dropna를 바꿔주자

이슈 : 쿼리로 한번에 가져온 결과와 쿼리를 각각 가져와 가공해 만든 groupby 결과가 매우 다름

원인 : member number 같은 series가 object로 잡혀있는데 concat 하는 과정에서 각각 다른 type으로 인식됨. None이나 'nomember' 같은 값이 있느냐에 따라 type이 무엇으로 인식될지가 결정.

조치 : 해당 series의 type을 str로 명시함.

 

다음부터는 ddl로 type도 맞추고 들어가는 습관을 들이면 좋겠다

+ Recent posts