Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pyspark_example/pyspark_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from pyspark.sql.functions import explode, col
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
from pyspark import SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.streaming.query import StreamingQuery
from pyspark.sql.functions import from_json

import os

# add argument to spark get the pakages it needs
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2 pyspark-shell'

# initialize the SparkContext
sc = SparkContext().setLogLevel("ERROR")

# Create SparkSession
spark = (
SparkSession
.builder
.appName("Streaming from Kafka")
.config("spark.streaming.stopGracefullyOnShutdown", True)
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2')
.config("spark.sql.shuffle.partitions", 4)
.master("local[*]")
.getOrCreate()
)

spark

# Create the streaming_df to read from kafka
streaming_df = spark.readStream\
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "pageview") \
.option("startingOffsets", "earliest") \
.load()

data_string = streaming_df.selectExpr(
"CAST(key AS STRING)", "CAST(value AS STRING)")

grouped: DataFrame = data_string.groupBy("key","value").count()

query: StreamingQuery = (
grouped
.writeStream
.outputMode("complete")
.format("console")
.start()
)

query.awaitTermination()
9 changes: 9 additions & 0 deletions pyspark_example/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
kafka-topics.sh --create --topic devices --bootstrap-server localhost:9092

references:
https://subhamkharwal.medium.com/pyspark-structured-streaming-read-from-kafka-64c40767155f

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

https://stackoverflow.com/questions/70725346/failed-to-find-data-source-please-deploy-the-application-as-per-the-deployment

2 changes: 2 additions & 0 deletions pyspark_example/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
py4j==0.10.9.7
pyspark==3.5.2