반응형
Spark Docs에 나오는 Spark Streaming 예제
localhost:9999에서 입력받은 글자 단어 세기
0. 실행 환경
AWS EC2 t2.xlarge
OS : Red Hat 9.1
Python : 3.9
Spark : 3.3.1
Scala : 2.12.15
Java : OpenJDK 64-Bit Server VM, 1.8.0_352
1. Streaming Test
1-1. streaming.py 생성
vi streaming.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Create SparkSession
spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
# localhost:9999 streaming input -> Create DataFrame
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Split input by " " as word
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Count words
wordCounts = words.groupBy("word").count()
# Print number of words
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
# DataFrame으로 실행
words_df = lines_df.select(expr("explode(split(value, ' ')) as word"))
counts_df = words_df.groupBy("word").count()
word_count_query = counts_df.writeStream.format("console")\
.outputMode("complete")\
.option("checkpointLocation", ".checkpoint")\
.start()
word_count_query.awaitTermination()
1-2. streaming 실행
spark-submit structured_network_wordcount.py localhost 9999
1-3. Netcat실행
# 추가 세션 실행 후 명령어 입력
nc -lk 9999
# -> 글자 입력
)
1-4. 결과
)
2. readSteam Options
# socket(테스트용 : UTF-8 읽어옴. fault-tolerant 보장 x)
readStream("socket") \
.option("host", "localhost")\
.option("port", 9999)\
# rate source(테스트용 : 초당 지정된 수 만큼 데이터 생성)
# kafka source
readStream("kafka")\
.option("subscribe", "topic1") \
.load()
# file source
# 지원 파일 형식 : text, csv, json, orc, parquet
serSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
.readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory")
Q.
- readstream("socket").option("host", HOST)
HOST에 locahost말고 되는지? - kafka, 파일로 테스트해보기
- checkpointLocation 오류 해결
word_count_query = df.writeStream.format("console")\ .outputMode("complete")\ .option("checkpointLocation", ".checkpoint")\ .start()
참고 자료
structured-streaming-programming-guide
structured-streaming-programming-guide - KO
728x90
반응형
'Data > Data Engineering' 카테고리의 다른 글
[Snowflake] Badge 1 획득 (0) | 2023.01.08 |
---|---|
[Airflow] Airflow 설치하기(pip) (0) | 2023.01.08 |
[Spark] SQL 연습하기 (1) | 2022.12.18 |
[MongoDB] DB, Data 기본 CRUD 명령어 정리 (0) | 2022.11.26 |
[MongoDB] root(admin) 계정 생성하기 (0) | 2022.11.26 |