%%file stream.py
import json
import random
import sys
from datetime import datetime, timedelta
from time import sleep
from kafka import KafkaProducer
if __name__ == "__main__":
= "broker:9092"
SERVER
= KafkaProducer(
producer =[SERVER],
bootstrap_servers=lambda x: json.dumps(x).encode("utf-8"),
value_serializer=(3, 7, 0),
api_version
)
try:
while True:
= datetime.now() + timedelta(seconds=random.randint(-15, 0))
t
= {
message "time" : str(t),
"id" : random.choice(["a", "b", "c", "d", "e"]),
"values" : random.randint(0,100)
}
"streaming", value=message)
producer.send(1)
sleep(except KeyboardInterrupt:
producer.close()
Apache Kafka producer
Apache Kafka with our env
You can go through this version by having a new Docker image and Docker Desktop running on your own computer.
- Open your web browser and navigate to the environment page (for Docker, open localhost:8888).
- Launch a new terminal (in Jupyter Lab, using the terminal icon).
- Navigate to the main directory and list all elements. Check if the kafka directory is on the list.
cd ~
ls -all
- Run the command to check the list of Kafka server topics.
kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
- Add a topic named “streaming”
kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streaming
- Check the list of topics again, making sure you have the “streaming” topic.
- Open a new terminal in the notebook and create a producer in the console generating data for the new topic.
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic streaming
To check if message sending works, open another terminal window and enter the following command to run a consumer in the console:
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginning
Remember to run commands from the appropriate directory.