Archive

[Spark] Spark Application 최적화 및 튜닝 (Dynamic Allocation/Executor Memory/Partitions) 본문

------- DE -------/Spark

[Spark] Spark Application 최적화 및 튜닝 (Dynamic Allocation/Executor Memory/Partitions)

enent 2022. 9. 30. 01:15
반응형

0. Overview

대규모의 Spark Workload는 매일 야간에 혹은 몇 시간 간격으로 배치 잡으로 실행되는 경우가 많다. TB급 이상의 데이터를 처리하다 보면 resource 부족이나 점진적인 성능 저하에 따른 Job failure을 겪을 수가 있다. Resource 사용 최적화, Task의 병렬 실행, 다수의 Task에 의한 Bottleneck 문제를 어떻게 피하는지 등 문제 해결에 도움을 줄 Spark Configuration 들을 살펴보고자 한다.

1. Spark Configuration Setting

Spark Configuration 설정을 하는 방법들 중에서 우선순위가 존재한다.
spark-default.conf -> spark-submit을 통해 설정된 값 -> Spark Application 내 설정된 SparkSession을 통해 설정된 값순으로 읽으며, 순서대로 읽으면서 중복된 Setting은 뒷 순서에서 읽는 Setting으로 초기화 된다.

2. Static / Dynamic Resource Allocation

Static resource allocation
spark-submit 혹은 Spark application 내에서 Static 하게 자원을 지정한다면, 후에 Task들이 기다리는 상황이 발생하더라도 Spark에서 지정한 Config값 이상의 추가적인 resource를 할당할 수 없다.

Dynamic allocation을 사용하면, 요청에 맞게 Computing resource를 더 할당하거나 줄이도록 할 수 있다. Streaming이나 혹은 Daily로 유입되는 Data 양의 차이가 있는 workload들을 처리할 때, 혹은 Data 분석에 사용할 경우 필요한 옵션이다.


Dynamic allocation 을 사용하기 위해 True로 설정해야 하는 옵션은 아래와 같다. ( Spark 공식 문서에서 안내하는 방법이나, dynamicAllocatio.enabled=true 만 설정해도 사용엔 문제가 없는 것으로 보임 )

① spark.dynamicAllocation.enabled = true + spark.shuffle.service.enabled = true
② spark.dynamicAllocation.enabled = true + spark.dynamicAllocation.shuffleTracking.enabled = true

* spark.shuffle.service.enabled : external shuffle service를 가능하게 하는 옵션으로 executor가 쓴 shuffle file들을 보전하여 안전하게 executor를 제거할 수 있게 해 줌.
* spark.dynamicAllocation.shuffleTracking.enabled : executor에 대한 shuffle file tracking을 enable하게 하여 external shuffle service 없이도 Dynamic allocation을 가능하게 함.




추가적으로 설정해 줄 수 있는 옵션들은 공식문서 를 확인할 수 있다. 참고할 만한 config들이 있다.

Option Default Setting Description
spark.dynamicAllocation.minExecutors 0 최소 executor 수
spark.dynamicAllocation.schedulerBacklogTimeout 1s 해당 backlog timeout 시간이 이상 Scheduling 되지 않은 Task가 있으면 Driver가 해당 Executor가 실행되도록 요청
spark.dynamicAllocation.maxExecutors Inf 최대 executor 수
spark.dynamicAllocation.executorIdleTimeout 60s Task가 완료하고 해당 시간동안 Executor내 진행할 작업이 없으면 Driver는 Executor를 중지

 

3. Configuring Spark Executors Memory

1) Executor Memory Structure

Executor들이 Memory가 부족하거나 JVM Garbage Collection 문제를 겪게 하지 않으려면, Executor Memory에 대한 설정도 필요하다.

spark.executor.memory 는 각 Executor 에서 총 사용 가능한 Memory를 뜻한다.
내부적으로는 아래 그림처럼 실행 메모리 ( Execution Memory ), 저장 메모리 ( Storage Memory ), 예비 메모리 ( Reserved Memory )로 나뉘어 있다.

spark.memroy.fraction Default 설정(0.6)에 의하면 Reserved Memory 에 300MB, Execution Memory에 60%, Storage Memory에 40% 가 할당된다.

  • Reserved Memory : OOM Error 예방을 위해 할당된 메모리 (Java Heap Memory)
  • Execution Memory : Spark 의 Shuffle, Join, Sort, Aggregation 사용되는 메모리
  • Storage Memory : 사용자 데이터 구조 Caching, Dataframe에서 온 Partition을 저장하데 사용하는 메모리

나뉘어 있긴 하지만 일반적으로 Execution Memory, Storage Memory는 공유되는 영역이다. Execution Memory의 영역을 넘어서 Storage Memory로 사용 가능하고, 그 반대의 경우도 가능하다. 다만 Full로 한쪽을 사용하는 것에 있어서는, Storage Memory 에는 Threhold가 있어서, 총 Storage Usage가 Threshold 밑으로 떨어질 때 Threshold 영역을 제외한 나머지 공간을 Execution Memory로 사용할 수 있다. 이는 어떤 상황에서도 Cache 된 영역이 남겨진다는 것이다. 반대로 Execution Memory는 구현이 복잡하게 되어있어서 Storage Memory로 Executor Memory를 Full로 사용하는 일은 없을 수 있다.

spark.memory.storageFraction (Default 0.5) config를 통해 Storage Memory 로서 최소 보장되는 Memory 양을 설정할 수 있다.
이러한 구조는 불필요한 Disk Spill 이 일어나는 것을 방지하고, 다양한 Workload에 대해 합리적인 성능을 제공한다.

2) Map / Shuffle 작업 시 Executor Memory

Map / Shuffle 작업 시 Spark 는 Disk의 Shuffle File에 데이터를 쓰고 읽으므로 이때, 큰 I/O 작업이 발생한다. 때문에 이런 작업이 유난히 많은 대용량 Spark 잡 수행 시 병목 현상이 발생할 수 있다.
Map, Spill, Merge 작업 중 비 효율적인 I/O로 인한 영향도를 줄이고, 최종 Shuffle Partition을 Disk에 쓰기 전에 Buffer Memory 를 잘 이용할 수 있도록 Config를 조정할 수 있다. 아래의 Config들을 잘 조정하면 대용량 Spark Workload의 성능을 전체적으로 향상시키는 데에 도움이 된다. 보다 자세한 내용은 링크를 참고 바라며, 추후 여력이 될 때 한번 자세히 정리해 볼 예정이다.

Configuration Default Recommendation Description
spark.driver.memory 1GB   Driver가 Executor들에게 Data를 받기 위해 할당되는 memory 양.
collect() 같은 함수를 통해 driver에 많은 데이터를 받아와야 하는 경우 memory 가 부족할 수 있어 조정이 필요하다.
spark.shuffle.file.buffer 32KB 1MB 해당 값을 높임으로써 Spark 에서 Map 결과를 Disk 에 쓰기 전에 Buffering을 더 많이 할 수 있어 Disk 에 쓰는 횟수를 줄일 수 있다.
spark.file.transferTo true false False로 바꾸면 Spark 가 Disk에 쓰기 전에 File Buffer를 사용할 수 있어서 I/O 횟수를 줄일 수 있다.
spark.shuffle.unsafe.file.output.buffer 32KB 1MB Shuffle 작업 중 File Merge시 가능한 Buffer 크기에 대한 내용이다. 대용량 workload에서는 크게 가져가고, 기본 workload에선 default로 가져가도 문제 없다.
spark.io.compression.lz4.blockSize 32KB 512KB 압축하는 Block 단위를 크게 함으로써 Shuffle File Size를 줄일 수 있다.
spark.shuffle.service.index.cache.size 100MB   Cache entry들은 지정된 Memory 양에 따라 제한된다
spark.shuffle.registration.timeout 5000ms 120,000ms Shuffle service 등록을 위한 최대 대기 시간
spark.shuffle.registration.maxAttempts 3 5 Shuffle service 등록 실패시 재시도 횟수를 결정함

 

4. Partitions

Partition은 데이터를 관리 가능하고 쉽게 읽을 수 있도록 Disk의 연속된 위치에 Chunk나 Block의 모음으로 나눠서 저장하는 것이다. HDFS나 S3의 경우 Block사이즈는 128MB 이고, 연속된 Block 모음들이 한 개의 Partition을 만든다. Partition의 크기가 너무 작으면 Disk I/O가 많아지고, 잦은 File system 작업 등으로 인해 성능 저하가 생길 수 있고, 너무 크면 필요한 Memory 가 너무 늘어나 전체적인 Memory 양이 부족해질 수 있기 때문에 적절한 파티션 크기와 개수의 설정은 중요하다.

Partition을 용도에 따라 세가지로 분류할 수 있다.
① Input Partition
Spark 에서 처음 Parquet, JSON, ORC 같은 파일을 읽을 때 생성하는 Partition으로 spark.sql.files.maxPartitionBytes (Default 128MB)을 통해 설정할 수 있다. 대부분의 경우, 필요한 칼럼만 골라서 읽고 쓰기 때문에 Default 값인 128MB보다 작다.

② Output Partition
파일을 저장할 때 생성하는 Partition 으로 Dataframe API를 사용하는 코드 상에서 .repartition(num), .coalesce(num) 을 통해 생성할 수 있다. 목적에 따라서 Partition의 크기를 늘리거나(ex. groupBy 집계 후 File size를 늘릴 때) 줄이는(ex. 결과값 저장 시 파편화 방지) 목적으로 사용한다.

③ Shuffle Partition
Shuffle 단계에서 생성되는 Shuffle Partition의 경우, spark.sql.shuffle.partitions (Default 50MB) 옵션을 통해 설정 가능하며 Input Data의 크기에 따라 해당 config를 조정하여 너무 작은 Partition들이 Executor들에게 할당되지 않게 하는 것이 좋다.
그렇다고 Partition의 크기가 크고 연산에 쓰이는 메모리가 부족하다면 Shuffle Spill(데이터를 직렬화하고 스토리지에 저장, 처리 이후에는 역 직렬 화하고 연산 재개함)이 일어날 수 있고, 이에 따라 Task가 지연되고 에러가 발생할 수 있다.

Shuffle Partition 개수를 정하는 공식은 없으며 Workload, Resource 활용 방식, Core 개수, Executor 메모리 가용량에 따라 달라지기 때문에 Test를 반복하여 맞는 개수를 찾아가는 수밖에 없다. 일반적으로는, 하나의 Shuffle Partition 크기가 100~200MB 정도 나올 수 있도록 spark.sql.shuffle.partitions 수를 조절하는 것이 최적이라고 한다.

 

5. Maximizing Spark parallelism

Partition은 Parallel하게, Independent 하게 처리할 수 있어서 Spark 가 대규모의 Parallel 한 데이터 처리가 가능하다.

Spark에서 최대한 Resource를 활용하기 위해서는, Executor에 할당된 Core의 개수만큼 Partition들이 최소한으로 할당되는 것이다. 여기서 말하는 최소한은 1 개의 Core당 1 개의 Partition이 할당되는 상황을 의미한다. 1개의 Core에 여러 개의 Partition이 할당된다면 여러 개의 Thread를 통해 처리하여 Core가 계속 바쁜 상태를 유지할 것이다. 때문에 1개의 Core에 1개의 Thread로 1개의 Partition을 처리하는 것이 이상적이다.

 

Reference
Learning Spark(2nd) - 7. Optimizing and Tuning Spark Applications
https://spark.apache.org/docs/latest/tuning.html#memory-management-overview
https://tech.kakao.com/2021/10/08/spark-shuffle-partition/

 

반응형
Comments