본문 바로가기
● Data Processing

PySpark로 대규모 JSON 중첩 배열 평탄화 가이드

by DataFolio.lab 2026. 6. 11.
반응형

PySpark로 중첩 JSON/XML 반정형 데이터를 정형 테이블로 변환할 때 성능이 극대화되는 핵심은 explode와 내장 함수만 사용한 단일 파싱 파이프라인, 파이프라인 초기 필터링으로 데이터 양 축소, Parquet/Delta 저장 형식 사용, 그리고 Python UDF 대신 내장 표현식 또는 Pandas UDF 활용입니다. 

PySpark로 대규모 JSON 중첩 배열 평탄화 가이드

반응형

PySpark 대규모 JSON/XML 파싱과 플래티닝의 핵심 원리

반정형 데이터의 특성과 PySpark의 장점

JSON과 XML은 반정형 데이터로, 스키마 정보를 데이터와 함께 제공합니다. 하지만 관계형 데이터베이스와 달리 명확한 테이블 구조가 없어서 파싱 과정이 복잡해집니다. PySpark는 분산 컴퓨팅 프레임워크로, 대규모 JSON/XML 데이터를 여러 노드에서 병렬 처리할 수 있어 성능 면에서 압도적입니다. Switching to PySpark future‑proofs your pipelines and lets you flatten JSON files across a cluster. 


중첩 배열 구조를 평탄화하는 기본 패턴

중첩된 JSON 데이터를 정형 테이블로 변환하는 가장 기본적이고 효율적인 방법은 explode 함수를 사용하는 것입니다. 배열을 개별 행으로 분해하고, 중첩된 구조체는 도트 표기법으로 최상위 레벨로 펼칩니다.

 

다음은 실제 동작하는 전체 코드입니다:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, explode_outer, struct
from pyspark.sql.types import StructType, ArrayType, StructField, StringType

spark = SparkSession.builder \
    .appName("jsonFlatten") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

df = spark.read.option("multiline", "true").json("path_to_your_json_file.json")

df = df.select(
    "name",
    "address.city",
    "address.street",
    "phoneNumbers"
)

df = df.withColumn("phoneNumber", explode("phoneNumbers"))

df = df.select("name", "city", "street", "phoneNumber")

df.show()

 

이 코드는 name, city, street, phoneNumber 컬럼을 가진 평탄화된 DataFrame을 생성하며, 각 전화번호가 별도의 행으로 표시됩니다. 


재귀적 플래티닝: 임의 깊이 중첩 처리

실무에서는 3~4 단계 깊이의 중첩 구조가 자주 등장합니다. 이럴 때는 반복 루프로 StructType과 ArrayType을 모두 처리하는 일반화 함수가 필요합니다.

from pyspark.sql import DataFrame
from pyspark.sql.functions import col, explode_outer
from pyspark.sql.types import StructType, ArrayType

def flatten_schema(df: DataFrame) -> DataFrame:
    while True:
        has_struct = False
        for field in df.schema.fields:
            if isinstance(field.dataType, StructType):
                has_struct = True
                expanded_cols = [
                    col(f"{field.name}.{nested_field.name}").alias(f"{field.name}_{nested_field.name}")
                    for nested_field in field.dataType.fields
                ]
                df = df.select("*", *expanded_cols).drop(field.name)
                break
            elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
                has_struct = True
                df = df.withColumn(field.name, explode_outer(field.name))
                break
        if not has_struct:
            break
    return df

flat_df = flatten_schema(jdf)

 

이 함수는 StructType을 최상위 레벨로 펼치고, ArrayType이면서 내부가 StructType인 경우 explode_outer로 배열을 풀어냅니다.


explode vs explode_outer: 어떤 것을 써야 할까?

구분 explode explode_outer
빈 배열 행 제거 null로 변환, 행 유지
null 값 행 제거 null 유지, 행 출력
중복 데이터 개별 행으로 분할 개별 행으로 분할
중첩 리스트 평탄화 안 함 평탄화 안 함

 

explode_outer는 데이터 손실을 방지할 때 유용하며, 특히 비즈니스에서 누락된 데이터를 보존해야 할 때 선호됩니다 


성능 최적화 핵심 테크닉 5 가지

1. 셔플(Shuffle) 최소화

셔플은 네트워크 IO와 소트를 발생시키는 가장 무거운 연산입니다. 조인과 집계는 대표적인 셔플 원인입니다.

  • 조인 전 필터링/컬럼 프루닝: 조인 대상 데이터를 최소화
  • 브로드캐스트 조인: 작은 참조 테이블은 broadcast()로 메모리에 올려 셔플 감소
  • 스큐 키 완화: salting 기법으로 파티션 불균형 해소
from pyspark.sql.functions import broadcast
joined = big.join(broadcast(small_dim), big.key == small_dim.key, "left")

2. Narrow Transformation 먼저 실행

  • Narrow: map, filter → 파티션 내 처리, 셔플 없음
  • Wide: groupBy, join → 셔플 발생, 성능 비용 큼

가능하면 Narrow 변환으로 먼저 데이터량을 줄이고 Wide 변환을 실행합니다.

# Bad: join 후 필터
out = big.join(other, "id").filter(col("dt") >= cutoff)

# Good: 필터 후 join
big2 = big.filter(col("dt") >= cutoff).select("id","x")
out = big2.join(broadcast(other), "id")

3. Python UDF 대신 내장 함수 사용

Python UDF는 Catalyst 최적화와 벡터화 이점을 잃어 성능 저하를 유발합니다. 가능하면 내장 함수를 사용하고, 불가피하면 Pandas UDF(vectorized) 를 고려합니다.

# Bad: 일반 UDF
@udf("double")
def f(x): return x * 1.1
df2 = df.withColumn("y", f("x"))

# Good: 내장 표현식
from pyspark.sql.functions import col
df2 = df.withColumn("y", col("x") * 1.1)

4. Lazy Evaluation과 캐시 활용

PySpark는 Lazy evaluation을 사용합니다. Transformation은 즉시 실행되지 않고, Action이 호출될 때 실제 Job이 실행됩니다. 반복 사용할 DataFrame은 cache()persist()로 저장합니다.

features = preprocess(df).cache()
_ = features.count()
train = features.filter("split = 'train'")
valid = features.filter("split = 'valid'")

5. 출력 파일 크기 관리

Parquet 권장 파일 크기는 128~512MB입니다. 작은 파일이 많다면 coalesce()로 병합하고, 큰 데이터는 repartition()으로 분할합니다.

(df.repartition(800)
   .write.mode("overwrite")
   .parquet("s3://bucket/mart/orders/"))

XML 데이터 파싱 전략

XML 도 JSON 과 유사하게 반정형 데이터입니다. PySpark 는 기본적으로 XML 을 지원하지 않지만, spark-xml 라이브러리를 사용하면 됩니다.

df = spark.read \
    .option("rowTag", "record") \
    .xml("path_to_xml_file.xml")

 

그 후 JSON 과 동일한 explodeflatten_schema 함수를 적용하면 됩니다. XML 는 중첩 깊이가 깊은 경우가 많으므로 재귀적 플래티닝이 필수입니다.


Parquet/Delta 저장 형식 사용

Parquet 는 컬럼 지향 압축 형식으로, 쿼리 성능이 높습니다. JSON 대신 Parquet 로 저장하면 추후 쿼리 속도가 5~10 배 빨라집니다. Delta 형식은 ACID 를 보장하고, Upsert, 스키마 진화, Time Travel 을 지원합니다.

flat_df.write.format("parquet").save("output/path")
flat_df.write.format("delta").save("delta/path")

실제 생산 환경 배포 팁

  • AQE(Adaptive Query Execution) 활성화: 동적 셔플 파티션 조정으로 성능 향상
  • Spark UI 로 실행 계획 분석: 병목 구간 파악
  • 파티션 수 조정: spark.sql.shuffle.partitions 기본 200 → 데이터 양에 맞게 조정
  • 메모리 관리: executor-memory, driver-memory 적절히 설정

AQE 는 좋은 시작점입니다. what it does and why it does and how it's done 를 배우세요. 


전체 통합 코드 예제

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode_outer, broadcast
from pyspark.sql.types import StructType, ArrayType

spark = SparkSession.builder \
    .appName("LargeJSONFlatten") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

df = spark.read.option("multiline", "true").json("s3://bucket/input/*.json")

df = df.filter(col("status") == "active").select(
    "id",
    "user.name",
    "user.email",
    "orders"
)

def flatten_schema(df):
    while True:
        has_struct = False
        for field in df.schema.fields:
            if isinstance(field.dataType, StructType):
                has_struct = True
                expanded_cols = [
                    col(f"{field.name}.{nested_field.name}").alias(f"{field.name}_{nested_field.name}")
                    for nested_field in field.dataType.fields
                ]
                df = df.select("*", *expanded_cols).drop(field.name)
                break
            elif isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
                has_struct = True
                df = df.withColumn(field.name, explode_outer(field.name))
                break
        if not has_struct:
            break
    return df

flat_df = flatten_schema(df)

small_dim = spark.table("dim_table")
joined = flat_df.join(broadcast(small_dim), flat_df.id == small_dim.id, "left")

cached = joined.cache()
_ = cached.count()

cached.write.mode("overwrite").parquet("s3://bucket/output/parquet/")

 

반응형

놓치면 아쉬운 추천 글, 함께 읽어보세요!

  • 추천 글을 불러오는 중입니다...