Lab 4: Spark Structured Streaming + Kafka

Bridge from Lab 3

In Lab 3 you streamed from rate (built-in generator) and from a JSON file.
Today the same approach — but the source is Kafka. The change is literally one line:

# Lab 3 — rate:
df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Lab 3 — file:
df = spark.readStream.schema(tx_schema).json("data/stream")

# Lab 4 — Kafka:
df = spark.readStream.format("kafka").option("subscribe", "transactions").load()

Difference: Kafka returns data as raw bytes — they must be decoded.
Windows and watermarks from Lab 3 work without any changes.


Plan

  1. Start the producer
  2. Connect Spark to Kafka (spark-submit vs notebook)
  3. Inspect what really comes from Kafka — raw schema
  4. Step by step: bytes → string → JSON → table
  5. Tumbling windows + watermark
  6. Write alerts to the alerts topic

Part 0: Setup

In a JupyterLab terminal:

kafka/bin/kafka-topics.sh --create --topic transactions \\
  --bootstrap-server broker:9092 --if-not-exists
kafka/bin/kafka-topics.sh --create --topic alerts \\
  --bootstrap-server broker:9092 --if-not-exists

# Start the producer from Lab 1
python producer.py

Leave the producer running throughout the lab.


Part 1: Connecting Spark to Kafka

Why do we need an extra package?

Spark has no built-in Kafka support — a connector must be added as an external Maven dependency.
The package name differs depending on Spark and Scala versions:

Environment Package
Spark 4.0 (Scala 2.13) org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2
Spark 3.5 (Scala 2.12) org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0

It can be passed in two ways:

Option A — spark-submit (a .py script outside the notebook)

# Spark 4.0
spark-submit \\
  --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2 \\
  my_script.py

# Spark 3.5
spark-submit \\
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \\
  my_script.py

Spark downloads the package from Maven Central (once, then cached locally).
The first run produces many log lines — that is normal.

Option B — from the notebook via os.environ (used today)

Set the environment variable before the first PySpark import.
The code below auto-detects the version and selects the correct package.

import os
import pyspark

# Auto-detect Spark version → correct connector
spark_version = pyspark.__version__
print(f"Detected PySpark version: {spark_version}")

if spark_version.startswith("4"):
    KAFKA_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0-preview2"
else:
    # Spark 3.x
    KAFKA_PACKAGE = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0"

print(f"Using connector:         {KAFKA_PACKAGE}")

# Must be set BEFORE SparkSession.builder
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages {KAFKA_PACKAGE} pyspark-shell'
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Lab4-Kafka")
    .config("spark.jars.packages", KAFKA_PACKAGE)
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print(f"Spark {spark.version} — ready")

Part 2: What Really Comes from Kafka?

Let’s read the stream and inspect its raw schema — before processing anything.

kafka_raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("subscribe", "transactions")
    .option("startingOffsets", "earliest")
    .load()
)

kafka_raw.printSchema()

printSchema() output — what does each column mean?

root
 |-- key:           binary    ← message key (optional, null in our case)
 |-- value:         binary    ← MESSAGE CONTENT — our JSON lives here (as bytes)
 |-- topic:         string    ← topic name ("transactions")
 |-- partition:     integer   ← partition number (0, 1 or 2 — we have 3 partitions)
 |-- offset:        long      ← position within the partition (0, 1, 2, … monotonically increasing)
 |-- timestamp:     timestamp ← when Kafka received the message
 |-- timestampType: integer   ← 0 = CreateTime, 1 = LogAppendTime

Key observation: the value column is binary — Kafka stores bytes and knows nothing about the format.
All other columns are Kafka metadata, not business data.

Let’s see what a raw message looks like before any parsing:

from pyspark.sql.functions import col

batch_counter = {"n": 0}

def peek_raw(df, batch_id):
    batch_counter["n"] += 1
    print(f"--- Batch {batch_id}: {df.count()} messages ---")
    df.select(
        "topic",
        "partition",
        "offset",
        "timestamp",
        col("key").cast("string").alias("key"),
        col("value").cast("string").alias("value"),   # bytes → UTF-8 text
    ).show(5, truncate=100)
    if batch_counter["n"] >= 2:
        raise Exception("stop")

q = (
    kafka_raw.writeStream
    .foreachBatch(peek_raw)
    .option("checkpointLocation", "/tmp/chk_lab4_peek")
    .start()
)
try:
    q.awaitTermination()
except:
    q.stop()

Part 3: Step by Step — Bytes → Table

There are three decoding stages:

value (binary)
  → cast("string")       → "{\"tx_id\": \"TX1234\", \"amount\": 312.32, ...}"
  → from_json(col, schema) → struct<tx_id: string, amount: double, ...>
  → select("tx.*")        → tx_id | user_id | amount | store | ...

Step 1 — value as string

Verify that the bytes decode correctly to JSON text:

# Step 1: binary → string (raw JSON as text)
step1 = kafka_raw.select(
    col("offset"),
    col("partition"),
    col("value").cast("string").alias("raw_json"),
)

batch_counter["n"] = 0

def show_step1(df, batch_id):
    batch_counter["n"] += 1
    print(f"--- Batch {batch_id} ---")
    df.show(3, truncate=120)
    if batch_counter["n"] >= 2:
        raise Exception("stop")

q = step1.writeStream.foreachBatch(show_step1) \
         .option("checkpointLocation", "/tmp/chk_lab4_step1").start()
try:
    q.awaitTermination()
except:
    q.stop()

Step 2 — from_json(): string → struct

from_json() parses JSON text into a struct column — using the schema we provide.
The schema must match the producer’s fields — unknown fields are ignored, missing ones become null.

from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json

tx_schema = StructType([
    StructField("tx_id",     StringType()),
    StructField("user_id",   StringType()),
    StructField("amount",    DoubleType()),
    StructField("store",     StringType()),
    StructField("category",  StringType()),
    StructField("timestamp", StringType()),
])

# Step 2: string → struct (single 'tx' column containing all fields)
step2 = kafka_raw.select(
    from_json(col("value").cast("string"), tx_schema).alias("tx")
)

batch_counter["n"] = 0

def show_step2(df, batch_id):
    batch_counter["n"] += 1
    print(f"--- Batch {batch_id} — schema after from_json ---")
    df.printSchema()
    df.show(3, truncate=False)
    if batch_counter["n"] >= 1:
        raise Exception("stop")

q = step2.writeStream.foreachBatch(show_step2) \
         .option("checkpointLocation", "/tmp/chk_lab4_step2").start()
try:
    q.awaitTermination()
except:
    q.stop()

Step 3 — select("tx.*"): struct → flat columns

select("tx.*") unpacks the struct — each field becomes its own column.
We also convert timestamp from string to TimestampType (required for windows).

from pyspark.sql.functions import to_timestamp

# Step 3: struct → flat columns + timestamp conversion
df = (
    kafka_raw
    .select(from_json(col("value").cast("string"), tx_schema).alias("tx"))
    .select("tx.*")
    .withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"))
)

print("Final schema:")
df.printSchema()

batch_counter["n"] = 0

def show_parsed(df, batch_id):
    batch_counter["n"] += 1
    print(f"--- Batch {batch_id} ---")
    df.show(5, truncate=False)
    if batch_counter["n"] >= 2:
        raise Exception("stop")

q = df.writeStream.foreachBatch(show_parsed) \
      .option("checkpointLocation", "/tmp/chk_lab4_parsed").start()
try:
    q.awaitTermination()
except:
    q.stop()

Part 4: Tumbling Windows + Watermark

From this point the code is identical to Lab 2 (batch) and Lab 3 (rate source).
The source changed — the window logic did not.

Watermark — handling late-arriving events

Watermark tells Spark: “wait at most X time for late data, then close the window”.

Mode When it emits When to use
append only when the window is closed production, writing to Kafka/DB
complete after every micro-batch — the full table debugging

Task 4.1 — 1-minute tumbling windows per store

from pyspark.sql.functions import window, count, sum as _sum, round as _round

windowed = (
    df
    .withWatermark("timestamp", "30 seconds")
    .groupBy(window("timestamp", "1 minute"), "store")
    .agg(
        count("tx_id").alias("tx_count"),
        _round(_sum("amount"), 2).alias("total_amount"),
    )
)

batch_counter["n"] = 0

def show_window(df, batch_id):
    batch_counter["n"] += 1
    print(f"\n=== Batch {batch_id} ===")
    (
        df.select(
            col("window.start").alias("from"),
            col("window.end").alias("to"),
            "store", "tx_count", "total_amount",
        )
        .orderBy("from", "store")
        .show(truncate=False)
    )
    if batch_counter["n"] >= 5:
        raise Exception("stop")

q = (
    windowed.writeStream
    .outputMode("append")
    .foreachBatch(show_window)
    .option("checkpointLocation", "/tmp/chk_lab4_windows")
    .start()
)
try:
    q.awaitTermination()
except:
    q.stop()

Task 4.2 — Windows per category, complete mode

Group by category instead of store and use outputMode("complete").
Observe the difference: results appear after every micro-batch, not only when a window closes.

# YOUR CODE
# windowed_cat = df.withWatermark(...).groupBy(window(...), "category").agg(...)
# outputMode("complete")

Part 5: Writing Alerts to Kafka

Transactions above 3000 are sent to the alerts topic.
Kafka expects the value column as string/bytes — we use to_json() to convert the struct back to JSON.

from pyspark.sql.functions import to_json, struct, lit

alerts = (
    df
    .filter(col("amount") > 3000)
    .select(
        to_json(
            struct(
                "tx_id", "user_id", "amount", "store", "category",
                col("timestamp").cast("string"),
                lit("HIGH").alias("alert_level"),
            )
        ).alias("value")    # Kafka requires a 'value' column
    )
)

alert_query = (
    alerts.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:9092")
    .option("topic", "alerts")
    .option("checkpointLocation", "/tmp/chk_lab4_alerts")
    .outputMode("append")
    .start()
)
print("Alert stream started. Stop manually: alert_query.stop()")

Task 5.1 — Verify the alerts topic in a terminal

kafka/bin/kafka-console-consumer.sh \\
  --bootstrap-server broker:9092 \\
  --topic alerts \\
  --from-beginning

You should see JSON records with the field "alert_level": "HIGH".

Task 5.2 — Review questions

# 1. Why do the first results in 'append' mode appear with a delay?
#    ANSWER:

# 2. What happens if you set the watermark to "0 seconds"?
#    ANSWER:

# 3. What is the difference between window() in Lab 2 (batch) and here (streaming)?
#    ANSWER:

# 4. Why do we use to_json() when writing to Kafka instead of just select("value")?
#    ANSWER:
alert_query.stop()
spark.stop()

Homework

  1. Build a sliding window (2 minutes / 1-minute step) on the Kafka stream per store.
  2. Add a field ratio = amount / 400.0 (approximate average from Lab 2) to the alerts.
  3. What happens to the results when you stop the producer and wait 2 minutes? Why?

Next lab: Decision rules and scoring on a stream.