Producent Apache Kafka

Wersja z dostępem do środowiska

Tą wersję można przejść również posiadając nowy obraz dockerowy i uruchomiony docker desktop na własnym komputerze.

  1. Przejdź do przeglądarki i uruchom stronę ze środowiskiem (w przypadku Docker uruchom localhost:8888).

  2. Uruchom (w jupyter lab za pomocą ikony terminala) nowy terminal

  3. Przejdź do katalogu głównego i wypisz listę wszystkich elementów. Sprawdź czy na liście znajduje się katalog kafka.

    cd ~
    ls -all
  4. Uruchom polecenie sprawdzające listę topiców serwera Kafki bash kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092

  5. Dodaj topic o nazwie streaming

    kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streaming
  6. Sprawdź listę tematów ponownie upewniając się, że posiadasz temat streaming

  7. Uruchom nowy terminal i utwórz producenta w konsoli generującego dane do nowego topicu

kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic streaming

Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą konsumenta w konsoli:

kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginning

Pamiętaj, aby uruchamiać komendy z odpowiedniego katalogu.

Ponizszy kod mozesz wykorzystać do strumieniowego przesyłania danych do Kafki. Pozwala na to pythonowy Producer.

%%file stream.py

import json
import random
import sys
from datetime import datetime, timedelta
from time import sleep

from kafka import KafkaProducer

KAFKA_TOPIC = 'streaming'
SERVER = "broker:9092"
LAG = 1

if __name__ == "__main__":
    
    producer = KafkaProducer(
        bootstrap_servers=[SERVER],
        value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        api_version=(3, 7, 0),
    )
    
    try:
        while True:
            
            t = datetime.now() + timedelta(seconds=random.randint(-15, 0))
            
            message = {
                "time" : str(t),
                "id" : random.choice(["a", "b", "c", "d", "e"]),
                "values" : random.randint(0,100)
            }
            
            producer.send(KAFKA_TOPIC, value=message)
            sleep(LAG)
    except KeyboardInterrupt:
        producer.close()

Uruchom kod poleceniem:

python stream.py

Następnie przejdź do nowego terminala i uruchom konsumenta (w konsoli)

kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginning