Apache Kafka producer

Apache Kafka with our env

You can go through this version by having a new Docker image and Docker Desktop running on your own computer.

  1. Open your web browser and navigate to the environment page (for Docker, open localhost:8888).
  2. Launch a new terminal (in Jupyter Lab, using the terminal icon).
  3. Navigate to the main directory and list all elements. Check if the kafka directory is on the list.
   cd ~
   ls -all
  1. Run the command to check the list of Kafka server topics.
    kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
  1. Add a topic named “streaming”
   kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streaming
  1. Check the list of topics again, making sure you have the “streaming” topic.
  2. Open a new terminal in the notebook and create a producer in the console generating data for the new topic.
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic streaming

To check if message sending works, open another terminal window and enter the following command to run a consumer in the console:

kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginning

Remember to run commands from the appropriate directory.

%%file stream.py

import json
import random
import sys
from datetime import datetime, timedelta
from time import sleep

from kafka import KafkaProducer

if __name__ == "__main__":
    SERVER = "broker:9092"

    producer = KafkaProducer(
        bootstrap_servers=[SERVER],
        value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        api_version=(3, 7, 0),
    )
    
    try:
        while True:
            
            t = datetime.now() + timedelta(seconds=random.randint(-15, 0))
            
            message = {
                "time" : str(t),
                "id" : random.choice(["a", "b", "c", "d", "e"]),
                "values" : random.randint(0,100)
            }
            
            
            producer.send("streaming", value=message)
            sleep(1)
    except KeyboardInterrupt:
        producer.close()