%%file konsument.py
from kafka import KafkaConsumer
import json
SERVER = "broker:9092"
TOPIC = "streaming"
# Konsumer do pobierania danych z Kafka
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=SERVER,
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# Pobieranie transakcji w niemal real-time i analiza
for message in consumer:
transaction = message.value
if transaction["values"] > 80:
print(f"🚨 Wykryto dużą transakcję: {transaction}")Consumer w środowisku Python
Przygotuj środowisko i uruchom skrypt producenta.
Rozpatrzmy kod konsumenta czytającego z topicu oraz realizującego prostą regułę decyzyjną.
Kod ten uruchom poleceniem:
python konsument.py