안녕하세요, 카카오뱅크 투자개발팀에서 펀드시스템 개발과 운영을 담당하는 Sean입니다. 이번 글에서는 2024년 1월 오픈한 펀드 시스템에서 중요한 역할을 담당하고 있는 ‘펀드 배치 시스템’에 대해서 설명드리고자 합니다. 카카오뱅크 펀드 배치 시스템은 Airflow라는 오픈소스로 만들어졌습니다. Airflow는 에어비앤비(Airbnb)의 엔지니어링 팀에서 복잡한 워크플로우를 관리하기 위해 개발한 솔루션입니다. 현재는 아파치(Apache) 재단에서 오픈소스로 관리되고 있으며, 많은 기업에서 Airflow를 활용하여 데이터 파이프라인을 구축하고 있습니다.
특히 금융 서비스의 배치 시스템은 각종 규제를 준수하면서도 효율적으로 수많은 배치 작업을 수행해야 하는데요. 이런 환경에서 Airflow를 어떻게 활용하여 배치 시스템을 구축했는지에 대해 소개드리겠습니다.
카카오뱅크 펀드 서비스 오픈 ☁️
먼저, 카카오뱅크 펀드 서비스가 탄생한 대략적인 배경을 소개드리겠습니다. 카카오뱅크의 펀드 서비스는 온프레미스(On-Premise) 서버에 구축된 계정계와 채널계의 영역에서 벗어나, 당행 금융 서비스 중 최초로 퍼블릭 클라우드 환경에 구축됐습니다. 이는 2019년 전자금융 감독규정 개정을 통해 금융회사도 퍼블릭 클라우드를 이용할 수 있는 길이 열린 덕분입니다.
감독규정 개정 이후, 여러 금융회사들이 자사 IT 시스템의 클라우드 도입을 마케팅 포인트로 활용해 왔습니다. 하지만 대부분 가상 머신(Virtual Machine, VM) 사용 등 IaaS(Infrastructure as a Service)를 활용하는 데에 머물러 있었는데요. 이는 실질적으로 인프라만 클라우드 환경에 구축된 것일 뿐, 활용하는 기술과 개발 방식의 대대적인 변화를 수반하지는 않았습니다. 또한 대부분의 클라우드 시스템을 자사의 주요 서비스를 올리기 보단, 주로 비중요 시스템의 운영에 활용해 왔었습니다.
카카오뱅크에서는 이전부터 다양한 CSP(Cloud Service Provider)에서 사용할 수 있는 클라우드 기술에 대한 내부 검토를 진행해 왔습니다. 그리고 클라우드 환경에 서비스를 구축한다면 그 장점을 제대로 활용하기 위해 아예 설계부터 클라우드 네이티브 기술을 적용하여 시작해야 한다는데 의견이 모아졌습니다. 이를 기반으로 당행의 핵심 금융 시스템에도 당연히 클라우드를 도입할 수 있어야 한다고 생각한 카카오뱅크는 K8s, Managed Database, 분산스토리지, API GW 등 클라우드 네이티브 기술을 활용하여 서비스를 설계하고 구축하였습니다.
그렇게 탄생한 첫번째 사례가 바로 카카오뱅크 펀드 프로젝트입니다. 카카오뱅크는 ‘펀드 판매 라이센스(금융투자업 인가)’ 획득 단계부터 금융당국에 클라우드 네이티브 아키텍처를 갖춘 펀드 시스템을 제안했습니다. 평가 결과, 당행에서 제안한 펀드 시스템은 1등급 전자 금융 시스템을 영위하기에 충분한 전산 설비, 정보보호체계, 데이터 백업 시스템을 갖추고 있는 것으로 인정받았습니다. 카카오뱅크는 펀드 판매 라이센스를 획득하고 성공적으로 지난 2024년 1월 16일 서비스를 런칭했으며, 안정적으로 시스템을 운영하고 있습니다.
클라우드용 배치 시스템이 필요해 😇
본격적인 배치 시스템 이야기를 시작하기 전에, 잠시 카카오뱅크의 아키텍처를 살펴보겠습니다. 펀드 서비스 개발을 시작할 당시, 카카오뱅크 시스템은 아래 [그림 2]와 같은 형태를 띠고 있었는데요. 고객이 휴대폰 단말기로 앱을 사용하면 채널계와 계정계(Core Banking System, CBS)를 거쳐 고객의 금융 정보가 오고가며 다양한 금융 업무를 볼 수 있는 구조입니다.
그런데 클라우드 환경 위에 펀드 시스템을 구축하면서, 이 시스템이 단순히 펀드와 관련된 기능 연계를 제공하는 게 아니라 직접 펀드 거래를 수행할 수 있도록 채널계와 계정계에 대응되는 시스템들이 새롭게 구축되었습니다. 이에 따라 클라우드 펀드 시스템에서의 펀드 거래를 위해 핵심 금융 거래 원장도 클라우드에 구축되었습니다. 그렇게 카카오뱅크의 시스템은 아래 [그림 3]과 같은 형태로 진화하게 됩니다.
펀드 서비스가 퍼블릭 클라우드 환경 위에서 새로이 구축되다 보니, 기존에 존재하던 대부분의 당행 온프레미스 시스템들을 활용하는 데에 많은 제약이 있었습니다. 그 중 가장 큰 문제는 바로 사용할 수 있는 클라우드 환경을 위한 배치(Batch) 시스템이 마땅히 존재하지 않는다는 점이었습니다. 펀드 시스템의 아키텍처 설계 단계에서 배치 업무 수행 방안에 대한 고민하던 당시, 온프레미스 환경에서 운영 중이던 배치 시스템은 클라우드 환경을 지원하지 않았습니다. 결국 저희는 클라우드 네이티브 서비스를 위한 자체적인 배치 솔루션을 도입하기로 결정했습니다.
Airflow 도입 배경
카카오뱅크에서 신규 시스템을 구축하다 보면 ‘금융 도메인’이 가지는 특수성에서 비롯된 여러 요구사항을 마주하게 됩니다. 금융 서비스는 주로 ‘개인의 금융 활동’이라는 매우 민감한 정보를 다루고 있기 때문입니다. 그렇기에 은행에서의 개발은 필요한 규제를 준수하며 시스템의 안정성을 확보하고, 데이터의 정합성과 신뢰성을 보장할 수 있어야 합니다.
저희가 클라우드 환경에서 새로운 배치 시스템을 구축할 당시, 최우선적으로 고려했던 필수 요구사항은 다음과 같습니다.
- 모든 거래 및 활동을 추적할 수 있도록 감사(Auditing) 및 로깅(Logging) 시스템이 갖춰져야 합니다.
- 시스템 내외부로부터의 비인가된 접근을 방지하고 민감한 데이터를 보호하기 위한 적절한 접근 제어 설정이 가능해야 합니다.
- 고가용성과 신뢰성을 보장해야 합니다.
- 일관된 데이터 처리를 보장해야 합니다.
그리고 아래와 같은 희망 요구사항이 있었습니다.
- 단순히 배치 작업을 트리거하는 것뿐만 아니라 ETL(Extract, Transform, Load) 수행 등 복잡한 워크플로우 로직을 작성할 수 있어야 합니다.
- 작성된 작업에서 데이터가 어떻게 흐르는지, 어떤 조건에 따라 분기가 발생하는지 쉽게 파악할 수 있어야 합니다.
- 실시간으로 작업이 어떻게 수행되고 있는지 모니터링이 용이해야 합니다.
- 퍼블릭 클라우드에서 운영되는 시스템이기 때문에 AWS, GCP 등 클라우드 인프라와의 연동이 매끄러워야 합니다.
저희는 위와 같은 요구사항을 모두 만족시킬 워크플로우 플랫폼으로 가장 대중적인 Airflow를 고려해보게 되었습니다.
Airflow는 복잡한 워크플로우를 코드로 작성, 스케줄링, 모니터링하기 위한 오픈소스 플랫폼입니다. 여기서 ‘워크플로우(Workflow)‘란, 의존성으로 연결된 작업들의 집합을 의미하며, Airflow에서는 워크플로우와 데이터 파이프라인을 코드로 정의할 수 있습니다. 풍부한 플러그인과 커뮤니티 지원 덕분에 다양한 데이터 소스와의 연동 및 타사 도구와의 통합이 용이합니다. 또한, 유연한 스케줄링 기능을 제공하며 시스템의 확장성도 갖추고 있습니다. 그 밖에도 Web UI로 제공되는 시각화 도구를 통해 워크플로우 상태를 모니터링하고 디버깅할 수도 있습니다. 특히, 저희가 가장 필요로 했던 강력한 감사 및 접근 제어 기능은 도입 의사결정에 큰 영향을 미쳤습니다.
그럼 Airflow를 어떻게 핵심 금융거래를 수행하는 서비스의 배치 시스템으로 활용했는지 설명드리겠습니다.
Airflow의 기본 구성 및 작동 원리
📊 워크플로우 구성 요소: DAG, 태스크, 인스턴스
Airflow에서 각 워크플로우는 DAG(Directed Acyclic Graph)를 통해 표현됩니다. 이때 DAG는 태스크(Task)들로 구성되어 있으며, 태스크들 간의 순환이나 반복을 허용하지 않습니다.
각 태스크들이 수행하는 작업은 오퍼레이터(Operators)를 통해 정의됩니다. BashOperator
, PythonOperator
등의 기본 오퍼레이터는 Airflow 기본 패키지에 내장되어 있으며, API 호출을 위한 HttpOperator
, 데이터베이스 연동을 위한 JdbcOperator
등 일반적으로 필요한 기능들은 대부분 Airflow 커뮤니티에서 이미 제공되므로 손쉽게 프로젝트에 가져와 사용할 수 있습니다. 만일 추가로 필요한 기능이 있다면, 오퍼레이터의 확장성 덕분에 직접 필요한 오퍼레이터를 정의하여 사용할 수도 있습니다.
DAG가 실행되면 DAGrun 인스턴스(DAGrun Instance)가 생성되고, DAGrun 인스턴스 내에 각각의 독립적인 태스크 실행 인스턴스가 생성되는데 이를 태스크 인스턴스(Task Instance)라고 부릅니다.
⚙️ Airflow 컴포넌트
Airflow를 설치할 때 요구되는 최소한의 컴포넌트들은 다음과 같습니다.
• 익스큐터(Executor): 스케줄러 프로세스와 함께 동작하는 구성 요소로, 각 운영환경에 따라 필요한 익스큐터를 설정해 사용할 수 있습니다. CeleryExecutor, KubernetesExecutor 등이 있으며, 익스큐터 종류에 따라 실제로 태스크가 수행되는 워커(Worker)가 결정됩니다.
• 웹 서버: DAG, 태스크의 상태를 모니터링하고 트리거하기 위한 UI를 제공합니다.
• DAG 파일 폴더: 스케줄러가 어떤 태스크를 언제 실행시킬지를 읽어오기 위한 파일을 저장하는 곳입니다.
• 메타데이터 DB: 워크플로우와 태스크의 상태를 저장하기 위한 데이터베이스입니다.
Airflow는 DAG 파일 폴더에서 개발자가 파이썬으로 작성한 DAG를 읽어와 스케줄링하고, 해당 DAG의 실행 시각이 되면 익스큐터에게 태스크를 넘겨줍니다. 익스큐터는 워커에게 태스크 실행을 일임하며, 그 실행 상태는 메타데이터 DB에 저장됩니다. 이때, 업무를 실행시키는 방법을 결정하기 위해 어떤 익스큐터를 사용할지 결정해야 합니다.
⚖️ 익스큐터(Executor) 결정
Airflow는 기본 설정으로 Sequential Executor를 사용합니다. Sequential Executor는 병렬성을 제공하지 않아, 한 번에 하나의 태스크만 실행할 수 있어 실제 운영 환경에는 적합하지 않습니다. 소규모 단일 머신 프로덕션 환경의 경우에는 Local Executor를 대신 사용해야 하며, 다중 머신/클라우드 프로덕션 환경에서는 원격 익스큐터 중 하나를 선택해 사용해야 합니다. 이 글은 ‘클라우드 환경의 배치 시스템 구축’에 대해 설명하므로, 원격 익스큐터에 대해서만 다루도록 하겠습니다.
첫 번째로 설명드릴 Celery Executor는 Celery 백엔드(RabbitMQ, Redis 등)를 이용하여 수행 예정인 태스크들을 대기열에 등록하는 구조로 만들어져 있습니다. 대기열에 등록된 태스크는 워커가 대기열에서 읽어가서 개별적으로 처리합니다. 즉, 여러 대의 머신에서 다수의 워커가 동작하며 분산 환경에서 병렬로 태스크를 실행할 수 있게 됩니다. 이 방식으로 익스큐터가 동작하기 위해서는 워커가 노드를 상시 점유하며 대기열을 구독하는 형태로 구동되어야 합니다.
다음으로 소개드릴 Kubernetes Executor는 그 이름 그대로 쿠버네티스(K8S, Kubernetes)에서 동작하는 익스큐터입니다. 각 태스크들은 쿠버네티스 클러스터 내에서 독립된 파드로 실행됩니다. 클라우드 환경에서는 모든 것이 비용과 직결되므로 워커가 상시 노드를 점유하지 않도록 하여, 비용을 절감할 필요가 있었습니다. 이에 따라 카카오뱅크의 펀드 시스템은 Kubernetes Executor를 사용하기로 결정했습니다. 물론 Kubernetes Executor는 워크플로우를 실행하기 위해 파드가 초기화되고 트리거되는 데 시간이 소요된다는 단점이 있지만, 대략 1분 내외의 짧은 시간으로 이 정도 트레이드오프는 펀드 시스템 특성상 허용 가능한 범위에 있어 도입에 문제가 되지 않았습니다.
Airflow에서 배치 업무 작성하기
Step 1. DAG 작성 컨벤션
Airflow에는 DAG와 태스크를 정의하는 여러 가지 방법이 있습니다. 저희는 각자의 업무를 정의하고 워크플로우를 코드로 옮기기에 앞서, 팀 내에서 어떤 문법을 사용할 것인지 컨벤션을 맞춰 DAG 파일을 템플릿화했습니다. 최초 작성 이후 수많은 변경을 겪으며 DAG가 복잡해지는 경우, 가독성을 해치지 않도록 통일된 코드 작성 규칙이 필요했습니다.
그렇게 저희는 아래의 코드처럼 DAG를 정의할 때 데코레이터(airflow decorators)를 사용하고, 태스크를 정의할 때는 클래스의 표준 생성자를 사용하는 방법을 선택했습니다.
import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.timetables.interval import CronDataIntervalTimetable
@dag(
dag_id="daily-batch",
start_date=datetime(2024, 1, 12, pendulum.timezone("Asia/Seoul")),
schedule=CronDataIntervalTimetable(cron="...", timezone="Asia/Seoul"),
tags=['fund'],
description="Daily Batch",
)
def daily_batch():
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task1 >> [task2, task3]
generate_dag()
Step 2. 비즈니스 로직 작성
펀드 시스템의 특징 중 하나는 배치 업무에 핵심 비즈니스 로직이 있다는 것입니다. 왜 그럴까요?
펀드는 투자자로부터 모은 자금을 자산운용회사가 투자하고 운용한 후 그 결과를 돌려주는 간접 투자 상품입니다. 펀드도 주식과 같은 직접 투자 상품과 마찬가지로 사고팔 때 의사결정의 기준이 되는 가격이 매겨지는데, 이를 ‘기준가’라고 부릅니다. 실시간으로 변동하는 주식의 주가와 달리, 펀드의 기준가는 하루에 한 번 계산됩니다. 이는 펀드가 주식, 채권 등 여러 종류의 자산으로 운용되고 있으며, 해당 자산들의 모든 가치를 반영해서 기준가를 계산하기 때문입니다. 또한, 연 1회 수행되는 결산이라는 업무를 통해 펀드의 수익을 분배하고 기준가를 초기화하기도 합니다. 따라서 매일 오전, 기준가를 입력하고 펀드의 주문을 처리하는 각종 배치 업무가 아래와 같이 수행됩니다.
이러한 오전 배치 업무에 포함된 복잡한 비즈니스 로직을 파이썬 코드로 옮겨 다시 작성하게 되면, 테스트가 어려워지고 로직이 여기저기 분산됨에 따라 업무의 일관성과 무결성을 보장할 수 없게 됩니다. 따라서 펀드의 오전 배치 업무 중에서 펀드의 핵심 비즈니스 로직을 사용해야 하는 유즈케이스는 펀드 코어 애플리케이션에 그대로 두고, 해당 유즈케이스만 트리거할 수 있도록 DAG를 작성해야 했습니다.
이를 위해 Airflow는 애플리케이션 이미지로 pod를 생성하고 유즈케이스만 트리거할 수 있도록 KubernetesPodOperator를 사용했습니다.
아래와 같이 배치 업무 수행용 pod를 생성해서 업무를 수행합니다.
@dag(
dag_id="fund-daily-batch",
start_date=datetime(2024, 1, 12, tzinfo=pendulum.timezone("Asia/Seoul")),
# ...,
description="펀드 일일 배치",
)
def fund_daily_batch():
# ...
selling_confirmation = KubernetesPodOperator(
container_resources=k8s.V1ResourceRequirements(
limits={"cpu": "1", "memory": "4Gi"},
requests={"cpu": "1", "memory": "2Gi"}
),
image_pull_policy="Always",
image_pull_secrets=[k8s.V1LocalObjectReference('auth_token')],
task_id="selling_confirmation",
env_vars=fund_env_vars,
secrets=[fund_secrets],
arguments=['selling_confirmation', '--date', execution_date],
name="selling_confirmation_batch",
image="kakaobank-registry/.../fundapp:######",
pod_template_file='.../pod_template.yaml',
)
# ...
기준가_입력 >> 결산_처리 >> 기준가_확정_대기
기준가_입력_검증 >> selling_confirmation >> 환매_결제
환매_결제 >> 매입_결제
# ...
fund_daily_batch()
Step 3. 시간 관리
위 fund_daily_batch
DAG에서는 execution_date
라는 값이 보입니다. 시스템에 장애가 발생하여 배치 업무가 실패했을 때, 업무 재처리를 위해 익영업일에 다시 실행시키더라도 전날 실행한 것과 같은 결과를 보장하기 위해 펀드의 모든 배치 업무에는 날짜 파라미터를 전달하고 있습니다.
현재는 사용이 중단(deprecated)되고 {{ logical_date }}
라는 이름으로 변경되었지만, Airflow의 템플릿에는 {{ execution_date }}
라는 값이 있었습니다. 그러나 이 값을 그대로 사용하면 의도치 않은 결과를 초래할 수 있습니다.
Airflow에서 사용하는 execution_date
는 이름과 달리 태스크가 실제로 실행되는 시간이 아니라, ‘해당 태스크가 처리하고자 하는 Time Window의 시작 지점’을 의미합니다. 예를 들어, 실행 간격이 1일인 태스크가 2024년 1월 2일에 실행되었다면, 해당 태스크는 2024년 1월 1일 08:00에 Data Interval이 시작되고, 2024년 1월 2일 08:00에 Data Interval이 종료됩니다. 이 시간 간격을 Schedule Interval이라고 하며, DAG의 실행 주기를 나타냅니다.
이는 Airflow 공식 문서에도 잘 설명되어 있습니다.
(원문) A DAG run is usually scheduled after its associated data interval has ended, to ensure the run is able to collect all the data within the time period. In other words, a run covering the data period of 2020-01-01 generally does not start to run until 2020-01-01 has ended, i.e. after 2020-01-02 00:00:00.
(한글 해석 🇰🇷) DAG 실행은 일반적으로 연관된 data interval이 종료된 후로 스케줄링되며, 이렇게 함으로써 DAG 실행이 해당 기간 내의 모든 데이터를 수집할 수 있도록 보장합니다. 즉, 2020년 1월 1일 중의 데이터를 다루는 DAG 실행은 일반적으로 그 날(2020년 1월 1일)이 끝날 때까지 실행되지 않습니다. 즉, 2020년 1월 2일 00:00:00 이후에 실행됩니다.
즉, Airflow는 작업을 트리거할 때 해당 작업의 시작시간을 Left-bound 하여 사용하고 있는 것을 확인할 수 있습니다. 이를 그림으로 나타내면 다음과 같습니다.
따라서 펀드 배치 업무 처리를 위해 Airflow에서 pod를 트리거할 때, 실제로 실행한 날짜를 인자로 전달해야 하는 요구사항이 있을 경우, 트리거 시점의 일시를 사용하기 위해 {{ data_interval_end }}
을 사용하고 있습니다. 아울러 개발자의 실수를 방지하기 위해 아래와 같이 스크립트를 작성하여 사용하고 있습니다.
# DagUtil.py
def target_date(no_dash: bool = False) -> str:
if no_dash:
return target_date_nodash()
else:
return "{{ dag_run.conf.target_date | d(macros.fund.ktc_dash(data_interval_end)) }}"
def target_date_nodash() -> str:
return "{{ dag_run.conf.target_date | d(macros.fund.ktc_dash(data_interval_end)) | replace('-','') }}"
def target_date_yesterday(no_dash: bool = False) -> str:
if no_dash:
return target_date_yesterday_nodash()
else:
return "{{ dag_run.conf.target_date | d(macros.fund.ktc_before_dash(data_interval_end)) }}"
def target_date_yesterday_nodash() -> str:
return "{{ dag_run.conf.target_date | d(macros.fund.ktc_before_dash(data_interval_end)) | replace('-','') }}"
#fundmacros.py
# Asia/Seoul YYYY-MM-DD pattern
def ktc_dash(dt: datetime) -> str:
return dt.astimezone(LOCAL_TZ).strftime("%Y-%m-%d")
# Asia/Seoul YYYYMMDD pattern
def ktc_nodash(dt: datetime) -> str:
return ktc_dash(dt).replace("-", "")
# Asia/Seoul before YYYY-MM-DD pattern
def ktc_before_dash(dt: datetime) -> str:
return (dt - timedelta(days=1)).astimezone(LOCAL_TZ).strftime("%Y-%m-%d")
# Asia/Seoul before YYYYMMDD pattern
def ktc_before_nodash(dt: datetime) -> str:
return ktc_before_dash(dt).replace("-", "")
class FundMacrosPlugin(AirflowPlugin):
name = "fund"
macros = [ktc_dash, ktc_nodash, ktc_before_dash, ktc_before_nodash]
금융 규제를 충족하는 Airflow 시스템 구축
감사 로그(Audit Log)
앞서 은행의 개발자는 규제를 준수하며 개발해야 된다고 설명드렸었죠? 🏦
저희가 배치 시스템을 구축하며 참고했던, 전자금융감독규정 제 30조(일괄작업에 대한 통제)입니다.
1. 일괄작업은 작업요청서에 의한 책임자의 승인을 받은 후 수행할 것
2. 일괄작업은 최대한 자동화하여 오류를 최소화할 것
3. 일괄작업 수행 과정에서 오류가 발생하였을 경우 반드시 책임자의 확인을 받을 것
4. 모든 일괄작업의 작업내용을 기록ㆍ관리할 것
5. 책임자는 일괄작업 수행자의 주요업무 관련 행위를 모니터링할 것
해당 규정을 통해 알 수 있듯이 모든 일괄 작업(배치 업무)의 작업 내용은 모두 기록·관리되어야 하며, 책임자의 승인 없이는 수행될 수 없습니다. 책임자의 승인에 관한 내용은 아래에서 조금 더 자세히 다루겠습니다. 다행이 Airflow에서는 DAG 및 태스크 실행에 연관된 사용자의 행동과 시스템 이벤트를 추적하는 감사 로그(Audit Log) 기능을 지원합니다. 해당 감사 로그는 DB에 저장되며, Airflow UI를 통해 접근할 수 있습니다.
Airflow 공식 문서에 정의된 이벤트의 종류는 다음과 같습니다.
• [variable,connection].create: 사용자가 연결 또는 변수를 생성함
• [variable,connection].edit: 사용자가 연결 또는 변수를 수정함
• [variable,connection].delete: 사용자가 연결 또는 변수를 삭제함
• delete: 사용자가 DAG 또는 태스크를 삭제함
• failed: Airflow 또는 사용자가 태스크를 실패로 설정함
• success: Airflow 또는 사용자가 태스크를 성공으로 설정함
• retry: Airflow 또는 사용자가 태스크 인스턴스를 재시도함
• clear: 사용자가 태스크의 상태를 초기화함
• cli_task_run: Airflow가 태스크 인스턴스를 트리거함
이외에도 사용자가 특정 태스크 인스턴스의 로그를 확인하거나, 사용자 권한을 변경하는 등의 기록들이 감사 로그에 기록됩니다.
접근통제 (RBAC, Role-Based Access Control)
다음으로, 배치 시스템을 구축하며 참고한 전자금융감독규정 제26조(직무의 분리)입니다.
1. 프로그래머와 오퍼레이터
2. 응용프로그래머와 시스템프로그래머
3. 시스템보안관리자와 시스템프로그래머
4. 전산자료관리자(librarian)와 그 밖의 업무 담당자
5. 업무운영자와 내부감사자
6. 내부인력과 전자금융보조업자 및 유지보수업자 등을 포함한 외부인력
7. 정보기술부문인력과 정보보호인력
8. 그 밖에 내부통제와 관련하여 직무의 분리가 요구되는 경우
위의 규정에 따르면, 프로그래머와 오퍼레이터 직무는 분리되어야 합니다. 즉, ‘일괄작업 프로그램의 실행’은 오퍼레이터의 직무가 담당하는 업무이고, 프로그래머의 담당이 아니므로 프로그래머가 일괄작업을 직접 수행할 수 없다는 뜻이 됩니다. 따라서 프로그래머에 해당하는 개발자가 직접 배치처리 업무를 트리거하지 않고, 오퍼레이터를 통해 배치 업무를 트리거할 수 있도록 적절한 접근통제 방안이 마련되어야 합니다.
DAG 레벨로 접근통제를 지정하려면 아래 코드와 같이 access_control={'$Role' : { '$권한', '$권한',... }}
을 DAG 설정에 추가해 주면 됩니다.
DAG 레벨로 접근 통제를 지정하려면 아래 코드와 같이 access_control={'$Role': {'$권한', '$권한', ... }}
을 DAG 설정에 추가하면 됩니다. 권한은 Airflow 공식 문서의 Access Control 섹션을 참고하여 작성하시면 됩니다.
@dag(dag_id=“example_acl”, start_date=datetime.datetime(2024, 9, 1),
access_control={
“fund-operator”: { “can_edit”, “can_read” } ,
})
def sample_dag():
task = EmptyOperator(task_id=“hello”)
task
sample_dag()
하지만 모든 DAG에 위와 같은 코드를 삽입하는 것은 매우 번거롭기에, 아래와 같이 Airflow 설정 파일에 스크립트를 추가하여 작업을 자동화했습니다.
localSettings:
## the full content of the `airflow_local_settings.py` file (as a string)
## - docs for airflow cluster policies:
## https://airflow.apache.org/docs/apache-airflow/stable/concepts/cluster-policies.html
##
## ____ EXAMPLE _______________
## stringOverride: |
## # use a custom `xcom_sidecar` image for KubernetesPodOperator()
## from airflow.kubernetes.pod_generator import PodDefaults
## PodDefaults.SIDECAR_CONTAINER.image = "gcr.io/PROJECT-ID/custom-sidecar-image"
##
stringOverride: |
from airflow.models import DAG
from airflow.security import permissions
BANK_TAG_GROUP = ["fund"]
def dag_policy(dag: DAG):
if not dag.tags:
return
dag_tags = list(filter(lambda x: x in BANK_TAG_GROUP, dag.tags))
if not dag_tags:
return
for role in dag_tags:
op_role = role+"-operator"
_access_control = dag.access_control or {}
_access_control.setdefault(role, set())
_access_control.setdefault(op_role, set())
_access_control[role].update({permissions.ACTION_CAN_READ})
_access_control[op_role].update({permissions.ACTION_CAN_EDIT})
dag.access_control = _access_control
마무리하며
지금까지 클라우드 환경에서 펀드 시스템을 구축하고, Apache Airflow 오픈소스로 배치 업무 수행 환경을 어떻게 구성했는지에 대해 설명드렸습니다. 배치 시스템 구축은 퍼블릭 클라우드 환경에서 금융 서비스를 개발하는 과정에서 직면한 많은 과제 중 하나였습니다. 특히, 펀드 도메인 특유의 복잡한 비즈니스 로직을 안정적으로 실행하고 관리하기 위해 규제를 준수하면서도 다양한 기술적 고민과 해결 방안을 모색해야 했습니다. 몇 번의 실패와 도전을 겪으면서도, 팀원들의 협력과 열정 덕분에 현재의 시스템을 완성할 수 있었습니다.
서비스를 오픈한 이후에도 여전히 많은 도전 과제가 남아있습니다. 하지만 제가 소속된 카카오뱅크의 투자개발팀은 안정적이고 혁신적인 펀드 서비스를 제공하기 위해 항상 최선을 다하고 있습니다. 앞으로도 고객 여러분의 기대에 부응할 수 있도록, 시스템의 안정성을 유지하며 더 나은 펀드 서비스를 제공하기 위해 펀드 시스템을 지속적으로 개선해 나가겠습니다. 카카오뱅크 펀드 서비스에 앞으로도 많은 기대와 관심 부탁드립니다. 감사합니다. 😆