Archive

[Spark] Spark Optimization 전략 본문

------- DE -------/Spark

[Spark] Spark Optimization 전략

enent 2022. 11. 18. 16:48
반응형

1. Row 및 Column 필터링은 최대한 앞 쪽에서 

데이터를 변환하기 전, filter 함수를 먼저 적용하고 필요한 Column 만 선택하여 필요한 데이터만 처리할 수 있도록 한다. 처리 시간을 줄이고 Spark Application을 좀 더 효율적으로 만들 수 있다. 

 

2. 적절한 File Format 선택

Avro와 같은 Row 기반 파일 형식은 Write는 빠르지만 Read는 느리고, ORC와 Parquet과 같은 Column 기반 파일 형식은 Avro에 비해  Write는 느리지만 Read는 더 빠르다. 때문에 Avro 같은 Row기반 파일 형식은 Write Once, Read Once 일 때 적합하고, ORC나 Parquet처럼 Column 기반 파일 형식은 더 자주 Read 하고 Transform 이 일어나는 파일일 때 적합하다.

Parquet은 스파크에서 가장 많이 사용되는 파일 형식이다. Data Skipping을 이용한 빠른 Query 성능을 제공하고, Storage에서 항상 압축되어 저장된다. Data Skipping을 통해 Spark 가 전체 파일을 읽지 않고도 필요한 Row와 Column만 읽어 처리할 수 있다. 


3. Caching

  • Cache : 데이터를 Memory에 저장하고 Spill 된 데이터를 Disk에 저장한다.
    - 많은 양의 데이터가 Memory에 캐시 될 때 Compute memory를 차지할 수 있으므로 데이터를 처리하기에 충분한 Compute Memory가 있는지 확인해야 한다.
  • Persist : 데이터를 Memory 나 Disk 중 어디에 Cache 할지 위치를 지정할 수 있다.
    - Disk 에 캐시 하게 되면, Compute power를 차지하지 않게 된다. 또한 unpersist() 함수를 통해 계산이 완료되고 나면 캐시 된 Dataframe을 Unpersist 해주는 것도 좋은 방법이다. 

 

4. Enable AQE

Adaptive Query Execution은 스파크 3.0에 도입된 새로운 기능으로 스파크 3.2부터 기본적으로 활성화된다. AQE는 Query가 실행되는 동안 Spark가 Runtime에 Query Plan을 변경할 수 있도록 하여 더 빠른 쿼리 성능을 제공한다.

 

spark.conf.set("spark.sql.adaptive.enabled", True)를 사용하여 활성화할 수 있다.

AQE를 사용하게 되면 Runtime 통계 정보에 기반하여 Shuffle 되는 파일의 Data 크기를 알 수 있다. 따라서, Runtime에 Query Plan을 실행할 때, Shuffle 파일의 크기에 따라서 Shuffle Sort Join이나 Broadcast Join 중에 Join 전략을 Runtime에 바꿀 수 있고, Shuffle Partition의 개수를 Coalesce 하거나, Skew Join을 최적화할 수 있다.

 

AQE의 3가지 장점

1) Join Strategy Switch

처음에 Query Plan을 만드는 동안 쿼리 내에 Join 조건이 있으면, Spark는 Data가 아직 Load 되지 않은 상태의 Table 크기를 읽은 후 테이블들 중 한 개의 사이즈가 15MB 보다 작을 경우 Broadast Join을 사용한다. 15MB보다 크다면 일반적으로 사용하는 Sort-Merge Join 사용한다. 

  • Broadcast Join : Table을 모든 Executor의 Memory에 Push 하며 Join시 Data Shuffle이 일어나지 않음
  • Sort Merge Join : Node들 간의 Data Shuffling을  유발하여 비용이 많이 듦

 

AQE를 통해 Spark는 물리적인 Query Plan을 Runtime 통계정보를 사용하여 Runtime에 수정할 수 있다. 예를 들어, 쿼리가 실행될 때 처음에 Spark는 Join 할 두 테이블의 사이즈가 15MB가 넘는다는 것을 파악한 후 Sort-Merge Join을 선택할 후 있지만, Run time에 Spark는 Filter를 직접 Data Source에 적용하는 Filter Push down을 예측한다.

Filter Pushdown 이란 Filter 연산을 수행할 때 Filed Casting 등의 연산이 있는 경우를 제외하고 자동으로 Data Source 에 직접 Filtering을 적용되는 것이다. 이를 통해 I/O를 줄일 수 있다.

즉, Filter Pushdown을 통해 Load 되는 테이블의 크기를 15MB 미만으로 줄일 수 있게 되고 이 때 AQE가 활성화 되어있다면  Spark가 Query Plan을 Sort-Merge Join에서 Broadcast Join로 변경하여 쿼리 성능을 향상할 수 있다. 

 

 

2) Coalesce Shuffle Partition

spark.sql.sql.partitions는 개발자가 모든 Job에 대해 설정해야 한다. spark.sql.sql.partitions 를 설정하면 설정된 값으로 모든 동일한 Partiition을 출력한다. Default 값은 200이다. Cluster의 총 Core 수와 비례하는 Shuffle Partition을 설정하는 것이 좋다.

 

만약 AQE가 활성화 되어있으면, Spark는 Runtime 통계정보를 기반으로 최적의 Shuffle Partition을 선택한다. 조금 더 디테일 하게는, Spark 가 Runtime 통계를 기반으로 Shuffle Partition을 자동으로 Scale down 할 수 있도록 총 Core 수에 비례하여 더 높은 Shuffle Partition을 설정한다.  (AQE의 역할 자체가 너무 많은 수의 작은 Partition들을 크고 적은 Partition으로 줄여주는 역할이기 때문이다)

3) Optimize Skew Join

Data Skew는 Memory 내 파티션들이 불균형하게 분포되어 있을 때 발생한다. 
AQE가 활성화 되어 있으면 Spark는 Query가 실행되는 동안 Shuffle File 통계정보에서 Skewness를 감지할 수 있고, 각 작업이 거의 동일한 시간에 완료되도록 Skewed 된 데이터에 대해 Partition을 하위로 더 생성하여 Skewness를 방지할 수 있다. 

 

 

5. Broadcast Join

Broadcast Join은 기본적으로 모든 Worker Node에서 Master Node로 Table Data를 수집하고, 전체 Table을 다시 모든 Worker Node로 다시 배포한다. Broadcast Join을 사용하는 것은 결국 Shuffle을 방지하여 Join 연산의 속도를 높이는 것이며 일반적으로 Join 하려는 테이블 중 하나의 크기가 15MB일 경우에 사용된다. 전체 Table이 모든 Worker Node에 존재하기 때문에 Data Shuffle이 발생하지 않고 이에 따라 성능이 향상될 수 있다. Spark에서 Default Broadcast Size는 10MB이다.

Spark.conf.set("spark.sql.autoBroadcastJoinThreshold", ["수정할 값"]) 을 통해 값을 변경할 수 있다.

 

AQE가 활성화 되어있으면, Spark는 자동으로 최적의 Join연산을 직접 선택한다. 

 

6. Optimize shuffle partition

AQE 장점 2) 에서 말했듯, 개발자는 모든 Spark Application에 대해 Shuffle partition을 설정해야 한다. Default Partition Size는 200이다. 대부분의 경우 Shuffle Partition을 Cluster의 Core수와 동일하게 설정한다.(Core 수 = Partition 수)
만약 Cluster에 비해 파일 사이즈가 매우 큰 파일이 있다면, Shuffle Partition의 크기를 Cluster의 Core 수에 비례하게 설정할 수 있다. Cluster보다 큰 파일에 대해 작은 Shuffle Partition을 선택하면, 한 개의 Partition 이 가지고 있는 데이터가 많아져 OOM Error가 발생한다.

 

 

7. Avoid Skew and Spill

Skew 된 상태라는 것은 Partition들 중 하나가 매우 크다는 것을 의미한다. 이렇게 되면, Skew된 Data Partition이 다른 Task보다 더 오래 실행되고, 최종적으로는 Skew Data Partition이 다 처리된 후에야 Stage가 완료된다.

또한 충분한 Executor Memory가 없는 경우에 Skew는 Spill을 야기할 수 있다. 이 때, Spill은 사용 가능한 Memory가 없어서 Spark 가 Disk에 중간 과정의 데이터를 쓰는 것을 의미한다.

AQE가 활성화 되어 있다면, Partition을 현재 가용 상태에 맞게 Scale down을 하기 때문에 Skew를 피할 수 있고 Spark Memory가 충분한 상태인지 자동으로 파악하여 Spill을 방지할 수 있다. Skew와 Spill의 경우 Spark UI에서 확인할 수 있다.

 

 

8. Pass schema manually

Spark는 Schema를 추론하기 위해 CSV, JSON 파일 형식의 경우 전체 파일을 스캔해야 하고 이를 위해 내부적으로는 가능할 때마다 스키마를 수동으로 전달한다. Parquet 파일의 경우엔, Spark는 Schema를 추론하기 위해 적어도 한 개의 Part 파일을 읽어야 한다. 

 

9. Avoid scanning issue

Table이 년, 월, 일처럼 잘게 Partitioned 되어있는 파일은 Table로 등록한다. Table로 등록하면 Metastore에 데이터를 미리 등록할 수 있어 Query 성능이 빨라진다. 또한 너무 많은 작은 파일이 생성되는 것을 coalesce 등을 통해 피하는 것이 좋다. 파일이 작으면 폴더 내 파일들을 스캔하는데 더 많은 시간이 걸려 Query 성능이 느려지게 된다.

 


10. 고차 함수 및 UDF 사용 피하기

Pyspark의 경우 모든 Executer에 Python과 Java 사이에 코드 직렬화라는 과정이 있어 Query 성능이 느려질 수 있다. 때문에 가능한 UDF 함수 대신 Spark SQL 함수를 사용하는 것이 좋다.

 

 

11.  Bucketing

Bucketing은 쉽게 말해 데이터를 파일별로 나누어 버켓에 저장하는 것이다. 데이터를 미리 Shuffle 하여 특정 Partition 혹은 Part File에 위치하도록 하는 것이다. 즉, Partition 내의 Partition이라고 볼 수 있다. 이 때, Part File의 위치와 내용은 Metastore에 의해 추적된다. 

 

Bucketing을 통해 Table사이즈가 TB 혹은 그 이상이 될 때의 Shuffle 연산을 피할 수 있다. 주로 TB 이상의 Dataset에서 자주 Query 혹은 Join 연산이 일어나는 경우에만 사용해야 한다.

Bucketing을 실행하려면, 두 테이블의 크기가 현저히 달라도 ( Ex. 한 테이블은 TB, 다른 한 테이블은 MB) 두 테이블 모두 같은 Column과 같은 개수의 Bucket으로 Bucketing 되어 있어야 한다

디스크에서 파일을 읽을 때 Spark Partition의 수는 Dataset의 Bucket 수와 같다. Partition을 계산할 때, defaultMaxPartitionBytes 에 설정된 값은 무시된다.


Bucketing은 두 테이블을 매우 자주 Join 하여 Shuffle을 방지하고자 할 때 유용하다. Bucketing 된 데이터를 가져오기 위해 미리 데이터를 Shuffle 한 후 Bucketing 된 데이터와 join 연산을 하기 때문에 쿼리 성능이 향상된다. 자주 사용하는 테이블에 유용하며 만약 Join 연산을 한 번 수행 되는 상황이라면 bucketing을 하지 않는 것이 낫다.

 


12. Disk Partitioning

Disk Partition의 장점은 Spark가 파일에서 필요한 Row와 Column만 읽을 수 있는 Prediate Pushdown을 할 수 있다는 것이다.

Predicate Pushdown은 데이터를 읽을 때 Filter 후에 Scan 이 일어나는 것으로, 데이터를 읽을 때부터 효율적인 처리가 가능하다.

즉, Column Name과 Data Type을 하위에서부터 즉, 실제 데이터를 읽을 때가 아닌 Scanning 중에 유추할 수 있기 때문에 궁극적으로는 I/O가 줄일 수 있다.

 

 

13. Z-order Indexing

Z-Order Indexing은 Disk Partitioning과 비슷한 역할을 하나 아쉽게도 현재는 Databricks에서만 사용할 수 있다.

Z-Order Indexing은 Parquet File을 최적의 Size로 결합하여 Small File Size 이슈를 방지하고, 주요한 Column들에 대해 빠른 Query 성능을 제공한다.

또한 수십억 Record를 가지고 있는 Dataset에서 단 한 번만 발생한 Record를 Filtering하는 것과 같은  매우 선택적인 Query에 대해서도 Scan time을 줄여준다. Cardinality 가 높은 Column(Primary Key)를 사용하여 특정 Record를 Filtering 하기 때문에 가능하다.

Z-Order 알고리즘을 사용하여 Disk에서 Spark로 데이터를 읽는 동안 Data Skipping을 수행하기도 한다.다.


Z-Ordering를 Column 수에 상관없이 사용해도 되지만 가장 사용빈도가 높은 Column에 대해서 먼저 Ordering 하는 것이 좋다. 추가적으로, Column을 추가하는 것은 Z-Ordering의 성능을 감소시킨다.

 

When to use partitioning vs z-ordering:

- Partitioning : Low Cardinality Columns (부서 / 연도 등)처럼 Filter를 걸어 Record Group을 추출할 때

- Z-Ordering : High Cardinality Columns (ID / PK 등)처럼 10억 개의 Record table에서 한 개의 record를 추출할 때

 

Summary 

 

1. Shuffle Partition을 Cluster Core 수와 같거나 비례하도록 설정한다

 

2. Disk Partition은 Year, Month, Date와 같은 Low Cardinality Columns에 대해 Filter가 Record Group을 추출할 때 사용한다. Disk Partition은 일부 파일만 읽는 대신에 Partition Column 값을 Directory와 하위 Directiory에서 추론할 수 있어 Predicate Push Down 및 I/O 를 줄일 수 있다.

 

3. Bucketing의 경우 TB 범위에서 자주 Join 되는 테이블에 유용하다. Bucketing을 사용하면 Bucket 연산을 하는 동안 데이터가 이미 Shuffle되고 Join 할 때 Bucketing 된 Table은 Shuffle을 하지 않아도 되기 때문에 쿼리 성능이 향상된다. Bucketing은 Meta Store에 Part File에 대한 위치와 내용을 저장한다.

 

4. Z-Order Indexing는 10억 개의 Row에서 한 개의 Record를 가져오는 것과 같이 High Cardinal Column에서 자주 실행될 때 사용되며 Small File 이슈를 방지한다. 어떤 Column에 Z-Order를 적용한 경우, 모든 Parquet 파일이 최적의 사이즈의 Parquet 파일로 압축된다.

 

5. 자주 사용하는 Dataframe을 Cache 한다. 다만, 데이터를 처리하기에 충분한 Memory가 남아있는지 확인해야 한다.

 

6. 최상의 쿼리 성능을 위해 AQE를 사용해라. Data Skewness도 방지할 수 있고, 자동으로 Shuffle Partition의 사이즈를 조정하고 Runtime에 Query Plan을 수정할 수 있다.

 

7. 다른 기술로는 Filter, File Format Selection, File Formation Selection 등이 있다. 

 

https://surenderpa.medium.com/apache-spark-optimization-techniques-b7ced0b9c218
반응형
Comments