Producent Apache Kafka

Wersja z dostępem do środowiska

Tą wersję można przejść również posiadając nowy obraz dockerowy i uruchomiony docker desktop na własnym komputerze.

  1. Przejdź do przeglądarki i uruchom stronę ze środowiskiem (w przypadku Docker uruchom localhost:8888).

  2. Uruchom (w jupyter lab za pomocą ikony terminala) nowy terminal

  3. Przejdź do katalogu głównego i wypisz listę wszystkich elementów. Sprawdź czy na liście znajduje się katalog kafka.

    cd ~
    ls -all
  4. Uruchom polecenie sprawdzające listę topiców serwera Kafki bash kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092

  5. Dodaj topic o nazwie streaming

    kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streaming
  6. Sprawdź listę tematów ponownie upewniając się, że posiadasz temat streaming

  7. Uruchom nowy terminal na notatniku i utwórz producenta w konsoli generującego dane do nowego topicu

kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic streaming

Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą konsumenta w konsoli:

kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginning

Pamiętaj aby uruchamiać komendy z odpowiedniego katalogu.

%%file stream.py

import json
import random
import sys
from datetime import datetime, timedelta
from time import sleep

from kafka import KafkaProducer

if __name__ == "__main__":
    SERVER = "broker:9092"

    producer = KafkaProducer(
        bootstrap_servers=[SERVER],
        value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        api_version=(3, 7, 0),
    )
    
    try:
        while True:
            
            t = datetime.now() + timedelta(seconds=random.randint(-15, 0))
            
            message = {
                "time" : str(t),
                "id" : random.choice(["a", "b", "c", "d", "e"]),
                "values" : random.randint(0,100)
            }
            
            
            producer.send("streaming", value=message)
            sleep(1)
    except KeyboardInterrupt:
        producer.close()

Wersja obraz Docker

  1. Przejdź do katalogu jupyterlab i uruchom obraz poleceniem

    docker compose up
  2. Otwórz nowy terminal i sprawdź listę topiców

    docker exec broker kafka-topics --list --bootstrap-server broker:9092

    Lista ta zazwyczaj jest pusta.

  3. dodaj topic o nazwie streaming

docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic streaming
  1. Sprawdź listę tematów ponownie upewniając się, że posiadasz temat streaming
  2. Uruchom nowy terminal na swoim komputerze i utwórz producenta generującego dane do nowego topicu
docker exec --interactive --tty broker kafka-console-producer --bootstrap-server broker:9092 --topic streaming

Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą consumenta:

docker exec --interactive --tty broker kafka-console-consumer --bootstrap-server broker:9092 --topic streaming --from-beginning