34
loading...
This website collects cookies to deliver better user experience
v2.7.0
in our machine when we discussed Kafka installation on EC2, but the following commands is helpful if you'll be working on Kafka inrastructure that's been built by others. The latest release and the current stable version as of this writing is v2.8.0
.[root@hcptstkafka1 kafka]# kafka-topics.sh --version
2.7.0 (Commit:448719dc99a19793)
# On terminal 1
zookeeper-server-start.sh /$KAFKA-HOME/zookeeper.properties
# On terminal 2
kafka-server-start.sh /$KAFKA-HOME/server.properties
kafka-topics.sh
--create --topic
, followed by the topic name--partition
, followed by how many partition you want,--replication-factor
, followed by a number which should be equal or less than the number of brokers.--zookeeper
, followed by local machine's and port 2181 --bootstrap-server
which runs on port 9092
instead.localhost
and 127.0.0.1
can be used interchangeably.[root@hcptstkafka1 ~]# kafka-topics.sh \
--topic test-topic-1 --create \
--partitions 3 --replication-factor 1 \
--zookeeper 127.0.0.1:2181
Created topic test-topic-1.
[root@hcptstkafka1 ~]# kafka-topics.sh \
--create --topic test-topic-2 \
--partitions 4 --replication-factor 1 \
--bootstrap-server 127.0.0.1:9092
Created topic test-topic-2.
[root@hcptstkafka1 ~]# kafka-topics.sh \
--create --topic test-topic-3 \
--partitions 5 --replication-factor 1 \
--zookeeper localhost:2181
Created topic test-topic-3.
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--bootstrap-server localhost:9092
test-topic-1
test-topic-2
test-topic-3
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--zookeeper localhost:2181
test-topic-1
test-topic-2
test-topic-3
--delete
with the new approach, no output is returned.--delete
will return a message saying that the topic is is marked for deletion.[root@hcptstkafka1 ~]# kafka-topics.sh \
--delete --topic test-topic-2 \
--zookeeper localhost:2181
Topic test-topic-2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@hcptstkafka1 ~]#
[root@hcptstkafka1 ~]# kafka-topics.sh \
--delete --topic test-topic-3 \
--bootstrap-server localhost:9092
[root@hcptstkafka1 ~]#
[root@hcptstkafka1 ~]# kafka-topics.sh --list \
--bootstrap-server localhost:9092
test-topic-1
[root@hcptstkafka1 ~]# kafka-topics.sh \
--describe --topic test-topic-1 \
--bootstrap-server localhost:9092
Topic: test-topic-1 PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test-topic-1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-1 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-1 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create --topic test-eden-1 \
--partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-eden-1.
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create --topic test-tina-1 \
--partitions 3 --replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-tina-1.
[root@hcptstkafka1 ~]# cd /usr/local/bin/kafka/data/kafka/
[root@hcptstkafka1 kafka]# ll
total 20
-rw-r--r-- 1 root root 4 Aug 15 07:25 cleaner-offset-checkpoint
-rw-r--r-- 1 root root 4 Aug 15 07:38 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 Aug 15 06:13 meta.properties
-rw-r--r-- 1 root root 151 Aug 15 07:38 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 151 Aug 15 07:39 replication-offset-checkpoint
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-0
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-1
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-eden-1-2
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-0
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-1
drwxr-xr-x 2 root root 141 Aug 15 07:34 test-tina-1-2
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-0
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-1
drwxr-xr-x 2 root root 141 Aug 15 06:15 test-topic-1-2
[root@hcptstkafka1 kafka]#
[root@hcptstkafka1 kafka]# kafka-topics.sh --list --bootstrap-server localhost:9092
test-eden-1
test-tina-1
test-topic-1
broker-list
- deprecatedbootstrap-server
topic
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic-1
>Hello world!
>This is the first message that we sent to a topic! D D
>^Z
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--bootstrap-server localhost:9092
>Awesome job on this!
>Alright!
>^Z
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 --topic test-topic-10
> Third set of messages!
[2021-06-29 19:57:39,480] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-10=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>Wait there's error but I'm still able to type in a message
>^Z
# the new topic should now appear when you try to list the topics
[root@hcptstkafka1 ~] kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
test-topic-1
test-topic-10
describe
.[root@hcptstkafka1 ~] kafka-topics.sh \
--describe --topic test-topic-10 \
--bootstrap-server localhost:9092
Topic: test-topic-10 PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test-topic-10 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
config/server.properties
[root@hcptstkafka1 ~] pwd
/usr/local/bin/kafka
[root@hcptstkafka1 kafka]
[root@hcptstkafka1 kafka] vi config/server.properties
## Some of the output omitted ##
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/bin/kafka/data/kafka
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=4
kafka-server-start.sh
was run, hit Ctrl-C then run the command again.# Send the message
[root@hcptstkafka1 ~] kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test-topic-100
>Another message sent!
[2021-06-29 20:12:57,142] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic-100=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>Oops! But hey, still sending message!
>^Z
# List the topics and get the details of the newly-created topic.
[root@hcptstkafka1 ~] kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
test-topic-1
test-topic-10
test-topic-100
[root@hcptstkafka1 ~] kafka-topics.sh \
--describe --topic test-topic-100 \
--bootstrap-server localhost:9092
Topic: test-topic-100 PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test-topic-100 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-100 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-100 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: test-topic-100 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
[root@hcptstkafka1 ~] kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic-1 --producer-property acks=all
> This is my second batch of message sent to the topic.
>I should get confirmation from the topic
bootstrap-server
topic
kafka-console-consumer
is ran.kafka-console-consumer
and then run kafka-console-producer
on another window and send a message again to the topic, the console consumer will now be able to see it in real-time.# In producer's window
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-10
>Sending a message to the second topic - test-topic-10!
>yeahboii!
>over and out!
# In consumer's window.
# Note that each line of message arrives in real-time.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:90192 \
--topic test-topic-10 sr: 0
Sending a message to the second topic - test-topic-10! sr: 0
yeahboii!
over and out!
--from-beginning
. To see the messages we sent to test-topic-1 earlier,[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--from-beginning
Alright!
I should get confirmation from the topic
This is the first message that we sent to a topic!
This is my second batch of message sent to the topic.
--partition
parameter followed the partition number.[root@hcptstkafka1 ~]# kafka-console-producer.sh \
--topic test-eden-1 \
--bootstrap-server localhost:9092
>This topic will contain a list of names
>Barney
>Marshall
>Ted
>Robin
>Lily
>Sheldon
>Rajesh
>Penny
>Leonard
>Howard
>Bernadette
>Amy
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-eden-1 \
--from-beginning \
--bootstrap-server localhost:9092
This topic will contain a list of names
Marshall
Lily
Rajesh
Leonard
Amy
Barney
Ted
Sheldon
Penny
Bernadette
Robin
Howard
[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--partition 1 \
--from-beginning \
--bootstrap-server localhost:9092
This topic will contain a list of names
Marshall
Lily
Rajesh
Leonard
Amy
--from-beginning
parameter.[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--partition 1 \
--offset 2\
--bootstrap-server localhost:9092
Lily
Rajesh
Leonard
Amy
[root@hcptstkafka1 ~]# kafka-console-consumer.sh --topic test-eden-1 \
--offset 2\
--bootstrap-server localhost:9092
The partition is required when offset is specified.
# Create new topic
[root@hcptstkafka1 kafka]# kafka-topics.sh \
--create \
--topic test-topic-10 \
--partitions 3 \
--replication-factor 1 \
--bootstrap-server localhost:9092
Created topic test-tina-1.
# Runs consumer 1 on terminal 1
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
# Runs consumer 2 on terminal 2
[root@hcptstkafka1 ~]# kafka-console-consumer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--topic test-topic-10 \
--bootstrap-server localhost:9092
>10
>20
>30
>40
>50
>60
--group
when we run consumers.kafka-console-consumer
with --group
on both consumers.# Run command in two terminals for the consumers
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group
# Run command in terminal for producer
[root@hcptstkafka1 kafka]# kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1
kafka-console-producer
and begin sending messages. You'll see that messages will be distributed between the two consumers.--from-beginning
with the kafka-console-consumer
and specified the --group
, you'll be able to see all of the messages sent to the topic.--from-beginning
, it will return nothing. This is because the first time this command was run, all the messages from the start have been committed.[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group \
--from-beginning
let try sending messages! which consumer will receive this color?
consumer will receive this color?
red
orange
yellow
green
blue
# When you rerun the command again, it returns nothing.
[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group \
--from-beginning
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 --list
my-awesome-app-group
--describe
and specify the group.[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--describe
Consumer group 'my-awesome-app-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-awesome-app-group test-topic-1 2 9 11 2 - - -
my-awesome-app-group test-topic-1 1 10 10 0 - - -
my-awesome-app-group test-topic-1 0 19 20 1 - - -
kafka-console-consumer
again.[root@hcptstkafka1 kafka] kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1
violet
white
black
describe
on the consumer group, the lag will now be zero.[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--describe
Consumer group 'my-awesome-app-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-awesome-app-group test-topic-1 2 11 11 0 - - -
my-awesome-app-group test-topic-1 1 10 10 0 - - -
my-awesome-app-group test-topic-1 0 20 20 0 - - -
kafka-console-consumer.sh
and specifying the group will read from the offset - which is the last position in the partition where it left off.kafka-consumer-group
command without any parameter, it will not push through and instead will return the available parameters that you can append to the command.[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh
# Some parts of the output omitted
--reset-offsets Reset offsets of consumer group.
Supports one consumer group at the
time, and instances should be
inactive
Has 2 execution options: --dry-run
(the default) to plan which offsets
to reset, and --execute to update
the offsets. Additionally, the --
export option is used to export the
results to a CSV format.
You must choose one of the following
reset specifications: --to-datetime,
--by-period, --to-earliest, --to-
latest, --shift-by, --from-file, --
to-current.
To define the scope use --all-topics
or --topic. One scope must be
specified unless you use '--from-
file'.
--reset-offsets
and specify that we want to start at the beginning--to-earliest
. We also need to define if we want reset the offset for just one topic or for all topics.--execute
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets --to-earliest \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 0
my-awesome-app-group test-topic-1 1 0
my-awesome-app-group test-topic-1 2 0
test-topic-1
, we'll be able to see all the messages from the beginning.[root@hcptstkafka1 kafka]# kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic-1 \
--group my-awesome-app-group
Alright!
I should get confirmation from the topic
the first
ahh right,
orange
orange
green
This is the first message thatwe sent to a topic!
This is my second batch of message sent to the topic.
Let's see where this will go
now lets try colors
yellow
red
violet
Hello world!
Awesome job on this!
which consumer
or second
its distributes
red\
green
blue
let try sending messages! which consumer will receive this color?
yellow
blue
white
--shift-by
and append a positive number to advance forward with the number of steps or a negative number to move backward.# Moving the offset three steps forward
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by 3 \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 3
my-awesome-app-group test-topic-1 1 3
my-awesome-app-group test-topic-1 2 3
# Moving the offset another step.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by 1 \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 4
my-awesome-app-group test-topic-1 1 4
my-awesome-app-group test-topic-1 2 4
# Moving the offset three steps backward.
[root@hcptstkafka1 kafka]# kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-awesome-app-group \
--reset-offsets \
--shift-by -3 \
--topic test-topic-1 \
--execute
GROUP TOPIC PARTITION NEW-OFFSET
my-awesome-app-group test-topic-1 0 1
my-awesome-app-group test-topic-1 1 1
my-awesome-app-group test-topic-1 2 1
ERROR [KafkaServer id=0] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.KafkaException: Socket server failed to bind to 0.0.0.0:9092: Address already in use
netstat
or lsof
.[root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092
tcp6 0 0 :::9092 :::* LISTEN 23999/java
[root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
java 23999 root 134u IPv6 52852 0t0 TCP *:XmlIpcRegSvc (LISTEN)
# send a kill signal
[root@hcptstkafka1 kafka]# kill -9 23999
# process should now disappear
[root@hcptstkafka1 kafka]# lsof -n -i :9092 | grep LISTEN
[root@hcptstkafka1 kafka]# netstat -tulpn |grep 9092
# retry running kafka
[root@hcptstkafka1 kafka]# kafka-server-start.sh config/server.properties
kafka-console-producer \
--broker-list 127.0.0.1:9092 \
--topic first_topic \
--property parse.key=true \
--property key.separator=,
> key,value
> another key,another value
kafka-console-consumer \
--bootstrap-server 127.0.0.1:9092 \
--topic first_topic --from-beginning \
--property print.key=true \
--property key.separator=,
Basic KAFKA Commands
START ZOOKEEPER
bin/zookeeper-server-start.sh config/zookeeper.properties
START KAFKA BROKER
bin/kafka-server-start.sh config/server.properties
CREATE TOPIC
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--replication-factor 1 \
--partitions 3 \
--topic test
LIST TOPICS
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list
TOPIC DETAILS
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--describe \
--topic test
START CONSOLE PRODUCER
bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic test
START CONSOLE CONSUMER
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test
START CONSOLE CONSUMER AND READ MESSAGES FROM BEGINNING
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--from-beginning
START CONSOLE CONSUMER WITH SPECIFIC CONSUMER GROUP
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test \
--group test \
--from-beginning
LIST CONSUMER GROUPS
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
CONSUMER GROUP DETAILS
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group test \
--describe
Apache Kafka Series - Learn Apache Kafka for Beginners v2 by Stephane Maarek
Getting Started with Apache Kafka by Ryan Plant
Apache Kafka A-Z with Hands on Learning by Learnkart Technology Private Limited
The Complete Apache Kafka Practical Guide by Bogdan Stashchuk