※ 혼자 책보고 공부하면서 적는거라 틀린 부분 있으면 댓글로 알려주세요!
https://www.yes24.com/Product/Goods/67116641
CHAP2. 스파크 간단히 살펴보기
스파크 애플리케이션
스파크 애플리케이션 = 드라이버(명령) + 익스큐터(실행 및 진행상황 보고)
드라이버가 익스큐터에 작업 할당 → 익스큐터가 데이터 처리 및 드라이버에 보고 → 작업 완료 시 드라이버가 결과 반환
- 스파크(Spark): 대규모 데이터 처리 시스템. 병렬처리를 통해 빠른 속도로 작업 가능
- 드라이버(Driver): 스파크 애플리케이션의 중심. 명령을 실행하고 익스큐터에게 작업 분배
- 익스큐터(executor): 실제 데이터를 처리하는 노드들. 진행상황을 드라이버에게 보고
- 클러스터 매니저(cluster manager): 클러스터 내에서 자원 할당 및 클러스터 관리 담당
- 다양한 클러스터 매니저: Hadoop의 YARN, MESOS 등
스파크 대화형 콘솔 실행 & SparkSession
스파크 애플리케이션은 sparksession으로 제어됨.
1. cmd를 열고 스파크를 설치한 디렉터리로 변경(cd) 후 pyspark를 입력했다.
cd C:\spark\spark-3.5.3-bin-hadoop\bin #본인이 설치한 스파크 디렉터리로 설정
pyspark
2. spark를 입력하면 SparkSession객체가 출력된다.
3. 1개의 컬럼과 1000개 로우로 구성된 데이터프레임 생성
myRange = spark.range(1000).toDF("number")
myRange.show() #잘 된건지 확인하려고 show() 함수를 써봤따
파티션
데이터를 효율적으로 분산하기 위해 사용되는 개념. 데이터를 나누는 개념으로, 데이터를 여러 파티션으로 나누면 각 파티션을 병렬로 처리할 수 있다.
트랜스포메이션, 데이터를 변환하는 연산
스파크의 핵심 데이터 구조는 불변성을 가진다. 따라서 데이터 구조를 '변경'하려면 원하는 변경 방법을 스파크에 알려줘야한다.
트랜스포메이션 연산은 데이터 파이프라인을 정의하는 단계로, 액션을 호출하지 않으면 실제 트랜스포메이션을 수행하지 않는다.
1. 위에서 만든 데이터프레임 myRange에서 짝수만 남기는 코드를 작성했지만, 아무일도 일어나지 않았다.
divisBy2=myRange.where("number %2=0")
주요 트랜스포메이션 연산 (by.chatgpt)
- map(func): 각 요소에 func를 적용하여 새로운 RDD를 반환합니다.
- flatMap(func): 각 요소를 여러 개의 요소로 분리하여 새로운 RDD를 반환합니다.
- filter(func): 조건을 만족하는 요소만 필터링하여 새로운 RDD를 반환합니다.
- distinct(): 중복을 제거한 새로운 RDD를 반환합니다.
- union(other): 두 RDD를 합쳐 새로운 RDD를 반환합니다.
- intersection(other): 두 RDD의 교집합을 반환합니다.
- groupByKey(): 키를 기준으로 데이터를 그룹화하여 새로운 RDD를 반환합니다.
- reduceByKey(func): 키를 기준으로 데이터를 집계하여 새로운 RDD를 반환합니다.
- join(): 두 RDD나 DataFrame을 특정 조건에 맞춰 병합하여 새로운 RDD나 DataFrame을 반환합니다.
- withColumn(): DataFrame에 새로운 컬럼을 추가하거나 기존 컬럼을 수정합니다.
- select(): 원하는 컬럼만 선택하여 새로운 DataFrame을 반환합니다.
지연 연산 & 액션
지연연산이란, 스파크가 연산을 처리하지 직전까지 기다리는 동작 방식이다. 트랜스포메이션은 즉시 실행되지 않고, 액션이 호출될 때 실행된다. 그러니까 작업 정의만하고, 실제 작업은 최종적으로 결과가 필요할 때 수행된다.
액션이란, 실제 데이터 처리를 즉시 수행하고, 익스큐터에서 처리해 처리 결과를 반환한다.
1. 위에 걸어둔 트랜스포메이션에 대해 액션 명령을 내리면 다음과 같이 실행결과가 즉시 반환된다.
주요 액션 명령 (by.chatgpt)
collect() | RDD나 DataFrame의 모든 데이터를 드라이버로 가져와 리스트 형태로 반환합니다. 대규모 데이터에는 부적합할 수 있음. |
count() | RDD나 DataFrame의 요소 개수를 반환합니다. |
reduce(func) | RDD의 모든 요소를 결합하여 단일 값으로 집계합니다. 주로 합계, 최대값, 최소값 등을 계산할 때 사용. |
first() | RDD나 DataFrame에서 첫 번째 요소를 반환합니다. |
take(n) | RDD나 DataFrame에서 처음 n개의 요소를 반환합니다. |
takeSample(withReplacement, num, seed) | RDD에서 무작위로 샘플링하여 n개의 요소를 반환합니다. withReplacement로 복원 여부, seed로 난수 시드를 지정 가능. |
saveAsTextFile(path) | RDD를 텍스트 파일로 외부 파일 시스템에 저장합니다. |
countByValue() | RDD에서 각 값의 출현 빈도를 계산하여 반환합니다. |
foreach(func) | RDD의 각 요소에 대해 함수를 실행하지만, 결과를 반환하지 않습니다. 보통 로그 기록이나 외부 시스템에 데이터를 저장할 때 사용. |
show() | DataFrame에서 최초 몇 개의 행을 출력합니다. 기본적으로 20개를 출력합니다. (옵션으로 개수를 변경 가능) |
reduceByKey(func) | RDD에서 키를 기준으로 집계하여 새로운 RDD를 반환합니다. reduceByKey는 각 키별로 데이터를 결합하는 방식입니다. |
aggregate(func)() | RDD에서 집계 연산을 수행하는데, reduceByKey보다 더 복잡한 집계 처리가 가능합니다. |
foreachPartition(func) | RDD의 각 파티션에 대해 함수를 실행합니다. (효율적 분산 처리를 위해 파티션 단위로 실행됨) |
toLocalIterator() | RDD의 데이터를 로컬 이터레이터 형태로 반환합니다. 이 방식은 메모리에 모든 데이터를 로드하지 않고 하나씩 처리할 수 있도록 합니다. |
헉 그러고보니 앞에서 했던 Show()도 액션 명령이었다!
트랜스포메이션 vs 액션
트랜스포메이션(transformation) | 액션(action) | |
실행 | 지연 실행 | 즉시 실행 |
결과 | 새로운 RDD, 데이터프레임 반환(실제 계산은 안함) | 결과값 반환, 저장소에 저장 |
목적 | 데이터 변환 | 데이터 처리 및 결과 반환 |
일단 여기까지는 어느정도 감이 잡힌 것 같다. 내일 예제 보면서 잘 이해했는지 확인해봐야겠다.
'데이터사이언티스트로 살아남기 > Spark' 카테고리의 다른 글
[스파크 완벽 가이드] 2장. 스파크 간단히 살펴보기(2) (0) | 2024.12.17 |
---|---|
[Spark] Window10 로컬 환경에 Spark 설치하기 (1) | 2024.12.10 |