Kafka Consumer
1️⃣ Check topic list
Go to the home directory:
cd ~
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
2️⃣ Create new topis with name mytopic
kafka/bin/kafka-topics.sh --create --topic mytopic --bootstrap-server broker:9092
3 Recheck topic list
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092 | grep mytopic
Kafka producer
from kafka import KafkaConsumer
import json
= "broker:9092"
SERVER = "mytopic"
TOPIC
# Konsumer do pobierania danych z Kafka
= KafkaConsumer(
consumer
TOPIC,=SERVER,
bootstrap_servers='earliest',
auto_offset_reset=lambda x: json.loads(x.decode('utf-8'))
value_deserializer
)
# Pobieranie transakcji w niemal real-time i analiza
for message in consumer:
= message.value
transaction if transaction["values"] > 80:
print(f"🚨 BAD TRANSACTION: {transaction}")