Making Apache Kafka and Confluent Schema Registry on docker-compose accessible on the internet
3 min readAug 1, 2024
Full example available at https://github.com/alberttwong/onehouse-demos/tree/main/mysql-debezium
So here are my requirements
- Apache Kafka (not Confluent’s version)
- Confluent Schema Registry (I need to to support AVRO and/or protobuf)
- All software has to run on docker-compose
- Need to somehow make a docker-compose environment available on the internet (using ngrok)
- Enable auth security on Confluent Schema Registry with http (didn’t bother with https)
- Configure Apache Kafka to connect to Confluent Schema Registry with auth
- Have min.io object store so I can store my Apache Hudi on a S3 system
- Support Spark to write Apache Hudi files on S3
- Support Apache Hive Metastore to register Apache Hudi files for query engine support (StarRocks, Trino)
- Debezium configured mysql and kafka connect for change data capture (CDC)
So here’s all the important bits (look at ngrok, kafka and confluent schema registry configuration):
docker-compose.yml
services:
ngrok:
image: ngrok/ngrok:latest
restart: unless-stopped
command:
- "start"
- "--all"
- "--config"
- "/etc/ngrok.yml"
volumes:
- ./ngrok.yml:/etc/ngrok.yml
ports:
- 4040:4040
zookeeper:
image: quay.io/debezium/zookeeper:2.7.0.Final
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.7.0.Final
ports:
- 29092:29092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=DOCKER://kafka:29092, NGROK://kafka:9092
- KAFKA_ADVERTISED_LISTENERS=DOCKER://kafka:29092
- KAFKA_INTER_BROKER_LISTENER_NAME=DOCKER
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=DOCKER:PLAINTEXT,NGROK:PLAINTEXT
entrypoint:
- /bin/sh
- -c
- |
echo "Waiting for ngrok tunnel to be created"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://ngrok:4040/api/tunnels/kafka)
echo -e $$(date) "\tTunnels API HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
echo "ngrok tunnel is up"
NGROK_LISTENER=$(curl -s http://ngrok:4040/api/tunnels/kafka | grep -Po '"public_url":.*?[^\\]",' | cut -d':' -f2- | tr -d ',"' | sed 's/tcp:\/\//NGROK:\/\//g')
echo $$NGROK_LISTENER
export KAFKA_ADVERTISED_LISTENERS="$$KAFKA_ADVERTISED_LISTENERS, $$NGROK_LISTENER"
echo "KAFKA_ADVERTISED_LISTENERS is set to " $$KAFKA_ADVERTISED_LISTENERS
/docker-entrypoint.sh start
# apicurio:
# image: apicurio/apicurio-registry-mem:2.2.5.Final
# environment:
# - AUTH_ENABLED=true
# - CLIENT_CREDENTIALS_BASIC_AUTH_ENABLED=true
# ports:
# - 8080:8080
# - 8090:8090
schema-registry:
image: confluentinc/cp-schema-registry:7.0.15
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:29092
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
- SCHEMA_REGISTRY_AUTHENTICATION_METHOD=BASIC
- SCHEMA_REGISTRY_AUTHENTICATION_ROLES=write,read,admin
- SCHEMA_REGISTRY_AUTHENTICATION_REALM=Schema
# - SCHEMA_REGISTRY_CONFLUENT_LICENSE_REPLICATION_FACTOR=1
- SCHEMA_REGISTRY_OPTS=-Djava.security.auth.login.config=/tmp/jaas_config.file
# - SCHEMA_REGISTRY_SCHEMA_REGISTRY_RESOURCE_EXTENSION_CLASS=io.confluent.kafka.schemaregistry.security.SchemaRegistrySecurityResourceExtension
# - SCHEMA_REGISTRY_CONFLUENT_SCHEMA_REGISTRY_AUTHORIZER_CLASS=io.confluent.kafka.schemaregistry.security.authorizer.schemaregistryacl.SchemaRegistryAclAuthorizer
# - SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR=1
# - SCHEMA_REGISTRY_CONFLUENT_SCHEMA_REGISTRY_AUTH_MECHANISM=JETTY_AUTH
volumes:
- ./jaas_config.file:/tmp/jaas_config.file
- ./password-file:/tmp/password-file
links:
- zookeeper
mysql:
image: quay.io/debezium/example-mysql:2.7.0.Final
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
connect:
image: quay.io/debezium/connect:2.7.0.Final
ports:
- 8083:8083
links:
- kafka
- mysql
- schema-registry
environment:
- BOOTSTRAP_SERVERS=kafka:29092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
volumes:
- ./kafka/connect/thirdparty:/kafka/connect/thirdparty
metastore_db:
image: postgres:11
hostname: metastore_db
environment:
POSTGRES_USER: hive
POSTGRES_PASSWORD: hive
POSTGRES_DB: metastore
hive-metastore:
hostname: hive-metastore
image: 'starburstdata/hive:3.1.2-e.18'
ports:
- '9083:9083' # Metastore Thrift
environment:
HIVE_METASTORE_DRIVER: org.postgresql.Driver
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
HIVE_METASTORE_USER: hive
HIVE_METASTORE_PASSWORD: hive
HIVE_METASTORE_WAREHOUSE_DIR: s3a://warehouse/
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: admin
S3_SECRET_KEY: password
S3_PATH_STYLE_ACCESS: "true"
REGION: ""
GOOGLE_CLOUD_KEY_FILE_PATH: ""
AZURE_ADL_CLIENT_ID: ""
AZURE_ADL_CREDENTIAL: ""
AZURE_ADL_REFRESH_URL: ""
AZURE_ABFS_STORAGE_ACCOUNT: ""
AZURE_ABFS_ACCESS_KEY: ""
AZURE_WASB_STORAGE_ACCOUNT: ""
AZURE_ABFS_OAUTH: ""
AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
AZURE_ABFS_OAUTH_CLIENT_ID: ""
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
depends_on:
- metastore_db
healthcheck:
test: bash -c "exec 6<> /dev/tcp/localhost/9083"
minio:
image: minio/minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
default:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"
networks:
default:
name: datalakehouse
Thanks to:
- Getting Apache Kafka to work with ngrok https://rmoff.net/2023/11/01/using-apache-kafka-with-ngrok/
- This tutorial is based off of https://github.com/debezium/debezium-examples/tree/main/tutorial
- Setting auth in Confluent Schema Registry https://github.com/Dabz/kafka-security-playbook/tree/master/schema-registry/with-basic-auth
- Having Kafka Connect to Confluent Schema Registry using auth https://docs.confluent.io/platform/current/schema-registry/connect.html