= {"count": 0}
batch_counter
def process_batch(df, batch_id):
"count"] += 1
batch_counter[print(f"Batch ID: {batch_id}")
=False)
df.show(truncateif batch_counter["count"] % 5 == 0:
spark.stop()
Zarzadzanie zrodlami danych strumieniowych i segmentacja klientow
🔄 Wprowadzenie
W tym laboratorium zapoznasz sie z roznymi metodami zasilania danych strumieniowych w Apache Spark oraz zastosowaniem prostych transformacji, filtrowania i segmentacji klientow w czasie rzeczywistym.
💡 Pomocnicza funkcja do wyswietlania naszych danych strumieniowych
🔹 rate jako źródło kontrolowanego strumienia
✅ Zadanie 1
- Przygotuj strumien danych z
format('rate')
, ustawrowsPerSecond
na 5. - Utworz kolumne
user_id
:expr("concat('u', cast(rand()*100 as int))")
- Dodaj kolumne
event_type
:expr("case when rand() > 0.7 then 'purchase' else 'view' end")
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
= SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark "WARN")
spark.sparkContext.setLogLevel(
= (spark....)
rate_df
= (rate_df....)
events
= (events.writeStream
query format("console")
.
.foreachBatch(process_batch) .start())
🔹 Filtrowanie danych bez agregacji (append mode)
Zobacz jak działa poniższy kod i na jego podstawie:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
= SparkSession.builder.appName("AppendExample").getOrCreate()
spark "ERROR")
spark.sparkContext.setLogLevel(
# Źródło rate - generuje timestamp + value
= (spark.readStream
rate_df format("rate")
."rowsPerSecond", 5)
.option(
.load())
# Filtracja bez potrzeby agregacji (bezstanowe przetwarzanie)
= rate_df.filter(col("value") % 2 == 0) \
filtered "info", expr("concat('even:', value)"))
.withColumn(
# outputMode = append → pokazuje tylko nowe wiersze, bez stanu
= (filtered.writeStream
query "append")
.outputMode(format("console")
."truncate", False)
.option(
.foreachBatch(process_batch)
.start() )
- Skorzystaj z danych z poprzedniego zadania.
- Wyfiltruj tylko
purchase
.
= events....
purchases
= (purchases.writeStream
query format("console")
."append")
.outputMode(
.foreachBatch(process_batch) .start())
🔹 Źródło plikowe (JSON)
✅ Generator danych:
%%file generator.py
# generator.py
import json, os, random, time
from datetime import datetime, timedelta
= "data/stream"
output_dir =True)
os.makedirs(output_dir, exist_ok
= ["view", "cart", "purchase"]
event_types = ["electronics", "books", "fashion", "home", "sports"]
categories
def generate_event():
return {
"user_id": f"u{random.randint(1, 50)}",
"event_type": random.choices(event_types, weights=[0.6, 0.25, 0.15])[0],
"timestamp": (datetime.utcnow() - timedelta(seconds=random.randint(0, 300))).isoformat(),
"product_id": f"p{random.randint(100, 120)}",
"category": random.choice(categories),
"price": round(random.uniform(10, 1000), 2)
}
# Simulate file-based streaming
while True:
= [generate_event() for _ in range(50)]
batch = f"{output_dir}/events_{int(time.time())}.json"
filename with open(filename, "w") as f:
for e in batch:
+ "\n")
f.write(json.dumps(e) print(f"Wrote: {filename}")
5) time.sleep(
✅ Schemat danych:
{
"user_id": "u123",
"event_type": "purchase", // albo "view", "cart", "click"
"timestamp": "2025-05-09T15:24:00Z",
"product_id": "p456",
"category": "electronics",
"price": 299.99
}
- Utwórz zmienną
schema
, która zrealizuje schamat danych naszej ramki. WykorzystajStringType()
,TimestampType()
,DoubleType()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
= SparkSession.builder.appName("RealTimeEcommerce").getOrCreate()
spark "ERROR")
spark.sparkContext.setLogLevel(
# StringType(), TimestampType(), DoubleType()
= ... schema
✅ Odczyt danych z katalogu:
= (spark.readStream
stream
.schema(schema)"data/stream"))
.json(
= (stream.writeStream
query format("console")
.
.foreachBatch(process_batch) .start())
🔹 Bezstanowe zliczanie zdarzen
- Przygotuj zmienną agg1 zliczającą zdarzenia należące do danej grupy
event_type
.
= (stream....)
agg1
# pamietaj, że agregacje wymagają opcji complete
= (agg1
query
.writeStream"complete")
.outputMode(format("console")
.
.foreachBatch(process_batch)
.start() )
🔹 Agregacja w oknach czasowych
withWatermark("timestamp", "1 minute")
💡 Do czego służy: Informuje Sparka, że dane przychodzą z opóźnieniem i należy je przetwarzać tylko do określonego limitu wstecz (tutaj: 1 minuta).
🚨 Dlaczego ważne: Bez watermarku Spark trzymałby w pamięci wszystkie dane, by móc je jeszcze pogrupować. Watermark pozwala zwolnić pamięć.
- Pogrupuj typy zdarzen w thumbling window, w oknie co 5 minut
- dodaj watermark z ustawieniem na 1 minutę.
= (stream...)
windowed
= (
query
windowed.writeStream"append")
.outputMode(
.foreachBatch(process_batch)format("console")
.
.start() )
- Zmień thumbling window na sliding window z szerokością okna 5 minut i startem nowego okna co 1 minutę.
= (stream...)
windowed
= (
query
windowed.writeStream"append")
.outputMode(
.foreachBatch(process_batch)format("console")
.
.start() )
🔹 Segmentacja klientow
🧩 Logika segmentacji:
- jeśli był purchase → “Buyer”
- jeśli był cart, ale nie purchase → “Cart abandoner”
- jeśli tylko view → “Lurker”
groupBy(window(...), "user_id")
💡 Do czego służy: Grupujemy dane per użytkownik w konkretnym przedziale czasu (oknie 5-minutowym).
⏱️ window(“timestamp”, “5 minutes”): Funkcja okna czasowego – każda grupa będzie dotyczyć jednego użytkownika w konkretnym 5-minutowym interwale.
agg(collect_set("event_type"))
💡 Do czego służy: Zbiera wszystkie typy zdarzeń (view, cart, purchase) danego użytkownika w danym oknie.
🧠 Dlaczego collect_set
a nie collect_list
?: collect_set usuwa duplikaty — interesuje nas tylko czy coś się zdarzyło, a nie ile razy.
- withColumn(… expr(…))
💡 Do czego służy: Na podstawie zbioru zdarzeń określamy segment użytkownika.
🔎 array_contains
: Funkcja sprawdzająca, czy dany typ zdarzenia znajduje się w tablicy.
🧠 Co warto wiedzieć:
- Segmentacja to klasyczne zastosowanie agregacji i transformacji strumienia.
- Łączenie window + watermark jest kluczowe do kontroli stanu.
- collect_set umożliwia prostą analizę zachowań, bez potrzeby przechowywania surowych danych.
- expr() daje elastyczność, by używać składni SQL wewnątrz kodu DataFrame.
⚙️ Porownanie trybow outputMode
outputMode | Opis | Kiedy uzywac | Wymagania |
---|---|---|---|
append |
Wypisywane sa tylko nowe wiersze | Filtrowanie, wzbogacanie bez agregacji | Nie dziala z groupBy |
update |
Wypisywane sa tylko zmienione wiersze | Agregacje z watermarkami | Wymaga zarzadzania stanem |
complete |
Wypisywane jest calosciowe podsumowanie | Podsumowania okien, snapshoty | Moze byc kosztowne |