25
loading...
This website collects cookies to deliver better user experience
version: '3.9'
services:
db:
image: postgres:latest
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=arctype
docker-compose up
, we should have a functioning Postgres database.db_1 | 2021-05-22 03:03:59.860 UTC [47] LOG: database system is ready to accept connections
$ psql -h 127.0.0.1 -U postgres
Password for user postgres:
127 postgres@postgres=#
psql
prompt.version: '3.9'
services:
db:
image: postgres:latest
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=arctype
zookeeper:
image: debezium/zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
image: debezium/kafka
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT
- KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092
- KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT
connect:
image: debezium/connect
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
depends_on:
- zookeeper
- kafka
GROUP_ID
and *_STORAGE_TOPIC
configurations. More details on Connect configuration are available here.create table test (
id serial primary key,
name varchar
);
connector
to start streaming data from Postgres.replica
. Let's change that now.psql> alter system set wal_level to 'logical';
wal2json
. Debezium can work with either wal2json
or protobuf
. For this tutorial, we will use wal2json
. As its name implies, it converts Postgres' write-ahead logs to JSON format.$ docker ps
CONTAINER ID IMAGE
c429f6d35017 debezium/connect
7d908378d1cf debezium/kafka
cc3b1f05e552 debezium/zookeeper
4a10f43aad19 postgres:latest
$ docker exec -ti 4a10f43aad19 bash
$ apt-get update && apt-get install postgresql-13-wal2json
cURL
to send it to Debezium.$ echo '
{
"name": "arctype-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "wal2json",
"database.hostname": "db",
"database.port": "5432",
"database.user": "postgres",
"database.password": "arctype",
"database.dbname": "postgres",
"database.server.name": "ARCTYPE",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"snapshot.mode": "always"
}
}
' > debezium.json
$ curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
127.0.0.1:8083/connectors/ \
--data "@debezium.json"
{
"name": "arctype-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "wal2json",
"database.hostname": "db",
"database.port": "5432",
"database.user": "postgres",
"database.password": "arctype",
"database.dbname": "postgres",
"database.server.name": "ARCTYPE",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"snapshot.mode": "always",
"name": "arctype-connector"
},
"tasks": [],
"type": "source"
}
test
table.$ docker exec -it \
$(docker ps | grep arctype-kafka_kafka | awk '{ print $1 }') \
/kafka/bin/kafka-topics.sh \
--bootstrap-server localhost:9092 --list
ARCTYPE.public.test
__consumer_offsets
my_connect_configs
my_connect_offsets
my_connect_statuses
docker exec
allows you to execute a command inside of a container without having to enter into its shell. Docker requires you to specify the container id when you use docker exec
. When you re-create docker containers, the ID will change, making it futile to memorize that ID. $(docker ps | grep arctype-kafka_kafka | awk '{ print $1 }')
finds the correct container ID by listing all active docker containers (docker ps
), running them through grep to find the one that is running pure Kafka and then using awk
to cherry-pick just the first column of the output, which will be the container id. the $()
syntax runs a command and inserts its output in place.--bootstrap-server
. They refer to it as bootstrap because you'll usually run Kafka as a cluster with several nodes, and you need one of them that is public-facing for your consumer to "enter the mix." Kafka handles the rest on its own.ARCTYPE.public.test
. The first part, ARCTYPE
, is a prefix that we set with the database.server.name
field in the JSON configuration. The second part represents which Postgres schema the table is in, and the last part is the table name. Once you write more Kafka producers and stream applications, you'll have many more topics, so it's helpful to set the prefix to make it easy to identify which topics are pure SQL tables.$ docker exec -it \
$(docker ps | grep arctype-kafka_kafka | awk '{ print $1 }') \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic ARCTYPE.public.test
--from-beginning
to the console command.INSERT
and watch for output.postgres=# insert into test (name) values ('Arctype Kafka Test!');
INSERT 0 1
$ docker exec -it $(docker ps | grep arctype-kafka_kafka | awk '{ print $1 }') /kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic ARCTYPE.public.test
...
{
"before": null,
"after": {
"id": 8,
"name": "Arctype Kafka Test!"
},
"source": {
"version": "1.5.0.Final",
"connector": "postgresql",
"name": "ARCTYPE",
"ts_ms": 1621913280954,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"22995096\",\"22995096\"]",
"schema": "public",
"table": "test",
"txId": 500,
"lsn": 22995288,
"xmin": null
},
"op": "c",
"ts_ms": 1621913280982,
"transaction": null
}
name
field of the record we inserted!