투자팀 테드입니다.

렌딧 투자자는 5일, 15일, 25일로부터 6영업일에 수익금을 지급 받아 한 달이면 총 3번의 지급을 받게됩니다. 정확한 수익금 지급을 위해 여러 로직으로 촘촘한 검증 프로세스를 거칩니다. 현재의 프로세스를 자동화하여 지급을 더 편리하고 정확하게 개편한 개발 과정을 기록했습니다.

테드Ted(LENDIT Invest팀 Back-end Engineer)
투자와 대출로 나뉘는 렌딧 서비스 중 투자 환경의 시스템 개선에 힘쓰고 있습니다. 투자에서 지급까지의 프로세스를 투자자가 더 편리하게 느끼고 동시에 그 과정이 안정되도록 단단한 설계를 지향합니다.

WHAT

투자자에게 수동으로 지불하던 수익금을 자동화로 지급

WHY

한 달간 세 번의 지급, 지급당 세네 번의 검증을 수동으로 진행하는 것에 따른 업무 피로도 개선

자, 지급을 시작하지.

지급일이면 우선 지급에 앞서 검증 프로세스를 시작합니다.

지급용 인스턴스를 시작 > 어플리케이션을 새로 배포 > 검증 API를 실행 > 지급 검증 쿼리를 실행 > 인스턴스 종료


지급 프로세스도 비슷한 과정을 거칩니다.

지급용 인스턴스를 시작 > 어플리케이션을 새로 배포 > 지급 API를 실행 > 투자자들에게 전송할 문자와 이메일 생성 및 테스트 > 신탁에 지급 요청 발송 > 지급이 완료되면 만들어뒀던 문자와 이메일 발송 > 인스턴스 종료


이런 일련의 과정을 한달이면 대략 15번 정도 진행하며 언제나 긴장 상태인 개발자들을 위해 지급 프로세스 개편이 시작됐습니다.

자동화가 아니었던 이유

렌딧은 2015년 7월 서비스를 시작했습니다. 사업 초기에는 돈을 지급하는 과정만큼은 직접 모니터링해야 한다는 판단으로 자동화하지 않았습니다.
그러나 시간이 흘러 렌딧이 이만큼이나 성장해버리니, 자동화가 아니고선 안된다는 판단이 섰습니다. 자동화를 해도 될만큼 프로세스가 고도화 되었기에 가능한 작업이기도 하구요.

HOW

결론부터 말씀드리면 위에 말씀드린 일련의 프로세스들을 자동화하기 위해 Airflow를 사용하여 해결하였습니다.

아래는 Airflow에 대한 설명입니다.

“Airflow is a platform to programmatically author, schedule and monitor workflows.
Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks.
The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies.”

왜 Airflow를 사용했나?

Airflow는 복잡한 Workflow, 대용량 데이터 처리 Batch를 분산 시스템을 통해 원활하게 처리하기 위해 에어비앤비가 만든 시스템입니다. 프로그래밍 방식의 Workflow 관리를 지원하고 Workflows의 스케쥴링, 모니터링(Slack 알람, 로그 리모트 전송 등), 에러 핸들링(실패시 Retry 등)이 잘 되어있죠.

Airflow architecture
마스터(스케쥴러) > 메세지 브로커 > 워커의 구성으로 되어있어 손쉽게 워크로드를 분산해 high performance를 가져올 수 있습니다.

렌딧은 기존에 데이터 레이크에 Athena를, ETL 파이프라인에 Airflow를 사용하고 있었고 이번에 Batch성 작업에 Airflow를 사용하며 만족도가 높아 기존의 Batch 작업들도 전부 Airflow로 옮겨올 예정입니다.

Airflow로 자동화하기

1. 개념 다지기 3분컷

프로세스 진행과정에 대해 본격적으로 이야기하기 전에 Airflow Concepts부터 짚고 넘어가도록 하죠.

  • DAG : Directed acyclic graph. 실행하고 싶은 일(task)들의 실행 순서롤 구조화하고 실행 context를 제공
  • Task : dag 안에 하나의 일의 단위
  • Task instance : 각 Task의 개별 상태를 의미하며 ‘running’, ‘success’, ‘failed’, ‘skipped’, ‘up for retry’ 등의 상태값 보유
  • Operator : 실제 task 가 어떻게 동작해야하는지에 대한 표현한 구현체
  • Hook : 외부 플랫폼(aws, slack 등) or 데이터베이스(mysql, hive 등)들을 쉽게 사용할 수 있게 정의해둔 오퍼레이터의 한 종류
  • Sensor : 지정된 행동이 성공했는지 주기적으로 확인하는데 쓰이는 오퍼레이터

위의 개념과 함께 connections, pools, queues 등 Airflow를 운영하시려면 알아두셔야할 많은 기능들이 있습니다.

또 operator, hook, sensor 등의 오퍼레이터를 손쉽게 커스텀하게 작성해서 쓸 수도 있습니다.

예를 들어 Aws ec2를 평소에는 꺼두다 Python 오퍼레이터에서 Boto를 사용하여 ec2를 키고 Sensor를 통해 원하는 ec2가 켜져있는지 체크한 후 다음 과정을 진행하는 등 필요에 따라 유연하게 작성해 사용할 수 있습니다.

ted : 저는 Airflow 때문에 Python을 이번에 처음 접했는데 2주 안에 만들수 있었어요!

2. 단단한 지급과정

지급 데이터의 교차 검증

렌딧은 채권 구매에 대한 세부적인 데이터를 저장하는 저장소로 Mongodb를, 메인 database로는 Mysql을 사용합니다. 지급 준비과정 마지막에는 Java와 Mysql 로직으로 구한 지급 금액과 Python, 그리고 Mongodb를 이용해 구한 투자자의 지급액을 비교하는 교차 검증 과정을 거칩니다. 기존에는 별도 서버에 스크립트를 실행하여 검증을 진행했는데 Airflow의 Mongo 오퍼레이터와 Pandas를 통해 구현하여 쉽게 교차 검증 가능해졌습니다.

지급 검증 쿼리 실행

지급 과정의 한 단계가 끝났을 때엔 쿼리상의 검증이 필요하면 데이터베이스에 미리 정의한 검증 쿼리를 실행해 지급 시스템의 안정성을 높이도록 했습니다. 검증 쿼리 자체는 데이터베이스에 저장되어 있기 때문에 Workflow를 모르더라도 개발자나 데이터 사이언티스트가 쉽게 접근해 지급 과정마다 필요한 검증 쿼리를 고도화할 수 있습니다.

3. 지급 검증 DAG의 동적 등록

Airflow의 DAG는 start_date와 schedule_interval을 통해 Cron처럼 주기적으로 실행하는 패턴이 일반적입니다. 스케줄링을 통해 실행되는 방법 뿐만 아니라 외내부 이벤트를 통해서도 dag들을 실행시킬 수 있습니다. 외부에서 이벤트를 발생시키는 방법으론 Rest api나 Airflow cli를 통해 dag들을 실행시킬 수 있고 내부적으로는 TriggerDagRunOperator 라는 오퍼레이터를 통해서 원하는 시점에 다른 DAG 들을 실행시킬 수 있습니다.

검증 과정은 한 달에 3번 지급일 전에 실행됩니다. 내부적으로 공휴일과 임시 공휴일 등을 고려한 지급일 테이블에 따라 지급일이 다가올 때 아래와 같이 Triggerdagrunoperator를 통해 지급 검증 dag를 실행해서 검증 과정을 효율적으로 실행하도록 했습니다.

1
dag_instance = DAG(
2
    dag_id='register_payment',
3
    default_args=default_args,
4
    start_date=pendulum.create(2020, 1, 1, hour=18, tz=timezone_name),
5
    schedule_interval="0 18 * * *",  # 매일 18시
6
    catchup=False
7
)
8
9
with dag_instance as dag:
10
    /*
11
        생략 ...
12
    */
13
14
    def make_prepare_payment_dag(**context):
15
16
        kst_dt = pendulum.now(tz=_tz_name)
17
        kst_ds = kst_dt.strftime(_date_fmt)
18
19
        target_payment_ds = get_target_payment_date(kst_ds)
20
        target_payment_dt = pendulum.parse(target_payment_ds, tz=_tz_name)
21
22
        if _need_prepare_payment(target_payment_dt):
23
            if check_tomorrow_is_payment_day(kst_dt):
24
                execute_dt = kst_dt.replace(hour=22, minute=0)
25
            else:
26
                execute_dt = kst_dt.add(days=1).replace(hour=2, minute=0)
27
            execute_dt = execute_dt.in_timezone(tz='utc')
28
            _notify_prepare_payment(target_payment_ds, execute_dt)
29
            dag_run_op = TriggerDagRunOperator(
30
                task_id='register_prepare_payment',
31
                trigger_dag_id='payment_prepare_v1',
32
                execution_date=execute_dt,
33
                dag=dag,
34
            )
35
            dag_run_op.execute(context)
36
37
38
    PythonOperator(
39
        task_id='make_prepare_payment_dag',
40
        provide_context=True,
41
        python_callable=make_prepare_payment_dag
42
    )

AH? AH!

작업 중에 여러 문제점에 맞닥뜨리며 해결방법을 찾아 적용하는 과정에 대한 기록입니다.

API 호출의 완료를 어떻게 알 수 있을까?

Airflow의 오퍼레이터들은 하나의 오퍼레이터가 끝나면 Workflow의 상태가 변경되기 마련입니다. 지급 검증 및 지급 과정은 remote로 서버의 동작을 rest api로 컨트롤 하는 형태이기 때문에 API 호출 상태에 따라 변경되는 렌딧 서버에서의 지급 상태와 Airflow의 workflow 상태를 맞춰야 합니다.

또 다른 문제는 지급 검증 및 지급 모두 하나의 스텝이 굉장히 긴 작업이므로 보통의 http connection timeout의 범위를 훌쩍 넘어섭니다. 그래서 rest API 콜 하나당 오퍼레이터 하나를 엮어 dag를 구성하면 timeout으로 실패하는 이슈를 해결해야 했습니다.

  1. 지급 마이크로 서비스를 실행가능한 어플리케이션으로 빌드한 후 ssh 로 접속해서 실행
  2. rest api 를 실행한 후 다른 방법으로 api 호출 결과를 모니터링

제가 생각한 두개의 해결방법입니다.

결론적으론 두번째 방법으로 진행하게 됐습니다. 현재 렌딧의 시스템은 모노리스에서 마이크로 서비스로 차근차근 이동하고 있습니다. 마이크로 서비스로 변경하기 위해서는 바운디드 컨텍스트에 대한 논의, 배포 환경 설정 등 해결되어야할 과제가 많아서 첫번째 방법은 아쉽게도 다음을 기약하게 됐네요.

두번째 방법은 Spring AOP와 상태를 정의한 annotation을 사용해 해결했습니다.

1
@TaskState(taskName = TaskStateKt.PREPARE_PAYMENT_TASK, nextState = PreparePaymentState.CLEANED, description = "과거 지급 예정 내역 삭제", initialState = true)

지급 과정마다 하나의 스텝이 종료되면 상태를 정의한 후 위와 같이 데이터베이스에 업데이트해서 현재 상태를 기록합니다
Airflow에는 미리 정의된 지급 검증, 지급 상태를 렌딧에서 기록한 상태 정보를 sensor 오퍼레이터를 통해서 모니터링하여 Airflow workflow 상태와 싱크를 맞춥니다.

지급의 워크플로우는 최대한 보수적으로

목표했던 요구조건은 위의 과정들을 통해 충족되었지만 추가적 요건들도 있었는데요. 그중 하나가 지급 과정 중 실행하는 API들의 중복 실행과 이전 스템에 대한 호출을 시스템적으로 막는 것이었습니다.

이전 스텝에 대한 호출 방지는 위에서 만들어던 @TaskState 를 통해서 구현할 수 있었습니다. 현재 상태에서 진행할 수 있는 상태를 정의한 다음 허용된 상태로의 전이만 진행하게 하였습니다. 예를들어 A -> [B, C] -> D 의 상태를 가지는 state machine 에서 B나 C의 상태에서는 D로 전이가 가능하지만 다시 B로 상태전이가 일어나는 이벤트가 발생했을 때는 무시하게 하는 시스템적 제약을 추가하여 혹시 모를 휴먼 에러에 대해서 방지할 수 있었습니다.

상태 전이의 중복 실행 방지는 Mysql 의 exclusive_lock을 사용했습니다. 지급에 대한 모든 요청의 경우 요청이 시작되기 전에 다음 상태로 진행할 수 있는지 현재 상태를 조회합니다. 해당 조회 로직에 xlock을 사용한 다음, 상태 전이 요청이 끝나기 전까지 lock을 잡습니다. 상태 전이 중에 다른 요청이 들어왔을 때도 다시 xlock을 잡으려 시도하고 이전의 xlock때문에 이전 요청이 lock을 풀기 전까지 대기하게 되며, 이전 요청이 lock을 놓았을 때는 대기하게 됩니다. 이전 요청이 끝났을 때는 이미 상태 전이가 되었으므로 언급했던 상태 전이 기능을 체크하는 로직에서 현재 요청이 실패하게 됩니다.

마치며

지급 자동화 결과
어쩌면 렌딧에서 가장 중요하다 할 수 있는 지급 검증 및 지급 과정을 Airflow를 사용해 자동화할 수 있었고 이로 인해 기존에 이 작업을 위해 쏟았던 한달 중 약 30시간 정도를 벌 수 있게 됐습니다.

복잡한 workflow 자동화를 고민하시는 분들에게 도움이 되는 글이었으면 좋겠습니다.