From 6018d96fd1bb8ba0819a105f6be42e4bf4f42f81 Mon Sep 17 00:00:00 2001 From: leonardoleano Date: Wed, 14 Aug 2024 15:01:42 -0300 Subject: [PATCH] using kafka and pyspark --- pyspark_example/pyspark_example.py | 51 ++++++++++++++++++++++++++++++ pyspark_example/readme.md | 9 ++++++ pyspark_example/requirements.txt | 2 ++ 3 files changed, 62 insertions(+) create mode 100644 pyspark_example/pyspark_example.py create mode 100644 pyspark_example/readme.md create mode 100644 pyspark_example/requirements.txt diff --git a/pyspark_example/pyspark_example.py b/pyspark_example/pyspark_example.py new file mode 100644 index 0000000..3e1ffb5 --- /dev/null +++ b/pyspark_example/pyspark_example.py @@ -0,0 +1,51 @@ +from pyspark.sql.functions import explode, col +from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType +from pyspark import SparkContext +from pyspark.sql import SparkSession, DataFrame +from pyspark.sql.streaming.query import StreamingQuery +from pyspark.sql.functions import from_json + +import os + +# add argument to spark get the pakages it needs +os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2 pyspark-shell' + +# initialize the SparkContext +sc = SparkContext().setLogLevel("ERROR") + +# Create SparkSession +spark = ( + SparkSession + .builder + .appName("Streaming from Kafka") + .config("spark.streaming.stopGracefullyOnShutdown", True) + .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2') + .config("spark.sql.shuffle.partitions", 4) + .master("local[*]") + .getOrCreate() +) + +spark + +# Create the streaming_df to read from kafka +streaming_df = spark.readStream\ + .format("kafka") \ + .option("kafka.bootstrap.servers", "localhost:9092") \ + .option("subscribe", "pageview") \ + .option("startingOffsets", "earliest") \ + .load() + +data_string = streaming_df.selectExpr( + "CAST(key AS STRING)", "CAST(value AS STRING)") + +grouped: DataFrame = data_string.groupBy("key","value").count() + +query: StreamingQuery = ( + grouped + .writeStream + .outputMode("complete") + .format("console") + .start() +) + +query.awaitTermination() diff --git a/pyspark_example/readme.md b/pyspark_example/readme.md new file mode 100644 index 0000000..0c75c52 --- /dev/null +++ b/pyspark_example/readme.md @@ -0,0 +1,9 @@ +kafka-topics.sh --create --topic devices --bootstrap-server localhost:9092 + +references: +https://subhamkharwal.medium.com/pyspark-structured-streaming-read-from-kafka-64c40767155f + +https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html + +https://stackoverflow.com/questions/70725346/failed-to-find-data-source-please-deploy-the-application-as-per-the-deployment + diff --git a/pyspark_example/requirements.txt b/pyspark_example/requirements.txt new file mode 100644 index 0000000..c839ee6 --- /dev/null +++ b/pyspark_example/requirements.txt @@ -0,0 +1,2 @@ +py4j==0.10.9.7 +pyspark==3.5.2