Archive

[Spark] Core of Spark SQL Engine ( Catalyst Optimizer / Tungsten ) 본문

------- DE -------/Spark

[Spark] Core of Spark SQL Engine ( Catalyst Optimizer / Tungsten )

enent 2022. 8. 19. 21:44
반응형

Spark SQL 엔진의 핵심으로 Catalyst Optimizer와 Tungsten이 있다.

Catalyst 는 쿼리를 Optimizing 하는 데 이용하고, Tungsten은 이 Optimized Query를 바탕으로  Optimized Code를 생성하는데 이용된다.  이 둘의 조합으로 Spark SQL 엔진은 최적화된 고수준의 Dataframe / Dataset API 및 SQL 쿼리 등을 지원한다.

 

1. Catalyst Optimizer

Catalyst Optimizer 는 Query 최적화 프로그램을 구축하는 Spark 엔진의 핵심으로, Logical Query 들을 Physical Query Execute Plan으로 변환한다. 실행 계획으로 변환하기 위한 과정은 분석 -> 논리적 최적화 -> 물리 계획 수립 -> 코드 생성 으로 이루어진다. 

1) Spark Computations's Four-Phase

① 분석 ( Analysis )

Unresolved Logical Plan을 생성한다. Spark SQL엔진은 SQL이나 Dataframe Query 를 위한 Abstract Syntax Tree(AST)를 생성한다. 초기 단계에는 어떤 Column이나 Table 이름이든 간에 Column, Data Type, Function, Table, Database 의 네임 리스트를 가지고 있는 Spark Programming Interface인 Catalog 객체로 접근하여 가져올 수 있다. 이 생성된 Unresolved Logical Plan은 Logical Optimization 의 Input이 된다.

 

② 논리적 최적화 ( Logical Optimization )

표준적인 규칙을 기반으로 하는 최적화 접근 방식을 적용하면서, 먼저 여러 계획들을 수립한 후 CBO ( Cost Based Optimizer)를 사용하여 각 계획에 비용을 책정한다. 이 계획들은 Operator Trees 로 배열이 된다. 예를 들어, 이 트리들은 조건절을 아래 배치하고, Column을 걸러내고 Boolean 연산 단순화들을 진행한다. 이 Logical Plan은 Pysical Planning의 Input이 된다.

 

③ 물리 계획 수립 ( Physical Planning )

Spark 실행 엔진에서 선택된 Logical Plan을 바탕으로 대응되는 Pysical operators 를 사용해 최적의 Pysical Plan을 생성한다. 

 

④ 코드 생성 ( Code Generation ) 

각 머신에서 실행할 Java Byte Code 생성한다. 이 때, Tunsten을 통해 최적화된 Code를 생성하게 된다. 

 

 

 

2) Execute Plan

일련의 실행 계획을 .explain() 함수를 통해 확인할 수 있다.

 

아래는 특정 Column들을 groupby Count 한 후 정렬하는 Dataframe의 실행계획에 대한 예시이다.

count_mnm_df = (mnm_df
    .select("State", "Color", "Count")
    .groupBy("State", "Color")
    .agg(count("Count")
    .alias("Total"))
    .orderBy("Total", ascending=False))

count_mnm_df.show(5)
    
+-----+------+-----+
|State| Color|Total|
+-----+------+-----+
|   CA|Yellow| 1807|
|   WA| Green| 1779|
|   OR|Orange| 1743|
|   TX| Green| 1737|
|   TX|   Red| 1725|
+-----+------+-----+
only showing top 5 rows

 

== Parsed Logical Plan ==
'Sort ['Total DESC NULLS LAST], true
+- Aggregate [State#201, Color#202], [State#201, Color#202, count(Count#203) AS Total#214L]
   +- Project [State#201, Color#202, Count#203]
      +- Relation [State#201,Color#202,Count#203] csv

== Analyzed Logical Plan ==
State: string, Color: string, Total: bigint
Sort [Total#214L DESC NULLS LAST], true
+- Aggregate [State#201, Color#202], [State#201, Color#202, count(Count#203) AS Total#214L]
   +- Project [State#201, Color#202, Count#203]
      +- Relation [State#201,Color#202,Count#203] csv

== Optimized Logical Plan ==
Sort [Total#214L DESC NULLS LAST], true
+- Aggregate [State#201, Color#202], [State#201, Color#202, count(Count#203) AS Total#214L]
   +- Relation [State#201,Color#202,Count#203] csv

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [Total#214L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(Total#214L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [id=#259]
      +- HashAggregate(keys=[State#201, Color#202], functions=[count(Count#203)], output=[State#201, Color#202, Total#214L])
         +- Exchange hashpartitioning(State#201, Color#202, 200), ENSURE_REQUIREMENTS, [id=#256]
            +- HashAggregate(keys=[State#201, Color#202], functions=[partial_count(Count#203)], output=[State#201, Color#202, count#228L])
               +- FileScan csv [State#201,Color#202,Count#203] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/zacks/Git/Data Science Projects/LearningSparkV2/chapter2/p..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<State:string,Color:string,Count:string>

 

 

2. Tungsten

Tungsten은 엔진의 성능 향상을 목적으로 하며, Apache Spark의 실행 엔진에 Off-Heap Memory Management, Cache Locality, Whole-Stage Code Generation 기능을 적용하여 Spark 애플리케이션의 CPU/Memory를 최대한 사용할 수 있도록 효율성을 개선하는 역할을 한다. 이는 곧 Spark SQL 의 Pysical Query Optimization 단계에 해당한다.

 

Whole-Stage Code Generation 의 경우 불필요한 호출이나 중간 데이터를 위한 CPU Register를 제거하여 전체 Query를 하나의 함수로 합치는 과정으로 이루어진다. Off-Heap Memory Management JVM 객체가 아닌 Tungsten Row Format을 사용하고 명시적으로 Memory를 관리하는 기능이며,  Cache Locality는 높은 Cache Hite Rate을 위한 것이다. 

 

 

Tungsten Format ( Tungsten Binary Row / UnSafeRow )

1) Background

JVM(Java Virtual Machine)은 메모리를Stack과 Heap으로 나눈다. 새로운 객체가 생성되면 항상 Heap 공간에 생성되고, 이 객체에 대한 참조는 Stack에 저장된다. Java Garbage Collector는 Java 프로그램의 메모리를 자동으로 관리하는 프로세스이다. Heap 내의 일부 객체가 더 이상 필요하지 않다면 GC는 해당 객체를 찾아 삭제하여 Heap 공간을 비운다. 따라서 Heap에 저장되는 모든 데이터는 GC의 대상이 되기 때문에 Heap에 저장되는 데이터가 커지면 GC로 소모하는 시간이 훨씬 길어진다. 

 

Java 객체에는 Header Info, Hashcode, Unicode Info 등 큰 Overhead가 있다. 때문에 'abcd'와 같은 간단한 string도 4B가 아닌 48B를 사용한다. 때문에 Spark는 DataFrame / Dataset을 위한 JVM 기반의 객체를 생성하지 않고 Off-Heap 메모리에 할당한 후(Off-Heap 메모리는 JVM 외부에 있기 때문에 GC의 대상이 아님), 데이터를 레이아웃 하고, Encoder를 사용하여 Data를 In-Memory 형식의 Tunsten Format에서 JVM 객체로 변환한다. 

 

Off-Heap에 객체를 저장하기 위해서는 직렬화(Serialization)를 사용하여 JVM 객체를 Byte Stream으로 변환하는 과정이 필요하고, 데이터를 읽을 땐 역직렬화(Deserialization)을 통해서 객체를 읽는다. 즉, Encoder[T] 는 Spark 내부 Tungsten 형식에서 Dataset[T] 로 변환된다. 

 

Off-Heap에 Spark 가 내부적으로 저장하는 Tungsten Format을 사용하면 아래와 같이 데이터들이 인접한 방식으로 저장되고 Pointer 연산과 Offset을 통해 빠른 Serialization -> Deserialization 과정을 거쳐 빠르게 데이터에 접근할 수 있다.

 

 

2) Serialization and Deserialization (SerDe)

분산 처리 시스템에서는 Cluster 내 컴퓨터 노드들 사이에 데이터가 네트워크 통해 자주 이동한다. 이때 네트워크를 통해 데이터를 공유하기 위해선 Binary 형식이어야 한다. 때문에 데이터를 보낼 때는 Sender가 Binary Format으로 Encoding(Serialization) 하여 노드로 보내고, 받는 노드가 데이터를 받으면 Receiver가 Binary Format에서 Tungsten Format으로 Decoding(Deserialization) 하여 데이터를 읽는다.

 

Tungsten Binary Format은 Java Heap Memory에 객체를 저장하고, 크기가 JVM 객체보다 작아 공간을 적게 차지한다. Encoder는 메모리 주소와 Offset이 있는 Pointer 계산을 통해 빠르게 직렬화 / 역직렬화 할 수 있기 때문에 JVM GC pause로 인한 영향이 거의 없다.

 

 

3) Cost

Encoding 방식 도입 전과 비교했을 때보다는 훨씬 비용 효율적이지만, 그럼에도 대규모 데이터 셋과 많은 Query에서 SerDe 연산이 일어난다면 성능에 영향을 줄 수 있다.

 

과도한 SerDe를 방지하기 위해 Lambda 함수 대신 DSL (Domain Specific Language)를 사용하거나, SerDe 를 최소화할 수 있도록 Query를 함께 연결하여 사용하는 것이다. Lambda <-> DSL로 이동할 때마다 SerDe 연산이 발생하므로 DSL 로만 구성하면 비용을 줄일 수 있다.

 

아래는  Lambda <-> DSL를

Person(id: Integer, firstName: String, middleName: String, lastName: String,
gender: String, birthDate: String, ssn: String, salary: String)

import java.util.Calendar
val earliestYear = Calendar.getInstance.get(Calendar.YEAR) - 40

personDS
 // Everyone above 40: lambda-1
 .filter(x => x.birthDate.split("-")(0).toInt > earliestYear)

 // Everyone earning more than 80K
 .filter($"salary" > 80000)

170 | Chapter 6: Spark SQL and Datasets
 // Last name starts with J: lambda-2
 .filter(x => x.lastName.startsWith("J"))

 // First name starts with D
 .filter($"firstName".startsWith("D"))
 .count()

 

이렇듯 무의식 중의 과도한 SerDe 연산이 발생시킬 수 있으므로 아래와 같이 DSL 로만 구성하는 것이 효율적이다.

personDS
 .filter(year($"birthDate") > earliestYear) // Everyone above 40
 .filter($"salary" > 80000) // Everyone earning more than 80K
 .filter($"lastName".startsWith("J")) // Last name starts with J
 .filter($"firstName".startsWith("D")) // First name starts with D
 .count()


 

 

 

Reference
https://www.databricks.com/kr/glossary/catalyst-optimizer
https://www.databricks.com/kr/glossary/tungsten
Learning Spark (2nd) - 3. Apache Spark’s Structured APIs / 6. Spark SQL and Datasets
https://mengu.tistory.com/49
https://www.linkedin.com/pulse/catalyst-tungsten-apache-sparks-speeding-engine-deepak-rajak/
https://betterprogramming.pub/a-deep-dive-into-spark-datasets-and-dataframes-using-scala-a268b4af7491#47b8

 

반응형
Comments