batch_counter = {"count": 0}
def process_batch(df, batch_id):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)
    if batch_counter["count"] % 5 == 0:
        spark.stop()Zarzadzanie zrodlami danych strumieniowych i segmentacja klientow
🔄 Wprowadzenie
W tym laboratorium zapoznasz sie z roznymi metodami zasilania danych strumieniowych w Apache Spark oraz zastosowaniem prostych transformacji, filtrowania i segmentacji klientow w czasie rzeczywistym.
💡 Pomocnicza funkcja do wyswietlania naszych danych strumieniowych
🔹 rate jako źródło kontrolowanego strumienia
✅ Zadanie 1
- Przygotuj strumien danych z 
format('rate'), ustawrowsPerSecondna 5. - Utworz kolumne 
user_id:expr("concat('u', cast(rand()*100 as int))") - Dodaj kolumne 
event_type:expr("case when rand() > 0.7 then 'purchase' else 'view' end") 
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
spark = SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
rate_df = (spark....)
events = (rate_df....)
    
query = (events.writeStream
         .format("console")
         .foreachBatch(process_batch)
         .start())🔹 Filtrowanie danych bez agregacji (append mode)
Zobacz jak działa poniższy kod i na jego podstawie:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("AppendExample").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Źródło rate - generuje timestamp + value
rate_df = (spark.readStream
           .format("rate")
           .option("rowsPerSecond", 5)
           .load())
# Filtracja bez potrzeby agregacji (bezstanowe przetwarzanie)
filtered = rate_df.filter(col("value") % 2 == 0) \
                  .withColumn("info", expr("concat('even:', value)"))
# outputMode = append → pokazuje tylko nowe wiersze, bez stanu
query = (filtered.writeStream 
    .outputMode("append") 
    .format("console") 
    .option("truncate", False) 
    .foreachBatch(process_batch)
    .start()
        )- Skorzystaj z danych z poprzedniego zadania.
 - Wyfiltruj tylko 
purchase. 
purchases = events....
query = (purchases.writeStream
         .format("console")
         .outputMode("append")
         .foreachBatch(process_batch)
         .start())🔹 Źródło plikowe (JSON)
✅ Generator danych:
%%file generator.py
# generator.py
import json, os, random, time
from datetime import datetime, timedelta
output_dir = "data/stream"
os.makedirs(output_dir, exist_ok=True)
event_types = ["view", "cart", "purchase"]
categories = ["electronics", "books", "fashion", "home", "sports"]
def generate_event():
    return {
        "user_id": f"u{random.randint(1, 50)}",
        "event_type": random.choices(event_types, weights=[0.6, 0.25, 0.15])[0],
        "timestamp": (datetime.utcnow() - timedelta(seconds=random.randint(0, 300))).isoformat(),
        "product_id": f"p{random.randint(100, 120)}",
        "category": random.choice(categories),
        "price": round(random.uniform(10, 1000), 2)
    }
# Simulate file-based streaming
while True:
    batch = [generate_event() for _ in range(50)]
    filename = f"{output_dir}/events_{int(time.time())}.json"
    with open(filename, "w") as f:
        for e in batch:
            f.write(json.dumps(e) + "\n")
    print(f"Wrote: {filename}")
    time.sleep(5)✅ Schemat danych:
{
  "user_id": "u123",
  "event_type": "purchase", // albo "view", "cart", "click"
  "timestamp": "2025-05-09T15:24:00Z",
  "product_id": "p456",
  "category": "electronics",
  "price": 299.99
}- Utwórz zmienną 
schema, która zrealizuje schamat danych naszej ramki. WykorzystajStringType(),TimestampType(),DoubleType() 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("RealTimeEcommerce").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# StringType(), TimestampType(), DoubleType()
schema = ...✅ Odczyt danych z katalogu:
stream = (spark.readStream
          .schema(schema)
          .json("data/stream"))
query = (stream.writeStream
         .format("console")
         .foreachBatch(process_batch)
         .start())🔹 Bezstanowe zliczanie zdarzen
- Przygotuj zmienną agg1 zliczającą zdarzenia należące do danej grupy 
event_type. 
agg1 = (stream....)
# pamietaj, że agregacje wymagają opcji complete
query = (agg1
         .writeStream
         .outputMode("complete")
         .format("console")
         .foreachBatch(process_batch)
         .start()
        )🔹 Agregacja w oknach czasowych
withWatermark("timestamp", "1 minute")
💡 Do czego służy: Informuje Sparka, że dane przychodzą z opóźnieniem i należy je przetwarzać tylko do określonego limitu wstecz (tutaj: 1 minuta).
🚨 Dlaczego ważne: Bez watermarku Spark trzymałby w pamięci wszystkie dane, by móc je jeszcze pogrupować. Watermark pozwala zwolnić pamięć.
- Pogrupuj typy zdarzen w thumbling window, w oknie co 5 minut
 - dodaj watermark z ustawieniem na 1 minutę.
 
windowed = (stream...)
query = (
    windowed.writeStream
    .outputMode("append")
    .foreachBatch(process_batch)
    .format("console")
    .start()
)- Zmień thumbling window na sliding window z szerokością okna 5 minut i startem nowego okna co 1 minutę.
 
windowed = (stream...)
query = (
    windowed.writeStream
    .outputMode("append")
    .foreachBatch(process_batch)
    .format("console")
    .start()
)🔹 Segmentacja klientow
🧩 Logika segmentacji:
- jeśli był purchase → “Buyer”
 - jeśli był cart, ale nie purchase → “Cart abandoner”
 - jeśli tylko view → “Lurker”
 
groupBy(window(...), "user_id")
💡 Do czego służy: Grupujemy dane per użytkownik w konkretnym przedziale czasu (oknie 5-minutowym).
⏱️ window(“timestamp”, “5 minutes”): Funkcja okna czasowego – każda grupa będzie dotyczyć jednego użytkownika w konkretnym 5-minutowym interwale.
agg(collect_set("event_type"))
💡 Do czego służy: Zbiera wszystkie typy zdarzeń (view, cart, purchase) danego użytkownika w danym oknie.
🧠 Dlaczego collect_set a nie collect_list?: collect_set usuwa duplikaty — interesuje nas tylko czy coś się zdarzyło, a nie ile razy.
- withColumn(… expr(…))
 
💡 Do czego służy: Na podstawie zbioru zdarzeń określamy segment użytkownika.
🔎 array_contains: Funkcja sprawdzająca, czy dany typ zdarzenia znajduje się w tablicy.
🧠 Co warto wiedzieć:
- Segmentacja to klasyczne zastosowanie agregacji i transformacji strumienia.
 - Łączenie window + watermark jest kluczowe do kontroli stanu.
 - collect_set umożliwia prostą analizę zachowań, bez potrzeby przechowywania surowych danych.
 - expr() daje elastyczność, by używać składni SQL wewnątrz kodu DataFrame.
 
⚙️ Porownanie trybow outputMode
| outputMode | Opis | Kiedy uzywac | Wymagania | 
|---|---|---|---|
append | 
Wypisywane sa tylko nowe wiersze | Filtrowanie, wzbogacanie bez agregacji | Nie dziala z groupBy | 
update | 
Wypisywane sa tylko zmienione wiersze | Agregacje z watermarkami | Wymaga zarzadzania stanem | 
complete | 
Wypisywane jest calosciowe podsumowanie | Podsumowania okien, snapshoty | Moze byc kosztowne |