Apache Kafka on docker

Apache Kafka on docker

To install Kafka we need 2 things:-

  1. Instance of Zookeeper

  2. Instance of Kafka

Therefore we will create 2 services in docker-compose.yaml. The role of the zookeeper is to store information about various partitions and the leader of the partition.

Write docker-compose.yaml file

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper_ex
    ports:
     - "2181:2181"
    restart: unless-stopped

  kafka:
    image: wurstmeister/kafka
    container_name: kafka_ex
    ports:
     - "9092:9092"
    expose:
     - "9093"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_CREATE_TOPICS: "test_topic:1:1"
      KAFKA_LOG_RETENTION_HOURS: 1
      KAFKA_LOG_RETENTION_BYTES: 4073741824
      KAFKA_LOG_SEGMENT_BYTES: 1073741824
      KAFKA_RETENTION_CHECK_INTERVAL_MS: 300000
    volumes:
     - /var/run/docker.sock:/var/run/docker.sock
    restart: unless-stopped

Once we have created the docker-compose.yaml file we need to up the containers using the following command:-

docker-compose up

Open a new terminal and check the status of containers:-

docker ps

Create 2 python files named consumer.py and producer.py in folder where we created docker-compose.yaml file and in the terminal install kafka-python.

pip install kafka-python

Consumer.py

import json
from kafka import KafkaConsumer, KafkaProducer

kafka_server = ['localhost:9092']

topic = "test_topic"

consumer = KafkaConsumer(
    bootstrap_servers=kafka_server,
    value_deserializer=json.loads,
    auto_offset_reset="latest",
)

consumer.subscribe(topic)

while True:
    data = next(consumer)
    print(data)
    print(data.value)

Producer.py

import json
from datetime import datetime 
from time import sleep
from random import choice
from kafka import KafkaProducer

kafka_server = ['localhost:9092']

topic = "test_topic"
producer = KafkaProducer(bootstrap_servers=kafka_server,
                         value_serializer = lambda v:json.dumps(v).encode("utf-8")
                         )

random_values=[1,2,3,4,5,6,7]

while True:
    random_value = choice (random_values)
    data ={
        "test_data":{
            "random_value": random_value
        },
        "timestamp" : str(datetime),
        "value_status": "High" if random_value > 5 else "low"
    }

    print(data)
    producer.send(topic, data)

    producer.flush()
    sleep(3)

Open 2 new terminals and run the below commands:-

python producer.py  #terminal 1
python consumer.py #terminal 2