flowchart TB
subgraph "Apache Spark"
CORE["Spark Core\n(RDD, zarządzanie pamięcią)"]
SQL["Spark SQL\n& DataFrames"]
SS["Structured\nStreaming"]
ML["MLlib\n(Machine Learning)"]
GR["GraphX\n(Grafy)"]
end
SQL --> CORE
SS --> CORE
ML --> CORE
GR --> CORE
style CORE fill:#FF9800,color:#fff
style SQL fill:#2196F3,color:#fff
style SS fill:#F44336,color:#fff
style ML fill:#4CAF50,color:#fff
style GR fill:#9C27B0,color:#fff
Wykład 5 — Apache Spark i Structured Streaming
Analiza danych w czasie rzeczywistym
1 Dlaczego Spark?
Na poprzednich wykładach poznaliśmy Kafkę — system, który transportuje strumienie danych. Ale Kafka sama w sobie nie wykonuje złożonych analiz. Do tego potrzebujemy silnika przetwarzania — i tu wchodzi Apache Spark.
Spark to silnik do rozproszonego przetwarzania danych, który obsługuje zarówno tryb wsadowy (batch), jak i strumieniowy (streaming). Powstał na UC Berkeley w 2009 roku jako odpowiedź na ograniczenia Hadoop MapReduce — głównie na jego powolność wynikającą z ciągłego zapisu na dysk.
Dane trzymane w RAM, nie na dysku. Nawet 100x szybciej niż MapReduce.
Batch, streaming, SQL, ML, grafy — wszystko w jednym frameworku.
PySpark, Scala, Java, R.
Spark buduje plan wykonania i optymalizuje go zanim cokolwiek obliczy.
2 PySpark — podstawy
2.1 SparkSession — punkt wejścia
Każdy program PySpark zaczyna się od utworzenia SparkSession:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MojaPierwszaAplikacja") \
.master("local[*]") \
.getOrCreate()local[*] oznacza: uruchom Sparka lokalnie, używając wszystkich dostępnych rdzeni procesora. W produkcji zamiast tego podaje się adres klastra.
2.2 DataFrame API
Spark DataFrame to odpowiednik tabeli SQL lub DataFrame z Pandas — ale rozproszony na wielu maszynach.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, count
spark = SparkSession.builder.appName("Demo").master("local[*]").getOrCreate()
# Tworzenie DataFrame
dane = [
("Warszawa", "Elektronika", 4299.00),
("Kraków", "Odzież", 189.99),
("Warszawa", "Żywność", 87.50),
("Gdańsk", "Elektronika", 2599.00),
("Kraków", "Elektronika", 1299.00),
("Warszawa", "Odzież", 349.99),
("Gdańsk", "Żywność", 156.00),
]
df = spark.createDataFrame(dane, ["miasto", "kategoria", "kwota"])
df.show()2.3 Transformacje i akcje
Spark rozróżnia dwa typy operacji:
Definiują co chcesz zrobić, ale nie wykonują obliczeń. Są leniwe (lazy).
Przykłady: filter, groupBy, select, join.
# Transformacje (lazy — nic się jeszcze nie liczy)
wynik = df \
.filter(col("kwota") > 100) \
.groupBy("miasto") \
.agg(
sum("kwota").alias("suma"),
count("kwota").alias("liczba"),
avg("kwota").alias("srednia")
)Wyzwalają faktyczne obliczenia.
Przykłady: show, count, collect, write.
# Akcja (tu Spark faktycznie wykonuje obliczenia)
wynik.show()
spark.stop()Możesz też używać zwykłego SQL-a:
df.createOrReplaceTempView("sprzedaz")
spark.sql("""
SELECT miasto,
SUM(kwota) as suma,
COUNT(*) as liczba
FROM sprzedaz
WHERE kwota > 100
GROUP BY miasto
ORDER BY suma DESC
""").show()3 Structured Streaming — strumienie jako tabele
Structured Streaming traktuje strumień danych jak tabelę, do której ciągle dopisywane są nowe wiersze. Dzięki temu piszesz kod strumieniowy prawie identycznie jak batch — używasz tego samego DataFrame API.
flowchart TB
subgraph "Strumień wejściowy"
T1["Batch t1"] --> TAB["Nieograniczona\ntabela wejściowa\n(nowe wiersze\ndopisywane ciągle)"]
T2["Batch t2"] --> TAB
T3["Batch t3"] --> TAB
T4["Batch t4..."] --> TAB
end
TAB -->|"Zapytanie\n(query)"| RES["Tabela wynikowa\n(aktualizowana\nz każdym triggerem)"]
RES --> OUT["Wyjście:\nkonsola / Kafka / pliki / baza"]
style TAB fill:#2196F3,color:#fff
style RES fill:#4CAF50,color:#fff
style OUT fill:#FF9800,color:#fff
3.1 Tryby wyjścia (Output Modes)
Do wyniku dopisywane są tylko nowe wiersze. Domyślny tryb.
Cała tabela wynikowa jest nadpisywana (np. po agregacji).
Zapisywane są tylko zmienione wiersze.
3.2 Przykład: streaming z plików CSV
Najprostszy przykład — Spark monitoruje katalog i przetwarza nowe pliki CSV w miarę ich pojawiania się:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, sum
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
spark = SparkSession.builder.appName("StreamingDemo").master("local[*]").getOrCreate()
# Schemat danych
schema = StructType() \
.add("transaction_id", StringType()) \
.add("amount", DoubleType()) \
.add("store", StringType()) \
.add("timestamp", TimestampType())
# Czytanie strumienia z katalogu CSV
streaming_df = spark.readStream \
.schema(schema) \
.option("header", True) \
.csv("/data/incoming/")
# Agregacja w oknach 5-minutowych
result = streaming_df \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("store")
) \
.agg(sum("amount").alias("total"))
# Zapis wyniku do konsoli
query = result.writeStream \
.outputMode("complete") \
.format("console") \
.trigger(processingTime="10 seconds") \
.start()
# query.awaitTermination()4 Integracja Spark + Kafka
Kafka transportuje dane, Spark je przetwarza. To fundamentalna para w architekturze real-time analytics.
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, window, avg, count
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
spark = SparkSession.builder \
.appName("KafkaStreaming") \
.master("local[*]") \
.getOrCreate()
# Odczyt strumienia z Kafki
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "transakcje") \
.option("startingOffsets", "latest") \
.load()
# Kafka zwraca dane jako bajty — trzeba je sparsować
schema = StructType() \
.add("id", StringType()) \
.add("kwota", DoubleType()) \
.add("sklep", StringType()) \
.add("czas", TimestampType())
parsed_df = kafka_df \
.selectExpr("CAST(value AS STRING) as json") \
.select(from_json(col("json"), schema).alias("data")) \
.select("data.*")# Agregacja: średnia kwota i liczba transakcji
# w oknach 5-minutowych per sklep
wynik = parsed_df \
.withWatermark("czas", "2 minutes") \
.groupBy(
window(col("czas"), "5 minutes"),
col("sklep")
) \
.agg(
avg("kwota").alias("srednia_kwota"),
count("*").alias("liczba_transakcji")
)
# Zapis do konsoli (na lab: do bazy danych lub dashboardu)
query = wynik.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", False) \
.trigger(processingTime="10 seconds") \
.start()
# query.awaitTermination()Zwróć uwagę na .withWatermark("czas", "2 minutes") — to mechanizm watermarkingu, o którym mówiliśmy na Wykładzie 2. Spark toleruje opóźnienie zdarzeń do 2 minut i automatycznie odrzuca te, które przyszły później.
5 Kompletny pipeline — podsumowanie kursu
Łączmy wszystko, co poznaliśmy na wykładach:
flowchart LR
subgraph "Źródła danych"
WEB["Aplikacja\nwebowa"]
PAY["System\npłatności"]
IOT["Sensory\nIoT"]
end
subgraph "Transport"
K["Apache Kafka\n(W4)"]
end
subgraph "Przetwarzanie"
SS["Spark Streaming\n(W5)"]
ML["FastAPI + ML\n(W3)"]
end
subgraph "Wyniki"
DASH["Dashboard\n(near RT)"]
ALERT["Alerty\n(real-time)"]
REP["Raporty\n(batch)"]
end
WEB --> K
PAY --> K
IOT --> K
K --> SS --> DASH
K --> ML --> ALERT
SS --> REP
style K fill:#FF9800,color:#fff
style SS fill:#2196F3,color:#fff
style ML fill:#4CAF50,color:#fff
style DASH fill:#E3F2FD,stroke:#2196F3
style ALERT fill:#FFEBEE,stroke:#F44336
style REP fill:#F3E5F5,stroke:#9C27B0
| Komponent | Rola | Wykład |
|---|---|---|
| Typy danych, OLTP/OLAP, Data Lake | Kontekst i historia | W1 |
| Lambda/Kappa, okna czasowe | Architektura | W2 |
| ML batch vs online, SGD, anomalie | Modele i algorytmy | W3 |
| Apache Kafka | Transport danych | W4 |
| Apache Spark + Structured Streaming | Przetwarzanie | W5 |
6 Złożoność algorytmów — uwaga praktyczna
- Duże dane, proste obliczenia — np. filtrowanie logów, agregacje. Spark radzi sobie świetnie.
- Małe dane, ciężkie obliczenia — np. trenowanie modelu deep learning. Lepiej offline na GPU.
- Duże dane, ciężkie obliczenia — np. analiza wideo w czasie rzeczywistym. Wymaga specjalistycznej architektury (GPU cluster, edge computing).
Dla systemów real-time kluczowe jest, żeby czas przetwarzania jednego zdarzenia był krótszy niż interwał między zdarzeniami. Inaczej kolejka rośnie bez końca.
7 Co dalej — laboratoria
Na laboratoriach zbudujecie kompletny system krok po kroku:
| Lab | Temat |
|---|---|
| 1–2 | Środowisko (Git, Docker, Python), analiza eksploracyjna danych |
| 3–4 | Model ML (scikit-learn), przygotowanie do wdrożenia |
| 5–6 | Kafka w Dockerze — producent i konsument w Pythonie |
| 7–8 | PySpark + Structured Streaming + Kafka |
| 9–10 | Kompletny pipeline + projekt grupowy |
8 Podsumowanie
Apache Spark to silnik, który zamienia surowe strumienie danych w wartość biznesową. W połączeniu z Kafką tworzy potężną platformę real-time analytics. Structured Streaming upraszcza programowanie strumieniowe — piszesz kod jak dla batch, a Spark zajmuje się resztą.
Na laboratoriach przełożycie tę wiedzę na praktykę — od pierwszego docker compose up po kompletny system przetwarzania danych w czasie rzeczywistym.
Upewnij się, że masz zainstalowanego Dockera i Gita (patrz strona Narzędzia). Sklonuj repozytorium kursu i uruchom docker compose up — jeśli działa, jesteś gotowy.