%%file stream.py
import json
import random
import sys
from datetime import datetime, timedelta
from time import sleep
from kafka import KafkaProducer
if __name__ == "__main__":
= "broker:9092"
SERVER
= KafkaProducer(
producer =[SERVER],
bootstrap_servers=lambda x: json.dumps(x).encode("utf-8"),
value_serializer=(3, 7, 0),
api_version
)
try:
while True:
= datetime.now() + timedelta(seconds=random.randint(-15, 0))
t
= {
message "time" : str(t),
"id" : random.choice(["a", "b", "c", "d", "e"]),
"values" : random.randint(0,100)
}
"streaming", value=message)
producer.send(1)
sleep(except KeyboardInterrupt:
producer.close()
Producent Apache Kafka
Wersja z dostępem do środowiska
Tą wersję można przejść również posiadając nowy obraz dockerowy i uruchomiony docker desktop na własnym komputerze.
Przejdź do przeglądarki i uruchom stronę ze środowiskiem (w przypadku Docker uruchom
localhost:8888
).Uruchom (w jupyter lab za pomocą ikony terminala) nowy terminal
Przejdź do katalogu głównego i wypisz listę wszystkich elementów. Sprawdź czy na liście znajduje się katalog
kafka
.cd ~ ls -all
Uruchom polecenie sprawdzające listę topiców serwera Kafki
bash kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
Dodaj topic o nazwie streaming
kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streaming
Sprawdź listę tematów ponownie upewniając się, że posiadasz temat
streaming
Uruchom nowy terminal na notatniku i utwórz producenta w konsoli generującego dane do nowego topicu
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic streaming
Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą konsumenta w konsoli:
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginning
Pamiętaj aby uruchamiać komendy z odpowiedniego katalogu.
Wersja obraz Docker
Przejdź do katalogu jupyterlab i uruchom obraz poleceniem
docker compose up
Otwórz nowy terminal i sprawdź listę topiców
docker exec broker kafka-topics --list --bootstrap-server broker:9092
Lista ta zazwyczaj jest pusta.
dodaj topic o nazwie
streaming
docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic streaming
- Sprawdź listę tematów ponownie upewniając się, że posiadasz temat
streaming
- Uruchom nowy terminal na swoim komputerze i utwórz producenta generującego dane do nowego topicu
docker exec --interactive --tty broker kafka-console-producer --bootstrap-server broker:9092 --topic streaming
Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą consumenta:
docker exec --interactive --tty broker kafka-console-consumer --bootstrap-server broker:9092 --topic streaming --from-beginning