Lab 3: Spark Structured Streaming — Windows on a Live Stream

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("lab3").getOrCreate()
spark
temperature_sensor = (
    (12.5, "2019-01-02 12:00:00"),
    (17.6, "2019-01-02 12:00:20"),
    (14.6, "2019-01-02 12:00:30"),
    (22.9, "2019-01-02 12:01:15"),
    (17.4, "2019-01-02 12:01:30"),
    (25.8, "2019-01-02 12:03:25"),
    (27.1, "2019-01-02 12:02:40"),
)
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

schema = StructType([
    StructField("temperature", DoubleType(), True),
    StructField("time",        StringType(), True),
])
df = (
    spark.createDataFrame(temperature_sensor, schema=schema)
         .withColumn("time", to_timestamp("time"))
)
df.printSchema()
df.show(3)
df.createOrReplaceTempView("df")
spark.sql("SELECT time, temperature FROM df WHERE temperature > 21").show(5)
# Tumbling window on batch data
import pyspark.sql.functions as F

df2 = df.groupBy(F.window("time", "30 seconds")).count()
df2.show(truncate=False)
spark.stop()

Streaming Sources in Spark Structured Streaming

Spark Structured Streaming can ingest data from various sources. The most common:

rate — test source

Automatically generates rows at a fixed rate. Each row contains: - timestamp — event time - value — monotonically increasing counter (0, 1, 2, …)

Used to test streaming logic without any external dependency.

df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

Other sources

Socket (nc -lk 9999) — listens on a TCP port; for testing only.

File — watches a directory and treats new files as a stream; supports CSV, JSON, Parquet.

Kafka — reads from Apache Kafka topics (broker version ≥ 0.10.0).


Output Modes

outputMode controls what Spark writes after each micro-batch:

Mode What is written When to use
append Only new rows added in this micro-batch Most common; stateless ops
update Only rows whose aggregate value changed Streaming aggregations
complete The entire result table after every micro-batch Debugging; requires groupBy

Let’s create our first streaming DataFrame using the rate source.

%%file streamrate.py
## Run with: spark-submit streamrate.py

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = (
    spark.readStream
         .format("rate")
         .option("rowsPerSecond", 1)
         .load()
)

query = (
    df.writeStream
      .format("console")
      .outputMode("append")
      .option("truncate", False)
      .start()
)

query.awaitTermination()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id >= tstop:
        raise Exception("stop")

df = (
    spark.readStream
         .format("rate")
         .option("rowsPerSecond", 1)
         .load()
)

query = (
    df.writeStream
      .outputMode("append")
      .foreachBatch(process_batch)
      .option("truncate", False)
      .start()
)
try:
    query.awaitTermination()
except:
    query.stop()

Simulator

Let’s make the generated data more interesting.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id >= tstop:
        raise Exception("stop")

df = (
    spark.readStream
         .format("rate")
         .option("rowsPerSecond", 1)
         .load()
)

stream = (
    df.withColumn("time",        col("timestamp"))
      .withColumn("temperature", expr("20 + rand() * 10"))
      .select("time", "temperature")
)

query = (
    stream.writeStream
          .outputMode("append")
          .foreachBatch(process_batch)
          .option("truncate", False)
          .start()
)
try:
    query.awaitTermination()
except:
    query.stop()

1. Stateless Transformation

Any operation that does not depend on historical data — e.g. filtering, column derivation.

✅ Example: filter temperatures above 28 °C

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=5):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id >= tstop:
        raise Exception("stop")

df = (
    spark.readStream
         .format("rate")
         .option("rowsPerSecond", 1)
         .load()
)

stream = (
    df.withColumn("time",        col("timestamp"))
      .withColumn("temperature", expr("20 + rand() * 10"))
      .select("time", "temperature")
)

stream_filtered = stream.filter(col("temperature") > 28)

query = (
    stream_filtered.writeStream
                   .outputMode("append")
                   .foreachBatch(process_batch)
                   .option("truncate", False)
                   .start()
)
try:
    query.awaitTermination()
except:
    query.stop()

2. Simple Stateless Anomaly Detector

Let’s flag temperatures above 29.5 °C as anomalies by adding an anomaly column.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=10):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id >= tstop:
        raise Exception("stop")

df = (
    spark.readStream
         .format("rate")
         .option("rowsPerSecond", 1)
         .load()
)

stream = (
    df.withColumn("time",        col("timestamp"))
      .withColumn("temperature", expr("20 + rand() * 10"))
      .select("time", "temperature")
)

stream_anomaly = stream.withColumn(
    "anomaly",
    when(col("temperature") > 29.5, "YES").otherwise("NO")
)

query = (
    stream_anomaly.writeStream
                  .outputMode("append")
                  .foreachBatch(process_batch)
                  .option("truncate", False)
                  .start()
)
try:
    query.awaitTermination()
except:
    query.stop()

This is also stateless — only the current value is evaluated.

3. Tumbling Window

Groups data into fixed, non-overlapping intervals of e.g. 10 seconds.

⚠️ Note the output mode change to complete — required for streaming aggregations.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, window

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=30):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id >= tstop:
        raise Exception("stop")

df = (
    spark.readStream
         .format("rate")
         .option("rowsPerSecond", 1)
         .load()
)

stream = (
    df.withColumn("time",        col("timestamp"))
      .withColumn("temperature", expr("20 + rand() * 10").cast("double"))
      .select("time", "temperature")
)

tumbling_window = (
    stream
    .groupBy(window(col("time"), "10 seconds"))
    .avg("temperature")
)

query = (
    tumbling_window.writeStream
                   .outputMode("complete")      # full table after every micro-batch
                   .foreachBatch(process_batch)
                   .option("truncate", False)
                   .start()
)
try:
    query.awaitTermination()
except:
    query.stop()

4. Sliding Window

The same event can fall into multiple overlapping windows.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, window

spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

def process_batch(df, batch_id, tstop=30):
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id >= tstop:
        raise Exception("stop")

df = (
    spark.readStream
         .format("rate")
         .option("rowsPerSecond", 1)
         .load()
)

stream = (
    df.withColumn("time",        col("timestamp"))
      .withColumn("temperature", expr("20 + rand() * 10").cast("double"))
      .select("time", "temperature")
)

sliding = (
    stream
    .groupBy(window(col("time"), "10 seconds", "5 seconds"))  # 10s window, 5s step
    .avg("temperature")
)

query = (
    sliding.writeStream
           .outputMode("complete")
           .foreachBatch(process_batch)
           .option("truncate", False)
           .start()
)
try:
    query.awaitTermination()
except:
    query.stop()

File Source (JSON)

Data generator

%%file generator.py
# generator.py  —  run in a separate terminal: python generator.py
import json, os, random, time
from datetime import datetime

output_dir = "data/stream"
os.makedirs(output_dir, exist_ok=True)

stores     = ['Warsaw', 'Krakow', 'Gdansk', 'Wroclaw']
categories = ['electronics', 'clothing', 'food', 'books']

def generate_transaction():
    return {
        'tx_id':     f'TX{random.randint(1000, 9999)}',
        'user_id':   f'u{random.randint(1, 20):02d}',
        'amount':    round(random.uniform(5.0, 5000.0), 2),
        'store':     random.choice(stores),
        'category':  random.choice(categories),
        'timestamp': datetime.now().isoformat(),
    }

while True:
    batch    = [generate_transaction() for _ in range(2)]
    filename = f"{output_dir}/events_{int(time.time())}.json"
    with open(filename, "w") as f:
        for e in batch:
            f.write(json.dumps(e) + "\n")
    print(f"Wrote: {filename}")
    time.sleep(5)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("jsonDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

tx_schema = StructType([
    StructField("tx_id",     StringType()),
    StructField("user_id",   StringType()),
    StructField("amount",    DoubleType()),
    StructField("store",     StringType()),
    StructField("category",  StringType()),
    StructField("timestamp", StringType()),
])

batch_counter = {"count": 0}

def process_batch(df, batch_id, tstop=5):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_id >= tstop:
        raise Exception("stop")

# In a terminal: python generator.py
stream = (
    spark.readStream
         .schema(tx_schema)
         .json("data/stream")
)

query = (
    stream.writeStream
          .foreachBatch(process_batch)
          .start()
)
try:
    query.awaitTermination()
except:
    query.stop()