import os
import pyspark
# Auto-detect Spark version → correct connector
spark_version = pyspark.__version__
print(f"Detected PySpark version: {spark_version}")
if spark_version.startswith("4"):
KAFKA_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2"
else:
# Spark 3.x
KAFKA_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"
print(f"Using connector: {KAFKA_PACKAGE}")
# Must be set BEFORE SparkSession.builder
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages {KAFKA_PACKAGE} pyspark-shell'Lab 4: Spark Structured Streaming + Kafka
Bridge from Lab 3
In Lab 3 you streamed from rate (built-in generator) and from a JSON file.
Today the same approach — but the source is Kafka. The change is literally one line:
# Lab 3 — rate:
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()
# Lab 3 — file:
df = spark.readStream.schema(tx_schema).json("data/stream")
# Lab 4 — Kafka:
df = spark.readStream.format("kafka").option("subscribe", "transactions").load()Difference: Kafka returns data as raw bytes — they must be decoded.
Windows and watermarks from Lab 3 work without any changes.
Plan
- Start the producer
- Connect Spark to Kafka (spark-submit vs notebook)
- Inspect what really comes from Kafka — raw schema
- Step by step: bytes → string → JSON → table
- Tumbling windows + watermark
- Write alerts to the
alertstopic
Part 0: Setup
In a JupyterLab terminal:
kafka/bin/kafka-topics.sh --create --topic transactions \\
--bootstrap-server broker:9092 --if-not-exists
kafka/bin/kafka-topics.sh --create --topic alerts \\
--bootstrap-server broker:9092 --if-not-exists
# Start the producer from Lab 1
python producer.pyLeave the producer running throughout the lab.
Part 1: Connecting Spark to Kafka
Why do we need an extra package?
Spark has no built-in Kafka support — a connector must be added as an external Maven dependency.
The package name differs depending on Spark and Scala versions:
| Environment | Package |
|---|---|
| Spark 4.0 (Scala 2.13) | org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2 |
| Spark 3.5 (Scala 2.12) | org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 |
It can be passed in two ways:
Option A — spark-submit (a .py script outside the notebook)
# Spark 4.0
spark-submit \\
--packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2 \\
my_script.py
# Spark 3.5
spark-submit \\
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \\
my_script.pySpark downloads the package from Maven Central (once, then cached locally).
The first run produces many log lines — that is normal.
Option B — from the notebook via os.environ (used today)
Set the environment variable before the first PySpark import.
The code below auto-detects the version and selects the correct package.
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("Lab4-Kafka")
.config("spark.jars.packages", KAFKA_PACKAGE)
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print(f"Spark {spark.version} — ready")Part 2: What Really Comes from Kafka?
Let’s read the stream and inspect its raw schema — before processing anything.
kafka_raw = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "transactions")
.option("startingOffsets", "earliest")
.load()
)
kafka_raw.printSchema()printSchema() output — what does each column mean?
root
|-- key: binary ← message key (optional, null in our case)
|-- value: binary ← MESSAGE CONTENT — our JSON lives here (as bytes)
|-- topic: string ← topic name ("transactions")
|-- partition: integer ← partition number (0, 1 or 2 — we have 3 partitions)
|-- offset: long ← position within the partition (0, 1, 2, … monotonically increasing)
|-- timestamp: timestamp ← when Kafka received the message
|-- timestampType: integer ← 0 = CreateTime, 1 = LogAppendTime
Key observation: the value column is binary — Kafka stores bytes and knows nothing about the format.
All other columns are Kafka metadata, not business data.
Let’s see what a raw message looks like before any parsing:
from pyspark.sql.functions import col
batch_counter = {"n": 0}
def peek_raw(df, batch_id):
batch_counter["n"] += 1
print(f"--- Batch {batch_id}: {df.count()} messages ---")
df.select(
"topic",
"partition",
"offset",
"timestamp",
col("key").cast("string").alias("key"),
col("value").cast("string").alias("value"), # bytes → UTF-8 text
).show(5, truncate=100)
if batch_counter["n"] >= 2:
raise Exception("stop")
q = (
kafka_raw.writeStream
.foreachBatch(peek_raw)
.option("checkpointLocation", "/tmp/chk_lab4_peek")
.start()
)
try:
q.awaitTermination()
except:
q.stop()Part 3: Step by Step — Bytes → Table
There are three decoding stages:
value (binary)
→ cast("string") → "{\"tx_id\": \"TX1234\", \"amount\": 312.32, ...}"
→ from_json(col, schema) → struct<tx_id: string, amount: double, ...>
→ select("tx.*") → tx_id | user_id | amount | store | ...
Step 1 — value as string
Verify that the bytes decode correctly to JSON text:
# Step 1: binary → string (raw JSON as text)
step1 = kafka_raw.select(
col("offset"),
col("partition"),
col("value").cast("string").alias("raw_json"),
)
batch_counter["n"] = 0
def show_step1(df, batch_id):
batch_counter["n"] += 1
print(f"--- Batch {batch_id} ---")
df.show(3, truncate=120)
if batch_counter["n"] >= 2:
raise Exception("stop")
q = step1.writeStream.foreachBatch(show_step1) \
.option("checkpointLocation", "/tmp/chk_lab4_step1").start()
try:
q.awaitTermination()
except:
q.stop()Step 2 — from_json(): string → struct
from_json() parses JSON text into a struct column — using the schema we provide.
The schema must match the producer’s fields — unknown fields are ignored, missing ones become null.
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json
tx_schema = StructType([
StructField("tx_id", StringType()),
StructField("user_id", StringType()),
StructField("amount", DoubleType()),
StructField("store", StringType()),
StructField("category", StringType()),
StructField("timestamp", StringType()),
])
# Step 2: string → struct (single 'tx' column containing all fields)
step2 = kafka_raw.select(
from_json(col("value").cast("string"), tx_schema).alias("tx")
)
batch_counter["n"] = 0
def show_step2(df, batch_id):
batch_counter["n"] += 1
print(f"--- Batch {batch_id} — schema after from_json ---")
df.printSchema()
df.show(3, truncate=False)
if batch_counter["n"] >= 1:
raise Exception("stop")
q = step2.writeStream.foreachBatch(show_step2) \
.option("checkpointLocation", "/tmp/chk_lab4_step2").start()
try:
q.awaitTermination()
except:
q.stop()Step 3 — select("tx.*"): struct → flat columns
select("tx.*") unpacks the struct — each field becomes its own column.
We also convert timestamp from string to TimestampType (required for windows).
from pyspark.sql.functions import to_timestamp
# Step 3: struct → flat columns + timestamp conversion
df = (
kafka_raw
.select(from_json(col("value").cast("string"), tx_schema).alias("tx"))
.select("tx.*")
.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"))
)
print("Final schema:")
df.printSchema()
batch_counter["n"] = 0
def show_parsed(df, batch_id):
batch_counter["n"] += 1
print(f"--- Batch {batch_id} ---")
df.show(5, truncate=False)
if batch_counter["n"] >= 2:
raise Exception("stop")
q = df.writeStream.foreachBatch(show_parsed) \
.option("checkpointLocation", "/tmp/chk_lab4_parsed").start()
try:
q.awaitTermination()
except:
q.stop()Part 4: Tumbling Windows + Watermark
From this point the code is identical to Lab 2 (batch) and Lab 3 (rate source).
The source changed — the window logic did not.
Watermark — handling late-arriving events
Watermark tells Spark: “wait at most X time for late data, then close the window”.
| Mode | When it emits | When to use |
|---|---|---|
append |
only when the window is closed | production, writing to Kafka/DB |
complete |
after every micro-batch — the full table | debugging |
Task 4.1 — 1-minute tumbling windows per store
from pyspark.sql.functions import window, count, sum as _sum, round as _round
windowed = (
df
.withWatermark("timestamp", "30 seconds")
.groupBy(window("timestamp", "1 minute"), "store")
.agg(
count("tx_id").alias("tx_count"),
_round(_sum("amount"), 2).alias("total_amount"),
)
)
batch_counter["n"] = 0
def show_window(df, batch_id):
batch_counter["n"] += 1
print(f"\n=== Batch {batch_id} ===")
(
df.select(
col("window.start").alias("from"),
col("window.end").alias("to"),
"store", "tx_count", "total_amount",
)
.orderBy("from", "store")
.show(truncate=False)
)
if batch_counter["n"] >= 5:
raise Exception("stop")
q = (
windowed.writeStream
.outputMode("append")
.foreachBatch(show_window)
.option("checkpointLocation", "/tmp/chk_lab4_windows")
.start()
)
try:
q.awaitTermination()
except:
q.stop()Task 4.2 — Windows per category, complete mode
Group by category instead of store and use outputMode("complete").
Observe the difference: results appear after every micro-batch, not only when a window closes.
# YOUR CODE
# windowed_cat = df.withWatermark(...).groupBy(window(...), "category").agg(...)
# outputMode("complete")Part 5: Writing Alerts to Kafka
Transactions above 3000 are sent to the alerts topic.
Kafka expects the value column as string/bytes — we use to_json() to convert the struct back to JSON.
from pyspark.sql.functions import to_json, struct, lit
alerts = (
df
.filter(col("amount") > 3000)
.select(
to_json(
struct(
"tx_id", "user_id", "amount", "store", "category",
col("timestamp").cast("string"),
lit("HIGH").alias("alert_level"),
)
).alias("value") # Kafka requires a 'value' column
)
)
alert_query = (
alerts.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("topic", "alerts")
.option("checkpointLocation", "/tmp/chk_lab4_alerts")
.outputMode("append")
.start()
)
print("Alert stream started. Stop manually: alert_query.stop()")Task 5.1 — Verify the alerts topic in a terminal
kafka/bin/kafka-console-consumer.sh \\
--bootstrap-server broker:9092 \\
--topic alerts \\
--from-beginningYou should see JSON records with the field "alert_level": "HIGH".
Task 5.2 — Review questions
# 1. Why do the first results in 'append' mode appear with a delay?
# ANSWER:
# 2. What happens if you set the watermark to "0 seconds"?
# ANSWER:
# 3. What is the difference between window() in Lab 2 (batch) and here (streaming)?
# ANSWER:
# 4. Why do we use to_json() when writing to Kafka instead of just select("value")?
# ANSWER:alert_query.stop()
spark.stop()Homework
- Build a sliding window (2 minutes / 1-minute step) on the Kafka stream per store.
- Add a field
ratio = amount / 400.0(approximate average from Lab 2) to the alerts. - What happens to the results when you stop the producer and wait 2 minutes? Why?
Next lab: Decision rules and scoring on a stream.