Airflow 플러그인 제대로 활용하기 — Slack/Teams 알람부터 하나의 DAG 멀티 스케줄까지
Summary
Airflow 를 한동안 쓰다 보면 “DAG 만 잘 짜면 되는 거 아니야?” 하던 시기가 지나고, 운영을 편하게 만드는 건 DAG 바깥에 있다는 걸 깨닫게 돼요. 실패했는지 매번 웹 UI 들어가서 확인해야 하고, 평일 아침과 주말 오후에 다르게 돌려야 하는 작업을 두 개의 DAG 로 쪼개놓고 헷갈려하고… 이런 마찰들이 쌓입니다.
이 마찰을 줄여주는 게 Airflow 의 플러그인 시스템이에요. 이 글에서는 플러그인이 실제로 뭘 해주는지, 그리고 제가 운영에서 자주 쓰는 구성 세 가지 — Slack/Teams 실패 알람, 하나의 DAG 에 여러 스케줄을 거는 커스텀 Timetable, UI 확장 — 을 돌아가는 코드와 함께 정리합니다.
💡 이 글에서 다루는 것
- 플러그인이 해주는 일과 안 해주는 일 (오해 정리)
- 플러그인의 기본 구조 —
AirflowPlugin- Slack 실패 알람 —
on_failure_callback으로 붙이기- Teams 실패 알람 — Adaptive Card 웹훅으로 보내기
- 알람을 모든 DAG 에 한 줄로 —
default_args구성- 하나의 DAG 에 여러 스케줄 — 커스텀 Timetable
- UI 확장 — 태스크에 “로그/대시보드 열기” 버튼 달기
- 운영에서 자주 밟는 지뢰
Airflow 2.4 이상을 기준으로 합니다. (schedule= 인자, Timetable API 가 안정화된 버전대예요.)
1. 플러그인이 해주는 일, 그리고 안 해주는 일
가장 먼저 오해부터 풀고 가요. 예전 Airflow 1.x 시절에는 커스텀 Operator·Hook·Sensor 도 플러그인으로 등록해야 했어요. 그래서 “플러그인 = 커스텀 오퍼레이터 만드는 곳” 으로 기억하는 분이 많은데, 지금은 그렇지 않습니다.
2.x 부터 오퍼레이터·훅·센서는 그냥 평범한 파이썬 모듈에 두고 import 하면 돼요. 플러그인으로 등록할 필요가 없어졌습니다. 그럼 플러그인은 이제 뭘 하느냐, 정리하면 다음과 같아요.
| 플러그인으로 등록하는 것 | 무엇을 해주나 |
|---|---|
timetables |
커스텀 스케줄링 규칙. 하나의 DAG 에 여러 스케줄을 걸 때 핵심 |
macros |
템플릿({{ ... }})에서 쓸 수 있는 전역 함수 추가 |
global_operator_extra_links |
모든 태스크 상세 화면에 버튼(링크) 추가 |
operator_extra_links |
특정 오퍼레이터에만 버튼 추가 |
flask_blueprints / appbuilder_views |
웹서버에 커스텀 페이지 추가 |
appbuilder_menu_items |
상단 메뉴에 링크 항목 추가 |
listeners |
DAG/태스크 상태 변화 같은 이벤트 훅 등록 |
반대로 플러그인이 안 해주는 것도 명확히 해두면 좋아요.
- 커스텀 Operator/Hook/Sensor → 그냥 모듈로 두고 import (플러그인 불필요)
- Connection/Variable 관리 → UI·환경변수·시크릿 백엔드의 몫
- 의존 패키지 설치 →
requirements.txt/ 이미지 빌드의 몫
✅ 정리하면, 플러그인은 “Airflow 코어와 웹 UI 자체를 확장하는” 자리예요. DAG 안에서 쓰는 부품(오퍼레이터)이 아니라, DAG 바깥의 스케줄링·알림·화면을 손보는 도구라고 보면 됩니다.
2. 플러그인의 기본 구조
플러그인은 AirflowPlugin 을 상속한 클래스 하나로 시작해요. 이 클래스의 클래스 속성에 위 표의 항목들을 채워 넣으면 Airflow 가 기동할 때 자동으로 읽어 들입니다.
가장 작은 형태는 이래요.
# plugins/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
class MyCompanyPlugin(AirflowPlugin):
name = "my_company_plugin" # 필수. 플러그인 식별 이름
# 아래는 전부 선택 — 채운 것만 등록됨
timetables = []
macros = []
global_operator_extra_links = []
파일을 어디에 두느냐가 중요한데, Airflow 는 $AIRFLOW_HOME/plugins 디렉토리를 자동으로 스캔합니다. 그래서 운영에서는 보통 이런 구성을 권장해요.
$AIRFLOW_HOME/
├── dags/
│ └── ... # DAG 정의
└── plugins/
├── my_plugin.py # AirflowPlugin 등록 지점 (얇게 유지)
├── timetables/
│ └── multi_cron.py # 커스텀 Timetable
├── callbacks/
│ └── alerts.py # Slack / Teams 알람 함수
└── extra_links/
└── links.py # UI 버튼
💡 등록 지점인
my_plugin.py는 얇게 두고, 실제 구현은 하위 모듈로 빼는 걸 추천드려요. 콜백 함수 같은 건 DAG 에서도from callbacks.alerts import slack_fail_alert로 직접 import 하는 경우가 많은데,plugins/가 파이썬 경로에 잡혀 있어서 그대로 import 가 됩니다.
이제 이 구조 위에 알람부터 하나씩 채워볼게요.
3. Slack 으로 실패 알람 받기
가장 가성비 좋은 첫 구성이 실패 알람이에요. 태스크가 죽으면 슬랙으로 바로 날아오게 해두면, 더 이상 UI 를 들여다보며 “얘 돌고 있나?” 걱정하지 않아도 됩니다.
핵심은 Airflow 의 콜백이에요. 태스크에는 on_failure_callback, on_success_callback, on_retry_callback 을 걸 수 있고, 실패하면 Airflow 가 그 함수를 context 인자 하나와 함께 호출해줍니다.
먼저 Slack 쪽 준비. Slack 앱에서 Incoming Webhook 을 하나 만들고, 그 URL 을 Airflow Connection 으로 등록해요.
# Slack Webhook 커넥션 등록 (CLI)
airflow connections add slack_alert \
--conn-type slackwebhook \
--conn-password "T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
🚨 웹훅 URL 의 토큰 부분(
T.../B.../...)은 그 자체가 비밀번호예요. 코드·로그에 박지 말고 위처럼 커넥션(또는 시크릿 백엔드)으로만 다루세요. 위 값은 자리표시용 더미입니다.
이제 콜백 함수를 작성합니다. apache-airflow-providers-slack 패키지의 SlackWebhookHook 을 쓰면 깔끔해요.
# plugins/callbacks/alerts.py
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
def slack_fail_alert(context):
ti = context["task_instance"]
text = (
":red_circle: *Airflow 태스크 실패*\n"
f"*DAG*: `{ti.dag_id}`\n"
f"*Task*: `{ti.task_id}`\n"
f"*실행시각*: {context['logical_date']}\n"
f"*로그*: <{ti.log_url}|로그 열기>"
)
SlackWebhookHook(slack_webhook_conn_id="slack_alert").send(text=text)
여기서 context 가 뭘 담고 있는지 궁금할 텐데, 실제로 찍어보면 이렇게 보여요.
# 콜백 안에서 디버깅용으로 일부만 출력
def slack_fail_alert(context):
print(context["dag"].dag_id)
print(context["task_instance"].task_id)
print(context["task_instance"].log_url)
print(context["logical_date"])
print(repr(context.get("exception")))
etl_daily_sales
transform
http://airflow.example.com/log?dag_id=etl_daily_sales&task_id=transform&...
2026-06-29T01:30:00+00:00
ValueError('컬럼 amount 가 비어 있습니다')
ti.log_url 이 로그로 바로 가는 링크라서, 알람에 이거 하나만 박아두면 클릭 한 번으로 실패 로그까지 도달합니다. 이게 진짜 편해요.
마지막으로 DAG(또는 태스크)에 이 콜백을 걸어주면 끝.
from callbacks.alerts import slack_fail_alert
default_args = {
"on_failure_callback": slack_fail_alert,
"retries": 1,
}
4. Teams 로 실패 알람 받기
사내에서 Slack 대신 Microsoft Teams 를 쓰는 경우도 많죠. Teams 는 공식 Airflow 프로바이더가 따로 없다시피 해서, 웹훅에 JSON 을 POST 하는 방식으로 직접 보냅니다. 콜백 구조는 Slack 과 완전히 똑같고, 보내는 페이로드만 달라요.
⚠️ 예전에 많이 쓰던 Teams 의 “Office 365 Connector(MessageCard)” 웹훅은 Microsoft 가 단계적으로 종료하는 방향이에요. 새로 만든다면 Power Automate 의 Workflows 웹훅 + Adaptive Card 조합을 권장합니다. 채널 →
...→ 워크플로 → “웹훅 요청을 받으면 채널에 게시” 템플릿으로 URL 을 하나 받으면 돼요.
받은 워크플로 웹훅 URL 을 커넥션에 넣어둡니다. (URL 전체를 host 에 저장하는 방식이 다루기 편해요.)
airflow connections add teams_webhook \
--conn-type http \
--conn-host "https://prod-00.westus.logic.azure.com:443/workflows/........"
콜백 함수는 Adaptive Card 한 장을 만들어 POST 합니다.
# plugins/callbacks/alerts.py (이어서)
import requests
from airflow.hooks.base import BaseHook
def teams_fail_alert(context):
ti = context["task_instance"]
webhook_url = BaseHook.get_connection("teams_webhook").host
card = {
"type": "message",
"attachments": [{
"contentType": "application/vnd.microsoft.card.adaptive",
"content": {
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"type": "AdaptiveCard",
"version": "1.4",
"body": [
{"type": "TextBlock", "size": "Large", "weight": "Bolder",
"color": "Attention", "text": "🔴 Airflow 태스크 실패"},
{"type": "FactSet", "facts": [
{"title": "DAG", "value": ti.dag_id},
{"title": "Task", "value": ti.task_id},
{"title": "실행시각", "value": str(context["logical_date"])},
]},
],
"actions": [
{"type": "Action.OpenUrl", "title": "로그 열기", "url": ti.log_url},
],
},
}],
}
resp = requests.post(webhook_url, json=card, timeout=10)
resp.raise_for_status()
FactSet 으로 DAG·Task·시각을 표처럼 정리하고, Action.OpenUrl 로 로그 버튼을 달았어요. 카드가 채널에 이렇게 떠요.
🔴 Airflow 태스크 실패
DAG etl_daily_sales
Task transform
실행시각 2026-06-29 01:30:00+00:00
[ 로그 열기 ]
DAG 에 거는 방법은 Slack 과 동일합니다. 두 군데 다 보내고 싶으면 콜백을 합쳐도 돼요.
def notify_fail(context):
slack_fail_alert(context)
teams_fail_alert(context)
default_args = {"on_failure_callback": notify_fail}
5. 알람을 모든 DAG 에 한 줄로
DAG 마다 on_failure_callback 을 복붙하는 것도 금방 귀찮아져요. 두 가지 방법이 있습니다.
(1) default_args 공유 — 콜백을 담은 default_args 딕셔너리를 공용 모듈에 두고 각 DAG 에서 가져다 씁니다. 가장 단순하고 명시적이에요.
# plugins/callbacks/defaults.py
from callbacks.alerts import notify_fail
DEFAULT_ARGS = {
"on_failure_callback": notify_fail,
"retries": 1,
"retry_delay": __import__("datetime").timedelta(minutes=5),
}
# dags/etl_daily_sales.py
from callbacks.defaults import DEFAULT_ARGS
with DAG("etl_daily_sales", default_args=DEFAULT_ARGS, ...) as dag:
...
(2) 클러스터 전역 콜백 — DAG 를 아예 안 건드리고 모든 DAG 에 일괄 적용하고 싶으면, airflow.cfg 의 [core] 섹션에 있는 클러스터 정책을 쓰면 됩니다. airflow_local_settings.py 에 dag_policy 를 정의해두면 모든 DAG 로딩 시점에 후킹돼요.
# config/airflow_local_settings.py
from callbacks.alerts import notify_fail
def dag_policy(dag):
# 콜백이 안 걸린 DAG 에만 기본 실패 알람을 주입
if dag.on_failure_callback is None:
dag.on_failure_callback = notify_fail
💡 (1)은 DAG 마다 import 한 줄이 들어가지만 어떤 DAG 가 알람을 받는지 코드에 드러나서 추적이 쉬워요. (2)는 한 곳에서 강제하니 빠뜨릴 일이 없지만 암묵적이라 신규 팀원이 헷갈릴 수 있어요. 저는 표준 알람은 (2)로 깔고, 특수 알람(예: 특정 DAG 만 PagerDuty)은 (1)로 덮는 조합을 씁니다.
6. 하나의 DAG 에 여러 스케줄 — 커스텀 Timetable
여기서부터가 플러그인의 진짜 매력이에요. 예를 들어 이런 요구가 있다고 해봐요.
“평일에는 아침 9시에 한 번, 주말에는 오후 2시에 한 번 돌고 싶다.”
예전엔 이걸 DAG 두 개(job_weekday, job_weekend)로 쪼개거나, 매 시간 돌면서 “지금 돌 시간 맞아?” 를 분기하는 식으로 우회했어요. 둘 다 지저분하죠. Airflow 2.2 부터는 Timetable 로 스케줄 규칙 자체를 코드로 정의할 수 있고, 이걸 플러그인으로 등록하면 DAG 에서 그대로 가져다 쓸 수 있습니다.
여러 cron 표현식을 받아서 그중 가장 가까운 다음 시각으로 스케줄하는 타임테이블을 만들어볼게요. (croniter 패키지를 씁니다.)
# plugins/timetables/multi_cron.py
from datetime import datetime, timedelta
import pendulum
from croniter import croniter
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
class MultiCronTimetable(Timetable):
def __init__(self, crons, timezone="Asia/Seoul"):
self.crons = crons # 예: ["0 9 * * 1-5", "0 14 * * 6,0"]
self.timezone = timezone
@property
def summary(self): # 웹 UI 에 표시될 요약 문자열
return " | ".join(self.crons)
# DAG 직렬화 시 호출 — 스케줄러가 타임테이블을 저장/복원할 때 필요
def serialize(self):
return {"crons": self.crons, "timezone": self.timezone}
@classmethod
def deserialize(cls, data):
return cls(data["crons"], data["timezone"])
# 수동 트리거 시 data interval 을 어떻게 잡을지
def infer_manual_data_interval(self, *, run_after):
return DataInterval(start=run_after, end=run_after)
# 등록된 cron 들 중 after 이후로 가장 빠른 시각을 고른다
def _earliest_next(self, after):
tz = pendulum.timezone(self.timezone)
base = after.in_timezone(tz)
nexts = [
pendulum.instance(croniter(c, base).get_next(datetime))
for c in self.crons
]
return min(nexts)
# 스케줄러가 "다음 실행은 언제?" 를 물을 때 호출하는 핵심 메서드
def next_dagrun_info(self, *, last_automated_data_interval, restriction):
if last_automated_data_interval is not None:
last_run = last_automated_data_interval.end
else:
if restriction.earliest is None:
return None # start_date 가 없으면 스케줄 불가
# 시작점 자체가 스케줄 시각일 수도 있으니 1초 당겨서 포함시킴
last_run = restriction.earliest - timedelta(seconds=1)
next_time = self._earliest_next(last_run)
if restriction.latest is not None and next_time > restriction.latest:
return None # end_date 를 넘으면 멈춤
return DagRunInfo.interval(start=next_time, end=next_time)
커스텀 타임테이블에서 꼭 채워야 하는 자리는 네 군데예요. 정리하면 다음과 같습니다.
| 메서드/속성 | 역할 |
|---|---|
summary |
웹 UI 의 “Schedule” 칸에 보일 문자열 |
serialize / deserialize |
스케줄러가 타임테이블을 DB 에 저장/복원 |
infer_manual_data_interval |
사람이 손으로 트리거했을 때의 data interval |
next_dagrun_info |
다음 실행 시각 계산 — 가장 중요 |
이제 이걸 플러그인으로 등록합니다.
# plugins/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
from timetables.multi_cron import MultiCronTimetable
class MyCompanyPlugin(AirflowPlugin):
name = "my_company_plugin"
timetables = [MultiCronTimetable]
🚨 타임테이블 클래스는 반드시 플러그인의
timetables에 등록해야 합니다. 등록 안 하고 DAG 에서 직접 import 해서 쓰면, DAG 정의 자체는 통과해도 스케줄러가 직렬화를 못 해서DAG ... has timetable ... which is not registered식으로 깨져요. 알람 콜백은 그냥 import 로 쓰는데 타임테이블만 등록이 필요한 게 헷갈리는 지점이에요.
마지막으로 DAG 에서 schedule= 에 인스턴스를 넘겨요.
# dags/multi_schedule_job.py
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from timetables.multi_cron import MultiCronTimetable
with DAG(
dag_id="multi_schedule_job",
start_date=pendulum.datetime(2026, 1, 1, tz="Asia/Seoul"),
# 평일 09:00, 주말 14:00 — 한 DAG 에서 두 스케줄
schedule=MultiCronTimetable(["0 9 * * 1-5", "0 14 * * 6,0"]),
catchup=False,
) as dag:
EmptyOperator(task_id="run")
이제 웹 UI 의 Schedule 칸에는 0 9 * * 1-5 | 0 14 * * 6,0 으로 뜨고, 실제 실행은 평일 아침 9시 · 주말 오후 2시에 각각 한 번씩 일어나요. DAG 하나로 깔끔하게 끝납니다.
💡 위 구현은 개념을 보여주기 위해 단순화한 버전이에요. 실제 운영에 넣을 때는
catchup동작(과거 구간 backfill 여부), DST(서머타임) 경계, 그리고 data interval 의 의미(여기서는 시점-구간으로 0폭 처리)를 요구사항에 맞게 더 다듬는 걸 추천드려요. 단순히 “여러 cron 중 가장 가까운 시각” 만 필요하면 Airflow 내장CronTriggerTimetable을 cron 마다 따로 쓰는 것보다 이렇게 하나로 묶는 게 관리가 편합니다.
7. UI 확장 — 태스크에 버튼 달기
마지막으로 작지만 체감이 큰 기능 하나. 태스크 상세 화면에 “Grafana 대시보드 열기”, “Datadog 로그 보기” 같은 버튼을 달 수 있어요. 매번 URL 을 손으로 조합하던 걸 클릭 한 번으로 바꿔줍니다.
BaseOperatorLink 를 상속해서 링크를 정의하고, 플러그인의 global_operator_extra_links 에 등록하면 모든 태스크에 버튼이 붙어요.
# plugins/extra_links/links.py
from airflow.models.baseoperator import BaseOperatorLink
class GrafanaLink(BaseOperatorLink):
name = "Grafana 대시보드"
def get_link(self, operator, *, ti_key=None):
# ti_key 로 dag_id / task_id / run_id 를 받아 URL 을 조립할 수 있음
dag_id = ti_key.dag_id if ti_key else operator.dag_id
return f"https://grafana.example.com/d/airflow?var-dag={dag_id}"
# plugins/my_plugin.py (이어서)
from extra_links.links import GrafanaLink
class MyCompanyPlugin(AirflowPlugin):
name = "my_company_plugin"
timetables = [MultiCronTimetable]
global_operator_extra_links = [GrafanaLink()]
이러면 태스크를 클릭했을 때 뜨는 패널에 Grafana 대시보드 버튼이 생기고, 누르면 해당 DAG 의 대시보드로 바로 이동해요. 비슷한 식으로 상단 메뉴(appbuilder_menu_items)에 사내 위키·런북 링크를 박아두는 것도 운영팀이 좋아하는 구성입니다.
8. 운영에서 자주 밟는 지뢰
마지막으로 제가 직접 밟아본 것들 위주로 정리할게요.
- 플러그인 변경은 재기동이 필요해요. DAG 는 파일을 고치면 스케줄러가 다시 읽지만, 플러그인은 프로세스 기동 시점에 한 번 로드돼요. 타임테이블·extra link 를 고쳤는데 안 바뀌면 웹서버/스케줄러 재시작을 먼저 의심하세요.
- 콜백 안에서 또 예외가 나면 알람이 조용히 사라져요. 웹훅 호출에
timeout을 꼭 주고, 콜백 자체의 예외는 삼키거나 로깅만 하도록 방어하는 게 안전합니다. 알람 보내려다 알람이 죽으면 제일 황당해요. - 타임테이블은 등록(
timetables=[...])을 빼먹기 쉬워요. 6장의 🚨 그대로, import 만으로는 안 됩니다. logical_date는 UTC 예요. 알람 메시지에 그대로 박으면 한국 시각과 9시간 차이 나서 헷갈립니다. 보낼 때context['logical_date'].in_timezone('Asia/Seoul')로 변환하면 깔끔해요.- 웹훅 URL 은 비밀이에요. Slack/Teams 둘 다 URL 하나만 있으면 누구나 메시지를 보낼 수 있으니, 커넥션·시크릿 백엔드 밖으로 내보내지 마세요.
플러그인은 “있어도 그만” 처럼 보이지만, 한 번 알람과 멀티 스케줄을 붙여놓으면 운영 피로도가 확 줄어요. DAG 만 잘 짜는 단계를 넘어 운영을 편하게 만드는 단계로 넘어가는 가장 빠른 길이라고 생각합니다.
일단 오늘은 여기까지…..
다음 글에서는 Airflow 의 listener 와 OpenLineage 로 데이터 계보(lineage)를 자동 수집하는 구성을 정리해볼게요.
다음 글 →: Airflow listener 와 OpenLineage 로 데이터 계보(lineage) 자동 수집하기