Spark History MCP + AI Agent로 Spark 분석 자동화하기
- 승은(Sandy)/피플&컬쳐팀/Dreamus 이
- 2025년 12월 19일
- 7분 분량
안녕하세요! FLO 데이터&추천팀의 Ken, Julian입니다.
저희 데이터&추천팀에서는 매일 수십 개의 Spark 배치 작업을 운영하고 있습니다. 추천 모델 학습, 사용자 행동 데이터 집계, 데이터마트 생성, 사용자 추천 데이터 추출 등 다양한 ETL 파이프라인이 Amazon EMR 클러스터에서 돌아가고 있죠. 그런데 이 Spark 작업들이 실패하면... 정말 머리가 아픕니다.😱
"어? 새벽 3시에 돌린 작업이 실패했네? 로그 보러 가야겠다..."
"Stage 47이 실패했는데 왜 실패했지? Executor 로그를 뒤져봐야 하나?"
"Data Skew인가? OOM인가? 아니면 설정 문제인가?"
이런 상황에서 Spark UI, CloudWatch Logs, Amazon EMR 콘솔을 오가며 몇 시간씩 디버깅하는 건 일상다반사였습니다. 특히 주말이나 휴일 새벽에 장애가 발생하면... (더 이상 말하지 않겠습니다.😭)
우리가 겪었던 Spark 디버깅의 고통
데이터&추천팀에서 운영하는 Spark 작업은 다음과 같은 특징이 있습니다:
1. 복잡한 데이터 파이프라인
하루 수십 GB의 사용자 음원 청취 로그 처리
긴 Stage들로 구성된 DAG
다양한 데이터 소스 Join (S3, RDS, MongoDB..)
2. 빈번한 에러 발생
OOM (Out of Memory): Executor 메모리 부족으로 작업 실패
Data Skew: 특정 파티션에 데이터 집중으로 인한 타임아웃
Shuffle Spill: 과도한 Shuffle로 인한 성능 저하
네트워크 이슈: S3 read timeout, connection refused
3. 디버깅의 어려움
그리고 문제 발생 후, 트러블슈팅을 위한 과정을 구분해서 나열하자면 다음의 순서로 진행합니다.

이 과정이 평균 1시간, 복잡한 경우 그 이상 소요되었습니다.
더 큰 문제는 Spark 내부 동작에 대한 깊은 이해가 필요하다는 점이었습니다. Lazy Evaluation, Physical Plan, Catalyst Optimizer 등을 이해하지 못하면 근본 원인을 찾기 어려웠죠.
AI가 대신 분석해주면 어떨까?
어느 날 팀 회의에서 이런 이야기가 나왔습니다.
"우리가 매번 Spark UI 보고, 로그 뒤지고, 메트릭 분석하는데... GPT한테 물어보면 안 될까?"
처음엔 반신반의했지만, AWS에서 Spark History Server MCP를 오픈소스로 공개했다는 소식을 접하고 본격적으로 프로젝트를 시작했습니다.
우리가 만들고 싶었던 것
Spark 작업이 실패하면 자동으로 감지
AI가 Spark History Server 데이터를 조회하여 원인 분석
분석 결과를 Slack으로 전송
해결책까지 구체적으로 제시
이렇게 하면 새벽에 에러가 발생해도 아침에 출근해서 Slack만 보면 어느 정도 원인과 해결책을 알 수 있지 않을까요? 🤔
해결 방안: SparkListener + n8n AI Agent
저희는 다음과 같은 아키텍처를 설계했습니다.

EMR 영역: Spark 에러를 실시간으로 감지하는 곳
왼쪽의 파란색 EMR 영역은 실제 Spark 작업이 실행되는 공간입니다. 이 영역의 핵심은 Spark와 EventListener의 결합입니다. 이 리스너는 Spark 작업이 실행되는 동안 모든 이벤트를 실시간으로 모니터링합니다.
EMR 영역 아래쪽에는 Spark History Server (a.k.a. SHS)가 있습니다. 이건 EMR이 기본적으로 제공하는 서비스로, 모든 Spark 작업의 히스토리를 저장합니다. Application 목록, Stage와 Task의 실행 메트릭, Executor의 리소스 사용량, SQL 실행 계획까지 모든 걸 REST API로 제공하죠. 나중에 AI Agent가 이 서버에서 필요한 데이터를 가져와서 분석합니다.
AI Workflow 영역: 에러를 분석하고 해결책을 제시하는 곳
오른쪽의 초록색 AI Workflow 영역은 n8n으로 구성된 자동화 파이프라인입니다. 가장 먼저 HTTP Webhook 노드가 Spark에서 전송된 에러 정보를 받습니다. 이 노드는 Spark의 EventListener가 Stage 실패를 감지하면 이 주소로 JSON 데이터를 전송합니다. JSON에는 Application ID, EMR Cluster ID, 에러 타입, 에러 메시지, Stage 정보 등이 모두 담겨 있습니다.
저희는 OpenAI의 GPT-5-mini 모델을 사용했는데, 이 모델은 빠른 응답 속도와 저렴한 비용이 장점이면서도 Spark 에러 분석에는 충분한 품질을 제공합니다. AI 에이전트는 "j-2AXXXXXXGAPLF 클러스터의 application_1761710205960_0056 애플리케이션 에러 원인을 분석해 주고 해결책을 자세히 제시해 줘"(물론 실제 프롬프트는 훨씬 복잡합니다.😇)라는 자연어 프롬프트를 받아서 분석을 시작합니다.
AI 에이전트의 아래는 Spark History MCP Server가 연결되어 있습니다. MCP는 Model Context Protocol의 약자로, AI가 외부 시스템의 데이터를 구조적으로 조회할 수 있게 해주는 표준 프로토콜입니다. AWS에서 오픈소스로 공개한 이 MCP 서버는 Spark History Server의 REST API를 AI가 자연어로 호출할 수 있게 해주는 브릿지 역할을 합니다.
핵심 컴포넌트
1. SparkListener: Spark의 숨겨진 모니터링 인터페이스 🎯
Spark를 사용하다 보면 Spark UI에서 Stage, Task, Executor 정보를 볼 수 있습니다. 그런데 이 정보들이 어떻게 실시간으로 업데이트될까요? 바로 SparkListener 덕분입니다.
SparkListener란?
SparkListener는 Spark 내부에서 발생하는 모든 이벤트를 실시간으로 구독(Subscribe)할 수 있는 Observer 패턴 기반의 인터페이스입니다. Spark는 작업이 진행되는 동안 수백, 수천 개의 이벤트를 발생시키는데, SparkListener를 구현하면 이 이벤트들을 프로그래밍 방식으로 캡처할 수 있습니다.

이렇게 SparkListener를 활용하면 Spark가 "왜 실패했는지"를 실시간으로 알 수 있고, 이를 AI에게 전달하여 자동으로 원인을 분석할 수 있습니다.
2. n8n AI Agent (자연어 분석)
n8n은 no code 워크플로우 도구입니다. 우리는 다음과 같은 워크플로우를 구성했습니다:
[Webhook]
↓ (에러 정보 수신)
[AI Agent]
├─ OpenAI GPT-5-mini
└─ Spark History Server MCP (메트릭 조회)
↓ (분석 결과)
[Slack 알림]
3. Spark History Server MCP (메트릭 조회)
Spark History Server MCP는 기존 Apache Spark 인프라와 AI 에이전트를 연결하여 다음과 같은 일을 할 수 있게 해 줍니다.
🔍 자연어로 작업 세부 정보를 물어보고 조회할 수 있습니다.
📊 여러 애플리케이션의 성능 지표를 모아서 분석할 수 있습니다.
🔄 여러 번 실행된 작업을 비교해, 이번 실행이 예전보다 느려진 지점을(regressions) 찾아낼 수 있습니다.
🚨 상세한 오류 정보를 바탕으로 실패 원인을 분석할 수 있습니다.
📈 과거 실행 데이터를 기반으로 다양한 인사이트를 얻을 수 있습니다.
구현 과정
저희는 위 구조의 파란 영역, 초록 영역을 2명의 인원이 각자 맡아서 개발을 진행했습니다. 각 영역은 마치 Server-Client API 개발하듯 HTTP 요청/응답 규격만 맞추면 되는 작업이라 커플링 없이 독립적으로 개발을 했습니다.
1. SparkListener 개발 가장 먼저 저희가 해결해야 했던 문제는 "어떻게 Python으로 Java 인터페이스를 구현할 것인가?"였습니다. Spark는 Scala와 Java로 작성되어 있고, SparkListener도 엄연히 Java 인터페이스입니다. 그런데 PySpark는 내부적으로 Py4J라는 라이브러리를 사용해서 Python과 JVM을 연결합니다. 이 Py4J가 제공하는 특별한 패턴을 사용하면 Python 클래스를 Java 인터페이스처럼 보이게 만들 수 있습니다.
핵심은 Java 클래스 선언을 Python 안에 만드는 것입니다. Python 클래스 안에 class Java라는 내부 클래스를 만들고, 그 안에 implements = ["org.apache.spark.scheduler.SparkListenerInterface"]라고 선언하면 됩니다. 이렇게 하면 Py4J가 이 Python 객체를 Java 쪽에서 SparkListener로 인식하게 됩니다.

2. SparkContext에 리스너 등록
리스너를 구현했으면 이제 Spark에 등록해야 합니다. 여기서도 Py4J의 도움이 필요합니다. PySpark의 SparkContext는 Python 객체처럼 보이지만 실제로는 내부에 Java의 SparkContext를 가지고 있습니다. 이 Java SparkContext에 접근하려면 spark.sparkContext._jsc.sc()라는 경로를 타고 들어가야 합니다.

3. ZIP 파일로 패키징하고 배포
리스너를 구현하고 등록하는 코드를 작성했으면 이제 배포 단계입니다. PySpark의 --py-files 옵션은 Python 패키지를 배포할 수 있게 해주는데, 여기서 중요한 점은 반드시 ZIP 파일이어야 한다는 것입니다. Wheel(.whl) 파일은 지원하지 않습니다. 저희가 만든 SparkListener를 ZIP 파일은 S3에 업로드했고, 다음과 같이 사용하게 됩니다.

이렇게 세 단계만 거치면 끝입니다. Python으로 SparkListener를 구현하고, SparkContext에 등록하고, ZIP 파일로 배포하면 Spark 작업이 실행되는 동안 자동으로 에러를 감지해서 N8n으로 전송합니다. 처음엔 복잡해 보였지만 막상 해보니 PySpark의 내부 구조를 이해하는 좋은 기회였습니다.
4. Spark History MCP 서버
AWS의 Spark History Server MCP는 항상 구동(Running) 중인 EMR 클러스터에서만 통신이 가능하도록 구현되어 있습니다. 저희 팀은 Transient EMR 클러스터를 사용하고 작업이 끝나면 클러스터가 자동으로 종료되는데, AI가 분석하는 동안 EMR이 종료될 수 있다는 가능성이 있었습니다.
다행히 Amazon EMR은 Persistent Spark History Server를 제공합니다. EMR 클러스터가 종료되어도 Spark 이벤트 로그를 S3에 저장해두고, 별도의 영구적인 History Server를 통해 조회할 수 있는 기능입니다. 하지만 오픈소스 MCP 서버는 이 Persistent History Server를 지원하지 않았습니다..🥲 또르르
결국 오픈소스 코드를 직접 수정하여 Persistent History Server와 통신할 수 있도록 개선할 수밖에 없었습니다.
실제 사용 예시
어떤 워크플로우에서 장애가 발생한 상황이고, Stage 50에서 발생한 에러, 구체적으로는 OOM 때문에 해당 배치 프로세스가 비정상 종료된 상태입니다. 여기서 AI Analysis는 다음과 같이 2개의 전략을 제시해 줬습니다.
단기 조치: 긴급 장애 대응
장기 조치: 단기 조치로 대응한 워크플로우의 근본적인 원인 해결법
결론부터 말씀드리면, 단기 조치는 리소스 조정이었고, 장기 조치는 코드 수정이었습니다. AI가 제시한 단기 조치를 적용해서 급한 불은 끌 수 있었고, 장기 조치로 근본적인 문제를 해결할 수 있었습니다.
우선, 다음 스크린샷처럼 OOM이 발생한 Stage 50번의 Spark UI에서는 사람이 분석하기엔 쉽지 않습니다.🤔

그렇다면 이 상황을 AI는 어떻게 분석했을까요?🤖

단기 조치
분석 결과에서 아래와 같이 즉시 조치 설정만 적용을 해보았습니다.
spark.executor.memory: 12g → 18g
spark.executor.memoryOverhead: default → 4g
AI Agent가 알려준 위의 단기 조치(리소스 증설)로 긴급 대응을 할 수 있었습니다. 그렇게 긴급 대응 후에, 아래의 장기 조치 지침을 따라 코드 분석을 해보았습니다.
장기 조치

AI Agent가 Spark History Server 로그 분석을 통해 제시한 '장기 조치'의 핵심은 UDF(User Defined Function)의 사용과 대규모 리스트에 대한 collect_list 연산을 주요 병목 원인으로 지목한 것이었습니다. 이에 따라 다음과 같은 구체적인 코드 개선 작업을 수행했습니다.
PySpark Native Function 전환

수정 전 (UDF)

수정 후 (PySpark Native Function)
UDF는 직렬화(Serialization) 오버헤드가 발생하고 Catalyst Optimizer의 최적화를 받지 못하는 단점이 있었습니다. 이를 PySpark Native Function으로 전면 교체하여 실행 계획이 최적화되도록 수정했습니다.
또한 collect_list 집계 전 데이터를 필터링하고 길이를 제한(truncate)하여 연산 부하를 근본적으로 줄였습니다.
AI Agent의 분석을 토대로 코드를 개선한 최종적인 결과는 다음과 같습니다.
리소스 효율화(단기 조치): Executor 메모리를 기존 12GB에서 8GB로 약 33% 축소한 환경에서도 OOM이나 에러 없이 배치가 안정적으로 완주되었습니다.
성능 향상(장기 조치): Spark Native 함수 활용과 전처리 최적화 덕분에 배치 실행 시간이 약 60분에서 35분으로 약 40% 단축되었습니다.
결론적으로, 코드를 보지 않고 Spark History Server 로그만으로 문제의 원인을 짚어낸 AI Agent의 분석은 실제 코드 레벨의 원인과 일치했습니다. 이는 분석 시스템이 단순한 로그 정리를 넘어, 엔지니어에게 실질적인 인사이트를 제공할 수 있음을 보여주었습니다.
개선 효과
저희 트러블슈팅 프로세스를 다시 살펴볼까요? 프로젝트를 시작하기 전, 이렇게 6단계로 트러블슈팅을 진행했었습니다.

이 과정이 이제는 다음과 같이 개선되었습니다.

물론 이런 개선 효과를 정량적으로 측정하기는 쉽지 않습니다. 에러의 복잡도에 따라 분석 시간이 천차만별이고, 사람마다 Spark에 대한 이해도도 다르기 때문이죠. 하지만 저희는 엔지니어들이고, 엔지니어라면 체감한 개선 효과를 그래도 수치로 표현해 봐야 하지 않을까요? 그래서 다음과 같이 수치로 나타내보면,
개선지표
항목 | 성과 | 지표 |
트러블슈팅 단계 | 6단계 → 3단계 | 50% 🔻 |
사람의 수동 분석 시간 | 최소 30분 → 0분 | 불필요 |
VPN 접속 및 여러 UI 탐색 필요 | 필수 → 불필요 | 불필요 (이 항목이 가장 만족스러운 성과입니다) |
이 프로젝트는 단순한 자동화를 넘어서 Spark의 Observability를 확보한 것이라고 할 수 있습니다. 그동안 저희 팀은 Spark 작업 장애가 발생했을 때 주로 사후 로그 분석에 의존해 왔고, 실시간 모니터링과 구조화된 메트릭 기반의 분석 측면에서는 개선 여지가 있었습니다. 이제는 SparkListener를 통해 모든 이벤트를 실시간으로 캡처하고, Spark History Server의 메트릭을 구조화된 방식으로 조회하며, AI를 통해 자동으로 분석하는 시스템을 갖추게 되었습니다.
향후 계획
1. AI 메모리(Long-term Memory)로 진화하는 SparkBot
a. 프롬프트의 한계와 메모리의 필요성
좋은 답변을 얻으려면 구체적이고 명확한 프롬프트가 필수적이라는 사실은 누구나 알고 있습니다. 하지만 우리가 실무에서 얻은 방대한 Spark 튜닝 지식과 히스토리를 매번 프롬프트에 무한정 Append 하는 것은 불가능합니다. 토큰 제한(Context Window)이 있을뿐더러 비효율적이기 때문입니다. 따라서 휘발되는 지식을 외부 메모리(Vector DB)로 구조화하여 AI가 언제든 꺼내 쓸 수 있는 지식 베이스로 만들어야 합니다.
b. 단순 반복 분석에서 '지식 기반' 분석으로
지금의 AI는 매번 'Reset' 상태로 분석을 시작합니다.
"UDF 성능 병목은 Query Plan의 특정 연산자에서(ArrowEvalPython) 확인해야 한다"
"DataFrame Write 단계와 UDF 실행 시에는 On-heap이 아닌 Off-heap 메모리를 집중적으로 사용한다"
이런 중요한 기술적 맥락을 매번 새로 설명하지 않으면 AI는 겉핥기 식 답변만 내놓게 됩니다. "지난주에 똑같은 OOM 에러를 16GB로 해결했었는데..."라는 기억도 전무하여 똑같은 API를 호출하고 똑같은 질문을 반복합니다.
c. 해결책: 과거 경험과 심층 지식의 결합
벡터 DB를 연결하면 AI는 팀의 과거 분석 결과, 해결책, 그리고 고도화된 튜닝 지식을 모두 저장하고 학습합니다. 에러가 발생하면 단순히 로그만 보는 것이 아니라 과거 기록과 기술 지식을 검색(RAG)하여 다음과 같이 제안할 수 있게 됩니다.
"Stage 47에서 OOM이 또 발생했네요. 로그를 분석해 보니 UDF 실행 구간입니다. 기억된 지식(Knowledge Base)에 따르면 UDF와 Write 작업은 Off-heap 메모리를 사용합니다. 지난번(2025-01-15) 유사 사례에서도 Heap 대신 Off-heap 설정을 튜닝하여 해결했던 이력이 있습니다. 이번에도 spark.memory.offHeap.enabled 옵션과 메모리 크기 조정을 권장합니다."
2. 사람 개입 없이 스스로 해결하는 시스템(완전 자동화)
가장 야심찬 계획은 사람의 개입 없이 에러를 자동으로 해결하는 시스템을 만드는 것입니다. 지금은 AI가 분석 결과를 Slack으로 보내면, 어쨌든 사람이 AI가 제시한 해결 방법을 적용하여 재실행을 수행해야 합니다. 하지만 많은 경우 AI가 제시한 해결책이 명확하고 안전하다면, 굳이 사람이 개입할 필요가 없지 않을까요?