59
loading...
This website collects cookies to deliver better user experience
key-value
pair. Those elements can be anything from an integer to a Protobuf message, provided the right serializer and deserializer.public class Main {
public static void main(String[] args) throws Exception {
// Configure your producer
Properties producerProperties = new Properties();
producerProperties.put("bootstrap.servers", "localhost:29092");
producerProperties.put("acks", "all");
producerProperties.put("retries", 0);
producerProperties.put("linger.ms", 1);
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
producerProperties.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
producerProperties.put("schema.registry.url", "http://localhost:8081");
// Initialize a producer
Producer<Long, AvroHelloMessage> producer = new KafkaProducer<>(producerProperties);
// Use it whenever you need
producer.send(new AvroHelloMessage(1L, "this is a message", 2.4f, 1));
}
}
public class Main {
public static Properties configureConsumer() {
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers", "localhost:29092");
consumerProperties.put("group.id", "HelloConsumer");
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
consumerProperties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
consumerProperties.put("schema.registry.url", "http://localhost:8081");
// Configure Avro deserializer to convert the received data to a SpecificRecord (i.e. AvroHelloMessage)
// instead of a GenericRecord (i.e. schema + array of deserialized data).
consumerProperties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
return consumerProperties;
}
public static void main(String[] args) throws Exception {
// Initialize a consumer
final Consumer<Long, AvroHelloMessage> consumer = new KafkaConsumer<>(configureConsumer());
// Chose the topics you will be polling from.
// You can subscribe to all topics matching a Regex.
consumer.subscribe(Pattern.compile("hello_topic_avro"));
// Poll will return all messages from the current consumer offset
final AtomicBoolean shouldStop = new AtomicBoolean(false);
Thread consumerThread = new Thread(() -> {
final Duration timeout = Duration.ofSeconds(5);
while (!shouldStop) {
for (ConsumerRecord<Long, AvroHelloMessage> record : consumer.poll(timeout)) {
// Use your record
AvroHelloMessage value = record.value();
}
// Be kind to the broker while polling
Thread.sleep(5);
}
consumer.close(timeout);
});
// Start consuming && do other things
consumerThread.start();
// [...]
// End consumption from customer
shouldStop.set(true);
consumerThread.join();
}
}
Further Reading:
Available Libraries
Producer Configuration
Consumer Configuration
streams-plaintext-input
containing strings values, without necessarily providing keys. The few lines configuring the StreamsBuilder
will:counts-store
.streams-wordcount-output
.
public class Main {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("streams-plaintext-input")
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
.toStream()
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
// The consumer loop is handled by the library
streams.start();
latch.await();
}
}
mongo-source
to insert them into the collection sink
of the database named kafka-connect
. The credentials are provided from an external file, which is a feature of Kafka Connect to protect secrets.{
"name": "mongo-sink",
"config": {
"topics": "mongo-source",
"tasks.max": "1",
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri": "mongodb://${file:/auth.properties:username}:${file:/auth.properties:password}@mongo:27017",
"database": "kafka_connect",
"collection": "sink",
"max.num.retries": "1",
"retries.defer.timeout": "5000",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
"post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder",
"delete.on.null.values": "false",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy"
}
}
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
http://localhost:8083/connectors -d @sink-conf.json
CREATE TABLE <name> WITH (...)
): Creates a new stream or table that will be automatically updated.SELECT * FROM <table|stream> WHERE ID = 1
): Behaves similarly to a standard DBMS. Fetches data as an instant snapshot and closes the connection._SELECT * FROM <table|stream> EMIT CHANGES_
): Requests a persistent connection to the server, asynchronously pushing updated values.list topics
, and their content displayed using print <name>
.ksql> list topics;
Kafka Topic | Partitions | Partition Replicas
----------------------------------------------------
hello_topic_json | 1 | 1
----------------------------------------------------
ksql> print 'hello_topic_json' from beginning;
Key format: KAFKA_BIGINT or KAFKA_DOUBLE or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/05/25 08:44:20.922 Z, key: 1, value: {"user_id":1,"message":"this is a message","value":2.4,"version":1}
rowtime: 2021/05/25 08:44:20.967 Z, key: 1, value: {"user_id":1,"message":"this is another message","value":2.4,"version":2}
rowtime: 2021/05/25 08:44:20.970 Z, key: 2, value: {"user_id":2,"message":"this is another message","value":2.6,"version":1}
-- Let's create a table from the previous topic
ksql> CREATE TABLE messages (user_id BIGINT PRIMARY KEY, message VARCHAR)
> WITH (KAFKA_TOPIC = 'hello_topic_json', VALUE_FORMAT='JSON');
-- We can see the list and details of each table
ksql> list tables;
Table Name | Kafka Topic | Key Format | Value Format | Windowed
---------------------------------------------------------------------------
MESSAGES | hello_topic_json | KAFKA | JSON | false
---------------------------------------------------------------------------
ksql> describe messages;
Name : MESSAGES
Field | Type
-----------------------------------------------
USER_ID | BIGINT (primary key)
MESSAGE | VARCHAR(STRING)
-----------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
-- Appart from some additions to the language, the queries are almost declared in standard SQL.
ksql> select * from messages EMIT CHANGES;
+--------+------------------------+
|USER_ID |MESSAGE |
+--------+------------------------+
|1 |this is another message |
|2 |this is another message |
59