batch_counter = {"count": 0}
def process_batch(df, batch_id):
batch_counter["count"] += 1
print(f"Batch ID: {batch_id}")
df.show(truncate=False)
if batch_counter["count"] % 5 == 0:
spark.stop()Zarzadzanie zrodlami danych strumieniowych i segmentacja klientow
🔄 Wprowadzenie
W tym laboratorium zapoznasz sie z roznymi metodami zasilania danych strumieniowych w Apache Spark oraz zastosowaniem prostych transformacji, filtrowania i segmentacji klientow w czasie rzeczywistym.
💡 Pomocnicza funkcja do wyswietlania naszych danych strumieniowych
🔹 rate jako źródło kontrolowanego strumienia
✅ Zadanie 1
- Przygotuj strumien danych z
format('rate'), ustawrowsPerSecondna 5. - Utworz kolumne
user_id:expr("concat('u', cast(rand()*100 as int))") - Dodaj kolumne
event_type:expr("case when rand() > 0.7 then 'purchase' else 'view' end")
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
rate_df = (spark....)
events = (rate_df....)
query = (events.writeStream
.format("console")
.foreachBatch(process_batch)
.start())🔹 Filtrowanie danych bez agregacji (append mode)
Zobacz jak działa poniższy kod i na jego podstawie:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("AppendExample").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Źródło rate - generuje timestamp + value
rate_df = (spark.readStream
.format("rate")
.option("rowsPerSecond", 5)
.load())
# Filtracja bez potrzeby agregacji (bezstanowe przetwarzanie)
filtered = rate_df.filter(col("value") % 2 == 0) \
.withColumn("info", expr("concat('even:', value)"))
# outputMode = append → pokazuje tylko nowe wiersze, bez stanu
query = (filtered.writeStream
.outputMode("append")
.format("console")
.option("truncate", False)
.foreachBatch(process_batch)
.start()
)- Skorzystaj z danych z poprzedniego zadania.
- Wyfiltruj tylko
purchase.
purchases = events....
query = (purchases.writeStream
.format("console")
.outputMode("append")
.foreachBatch(process_batch)
.start())🔹 Źródło plikowe (JSON)
✅ Generator danych:
%%file generator.py
# generator.py
import json, os, random, time
from datetime import datetime, timedelta
output_dir = "data/stream"
os.makedirs(output_dir, exist_ok=True)
event_types = ["view", "cart", "purchase"]
categories = ["electronics", "books", "fashion", "home", "sports"]
def generate_event():
return {
"user_id": f"u{random.randint(1, 50)}",
"event_type": random.choices(event_types, weights=[0.6, 0.25, 0.15])[0],
"timestamp": (datetime.utcnow() - timedelta(seconds=random.randint(0, 300))).isoformat(),
"product_id": f"p{random.randint(100, 120)}",
"category": random.choice(categories),
"price": round(random.uniform(10, 1000), 2)
}
# Simulate file-based streaming
while True:
batch = [generate_event() for _ in range(50)]
filename = f"{output_dir}/events_{int(time.time())}.json"
with open(filename, "w") as f:
for e in batch:
f.write(json.dumps(e) + "\n")
print(f"Wrote: {filename}")
time.sleep(5)✅ Schemat danych:
{
"user_id": "u123",
"event_type": "purchase", // albo "view", "cart", "click"
"timestamp": "2025-05-09T15:24:00Z",
"product_id": "p456",
"category": "electronics",
"price": 299.99
}- Utwórz zmienną
schema, która zrealizuje schamat danych naszej ramki. WykorzystajStringType(),TimestampType(),DoubleType()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("RealTimeEcommerce").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# StringType(), TimestampType(), DoubleType()
schema = ...✅ Odczyt danych z katalogu:
stream = (spark.readStream
.schema(schema)
.json("data/stream"))
query = (stream.writeStream
.format("console")
.foreachBatch(process_batch)
.start())🔹 Bezstanowe zliczanie zdarzen
- Przygotuj zmienną agg1 zliczającą zdarzenia należące do danej grupy
event_type.
agg1 = (stream....)
# pamietaj, że agregacje wymagają opcji complete
query = (agg1
.writeStream
.outputMode("complete")
.format("console")
.foreachBatch(process_batch)
.start()
)🔹 Agregacja w oknach czasowych
withWatermark("timestamp", "1 minute")
💡 Do czego służy: Informuje Sparka, że dane przychodzą z opóźnieniem i należy je przetwarzać tylko do określonego limitu wstecz (tutaj: 1 minuta).
🚨 Dlaczego ważne: Bez watermarku Spark trzymałby w pamięci wszystkie dane, by móc je jeszcze pogrupować. Watermark pozwala zwolnić pamięć.
- Pogrupuj typy zdarzen w thumbling window, w oknie co 5 minut
- dodaj watermark z ustawieniem na 1 minutę.
windowed = (stream...)
query = (
windowed.writeStream
.outputMode("append")
.foreachBatch(process_batch)
.format("console")
.start()
)- Zmień thumbling window na sliding window z szerokością okna 5 minut i startem nowego okna co 1 minutę.
windowed = (stream...)
query = (
windowed.writeStream
.outputMode("append")
.foreachBatch(process_batch)
.format("console")
.start()
)🔹 Segmentacja klientow
🧩 Logika segmentacji:
- jeśli był purchase → “Buyer”
- jeśli był cart, ale nie purchase → “Cart abandoner”
- jeśli tylko view → “Lurker”
groupBy(window(...), "user_id")
💡 Do czego służy: Grupujemy dane per użytkownik w konkretnym przedziale czasu (oknie 5-minutowym).
⏱️ window(“timestamp”, “5 minutes”): Funkcja okna czasowego – każda grupa będzie dotyczyć jednego użytkownika w konkretnym 5-minutowym interwale.
agg(collect_set("event_type"))
💡 Do czego służy: Zbiera wszystkie typy zdarzeń (view, cart, purchase) danego użytkownika w danym oknie.
🧠 Dlaczego collect_set a nie collect_list?: collect_set usuwa duplikaty — interesuje nas tylko czy coś się zdarzyło, a nie ile razy.
- withColumn(… expr(…))
💡 Do czego służy: Na podstawie zbioru zdarzeń określamy segment użytkownika.
🔎 array_contains: Funkcja sprawdzająca, czy dany typ zdarzenia znajduje się w tablicy.
🧠 Co warto wiedzieć:
- Segmentacja to klasyczne zastosowanie agregacji i transformacji strumienia.
- Łączenie window + watermark jest kluczowe do kontroli stanu.
- collect_set umożliwia prostą analizę zachowań, bez potrzeby przechowywania surowych danych.
- expr() daje elastyczność, by używać składni SQL wewnątrz kodu DataFrame.
⚙️ Porownanie trybow outputMode
| outputMode | Opis | Kiedy uzywac | Wymagania |
|---|---|---|---|
append |
Wypisywane sa tylko nowe wiersze | Filtrowanie, wzbogacanie bez agregacji | Nie dziala z groupBy |
update |
Wypisywane sa tylko zmienione wiersze | Agregacje z watermarkami | Wymaga zarzadzania stanem |
complete |
Wypisywane jest calosciowe podsumowanie | Podsumowania okien, snapshoty | Moze byc kosztowne |