Wprowadzenie do Apache Spark
Apache Spark to ogólnego przeznaczenia, przetwarzający dane w pamięci silnik obliczeniowy. Spark może być używany wraz z Hadoopem, Yarnem i innymi komponentami Big Data, aby w pełni wykorzystać jego możliwości oraz poprawić wydajność aplikacji. Oferuje wysokopoziomowe interfejsy API w językach Scala, Java, Python, R i SQL.
Architektura Spark
Apache Spark działa w architekturze mistrz-podwładny (master-slave), gdzie główny węzeł nazywany jest „Driver”, a węzły podrzędne to „Workers”. Punktem startowym aplikacji Spark jest sc
, czyli instancja klasy SparkContext, która działa wewnątrz Drivera.
Główne komponenty Apache Spark
Spark Core
Często nazywany również samym „Spark”. Podstawowym elementem Sparka jest RDD (Resilient Distributed Dataset) — odporna na błędy, rozproszona kolekcja danych, przetwarzana równolegle na wielu węzłach klastra.
Spark SQL
W tej bibliotece dane reprezentowane są jako DataFrame, czyli struktura danych podobna do tabeli relacyjnej. Spark SQL umożliwia analizę danych z użyciem składni podobnej do SQL oraz funkcji przetwarzających dane.
Spark Streaming
Biblioteka ta wprowadza pojęcie D-Stream (Discretized Stream) — strumienia danych dzielonego na małe porcje (mikropartie), które można przetwarzać niemal w czasie rzeczywistym.
MLlib
Biblioteka do uczenia maszynowego, zawierająca popularne algorytmy, takie jak filtrowanie kolaboracyjne, klasyfikacja, klasteryzacja czy regresja.
GraphX
Biblioteka służąca do przetwarzania grafów. Umożliwia rozwiązywanie problemów z zakresu teorii grafów, takich jak PageRank, komponenty spójne i inne.
Uruchomienie Apache Spark
from pyspark.sql import SparkSession
= SparkSession.builder.appName("Lab6").getOrCreate() spark
spark
- in-memory
SparkSession
SparkContext
...
Przykład 1 - dane ralizujące szereg czasowy
= ((12.5, "2019-01-02 12:00:00"),
czujnik_temperatury 17.6, "2019-01-02 12:00:20"),
(14.6, "2019-01-02 12:00:30"),
(22.9, "2019-01-02 12:01:15"),
(17.4, "2019-01-02 12:01:30"),
(25.8, "2019-01-02 12:03:25"),
(27.1, "2019-01-02 12:02:40"),
( )
Dane realizujące pomiar temperatury w czasie.
Aby wygenerować DataFrame należy użyć metod createDataFrame
. Jednak należy pamiętać aby zdefiniować typy danych.
W następnych laboratoriach szerzej opiszemy typy danych w Sparku.
Zdefiuniujmy schemat naszych danych.
from pyspark.sql.functions import to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
= StructType([
schema "temperatura", DoubleType(), True),
StructField("czas", StringType(), True),
StructField( ])
Jak widać praktycznie wszystkie elementy poza nazwą kolumny oraz parametrem True są przedstawione jako obiekty.
= (spark.createDataFrame(czujnik_temperatury, schema=schema)
df "czas", to_timestamp("czas"))) .withColumn(
Sprawdźmy jak wygląda schemat utworzonej tabeli.
df.printSchema()
root|-- temperatura: double (nullable = true)
|-- czas: timestamp (nullable = true)
Następnie możemy sprawdzić jak przedstawia się sama tabela.
df.show()+-----------+-------------------+
|temperatura| czas|
+-----------+-------------------+
| 12.5|2019-01-02 12:00:00|
| 17.6|2019-01-02 12:00:20|
| 14.6|2019-01-02 12:00:30|
| 22.9|2019-01-02 12:01:15|
| 17.4|2019-01-02 12:01:30|
| 25.8|2019-01-02 12:03:25|
| 27.1|2019-01-02 12:02:40|
+-----------+-------------------+
Spark jako SQL
Ramki danych w sparku pozwalają wykorzystać język sql:
"czujnik_temperatury")
df.createOrReplaceTempView("SELECT * FROM czujnik_temperatury where temperatura > 21").show()
spark.sql(+-------------------+-----------+
| czas|temperatura|
+-------------------+-----------+
|2019-01-02 12:01:15| 22.9|
|2019-01-02 12:03:25| 25.8|
|2019-01-02 12:02:40| 27.1|
+-------------------+-----------+
Grupowanie danych
Standardowy grupowanie danych w sparku po zmiennej “czas” wygeneruje nam liczbę wierszy w każdym grupie. Ze względu, iż zmienne czasowe mają różne wartości, ilość otrzymanych grup będzie równa ilości wierszy w tabeli.
= df.groupBy("czas").count()
df2 df2.show()
Wykorzystując funkcję window
możemy wygenerować grupy czasowe w zależności od wybranego okna czasowego.
# Thumbling window
import pyspark.sql.functions as F
= df.groupBy(F.window("czas","30 seconds")).count()
df2 =False)
df2.show(truncate
+------------------------------------------+-----+
|window |count|
+------------------------------------------+-----+
|{2019-01-02 12:00:00, 2019-01-02 12:00:30}|2 |
|{2019-01-02 12:00:30, 2019-01-02 12:01:00}|1 |
|{2019-01-02 12:01:00, 2019-01-02 12:01:30}|1 |
|{2019-01-02 12:01:30, 2019-01-02 12:02:00}|1 |
|{2019-01-02 12:03:00, 2019-01-02 12:03:30}|1 |
|{2019-01-02 12:02:30, 2019-01-02 12:03:00}|1 |
+------------------------------------------+-----+
Sprawdźmy schemat
df2.printSchema()
root|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)
Podstawowa różnica między pandasowymi ramkami danych i sparkowymi jest taka, że w komórce danych sparkowych można używać typów złożonych - np struct
.
🔌 Źródła danych w Spark Structured Streaming
Spark Structured Streaming pozwala na przetwarzanie danych w czasie rzeczywistym z różnych źródeł strumieniowych. Najpopularniejsze z nich to:
✅ rate — źródło testowe
- Automatycznie generuje dane: co sekundę dodaje wiersz.
- Każdy wiersz zawiera:
- timestamp – znacznik czasu,
- value – licznik rosnący (0, 1, 2, …).
- Używane do testowania logiki strumieniowania bez konieczności podpinania zewnętrznych źródeł.
= spark.readStream.format("rate").option("rowsPerSecond", 1).load() df
📡 Inne źródła strumieni:
Socket
(do testów np. nc -lk 9999): To źródło nasłuchuje na wskazanym porcie gniazda (socket) i wczytuje dowolne dane do Spark Streaming. Również służy wyłącznie do celów testowych.
Plik
(File): Nasłuchuje określonego katalogu i traktuje pojawiające się tam pliki jako dane strumieniowe. Obsługuje formaty takie jak CSV, JSON, ORC oraz Parquet (np. .csv, .json, .parquet).
Kafka
: Odczytuje dane z Apache Kafka® i jest kompatybilne z brokerami w wersji 0.10.0 lub wyższej.
📤 Output Modes – tryby wypisywania wyników
outputMode określa jak Spark wypisuje dane po każdej mikroserii (micro-batch). Dostępne tryby to:
append
Wypisuje tylko nowe wiersze, które zostały dodane w tej mikroserii. Najczęściej używany.
update
Wypisuje zmienione wiersze - czyli zaktualizowane agregaty.
complete
Wypisuje całą tabelę agregacji po każdej mikroserii. Wymaga pełnej agregacji (np. groupBy).
Utwórzmy nasz pierwszy strumieniowy DataFrame w Sparku, korzystając ze źródła danych typu rate
.
Możemy zrealizować kod skryptu
%%file streamrate.py
## uruchom przez spark-submit streamrate.py
from pyspark.sql import SparkSession
= SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark "WARN")
spark.sparkContext.setLogLevel(
= (spark.readStream
df format("rate")
."rowsPerSecond", 1)
.option(
.load()
)
= (df.writeStream
query format("console")
."append")
.outputMode("truncate", False)
.option(
.start()
)
query.awaitTermination()
Albo uruchomić kod w notatniku
from pyspark.sql import SparkSession
= SparkSession.builder.appName("StreamingDemo").getOrCreate()
spark "WARN")
spark.sparkContext.setLogLevel(
def process_batch(df, batch_id, tstop=5):
print(f"Batch ID: {batch_id}")
=False)
df.show(truncateif batch_id == tstop:
df.stop()
= (spark.readStream
df format("rate")
."rowsPerSecond", 1)
.option(
.load()
)
= (df.writeStream
query format("console")
."append")
.outputMode(
.foreachBatch(process_batch)"truncate", False)
.option(
.start() )