Archive

[Spark] AQE (Adaptive Query Execution) / DPP ( Dynamic Partition Pruning ) 본문

------- DE -------/Spark

[Spark] AQE (Adaptive Query Execution) / DPP ( Dynamic Partition Pruning )

enent 2022. 10. 5. 21:10
반응형

Spark 3.0에 들어서면서 Spark Core와 SQL 엔진에서 쿼리 속도를 높이기 위해 변화된 것 중 DPP(Dynamic Partition Pruning)과 AQE (Adaptive Query Execution)에 대해 설명한다.

1. 동적 파티션 정리 (Dynamic Partition Pruning) [링크]

[SPARK-11150] Dynamic partition pruning - ASF JIRA

Implements dynamic partition pruning by adding a dynamic-partition-pruning filter if there is a partitioned table and a filter on the dimension table. The filter is then planned using a heuristic approach: As a broadcast relation if it is a broadcast hash

issues.apache.org


DPP는 한 마디로 쿼리 결과에 필요한 데이터만을 동적으로 처리하는 것이다.

DPP는 Filter Push Down을 기반으로 Optimizing을 한다. Filter Push Down은 Filter를 Scan연산 보다 앞에 실행하여 Filtering 한 값을 하는 방법이다. Full Scan을 피하고 원하는 값만 읽어올 수 있다.

Filter Push Down


Partition Pruning이란 Partitioned File 에 대해 Filter Push Down 방법을 적용한 것으로, Partitioned File 에 Filter를 우선 적용하여 Filter 기준과 일치하는 Partitioned File만을 Scan 하는 것이다.

DPP 의 대표적인 예시로 Fact Table과 특정 Filter를 거친 Dimemsion table과의 Join을 들 수 있다. 아래의 그림처럼 Dimension Table로부터 Filtering 한 결과를 Fact Table에 대한 Scan 연산의 일부 결과로 집어넣어 처음부터 Data를 읽어 들이는 범위를 제한한다.



만약 12.2 그림처럼 Dimemsion Table이 Fact Table보다 작고 Join을 수행하는 경우라면, Broadcast Join을 시도할 것이다. 이 때, Spark는 더 큰 Fact Table에서 읽는 데이터 양을 최소화하기 위해 아래와 같은 작업을 한다.

① Dimension Table쪽에서 Spark는 Filter Query의 일부분이 될(=Build Relation) Hash Table을 구축한다.
② Spark Query 결과를 Hash Table에 연결하고(=Build RElation을 실행) 이를 Broadcast 변수에 할당한다. 이후 Spark는 Join 연산에 관여하는 Executor들한테 Broadcast 변수들을 배포한다.
③ Spark는 각 Executor에 배포된 Hash Table을 확인하여 Fact Table에서 어떤 데이터들이 관계가 될지 결정한다.
해당 Filter를 Fact Table File Scan 연산에 동적으로 적용하고 Broadcast 변수의 결과를 재사용한다. 이 방식으로 Filter와 맞는 Partition들만을 Scan 하여 필요한 데이터들만 읽을 수 있다.


DPP를 활성화하는 spark.sql.optimizer.dynamicPartitionPruning.enabled 옵션은 default값이 true이며, DPP를 적용할 수 있다고 판단될 때 실행된다.



2. AQE (Adaptive Query Execution) [링크]

[SPARK-31412] New Adaptive Query Execution in Spark SQL - ASF JIRA

SPARK-9850 proposed the basic idea of adaptive execution in Spark. In DAGScheduler, a new API is added to support submitting a single map stage.  The current implementation of adaptive execution in Spark SQL supports changing the reducer number at runtime

issues.apache.org

AQE는 런타임에 수집된 통계 정보를 바탕으로 물리 실행 계획을 재 최적화하는 것으로 기존에 사용되던 CBO(Cost-Based Optimizer)를 보완하는 개념이다.

이를 위한 방법으로는 아래와 같은 것들이 있다

  • Shuffle Partition의 개수를 줄여서 Shufle Stage에서 Reducer의 개수를 줄인다.
  • Query의 물리 실행을 최적화한다. (ex. SortMergeJoin을 BroadcastHashJoin으로 바꿈)
  • Join 하는 동안 데이터 skew처리

이 모든 Adaptive 한 측정은 실행 계획이 실제 실행되는 사이에 이루어진다.

사용하기 위한 방법은 spark.sql.adaptvie.enabled=true이며, 세부적으로 spark.sql.adaptive.coalescePartitions.enabled=true,
spark.sql.adaptive.skewJoin.enabled=true를 통해 어떤 작업을 허용할지를 설정할 수 있다.

AQE Framework

한 쿼리에서 Spark 연산들은 Pipeling으로 배치되고 Parallel 하게 실행된다. 하지만 Shuffle이나 Broadcast exchange 같은 경우는 한 stage의 output이 다음 stage의 input이 되어야 한다. 이렇게 되면 stage가 실행될 때까지 대기해야 하는, 즉, Parallel 한 실행이 끊기는 시점이 있다. 이를 Meterializaion Point, Pipe Broke Point라고 부른다. 이 시점에는 모든 파티션에 대한 통계치가 있고 이후의 연산이 시작되지 않은 시점이기 때문에, 이때 쿼리를 재검토하고 다시 최적화할 수 있다.



AQE's Conceptual steps

AQE Framework 가 반복적으로 수행하는 개념적 단계이다.


1. 각 스테이지에서 모든 Scan 연산 같은 Leaf Node들이 실행된다
2. 한번 Meterialization Point가 끝나면 Complete으로 표시하고, 실행 중 수집된 모든 통계정보들이 Logical Plan에서 갱신된다.
3. 통계 정보 ( 읽은 Partition 개수 등 )에 기반하여 Catalyst Optimizer를 다시 실행하여 아래의 내용을 수행할 수 있는지 파악한다.
a. 파티션의 개수를 줄여서 Shuffle 데이터를 읽는 Reducer의 개수를 줄인다 (Partition Coalescing)
b. SortMergeJoin을 테이블에서 읽어 들이는 데이터 크기에 기반해 Brodcast Join으로 변경한다 (Join Strategy Swtiching)
c. Skew Join을 해결하고자 시도한다. (Skew Join Optimization)
d. 새로운 Optimized Logical Plan을 작성하고, 그에 따라 새로운 Optimized Physical Plan을 만든다.

1) Partition Coalescing

Shuffle Partition의 설정은 매우 중요하며, Partition의 크기에 따라 너무 작으면 오버헤드가 많아지고, 너무 크면 Disk Spill 이 발생한다.
AQE는 동적으로 Shuffle Partition의 수를 조정할 수 있는데, 처음에 설정된 Partition 수 대로 처리할 때 너무 작은 Partition 이 있다면 Shuffle Stage에서 Coalescing을 통해 Dynamic 하게 Partition 수를 줄여 적절한 Partition 크기와 개수로 설정한다.



2) Join Strategy Switching

Spark 에선 기본적으로 Join 할 두 Dataset 중 하나 이상이 Memory에 올리기 충분하다면 Brodcast Hash Join을 한다.
때문에 Runtime시 정보를 바탕으로 SortMerge Join 등 다른 Join Algorithm을 Brodcast Hash Join으로 로 바꿀 수 있다면 실행계획을 Brodcast Hash Join로 바꾸어 실행한다.


3) Skew Join Optimization

Join 연산을 하다 보면 Shuffle 작업 시 특정 Key에 값이 Skew 되는 경우가 존재한다. Shuffle시에는 해당 Shuffle의 Parallel 처리가 모두 종료돼야 다음 Stage를 진행할 수 있어서, Shuffle 수행 시간이 전체 작업시간에 영향을 미치게 된다.

AQE를 사용하면 Runtime 통계적 정보들을 바탕으로 Join 시에 Skew Reader가 Partition Size로부터 Skew를 감지하고, Skew Partition을 더 작은 Partition 들로 나누어 준다. 이렇게 효율적인 처리를 통해 전체 작업시간을 줄일 수 있다.






Reference
Learning Spark (2nd) - 12. Epilogue: Apache Spark 3.0
https://eyeballs.tistory.com/248
https://dzone.com/articles/dynamic-partition-pruning-in-spark-30
https://kadensungbincho.tistory.com/89
https://www.slideshare.net/databricks/adaptive-query-execution-speeding-up-spark-sql-at-runtime
https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html

반응형
Comments