(2/3) 검증 레이어를 적재 파이프라인에 꿰기 — 배치 인입·멱등 적재·사람 확인 큐
- PDF 에서 데이터를 뽑아 DB 에 넣기 — 구조체 파싱 vs OCR, 뭘 언제 쓰나
- 검증 레이어를 적재 파이프라인에 꿰기 — 배치 인입·멱등 적재·사람 확인 큐 ← 지금 글
- 검토 로그를 되먹여 양식별 파서 붙이기 — 양식 클러스터링·앵커 추출·회귀 테스트
Summary
1편에서는 PDF 에서 값을 뽑는 두 갈래 — 텍스트 레이어를 그대로 읽는 구조체 파싱 과 픽셀을 추론하는 OCR — 을 정확도 관점에서 갈라 썼어요. 그리고 어느 쪽이든 뽑은 값을 바로 DB 에 넣지 말고 검증 레이어를 거치라 는 얘기로 끝맺었죠. 타입 검증, 합계 재계산, confidence 게이팅 — 이 세 가지 함수를 만들었습니다.
그런데 함수 세 개를 만들어 둔 것과, 그걸 매일 수천 장이 들어오는 파이프라인 안에서 실제로 돌리는 것 은 다른 문제예요. 같은 문서가 두 번 들어오면? 검증에서 떨어진 문서는 어디로 보내죠? API 가 타임아웃 나면 재시도해야 하는데, 검증에서 떨어진 건 재시도하면 안 되고. 이런 걸 정리하지 않으면, 좋은 검증 함수를 만들어놓고도 어딘가에서 틀린 값이 새거나 같은 값이 두 번 들어가요.
이 글에서는 그 검증 레이어를 인입 → 검증 → 적재 → 사람 확인 으로 이어지는 파이프라인에 꿰는 구조를 정리합니다. 목표는 1편과 같아요 — 틀린 값을 DB 문 앞에서 돌려보내는 것. 다만 이번엔 그걸 한 장이 아니라 수천 장 규모에서, 재처리해도 안전하게.
💡 이 글에서 다루는 것
- 문서에 상태(status) 를 붙이는 이유 — 파이프라인은 상태 머신이에요
- 멱등 적재 — 같은 문서를 두 번 돌려도 중복이 안 생기게 (지문 해시 + UPSERT)
- 검증 게이트 — 통과는 적재로, 거절은 사람 확인 큐로 라우팅
- 사람 확인 큐 — 원본 근거(페이지·좌표)를 붙여 넘기고, 고친 값을 다시 흘려보내기
- 재시도와 실패 격리 — 일시적 오류는 재시도, 영구적 오류는 격리(재시도 금지)
- 관측 — 자동 적재율·거절률을 재서 양식 변화를 감지
1편을 안 봤어도 따라올 수 있게 썼지만, 검증 함수 세 개(parse_amount·rows_consistent·needs_review)는 1편에 있으니 같이 보시는 걸 추천드려요.
1. 파이프라인은 상태 머신이에요
문서 한 장을 “받아서 넣는다” 로 뭉뚱그리면 재처리·실패·검토를 다룰 수가 없어요. 그래서 문서마다 지금 어디까지 왔는지 를 상태로 기록합니다. 이게 파이프라인 전체의 뼈대예요.
from enum import Enum
class Status(str, Enum):
RECEIVED = "received" # 인입됨, 아직 추출 전
EXTRACTED = "extracted" # 값은 뽑음, 검증 전
VALIDATED = "validated" # 검증 통과, 적재 직전
LOADED = "loaded" # DB 적재 완료 (종착)
NEEDS_REVIEW = "needs_review" # 검증 거절 → 사람 확인 큐
FAILED = "failed" # 처리 불가 (손상 파일 등)
각 문서는 이 상태를 하나씩 밟아 나가요. 정상 경로는 received → extracted → validated → loaded, 검증에서 걸리면 extracted → needs_review, 파일이 깨졌으면 failed. 상태를 테이블 한 칸에 저장해두면 세 가지가 공짜로 따라와요.
- 재시작 안전 — 파이프라인이 중간에 죽어도,
loaded가 아닌 문서만 다시 집으면 돼요. - 가시성 —
needs_review가 몇 건인지 한 줄 쿼리로 나와요. - 멱등성의 토대 — 이미
loaded인 문서를 또 만나면 건너뛰면 돼요.
💡 상태를 문서 자체(파일명·메모리)가 아니라 DB 테이블에 두는 게 중요해요. 프로세스가 죽어도 살아남아야 재처리가 안전하거든요.
2. 멱등 적재 — 두 번 돌려도 한 번만 들어가게
파이프라인에서 재시도·재처리는 피할 수 없어요. 네트워크가 끊기고, 배치가 중간에 죽고, 어제 실패한 폴더를 오늘 다시 돌리죠. 이때 같은 문서가 DB 에 두 번 들어가면 그것도 “틀린 데이터” 예요. 합계가 두 배로 잡히니까요.
해법은 문서마다 변하지 않는 고유 키 를 정하는 거예요. 가장 튼튼한 건 파일 내용 자체의 해시(지문)예요.
import hashlib
def doc_fingerprint(pdf_bytes: bytes) -> str:
"""파일 내용으로 만드는 고유 지문. 같은 파일이면 항상 같은 값."""
return hashlib.sha256(pdf_bytes).hexdigest()[:16]
raw = open("invoice_born_digital.pdf", "rb").read()
print(doc_fingerprint(raw))
print(doc_fingerprint(raw)) # 같은 파일 → 같은 지문
9f2a41c0b7e8d135
9f2a41c0b7e8d135
이 지문을 테이블의 유니크 키로 걸고, 적재는 INSERT 가 아니라 UPSERT 로 해요. 같은 지문이 이미 있으면 새로 넣지 않고 갱신만 합니다.
INSERT INTO invoice (fingerprint, vendor, total, loaded_at)
VALUES (%(fingerprint)s, %(vendor)s, %(total)s, now())
ON CONFLICT (fingerprint) DO UPDATE
SET vendor = EXCLUDED.vendor,
total = EXCLUDED.total,
loaded_at = now();
이렇게 하면 같은 배치를 열 번 돌려도 결과가 똑같아요. 이게 멱등성(idempotency) 이고, 재시도를 마음 놓고 할 수 있게 해주는 안전장치예요.
⚠️ 파일 해시 대신 “거래번호+날짜” 같은 문서 안의 자연키 를 쓸 수도 있어요. 다만 그 값 자체를 OCR 로 뽑았다면 오인식 위험이 있으니, 자연키는 구조체 파싱으로 확실히 읽히는 문서에서만. 스캔본이 섞이면 파일 지문이 더 안전합니다.
3. 검증 게이트 — 통과는 적재로, 거절은 큐로
1편에서 만든 검증 함수들을 여기서 하나의 게이트 로 묶어요. 핵심은 “통과/거절” 만 내는 게 아니라, 거절이면 왜 거절인지 이유를 같이 내는 거예요. 그래야 사람이 검토할 때 어디를 볼지 알거든요.
def validate_invoice(rec: dict) -> tuple[bool, list[str]]:
"""검증 게이트 — 통과 여부와 거절 사유 목록을 함께 반환."""
problems = []
if rec["total"] is None: # 1편 parse_amount 가 None 준 경우
problems.append("합계란 숫자 파싱 실패")
elif sum(i["amount"] for i in rec["line_items"]) != rec["total"]:
problems.append("항목 합계 ≠ 합계란") # 1편 rows_consistent
if rec["min_conf"] < 0.90: # 1편 needs_review
problems.append(f"낮은 confidence {rec['min_conf']:.2f}")
return (len(problems) == 0, problems)
깨끗한 레코드와 OCR 이 흔들린 레코드를 하나씩 넣어보면 이렇게 갈려요.
clean = {"total": 125000, "min_conf": 0.98,
"line_items": [{"amount": 80000}, {"amount": 45000}]}
dirty = {"total": None, "min_conf": 0.61,
"line_items": [{"amount": 80000}, {"amount": 45000}]}
print(validate_invoice(clean))
print(validate_invoice(dirty))
(True, [])
(False, ['합계란 숫자 파싱 실패', '낮은 confidence 0.61'])
그리고 이 게이트를 상태 전이에 연결해요. 통과면 멱등 적재하고 loaded, 거절이면 사유와 함께 사람 확인 큐로 보내고 needs_review.
def route(rec: dict) -> Status:
ok, problems = validate_invoice(rec)
if ok:
upsert_invoice(rec) # 2장의 UPSERT
return Status.LOADED
enqueue_review(rec, reasons=problems) # 큐에 사유까지 같이
return Status.NEEDS_REVIEW
여기서 1편에서 강조한 원칙이 그대로 살아요 — 틀린 값을 그럴듯하게 자동 보정하지 않는다. 의심스러우면 무조건 큐로 보내고, 통과한 것만 테이블에 들어가요. 자동 적재율이 좀 낮아지더라도, 잘못된 레코드가 조용히 들어가는 것보다 훨씬 싸요.
4. 사람 확인 큐 — 근거를 붙여 넘기고, 고친 값을 되돌리기
needs_review 로 떨어진 문서는 사람이 봐야 해요. 이때 검토 화면에 원본 문서의 어느 부분에서 나온 값인지 를 같이 띄워주는 게 핵심이에요. 사람이 PDF 를 처음부터 뒤지게 만들면 검토가 병목이 되거든요.
그래서 큐에 넣을 때 값만 넣지 말고 출처(provenance) 를 같이 실어요.
def enqueue_review(rec: dict, reasons: list[str]) -> None:
review_rows.insert({
"fingerprint": rec["fingerprint"], # 어떤 문서
"field": rec["field_name"], # 어떤 필드
"raw_text": rec["raw_text"], # OCR/파싱이 읽은 원본 문자열
"source_page": rec["page"], # 몇 페이지
"bbox": rec["bbox"], # 그 페이지 어디(좌표)
"reasons": reasons, # 왜 걸렸는지
"status": Status.NEEDS_REVIEW,
})
source_page 와 bbox(좌표)가 있으면, 검토 UI 에서 그 페이지의 해당 영역을 하이라이트해 보여줄 수 있어요. 검토자는 “이 값이 8O,OOO 로 읽혔는데 실제론 뭔가요?” 를 몇 초 만에 판단하죠.
사람이 값을 고치면, 그 레코드를 버리지 말고 파이프라인으로 되돌려 보내요. 고친 값도 똑같은 검증 게이트를 다시 통과해야 하고, 통과하면 같은 UPSERT 로 적재돼요.
def apply_correction(fingerprint: str, field: str, corrected: str) -> Status:
rec = load_record(fingerprint)
rec[field] = corrected
rec["min_conf"] = 1.0 # 사람이 확정한 값
return route(rec) # 같은 게이트·같은 멱등 적재로 재진입
멱등 적재라서 이 재진입이 안전해요. 이미 부분 적재됐든 아니든, UPSERT 가 최종 상태를 하나로 맞춰주거든요.
✅ 사람이 자주 고치는 필드·양식을 로그로 쌓아두면, 나중에 그 양식 전용 후처리 규칙을 더하거나 OCR 전처리를 손볼 근거가 돼요. 검토 큐는 부담이 아니라 개선 신호의 출처 이기도 해요.
5. 재시도와 실패 격리 — 다 재시도하면 안 돼요
파이프라인에서 오류는 두 종류예요. 이걸 안 나누면 크게 손해 봅니다.
- 일시적(transient) 오류 — OCR API 타임아웃, 5xx, 네트워크 끊김. 다시 하면 성공 할 수 있어요.
- 영구적(permanent) 오류 — 파일 손상, 검증 실패. 다시 해도 똑같이 실패해요.
일시적 오류만 재시도하고, 영구적 오류는 재시도 없이 격리해야 해요. 검증에서 떨어진 문서를 재시도 루프에 넣으면, 같은 문서를 무한히 다시 돌리면서 API 비용만 태워요.
class TransientError(Exception): ... # 타임아웃·5xx — 재시도 대상
class PermanentError(Exception): ... # 손상·검증 실패 — 격리 대상
def process_with_retry(doc, max_tries: int = 3) -> Status:
for attempt in range(1, max_tries + 1):
try:
return process(doc)
except TransientError:
wait = 2 ** attempt # 2s, 4s, 8s … 지수 백오프
log(f"일시적 오류 — 재시도 {attempt}/{max_tries} ({wait}s 후)")
sleep(wait)
except PermanentError as e:
return dead_letter(doc, reason=str(e)) # 재시도 없이 격리
return dead_letter(doc, reason="재시도 소진")
dead_letter 로 보낸 문서는 DLQ(dead-letter queue) 에 쌓여요. 나중에 사람이 원인을 보고 다시 흘리거나, 애초에 처리 대상이 아닌 문서(빈 페이지·다른 양식)를 걸러내는 곳이에요.
⚠️ 재시도는 반드시 멱등 적재 위에서만 안전해요. 2장의 UPSERT 가 없으면, 재시도가 성공할 때마다 중복 레코드를 쌓아요. 멱등성과 재시도는 한 세트로 봐야 합니다.
6. 관측 — 거절률이 곧 품질 신호
파이프라인을 돌리기 시작하면, 매 배치의 상태 분포 를 재세요. 이게 데이터 품질을 보는 가장 싼 계기판이에요.
from collections import Counter
def batch_report(results: list[Status]) -> dict:
c = Counter(results)
total = len(results)
return {
"loaded": c[Status.LOADED],
"review": c[Status.NEEDS_REVIEW],
"failed": c[Status.FAILED],
"auto_rate": round(c[Status.LOADED] / total, 3), # 자동 적재율
}
results = ([Status.LOADED] * 182
+ [Status.NEEDS_REVIEW] * 15
+ [Status.FAILED] * 3)
print(batch_report(results))
{'loaded': 182, 'review': 15, 'failed': 3, 'auto_rate': 0.91}
자동 적재율(auto_rate)을 매일 기록해두면, 이게 양식이 바뀌었는지 알려주는 경보 가 돼요. 평소 0.91 이던 게 갑자기 0.6 으로 떨어지면, 거래처가 계산서 양식을 바꿨거나 스캔 품질이 나빠진 거예요. 사람이 큐를 다 처리하느라 지치기 전에, 숫자가 먼저 알려주죠.
💡 거절률이 특정 거래처·양식에 몰려 있으면 그 양식 전용 파서를 하나 더 두는 게 이득일 때가 많아요. 관측이 있어야 “어디에 노력을 쏟을지” 가 보여요.
7. 정리 — 전체 그림
두 편을 한 장으로 잇으면 이래요.
| 단계 | 하는 일 | 지키는 원칙 |
|---|---|---|
| 인입 | 판별(텍스트 레이어 유무) → 구조체 파싱 or OCR | 1편: 맞는 도구로 뽑기 |
| 지문 | 파일 해시로 고유 키 부여 | 재처리해도 중복 없음 |
| 검증 게이트 | 타입·합계·confidence + 거절 사유 | 틀린 값은 통과 금지 |
| 적재 | 통과분만 UPSERT | 멱등 — 두 번 돌려도 안전 |
| 사람 확인 큐 | 거절분을 근거와 함께 검토 → 재진입 | 자동 보정 금지, 사람이 확정 |
| 재시도·DLQ | 일시적만 재시도, 영구적은 격리 | 무한 재처리·비용 폭주 방지 |
| 관측 | 자동 적재율·거절률 기록 | 양식 변화를 숫자로 감지 |
가장 중요한 세 가지만 다시 짚을게요.
- 파이프라인은 상태 머신. 문서마다 상태를 DB 에 남기면 재시작·가시성·멱등성이 한꺼번에 따라와요.
- 멱등성과 재시도는 한 세트. UPSERT 없이 재시도하면 중복이 쌓여요. 지문 키 + UPSERT 를 먼저 깔고 재시도를 얹으세요.
- 틀린 값은 문 앞에서 돌려보낸다. 검증에서 걸린 건 자동 보정하지 말고 근거와 함께 사람 확인 큐로. 통과한 값만 테이블에 들어가야, PDF 에서 왔다는 이유로 데이터를 의심하지 않아도 돼요.
1편에서 “좋은 추출 도구보다 검증을 붙였느냐가 정확도를 만든다” 고 했는데, 2편은 그 검증을 수천 장 규모에서도 안전하게 굴리는 골격 이었어요. 판별로 시작해서, 검증 게이트로 거르고, 멱등 적재로 안전하게 넣고, 큐로 사람과 협업하는 — 이 골격 하나면 PDF 뭉치를 믿을 수 있는 테이블로 바꿀 수 있어요.
일단 오늘은 여기까지…..
PDF 두 편으로 판별부터 적재까지 한 바퀴 돌았네요. 다음엔 이 큐에 쌓인 검토 로그를 되먹여서 양식별 파서를 자동으로 붙이는 얘기를 풀어볼게요.
← 이전 글: (1/3) PDF 에서 데이터를 뽑아 DB 에 넣기 — 구조체 파싱 vs OCR, 뭘 언제 쓰나 | 다음 글 →: (3/3) 검토 로그를 되먹여 양식별 파서 붙이기 — 양식 클러스터링·앵커 추출·회귀 테스트