1. 이슈: config로 쓰고 있던 google sheet 문서가 손상되어 접근이 불가능한 상태. 사본 만들기 및 import range 함수를 통한 불러오기도 내부 에러로 실패. 중요한 hourly dag였기 때문에 바로 fix하고 배치가 돌아야 하는 상황.

2. 추정 원인: 구글 내부 에러(잦은 api 호출 등으로 막히는 상황도 고려했으나 그 정도의 대량 호출은 없었음. 구글 드라이브 인스턴스가 죽었다고 하기에는 소유자의 다른 문서들은 잘 열리는 상황. 저장중 모종의 이유로 인해 손상된 것으로 보임.)

3. 해결: daily로 google sheet로 된 모든 config를 S3로 xlsx로 백업하는 dag가 있어 해당 파일을 통해 복원. 권한 등의 적절한 복원을 위해 '편집자'는 confluence 등에 별도 작성할 필요가 있음

 

교훈: 외부화된 config는 손상되거나 접근이 불가능해질 경우 대처가 안 되므로 항상 백업을 진행하는 것이 좋음

 

cf) google sheet에 대한 api 접근은 은근히 많이 실패하는 편이다. queue limit과도 별 상관없고 짧은 기간 동안 특정 region에 할당된 인스턴스가 죽으면서 발생하는 것으로 보였다. N mins 단위로 도는 배치에서는 이중삼중으로 retry를 할 필요가 있다. 현재 진행하는 프로젝트에서는 여러 gcp project의 service account를 통해 retry를 할 수 있게 구성했다. 그럼에도 불구하고 문서 자체가 손상되면 어쩔 수가 없다. 원본 문서는 다음날 복구 되었으나, deprecated 처리했다.

현업과 소통하다 보면,

오케스트레이터에 직접 현업용 아이디를 만들어주고 권한을 컨트롤 하는 것보다,

구글시트에 config를 만드고 입력하라고 하는 게 커뮤니케이션상 훨씬 편할 때가 많다.

크게 느끼는 장점은 4가지이다.

1. 오케에서 Asset은 잘못 변경 시 audit로그를 뒤져야 하지만, 구글시트에서는 history가 바로 보인다

2. 새로운 인원이 추가되더라도 해당 인원의 아이디만 권한 허용해주면 끝이다

3. 현업들은 대부분 엑셀에 훨씬 익숙하다

4. 다른 과제의 Asset을 변경하는 등의 변수를 신경쓰지 않아도 된다. 물론 오케에서도 할 수는 있지만 시트가 훨씬 직관적이다

 

 

Asset에 해당하는 값을 여러 개 입력하기 때문에 컬럼을 그만큼 늘리는 건 가독성이 안 좋고,

Asset명 / 설정값으로 세팅하는데 그런 경우 가져오는 패턴을 기록해 본다.

dicConfig = dtConfig.AsEnumerable().ToDictionary(Function(row) row.Field(Of String)(" Asset명"), Function(row) row.Field(Of object)("설정값"))

 

+ 기존 config에 추가되게 하고 싶다면 For each row를 사용해 InitAllSettings에 있는 패턴처럼 하면 된다.

<for each row> dicConfig(row(" Asset명").ToString) = row(" 설정값").ToString </for each row>

collection간 비교가 필요할 때 자주 사용하는 패턴이 보여서 정리하려고 한다

1. list vs list

  i) 공통된 원소 비교 set(a) & set(b)
  ii) 완전히 동일한 리스트인지 a == b
  iii) 원소 같고 중복도 같은지 (순서 X) Counter(a) == Counter(b)
  iv) 순서/중복 무시하고 포함 여부 확인 set(a).issubset(set(b))
  v) 교집합 외 개수 비교 len(set(a) & set(b))

2. set vs set (보통은 그룹별 키가 얼마나 겹치나 볼 때 썼음)

  i) 공통된 원소 a & b, a.intersection(b) 교집합
  ii) 두 set의 모든 원소 (중복 제거) a | b, a.union(b) 합집합
  iii) a에만 있는 원소 a - b, a.difference(b) 차집합
  iv) 서로 다른 원소만 a ^ b, a.symmetric_difference(b) 대칭 차집합
  v) a가 b의 (진)부분집합인지 a <= b, a.issubset(b) 부분집합 관계 확인
  vi) a가 b의 진부분집합인지 a < b 진부분집합
  vii) 겹치는 원소가 있는지 not a.isdisjoint(b) 공통 원소 존재 여부 판단

3. df vs df

| 완전 동일 여부     | `df1.equals(df2)`               | 정렬까지 같아야 함 |
| 값 위치별 비교     | `df1 == df2`                    | 마스크 형태 결과. matrix 비교라 같은 행,렬 위치에 있는 값들끼리 == 비교한 것  |
| 행 단위 차이 추출   | `merge(..., indicator=True)`    | 유용|
| key 기준 변경 추적 | `merge` + suffix + 비교           | 특정 컬럼 비교   |
| 열 값 변경 추적    | `df1.compare(df2)`              | 깔끔한 차이만 추출 |
| 전체 row 존재 여부 | `set(map(tuple, df.values))` 방식 | 빠른 비교용     |

1. 패키지 선택

 - 방법: poll하는 패키지의 기반 언어 차이를 통한 성능 향상

 - 상황: broker의 메시지를 감당하지 못해, kafka-python에서 confluent-kafka로 전환

 - 변경점: kafka-python은 python 기반 패키지라 poll한 객체에 대해 property로 바로 접근. confluent-kafka는 librdkafka라는 C기반 라이브러리 기반이기 때문에 C의 객체 반환 방향성에 맞춰 method로 접근. (record.timestamp -> record.timestamp())

 

2. 구조 측면

이외 parallel consumer라는 패키지도 있으나(역시나 confluent에서 만든 패키지) 기본적으로는 자바 기반이다. (python으로 쓰려면 별도 그냥 쓰레드 구성해야 함)

+ parallel consumer가 만능 키같은 게 아니다. 결국 partition을 늘리기 힘든 브로커 고부하 상황에서, partition 내부까지 병렬 처리함으로써 성능 향상을 꾀하는 건데 안정성, 관리 용이성에 어느 정도 trade-off가 있어 보인다. (https://p-bear.tistory.com/85)

개인적으로 아직까지 보기엔 python 환경에서 구현한다면, docker를 partition 개수만큼 띄우고 병렬 처리하는 게 가장 부작용이 적어 보인다. 이렇게 하면 로컬 테스트도 쉽고 변경이 필요할 때도 구조가 간단한데 병렬 처리는 자연스럽게 가능하다.

 

그래도 2가지의 불편함&찝찝함이 존재하는데,

1. docker의 리소스 반환이 즉각적이지 않다. 동시에 restart해서 rebalancing하지 않으면, static하게 특정 docker에 과하게 리소스가 부여되는 현상이 관찰되었다. 일반적으로는 먼저 start된 docker에 그런 현상이 나타났다. 물론 이게 docker 형태일 때 피할 수 없는 불편함인가는 아직 잘 모르겠다. (환경은 HyperViser 내의 VM)

2. 지금 다루는 데이터의 크기가 적당히 커서(하루 parquet 기준 500~1000GB 수준) 이 구조가 가장 편한 거 아닐까? 하는 의문은 있다. 이보다 훨씬 커진다면 소스 내에서의 병렬성까지 확보해야 하지 않을까 싶다. 소스내 병렬성까지 생각해야 하는 수준까지 된다면 데이터 순서는 데이터 내부에서 구별해야 하므로, 항상 정확히 정의된 timestamp가 produce부터 고려될 필요가 있다.

 

이를 바탕으로 내가 겪어본 조합을 떠올려 보면 다음과 같다.

1. raw 데이터가 druid 수준에서 저장된다 => docker 병렬성 가능(druid에서 버티면 웬만큼 가능)

2. raw 데이터가 athena, s3 수준에서 저장된다 + python 환경이다 => docker 병렬성 / 소스내 병렬성 과도기 고민(어쩔 수 없을 때 소스내 병렬성 선택)

3.raw 데이터가 athena, s3 수준에서 저장된다 + python 환경일 필요가 없다 => broker 성능에 따라 미리 parallel consumer를 세팅하는 게 나쁘지 않아 보임

4. raw 데이터가 elastic search, open search, athena, s3, big query 수준에서 저장된다 + python 환경일 필요가 없다. => 거의 parallel consumer는 필수고, partition수를 계획적으로 조정, 테스트할 필요가 있음

 

 

날짜 레벨에서 정책의 유효 기간을 체크하고 알람하거나, 적용해야 하는 비지니스 요구사항이 꽤 많다.

하다 보면 한번씩 순수한 '날짜'를 제대로 편집하지 않아서 임계점에서 이슈가 생기기 마련이다.

일반적으로는 datetime으로 변환해서 대소 비교를 하는데, 비교점을 만들 때 시간을 replace 하는 것을 깜빡하면 불연속적인 정책에서 문제가 생긴다.

예시) 2월 3일 시작 ~ 3월 24일 끝인 정책이 있다고 할 때 3월 24일에 시간을 제거하지 않고 체크하면 이 정책은 3월 24일 기준 유효하지 않은 것으로 판정

 

따라서 datetime 객체를 비교할 때는 어느 레벨에서 비교하는지 생각하고 습관적으로 그 아래 세부 날짜 info를 모두 0으로 만들어주는 습관이 필요하다.

개별 단위 최적화는 습관적으로 할 수 있다면 좋다.

몇몇 최적화들은 파이프라인 내 worker들의 부하를 획기적으로 줄여주기도 한다.

다만 어떤 단위 최적화는 요구사항에 대한 확장성을 방해하기도 한다.

문제는 데이터 엔지니어가 도메인, 비지니스에 대한 경험이 꽤 많다고 하더라도,

생각보다 데이터 크기 변화가 예측이 안 되는 케이스가 많다는 점이다.

 

예시를 들자면 이런 케이스가 있다.

원본 데이터는 매우 무거운데 그 중에 특정 조건을 필터링해서 로드하는 프로세스가 있다.

당연히 output data는 작은데 원본이 무겁기 때문에,

편집에서는 메모리를 많이 쓰고 이후 로드에서는 거의 메모리나 네트워크 리소스를 많이 쓰지 않을 것으로 예상된다.

그래서 데이터를 로그할 때 저장소와 메모리를 아끼기 위해 버퍼에 올리지 않고 스트림을 통해 로드하는 방식을 택했다.

 

어느 날 이 프로세스에서 요구사항이 변경되어 조건이 변경되거나 혹은 같은 조건 하에서 데이터의 변화가 있어 output이 엄청나게 커졌다. (꽤 흔히 일어나는 케이스다)

버퍼에 올리지 않았기 때문에 업로드가 굉장히 느려진다. 대신 메모리는 아끼는 구조로 짰기 때문에 많이 아끼게 될 거다.

이때 메모리는 오토스케일링으로 충분히 커버되지만 업로드가 느려져 원래의 비지니스 요구사항을 못 맞추는 리스크가 생기는 환경이라면? 기존의 결정은 최적화가 맞았을까?

또는 이와 비슷하게 어떤 최적화는 요구사항 변경을 반영하기 위해 코드의 생산성과 가독성을 많이 포기해야 하는 경우도 생긴다.

 

어떤 분들은 기계적으로 단위 최적화를 하고 내가 더 잘짰어! 라는 주장을 많이들 하신다. 

단위내 최적화를 신경 쓰다가 놓친 비지니스를 위한 커뮤니케이션이나 손해본 생산성, 겪지 않아도 됐을 이슈 등도 기회비용에 포함해야 한다.

비용이 크지 않다면 데이터가 커질 때 이슈를 적게 겪는 방향, 가독성이 좋은 방향으로 가는 여유가 필요하다.

 

 

  1. s3fs 방식
    1. 장점: 메모리를 효율적으로 씀. 파일을 로컬에 쓰지 않은 상태에서 스트림으로 올라감
    2. 단점: 대용량 파일에서 업로드가 상대적으로 더 느림
    3. 예시
      1. df.to_parquet(s3_path, engine="pyarrow")
         to_parquet 안에서 s3fs를 사용하고 있어서 s3 경로를 입력하면 알아서 s3에 업로드
      2. 단, IAM이든 credentials든 인증 확인 필요
  2. buffer + byteIO 방식
    1. 장점: 메모리에 한번에 올려서 상대적으로 부하가 큼
    2. 단점: 업로드가 상대적으로 더 빠름. (대용량 파일 업로드에 적합)
    3. 예시
      1. 바이트 배열을 만들어서 버퍼에 담고 그걸 S3로 올리는 형태 
      2. parquet_bytes = df.to_parquet(compression='gzip', engine='pyarrow', index=False) buffer = BytesIO(parquet_bytes) boto.upload_fileobj_s3(buffer, bucket_name, parquet_path + parquet_name)

 

 

Requirements

 - 6억건 이상의 데이터를 매시간 insert하고 select 퍼포먼스도 유지해야 함(insert 시간은 < 5분)

 - No SQL은 지양하고 RDB에서 구현되어야 함(데이터 정합성이 매우 중요한 pricing 데이터)

 - dimension 확장으로 인한 record counts를 효과적으로 압축해야 함

 

클러스터링

 - 특정 dimension들의 value가 같은 경우 grouping하고 grouping key를 부여

 - metric을 적용하기 위해 dimension condition을 조회하는 경우 매핑 테이블과 엮여서 조회

 - 어떤 dimension들의 조합이 압축은 잘 되면서 확장성을 가지는지 테스트

 

클러스터링 방식의 특징

 - dimension들의 조합이 과해지면 debug가 어려워지고, 클러스터링이 된 테이블 자체의 크기가 커질 수 있음(결국 Load 부하를 분산하는 형태이기 때문)

 - dimension들이 모두 significant하게 작용해 dimension별 metric차이가 심할 경우, 효과가 떨어짐(궁극적으로는 특정 metric을 targeting해서 grouping 하기 때문. 정보량에서 손해를 본다는 cost가 전제되나 이것이 무시할 수 있을 정도임이 합의 되어야 함.)

+ Recent posts