Consumer w środowisku Python

Przygotuj środowisko i uruchom skrypt producenta.

Rozpatrzmy kod konsumenta czytającego z topicu oraz realizującego prostą regułę decyzyjną.

%%file konsument.py
from kafka import KafkaConsumer
import json  

SERVER = "broker:9092"
TOPIC  = "streaming"

# Konsumer do pobierania danych z Kafka
consumer = KafkaConsumer(
    TOPIC,
    bootstrap_servers=SERVER,
    auto_offset_reset='earliest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# Pobieranie transakcji w niemal real-time i analiza
for message in consumer:
    transaction = message.value
    if transaction["values"] > 80:
        print(f"🚨 Wykryto dużą transakcję: {transaction}")

Kod ten uruchom poleceniem:

python konsument.py