Lab 2: Apache Spark — Batch Analytics on Transaction Data

Why Spark?

In Lab 1 you built a consumer that counts transactions per store. It works — but imagine:

  • You need to know how many transactions happened per hour (not all-time totals)
  • You have not 50, but 10 million transactions per day
  • Your consumer crashes — you lose all state (counters), and have to start over

For tasks like these we use Apache Spark — a distributed computation engine that: - processes data in parallel (across multiple cores / machines) - has built-in time windows (window) - works on both files (batch) and live streams (streaming)

Plan

  1. Load 10,000 transactions from a file
  2. Basic aggregations — familiar groupBy
  3. Tumbling windows — counts per hour / per 30 minutes
  4. Sliding windows — overlapping intervals
  5. Preview: the same queries on a live Kafka stream

Part 1: Load the Data

SparkSession — the entry point

Before anything else, create a SparkSession. Think of it as a database connection — one object for the entire session.

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Lab2-Transactions")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
print(f"Spark {spark.version} — ready")

Load the JSON file

The data is in data/transactions_10k.jsonl — 10,000 e-commerce transactions from 08:00–11:00. Each line is one JSON record — the same format as Kafka events in Lab 1.

df = spark.read.json("data/transactions_10k.jsonl")

print(f"Record count: {df.count()}")
df.printSchema()
df.show(10, truncate=False)

Convert the timestamp column

Spark loaded timestamp as a string. To use time windows we need to convert it to TimestampType.

Think of it as changing a cell format in Excel from “text” to “date”.

from pyspark.sql.functions import to_timestamp, col

df = df.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))

df.printSchema()  # timestamp should now be 'timestamp (nullable = true)'

Part 2: Basic Aggregations

Before we get to windows, let’s warm up with groupBy — works just like SQL GROUP BY and Pandas.

Task 2.1 — Transaction count and total revenue per store

from pyspark.sql.functions import count, sum as _sum, avg, round as _round

store_summary = (
    df.groupBy("store")
    .agg(
        count("tx_id").alias("tx_count"),
        _round(_sum("amount"), 2).alias("total_PLN"),
        _round(avg("amount"), 2).alias("avg_PLN"),
    )
    .orderBy("store")
)
store_summary.show()

Task 2.2 — Stats per category

Count transactions, total revenue, min and max amount for each category.

from pyspark.sql.functions import min as _min, max as _max

# YOUR CODE
# df.groupBy("category").agg(...).orderBy("category").show()

Part 3: Tumbling Windows

What is a window?

The groupBy("store") above counted all records together — regardless of when a transaction occurred.

A window splits the time axis into intervals and groups data separately within each.

Tumbling window — non-overlapping, back-to-back intervals:

08:00 ──────── 09:00 ──────── 10:00 ──────── 11:00
  [ window 1 ]  [ window 2 ]  [ window 3 ]

Each transaction falls into exactly one window.

Task 3.1 — Transaction count per hour (tumbling 1h)

from pyspark.sql.functions import window

hourly = (
    df.groupBy(window("timestamp", "1 hour"))    # 1-hour tumbling window
    .agg(
        count("tx_id").alias("tx_count"),
        _round(_sum("amount"), 2).alias("total_PLN"),
    )
    .orderBy("window")
)
hourly.show(truncate=False)

The window column is a struct with start and end fields. We can extract them for cleaner output:

(
    hourly
    .select(
        col("window.start").alias("from"),
        col("window.end").alias("to"),
        "tx_count",
        "total_PLN",
    )
    .show(truncate=False)
)

Task 3.2 — 30-minute windows per store

Count transactions and total revenue per store in each 30-minute window. Sort by window, then by store within each window.

# YOUR CODE
# df.groupBy(window("timestamp", "30 minutes"), "store").agg(...).orderBy(...).show()

Task 3.3 — Which hour had the highest revenue for store “Kraków”?

Filter by store first, then apply an hourly window, sort descending by total.

from pyspark.sql.functions import desc

# YOUR CODE

Part 4: Sliding Windows

Sliding window — intervals overlap:

08:00 ────────────────────────────────── 11:00
  [──── 1h ────]
       [──── 1h ────]
            [──── 1h ────]

Two parameters: windowDuration (width) and slideDuration (step size).

Note: one transaction can appear in multiple windows — that is expected.

Task 4.1 — 1h window, 30-minute step

sliding = (
    df.groupBy(window("timestamp", "1 hour", "30 minutes"))  # 1h width, 30min step
    .agg(
        count("tx_id").alias("tx_count"),
        _round(_sum("amount"), 2).alias("total_PLN"),
    )
    .select(
        col("window.start").alias("from"),
        col("window.end").alias("to"),
        "tx_count",
        "total_PLN",
    )
    .orderBy("from")
)
sliding.show(truncate=False)

Task 4.2 — Compare tumbling vs sliding

Count the total number of result rows in both approaches. Why does sliding produce more rows?

tumbling_rows = (
    df.groupBy(window("timestamp", "1 hour"))
    .agg(count("tx_id"))
    .count()
)
sliding_rows = (
    df.groupBy(window("timestamp", "1 hour", "30 minutes"))
    .agg(count("tx_id"))
    .count()
)
print(f"Tumbling (1h):         {tumbling_rows} windows")
print(f"Sliding  (1h / 30min): {sliding_rows} windows")

# Answer in a comment: why does sliding produce more rows?
# YOUR ANSWER:

Part 5: Review Questions

# Answer in comments:

# 1. How many transactions are in the 09:00–10:00 window?
#    Check the result from Task 3.1.
#    ANSWER:

# 2. What is the difference between groupBy("store") and groupBy(window(...), "store")?
#    ANSWER:

# 3. In the sliding window (1h / 30min), how many windows contain transactions from 09:30?
#    Hint: draw a timeline.
#    ANSWER:

Preview: Lab 3

Today you applied windows to a file (historical batch data).

In the next lab the same code will work on a live Kafka stream — the only change is one line:

# Batch (today):
df = spark.read.json("data/transactions_10k.jsonl")

# Streaming (Lab 3):
df = spark.readStream.format("kafka").option(...).load()

The window() aggregations stay exactly the same.

Homework

  1. Find the hour in which store Gdańsk had the lowest average transaction amount.
  2. Count how many transactions per category occurred in the 09:00–09:30 window.
  3. Use a 15-minute window and find which quarter-hour had the peak transaction volume (all stores combined).

Next lab: The same window() on a live Kafka stream — watermarks and handling late-arriving events.

spark.stop()