%%file konsument.py
from kafka import KafkaConsumer
import json
= "broker:9092"
SERVER = "streaming"
TOPIC
# Konsumer do pobierania danych z Kafka
= KafkaConsumer(
consumer
TOPIC,=SERVER,
bootstrap_servers='earliest',
auto_offset_reset=lambda x: json.loads(x.decode('utf-8'))
value_deserializer
)
# Pobieranie transakcji w niemal real-time i analiza
for message in consumer:
= message.value
transaction if transaction["values"] > 80:
print(f"🚨 Wykryto dużą transakcję: {transaction}")
Consumer w środowisku Python
Przygotuj środowisko i uruchom skrypt producenta.
Rozpatrzmy kod konsumenta czytającego z topicu oraz realizującego prostą regułę decyzyjną.
Kod ten uruchom poleceniem:
python konsument.py