%%file stream.py
import json
import random
import sys
from datetime import datetime, timedelta
from time import sleep
from kafka import KafkaProducer
KAFKA_TOPIC = 'streaming'
SERVER = "broker:9092"
LAG = 1
if __name__ == "__main__":
producer = KafkaProducer(
bootstrap_servers=[SERVER],
value_serializer=lambda x: json.dumps(x).encode("utf-8"),
api_version=(3, 7, 0),
)
try:
while True:
t = datetime.now() + timedelta(seconds=random.randint(-15, 0))
message = {
"time" : str(t),
"id" : random.choice(["a", "b", "c", "d", "e"]),
"values" : random.randint(0,100)
}
producer.send(KAFKA_TOPIC, value=message)
sleep(LAG)
except KeyboardInterrupt:
producer.close()Producent Apache Kafka
Wersja z dostępem do środowiska
Tą wersję można przejść również posiadając nowy obraz dockerowy i uruchomiony docker desktop na własnym komputerze.
Przejdź do przeglądarki i uruchom stronę ze środowiskiem (w przypadku Docker uruchom
localhost:8888).Uruchom (w jupyter lab za pomocą ikony terminala) nowy terminal
Przejdź do katalogu głównego i wypisz listę wszystkich elementów. Sprawdź czy na liście znajduje się katalog
kafka.cd ~ ls -allUruchom polecenie sprawdzające listę topiców serwera Kafki
bash kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092Dodaj topic o nazwie streaming
kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streamingSprawdź listę tematów ponownie upewniając się, że posiadasz temat
streamingUruchom nowy terminal i utwórz producenta w konsoli generującego dane do nowego topicu
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic streamingAby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą konsumenta w konsoli:
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginningPamiętaj, aby uruchamiać komendy z odpowiedniego katalogu.
Ponizszy kod mozesz wykorzystać do strumieniowego przesyłania danych do Kafki. Pozwala na to pythonowy Producer.
Uruchom kod poleceniem:
python stream.pyNastępnie przejdź do nowego terminala i uruchom konsumenta (w konsoli)
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginning