23
loading...
This website collects cookies to deliver better user experience
The Azure Cosmos DB Linux Emulator which can be used for local development and testing purposes without creating an Azure subscription or incurring any costs.
And, Docker Compose which is a tool for defining and running multi-container Docker applications. It will orchestrate all the components required by our setup including Azure Cosmos DB emulator, Kafka, Zookeeper, Kafka connectors etc.
Step 0 — A simple scenario to check if our setup if functional.
How to handle streaming JSON data
How to handle streaming JSON data which is not compatible with Azure Cosmos DB
How to handle Avro data using Schema Registry
It is assumed that you’re comfortable with Kafka and have an understanding of Kafka Connect
At the time of writing, the Azure Cosmos DB Linux Emulator is in preview.
git clone [https://github.com/Azure-Samples/cosmosdb-kafka-connect-docker](https://github.com/Azure-Samples/cosmosdb-kafka-connect-docker)
cd cosmosdb-kafka-connect-docker
Azure Cosmos DB emulator
Kafka and Zookeeper
Azure Cosmos DB and Datagen connectors (run as separate Kafka Connect workers)
Confluent Schema Registry
(optional)
docker pull confluentinc/cp-zookeeper:latest
docker pull confluentinc/cp-kafka:latest
docker pull confluentinc/cp-schema-registry:latest
docker-compose -p cosmosdb-kafka-docker up --build
docker-compose -p cosmosdb-kafka-docker ps
Although this process can be automated, I am doing it manually to make things clear.
docker exec --user root -it cosmosdb-kafka-docker_cosmosdb-connector_1 /bin/bash
# execute the below command inside the container
curl -k https://cosmosdb:8081/_explorer/emulator.pem > ~/emulatorcert.crt && keytool -noprompt -storepass changeit -keypass changeit -keystore /usr/lib/jvm/zulu11-ca/lib/security/cacerts -importcert -alias emulator_cert -file ~/emulatorcert.crt
You should see this output — Certificate was added to keystore
Database named testdb
Containers — inventory, orders, orders_avro (ensure that the partition key for all the containers is /id)
curl -X POST -H "Content-Type: application/json" -d @cosmosdb-inventory-connector_1.json http://localhost:8083/connectors
# to check the connector status
curl http://localhost:8083/connectors/inventory-sink/status
docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-producer --topic inventory_topic --bootstrap-server kafka:29092'
{"id": "5000","quantity": 100,"productid": 42}
{"id": "5001","quantity": 99,"productid": 43}
{"id": "5002","quantity": 98,"productid": 44}
curl -X DELETE http://localhost:8083/connectors/inventory-sink/
curl -X POST -H "Content-Type: application/json" -d @cosmosdb-inventory-connector_2.json http://localhost:8083/connectors
# to check the connector status
curl http://localhost:8083/connectors/inventory-sink/status
curl -X POST -H "Content-Type: application/json" -d @datagen-inventory-connector.json http://localhost:8080/connectors
# to check the connector status
curl http://localhost:8080/connectors/datagen-inventory/status
Note that we use port 8080 for the Datagen connector since it’s running in a separate Kafka Connect container
docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-consumer --topic inventory_topic1 --bootstrap-server kafka:29092'
{"id":5,"quantity":5,"productid":5}
{"id":6,"quantity":6,"productid":6}
{"id":7,"quantity":7,"productid":7}
...
"transforms": "Cast",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "id:string"
curl -X DELETE http://localhost:8080/connectors/datagen-inventory
curl -X DELETE http://localhost:8083/connectors/inventory-sink/
curl -X POST -H "Content-Type: application/json" -d @cosmosdb-orders-connector_1.json http://localhost:8083/connectors
# to check the connector status
curl http://localhost:8083/connectors/orders-sink/status
curl -X POST -H "Content-Type: application/json" -d @datagen-orders-connector.json http://localhost:8080/connectors
# to check the connector status
curl http://localhost:8080/connectors/datagen-orders/status
docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-consumer --topic orders_topic --bootstrap-server kafka:29092'
{"ordertime":1496251410176,"orderid":3,"itemid":"Item_869","orderunits":3.2897805449886226,"address":{"city":"City_99","state":"State_46","zipcode":50570}}
{"ordertime":1500129505219,"orderid":4,"itemid":"Item_339","orderunits":3.6719921257659918,"address":{"city":"City_84","state":"State_55","zipcode":88573}}
{"ordertime":1498873571020,"orderid":5,"itemid":"Item_922","orderunits":8.4506812669258,"address":{"city":"City_48","state":"State_66","zipcode":55218}}
{"ordertime":1513855504436,"orderid":6,"itemid":"Item_545","orderunits":7.82561522361042,"address":{"city":"City_44","state":"State_71","zipcode":87868}}
...
"transforms": "RenameField,Cast",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "orderid:id",
"transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec": "id:string"
Depending on your use case, removing/renaming a field altogether may not be an ideal solution. However, it’s good to know that there are options. Also, remember that the original data in Kafka topics is still there, untouched. Other downstream apps can still leverage it.
curl -X DELETE http://localhost:8080/connectors/datagen-orders
curl -X DELETE http://localhost:8083/connectors/orders-sink/
curl -X POST -H "Content-Type: application/json" -d @cosmosdb-orders-connector_2.json http://localhost:8083/connectors
# to check the connector status
curl http://localhost:8083/connectors/orders-sink/status
curl -X POST -H "Content-Type: application/json" -d @datagen-orders-connector-avro.json http://localhost:8080/connectors
# to check the connector status
curl http://localhost:8080/connectors/datagen-orders/status
docker exec -it kafka bash -c 'cd /usr/bin && kafka-console-consumer --topic orders_avro_topic --bootstrap-server kafka:29092'
�����VItem_185lqf�@City_61State_73��
����WItem_219[�C��@City_74State_77��
�����VItem_7167Ix�dF�?City_53State_53��
���֩WItem_126*���?@City_58State_21��
�����VItem_329X�2,@City_49State_79��
�����XItem_886��>�|�@City_88State_27��
��V Item_956�r#�!@City_45State_96��
�ѼҕW"Item_157E�)$���?City_96State_63��
...
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
...
curl -X DELETE http://localhost:8080/connectors/datagen-orders/
curl -X DELETE http://localhost:8083/connectors/orders-sink/
docker-compose -p cosmosdb-kafka-docker down -v