%%file test_key_value.py
from pyspark.sql import SparkSession
= 'broker:9092'
KAFKA_BROKER = 'mytopic'
KAFKA_TOPIC
= SparkSession.builder.getOrCreate()
spark "WARN")
spark.sparkContext.setLogLevel(
= (spark.readStream.format("kafka")
df "kafka.bootstrap.servers", KAFKA_BROKER)
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "earliest")
.option(
.load()
)
# Konwersja danych binarnych na stringi
"CAST(key AS STRING)", "CAST(value AS STRING)") \
df.selectExpr(\
.writeStream format("console") \
."append") \
.outputMode(\
.start() .awaitTermination()
Apache Kafka - Wprowadzenie
Apache Kafka to system przetwarzania strumieniowego (event streaming), który działa jako rozproszony broker wiadomości. Pozwala na przesyłanie i przetwarzanie danych w czasie rzeczywistym.
Domyślnym adresem naszego brokera jest broker:9092
.
W Apache Kafka dane są przechowywane w strukturach zwanych topicami, które pełnią funkcję kolejek komunikacyjnych.
Zarządzanie Kafką odbywa się za pomocą skryptów. W naszym przypadku będą to skrypty .sh
.
1️⃣ Sprawdź listę topiców
Pamiętaj, aby przejść do katalogu domowego:
cd ~
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
2️⃣ Utwórz nowy topic o nazwie mytopic
kafka/bin/kafka-topics.sh --create --topic mytopic --bootstrap-server broker:9092
3️⃣ Utwórz producenta
Ten skrypt pozwoli Ci wprowadzać eventy ręcznie przez terminal. Opcje --property
są dodatkowe i służą do analizy w tym przykładzie.
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic mytopic --property "parse.key=true" --property "key.separator=:"
4️⃣ Consumer w Sparku
Otwórz nowy terminal w miejscu, gdzie znajduje się plik test_key_value.py
, i uruchom program Consumera w Sparku.
Pamiętaj, że Apache Spark nie posiada domyślnego konektora do Kafki, dlatego uruchom proces za pomocą spark-submit
i pobierz odpowiedni pakiet w Scali:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 test_key_value.py
5️⃣ Przetestuj przesyłanie danych
W terminalu z uruchomionym producentem wpisz tekst w postaci:
jan:45
alicja:20
Sprawdź, co pojawia się w oknie aplikacji Consumera.
6️⃣ Zakończenie procesu
Po zakończeniu pokazu użyj Ctrl+C
, aby zamknąć zarówno okno producenta, jak i aplikację Spark.