%%file stream.py
import json
import random
import sys
from datetime import datetime, timedelta
from time import sleep
from kafka import KafkaProducer
= 'streaming'
KAFKA_TOPIC = "broker:9092"
SERVER = 1
LAG
if __name__ == "__main__":
= KafkaProducer(
producer =[SERVER],
bootstrap_servers=lambda x: json.dumps(x).encode("utf-8"),
value_serializer=(3, 7, 0),
api_version
)
try:
while True:
= datetime.now() + timedelta(seconds=random.randint(-15, 0))
t
= {
message "time" : str(t),
"id" : random.choice(["a", "b", "c", "d", "e"]),
"values" : random.randint(0,100)
}
=message)
producer.send(KAFKA_TOPIC, value
sleep(LAG)except KeyboardInterrupt:
producer.close()
Producent Apache Kafka
Wersja z dostępem do środowiska
Tą wersję można przejść również posiadając nowy obraz dockerowy i uruchomiony docker desktop na własnym komputerze.
Przejdź do przeglądarki i uruchom stronę ze środowiskiem (w przypadku Docker uruchom
localhost:8888
).Uruchom (w jupyter lab za pomocą ikony terminala) nowy terminal
Przejdź do katalogu głównego i wypisz listę wszystkich elementów. Sprawdź czy na liście znajduje się katalog
kafka
.cd ~ ls -all
Uruchom polecenie sprawdzające listę topiców serwera Kafki
bash kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
Dodaj topic o nazwie streaming
kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streaming
Sprawdź listę tematów ponownie upewniając się, że posiadasz temat
streaming
Uruchom nowy terminal i utwórz producenta w konsoli generującego dane do nowego topicu
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic streaming
Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą konsumenta w konsoli:
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginning
Pamiętaj, aby uruchamiać komendy z odpowiedniego katalogu.
Ponizszy kod mozesz wykorzystać do strumieniowego przesyłania danych do Kafki. Pozwala na to pythonowy Producer
.
Uruchom kod poleceniem:
python stream.py
Następnie przejdź do nowego terminala i uruchom konsumenta (w konsoli)
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginning