Real-Time Analytics Lectures for SGH students

Apache Kafka

Apache Kafka

Producent kafka

import json
import random
import sys
from datetime import datetime, timedelta
from time import sleep

from kafka import KafkaProducer


if __name__ == "__main__":
    server = sys.argv[1] if len(sys.argv) == 2 else "localhost:9092"

    producer = KafkaProducer(
        bootstrap_servers=[server],
        value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        api_version=(2, 7, 0),
    )

    try:
        while True:
            message = {
                "time": str(
                    datetime.now() + timedelta(seconds=random.randint(-15, 0))
                ),
                "id": random.choice(["a", "b", "c", "d", "e"]),
                "value": random.randint(0, 100),
            }
            producer.send("topicX", value=message)
            sleep(1)
    except KeyboardInterrupt:
        producer.close()

ZMIEŃ KATALOG

cd kafka_2.12-2.7.0

WYKONAJ - terminal 1 - start zookeepera

bin/zookeeper-server-start.sh config/zookeeper.properties

WYKONAJ - terminal 2 - start serwera kafki

bin/kafka-server-start.sh config/server.properties

WYKONAJ - terminal 3 - stworzenie tematu X

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topicX

WYKONAJ - terminal 4 - start producera

python producer.py