Wykład 5 — Apache Spark i Structured Streaming

Analiza danych w czasie rzeczywistym

Apache Spark jako silnik rozproszonego przetwarzania, Structured Streaming, integracja Spark–Kafka, kompletny pipeline real-time analytics.
Note Czas trwania: 1,5h

Cel wykładu: Poznanie Apache Spark jako silnika do rozproszonego przetwarzania danych. Wprowadzenie do Structured Streaming i integracji Spark–Kafka. Przegląd kompletnego pipeline’u real-time analytics.


1 Dlaczego Spark?

Na poprzednich wykładach poznaliśmy Kafkę — system, który transportuje strumienie danych. Ale Kafka sama w sobie nie wykonuje złożonych analiz. Do tego potrzebujemy silnika przetwarzania — i tu wchodzi Apache Spark.

Spark to silnik do rozproszonego przetwarzania danych, który obsługuje zarówno tryb wsadowy (batch), jak i strumieniowy (streaming). Powstał na UC Berkeley w 2009 roku jako odpowiedź na ograniczenia Hadoop MapReduce — głównie na jego powolność wynikającą z ciągłego zapisu na dysk.

Tip Przetwarzanie w pamięci

Dane trzymane w RAM, nie na dysku. Nawet 100x szybciej niż MapReduce.

Tip Jeden silnik, wiele trybów

Batch, streaming, SQL, ML, grafy — wszystko w jednym frameworku.

Tip API w wielu językach

PySpark, Scala, Java, R.

Tip Lazy evaluation

Spark buduje plan wykonania i optymalizuje go zanim cokolwiek obliczy.

flowchart TB
    subgraph "Apache Spark"
        CORE["Spark Core\n(RDD, zarządzanie pamięcią)"]
        SQL["Spark SQL\n& DataFrames"]
        SS["Structured\nStreaming"]
        ML["MLlib\n(Machine Learning)"]
        GR["GraphX\n(Grafy)"]
    end
    SQL --> CORE
    SS --> CORE
    ML --> CORE
    GR --> CORE

    style CORE fill:#FF9800,color:#fff
    style SQL fill:#2196F3,color:#fff
    style SS fill:#F44336,color:#fff
    style ML fill:#4CAF50,color:#fff
    style GR fill:#9C27B0,color:#fff

Ekosystem Apache Spark


2 PySpark — podstawy

2.1 SparkSession — punkt wejścia

Każdy program PySpark zaczyna się od utworzenia SparkSession:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MojaPierwszaAplikacja") \
    .master("local[*]") \
    .getOrCreate()

local[*] oznacza: uruchom Sparka lokalnie, używając wszystkich dostępnych rdzeni procesora. W produkcji zamiast tego podaje się adres klastra.

2.2 DataFrame API

Spark DataFrame to odpowiednik tabeli SQL lub DataFrame z Pandas — ale rozproszony na wielu maszynach.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count

spark = SparkSession.builder.appName("Demo").master("local[*]").getOrCreate()

# Tworzenie DataFrame
dane = [
    ("Warszawa", "Elektronika", 4299.00),
    ("Kraków", "Odzież", 189.99),
    ("Warszawa", "Żywność", 87.50),
    ("Gdańsk", "Elektronika", 2599.00),
    ("Kraków", "Elektronika", 1299.00),
    ("Warszawa", "Odzież", 349.99),
    ("Gdańsk", "Żywność", 156.00),
]

df = spark.createDataFrame(dane, ["miasto", "kategoria", "kwota"])
df.show()

2.3 Transformacje i akcje

Spark rozróżnia dwa typy operacji:

Definiują co chcesz zrobić, ale nie wykonują obliczeń. Są leniwe (lazy).

Przykłady: filter, groupBy, select, join.

# Transformacje (lazy — nic się jeszcze nie liczy)
wynik = df \
    .filter(col("kwota") > 100) \
    .groupBy("miasto") \
    .agg(
        sum("kwota").alias("suma"),
        count("kwota").alias("liczba"),
        avg("kwota").alias("srednia")
    )

Wyzwalają faktyczne obliczenia.

Przykłady: show, count, collect, write.

# Akcja (tu Spark faktycznie wykonuje obliczenia)
wynik.show()

spark.stop()

Możesz też używać zwykłego SQL-a:

df.createOrReplaceTempView("sprzedaz")

spark.sql("""
    SELECT miasto,
           SUM(kwota) as suma,
           COUNT(*) as liczba
    FROM sprzedaz
    WHERE kwota > 100
    GROUP BY miasto
    ORDER BY suma DESC
""").show()

3 Structured Streaming — strumienie jako tabele

ImportantKluczowa koncepcja

Structured Streaming traktuje strumień danych jak tabelę, do której ciągle dopisywane są nowe wiersze. Dzięki temu piszesz kod strumieniowy prawie identycznie jak batch — używasz tego samego DataFrame API.

flowchart TB
    subgraph "Strumień wejściowy"
        T1["Batch t1"] --> TAB["Nieograniczona\ntabela wejściowa\n(nowe wiersze\ndopisywane ciągle)"]
        T2["Batch t2"] --> TAB
        T3["Batch t3"] --> TAB
        T4["Batch t4..."] --> TAB
    end
    TAB -->|"Zapytanie\n(query)"| RES["Tabela wynikowa\n(aktualizowana\nz każdym triggerem)"]
    RES --> OUT["Wyjście:\nkonsola / Kafka / pliki / baza"]

    style TAB fill:#2196F3,color:#fff
    style RES fill:#4CAF50,color:#fff
    style OUT fill:#FF9800,color:#fff

Structured Streaming: strumień jako nieograniczona tabela

3.1 Tryby wyjścia (Output Modes)

Note Append

Do wyniku dopisywane są tylko nowe wiersze. Domyślny tryb.

Warning Complete

Cała tabela wynikowa jest nadpisywana (np. po agregacji).

Tip Update

Zapisywane są tylko zmienione wiersze.

3.2 Przykład: streaming z plików CSV

Najprostszy przykład — Spark monitoruje katalog i przetwarza nowe pliki CSV w miarę ich pojawiania się:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder.appName("StreamingDemo").master("local[*]").getOrCreate()

# Schemat danych
schema = StructType() \
    .add("transaction_id", StringType()) \
    .add("amount", DoubleType()) \
    .add("store", StringType()) \
    .add("timestamp", TimestampType())

# Czytanie strumienia z katalogu CSV
streaming_df = spark.readStream \
    .schema(schema) \
    .option("header", True) \
    .csv("/data/incoming/")

# Agregacja w oknach 5-minutowych
result = streaming_df \
    .groupBy(
        window(col("timestamp"), "5 minutes"),
        col("store")
    ) \
    .agg(sum("amount").alias("total"))

# Zapis wyniku do konsoli
query = result.writeStream \
    .outputMode("complete") \
    .format("console") \
    .trigger(processingTime="10 seconds") \
    .start()

# query.awaitTermination()

4 Integracja Spark + Kafka

ImportantSerce kursu

Kafka transportuje dane, Spark je przetwarza. To fundamentalna para w architekturze real-time analytics.

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, avg, count
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
    .appName("KafkaStreaming") \
    .master("local[*]") \
    .getOrCreate()

# Odczyt strumienia z Kafki
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "transakcje") \
    .option("startingOffsets", "latest") \
    .load()

# Kafka zwraca dane jako bajty — trzeba je sparsować
schema = StructType() \
    .add("id", StringType()) \
    .add("kwota", DoubleType()) \
    .add("sklep", StringType()) \
    .add("czas", TimestampType())

parsed_df = kafka_df \
    .selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")
# Agregacja: średnia kwota i liczba transakcji
# w oknach 5-minutowych per sklep
wynik = parsed_df \
    .withWatermark("czas", "2 minutes") \
    .groupBy(
        window(col("czas"), "5 minutes"),
        col("sklep")
    ) \
    .agg(
        avg("kwota").alias("srednia_kwota"),
        count("*").alias("liczba_transakcji")
    )

# Zapis do konsoli (na lab: do bazy danych lub dashboardu)
query = wynik.writeStream \
    .outputMode("update") \
    .format("console") \
    .option("truncate", False) \
    .trigger(processingTime="10 seconds") \
    .start()

# query.awaitTermination()

Zwróć uwagę na .withWatermark("czas", "2 minutes") — to mechanizm watermarkingu, o którym mówiliśmy na Wykładzie 2. Spark toleruje opóźnienie zdarzeń do 2 minut i automatycznie odrzuca te, które przyszły później.


5 Kompletny pipeline — podsumowanie kursu

Łączmy wszystko, co poznaliśmy na wykładach:

flowchart LR
    subgraph "Źródła danych"
        WEB["Aplikacja\nwebowa"]
        PAY["System\npłatności"]
        IOT["Sensory\nIoT"]
    end
    subgraph "Transport"
        K["Apache Kafka\n(W4)"]
    end
    subgraph "Przetwarzanie"
        SS["Spark Streaming\n(W5)"]
        ML["FastAPI + ML\n(W3)"]
    end
    subgraph "Wyniki"
        DASH["Dashboard\n(near RT)"]
        ALERT["Alerty\n(real-time)"]
        REP["Raporty\n(batch)"]
    end

    WEB --> K
    PAY --> K
    IOT --> K
    K --> SS --> DASH
    K --> ML --> ALERT
    SS --> REP

    style K fill:#FF9800,color:#fff
    style SS fill:#2196F3,color:#fff
    style ML fill:#4CAF50,color:#fff
    style DASH fill:#E3F2FD,stroke:#2196F3
    style ALERT fill:#FFEBEE,stroke:#F44336
    style REP fill:#F3E5F5,stroke:#9C27B0

Kompletny pipeline real-time analytics — podsumowanie kursu

Mapa kursu
Komponent Rola Wykład
Typy danych, OLTP/OLAP, Data Lake Kontekst i historia W1
Lambda/Kappa, okna czasowe Architektura W2
ML batch vs online, SGD, anomalie Modele i algorytmy W3
Apache Kafka Transport danych W4
Apache Spark + Structured Streaming Przetwarzanie W5

6 Złożoność algorytmów — uwaga praktyczna

CautionProjektując pipeline real-time, rozważ wymiar obliczeniowy
  • Duże dane, proste obliczenia — np. filtrowanie logów, agregacje. Spark radzi sobie świetnie.
  • Małe dane, ciężkie obliczenia — np. trenowanie modelu deep learning. Lepiej offline na GPU.
  • Duże dane, ciężkie obliczenia — np. analiza wideo w czasie rzeczywistym. Wymaga specjalistycznej architektury (GPU cluster, edge computing).

Dla systemów real-time kluczowe jest, żeby czas przetwarzania jednego zdarzenia był krótszy niż interwał między zdarzeniami. Inaczej kolejka rośnie bez końca.


7 Co dalej — laboratoria

Na laboratoriach zbudujecie kompletny system krok po kroku:

Plan laboratoriów
Lab Temat
1–2 Środowisko (Git, Docker, Python), analiza eksploracyjna danych
3–4 Model ML (scikit-learn), przygotowanie do wdrożenia
5–6 Kafka w Dockerze — producent i konsument w Pythonie
7–8 PySpark + Structured Streaming + Kafka
9–10 Kompletny pipeline + projekt grupowy

8 Podsumowanie

Apache Spark to silnik, który zamienia surowe strumienie danych w wartość biznesową. W połączeniu z Kafką tworzy potężną platformę real-time analytics. Structured Streaming upraszcza programowanie strumieniowe — piszesz kod jak dla batch, a Spark zajmuje się resztą.

Important To był ostatni wykład

Na laboratoriach przełożycie tę wiedzę na praktykę — od pierwszego docker compose up po kompletny system przetwarzania danych w czasie rzeczywistym.

Tip Przed Lab 1

Upewnij się, że masz zainstalowanego Dockera i Gita (patrz strona Narzędzia). Sklonuj repozytorium kursu i uruchom docker compose up — jeśli działa, jesteś gotowy.