30
loading...
This website collects cookies to deliver better user experience
stdout
with echo and piped it to kafkacat. We only needed two simple flags: -b
for the broker and -t
for the topic. kafkacat realizes that we are sending it data and switches into producer mode. Now, we can read that data with the exact same kafkacat command:-K
flag. In this case, we'll use a colon:-K
flag off when reading, if we only want the value:stdout
to kafkacat, as we did above, will spin up a producer, send the data, and then shut the producer down. To start a producer and leave it running to continue sending data, use the -P
flag, as suggested by the auto-selecting
message above. kafka-console-consumer
would. In order to consume from a topic and immediately exit, we can use the -e
flag. Avro
format, we can use the -s
flag. This flag can be used for the whole record -s avro
, for just the key -s key=avro
, or just the value -s value=avro
. Here's an example using the movies
topic from the popular movie rating tutorial:kafkacat -h
will provide the complete list. For more great examples of kafkacat in action, check out related posts on Robin Moffatt’s blog. One piece missing from kafkacat is the ability to produce data in Avro
format. As we saw, we can consume Avro
with kafkacat using the Confluent Schema Registry, but we can't produce it. This leads us to our next tool.curl
or something similar. It can also be used with tools such as Postman to build a user-friendly Kafka UI. Here's an example of producing to a topic with Postman (the Content-Type
and Accept
headers were set under the “Headers” tab): curl
and Postman versions, REST Proxy does require that the schema for Avro
messages be passed in with each produce request. A tool like Postman, which allows you to build up a library of saved queries, can make this easier to manage. To consume from topics with REST Proxy, we first create a consumer in a consumer group, then subscribe to a topic or topics, and finally fetch records to our heart’s content. We'll switch back to curl
so that we can see all the necessary bits at once. First, we POST
to the consumer’s endpoint with our consumer group name. In this POST
request, we will pass a name for our new consumer instance, the internal data format (in this case, Avro
), and the auto.offset.reset
value.instance id
and base URI
of the newly created consumer instance. Next, we'll use that URI
to subscribe to a topic with a POST
to the subscription
endpoint.204
response. Now we can use a GET
request to the records
endpoint of that same URI
to fetch records.GET
request anytime to check for new data. If we no longer need this consumer, we can DELETE
it using the base URI
.brokers
, topics
, and partitions
with simple GET
requests.JSON
data, which we'll leave off for the sake of space. However, this does lead us nicely to our next tool.jq
: A command line processor for JSONjq
is an incredibly helpful tool when working with other command line utilities that return JSON
data. jq
is a command line utility that allows us to format, manipulate, and extract data from the JSON
output of other programs. Instructions for downloading and installing jq
can be found on GitHub, along with links to tutorials and other resources. GET
call to our consumer above. It's not the largest blob of JSON
out there, but it’s still a bit hard to read. Let's try again, this time piping the output to jq
:jq
:jq
. The bit between the single quotes is a jq
program with two steps. jq
uses the same pipe character to pass the output of one step to the input of another.jq
is an iterator, which will read each movie record from the array and pass it to the next step.jq
creates a new JSON
object from each record. The keys are arbitrary, but the values are derived from the input using jq
's identity
operator, '.'
.jq
, and you can read all about it in the documentation. The way that jq
operates on a stream of JSON
data, allowing us to combine different operations in order to achieve our desired results, reminds me of Kafka Streams—bringing us to our final tool.Topology::describe
method:EXPLAIN
command. First, find the executing query:CSAS_SHIPPED_ORDERS_0
, to get the topology: