Kafka Consumer
Befor we start go to the home directory
cd ~
1️⃣ Check topic list
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
2️⃣ Create new topis with name mytopic
kafka/bin/kafka-topics.sh --create --topic mytopic --bootstrap-server broker:9092
3️⃣ Recheck topic list
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092 | grep mytopic
Kafka Consumer code
Kafka consumer - check if transaction value is $ > 80$.
from kafka import KafkaConsumer
import json
= "broker:9092"
SERVER = "mytopic"
TOPIC
# Consumer config
= KafkaConsumer(
consumer
TOPIC,=SERVER,
bootstrap_servers='earliest',
auto_offset_reset=lambda x: json.loads(x.decode('utf-8'))
value_deserializer
)
for message in consumer:
= message.value
transaction if transaction["values"] > 80:
print(f"🚨 BAD TRANSACTION: {transaction}")