Making Apache Kafka and Confluent Schema Registry on docker-compose accessible on the internet

Albert Wong
3 min readAug 1, 2024

--

Full example available at https://github.com/alberttwong/onehouse-demos/tree/main/mysql-debezium

Making Apache Kafka and Confluent Schema Registry on docker-compose accessible on the internet

So here are my requirements

  • Apache Kafka (not Confluent’s version)
  • Confluent Schema Registry (I need to to support AVRO and/or protobuf)
  • All software has to run on docker-compose
  • Need to somehow make a docker-compose environment available on the internet (using ngrok)
  • Enable auth security on Confluent Schema Registry with http (didn’t bother with https)
  • Configure Apache Kafka to connect to Confluent Schema Registry with auth
  • Have min.io object store so I can store my Apache Hudi on a S3 system
  • Support Spark to write Apache Hudi files on S3
  • Support Apache Hive Metastore to register Apache Hudi files for query engine support (StarRocks, Trino)
  • Debezium configured mysql and kafka connect for change data capture (CDC)

So here’s all the important bits (look at ngrok, kafka and confluent schema registry configuration):

docker-compose.yml

services:

ngrok:
image: ngrok/ngrok:latest
restart: unless-stopped
command:
- "start"
- "--all"
- "--config"
- "/etc/ngrok.yml"
volumes:
- ./ngrok.yml:/etc/ngrok.yml
ports:
- 4040:4040
zookeeper:
image: quay.io/debezium/zookeeper:2.7.0.Final
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:2.7.0.Final
ports:
- 29092:29092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=DOCKER://kafka:29092, NGROK://kafka:9092
- KAFKA_ADVERTISED_LISTENERS=DOCKER://kafka:29092
- KAFKA_INTER_BROKER_LISTENER_NAME=DOCKER
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=DOCKER:PLAINTEXT,NGROK:PLAINTEXT
entrypoint:
- /bin/sh
- -c
- |
echo "Waiting for ngrok tunnel to be created"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://ngrok:4040/api/tunnels/kafka)
echo -e $$(date) "\tTunnels API HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
echo "ngrok tunnel is up"
NGROK_LISTENER=$(curl -s http://ngrok:4040/api/tunnels/kafka | grep -Po '"public_url":.*?[^\\]",' | cut -d':' -f2- | tr -d ',"' | sed 's/tcp:\/\//NGROK:\/\//g')
echo $$NGROK_LISTENER
export KAFKA_ADVERTISED_LISTENERS="$$KAFKA_ADVERTISED_LISTENERS, $$NGROK_LISTENER"
echo "KAFKA_ADVERTISED_LISTENERS is set to " $$KAFKA_ADVERTISED_LISTENERS
/docker-entrypoint.sh start
# apicurio:
# image: apicurio/apicurio-registry-mem:2.2.5.Final
# environment:
# - AUTH_ENABLED=true
# - CLIENT_CREDENTIALS_BASIC_AUTH_ENABLED=true
# ports:
# - 8080:8080
# - 8090:8090
schema-registry:
image: confluentinc/cp-schema-registry:7.0.15
ports:
- 8181:8181
- 8081:8081
environment:
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:29092
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
- SCHEMA_REGISTRY_AUTHENTICATION_METHOD=BASIC
- SCHEMA_REGISTRY_AUTHENTICATION_ROLES=write,read,admin
- SCHEMA_REGISTRY_AUTHENTICATION_REALM=Schema
# - SCHEMA_REGISTRY_CONFLUENT_LICENSE_REPLICATION_FACTOR=1
- SCHEMA_REGISTRY_OPTS=-Djava.security.auth.login.config=/tmp/jaas_config.file
# - SCHEMA_REGISTRY_SCHEMA_REGISTRY_RESOURCE_EXTENSION_CLASS=io.confluent.kafka.schemaregistry.security.SchemaRegistrySecurityResourceExtension
# - SCHEMA_REGISTRY_CONFLUENT_SCHEMA_REGISTRY_AUTHORIZER_CLASS=io.confluent.kafka.schemaregistry.security.authorizer.schemaregistryacl.SchemaRegistryAclAuthorizer
# - SCHEMA_REGISTRY_KAFKASTORE_TOPIC_REPLICATION_FACTOR=1
# - SCHEMA_REGISTRY_CONFLUENT_SCHEMA_REGISTRY_AUTH_MECHANISM=JETTY_AUTH
volumes:
- ./jaas_config.file:/tmp/jaas_config.file
- ./password-file:/tmp/password-file
links:
- zookeeper
mysql:
image: quay.io/debezium/example-mysql:2.7.0.Final
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
connect:
image: quay.io/debezium/connect:2.7.0.Final
ports:
- 8083:8083
links:
- kafka
- mysql
- schema-registry
environment:
- BOOTSTRAP_SERVERS=kafka:29092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
volumes:
- ./kafka/connect/thirdparty:/kafka/connect/thirdparty

metastore_db:
image: postgres:11
hostname: metastore_db
environment:
POSTGRES_USER: hive
POSTGRES_PASSWORD: hive
POSTGRES_DB: metastore

hive-metastore:
hostname: hive-metastore
image: 'starburstdata/hive:3.1.2-e.18'
ports:
- '9083:9083' # Metastore Thrift
environment:
HIVE_METASTORE_DRIVER: org.postgresql.Driver
HIVE_METASTORE_JDBC_URL: jdbc:postgresql://metastore_db:5432/metastore
HIVE_METASTORE_USER: hive
HIVE_METASTORE_PASSWORD: hive
HIVE_METASTORE_WAREHOUSE_DIR: s3a://warehouse/
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: admin
S3_SECRET_KEY: password
S3_PATH_STYLE_ACCESS: "true"
REGION: ""
GOOGLE_CLOUD_KEY_FILE_PATH: ""
AZURE_ADL_CLIENT_ID: ""
AZURE_ADL_CREDENTIAL: ""
AZURE_ADL_REFRESH_URL: ""
AZURE_ABFS_STORAGE_ACCOUNT: ""
AZURE_ABFS_ACCESS_KEY: ""
AZURE_WASB_STORAGE_ACCOUNT: ""
AZURE_ABFS_OAUTH: ""
AZURE_ABFS_OAUTH_TOKEN_PROVIDER: ""
AZURE_ABFS_OAUTH_CLIENT_ID: ""
AZURE_ABFS_OAUTH_SECRET: ""
AZURE_ABFS_OAUTH_ENDPOINT: ""
AZURE_WASB_ACCESS_KEY: ""
HIVE_METASTORE_USERS_IN_ADMIN_ROLE: "admin"
depends_on:
- metastore_db
healthcheck:
test: bash -c "exec 6<> /dev/tcp/localhost/9083"

minio:
image: minio/minio
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password
- MINIO_DOMAIN=minio
networks:
default:
aliases:
- warehouse.minio
ports:
- 9001:9001
- 9000:9000
command: ["server", "/data", "--console-address", ":9001"]
mc:
depends_on:
- minio
image: minio/mc
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/warehouse;
/usr/bin/mc mb minio/warehouse;
/usr/bin/mc policy set public minio/warehouse;
tail -f /dev/null
"

networks:
default:
name: datalakehouse

Thanks to:

--

--

Albert Wong

#eCommerce #Java #Database #k8s #Automation. Hobbies: #BoardGames #Comics #Skeet #VideoGames #Pinball #Magic #YelpElite #Travel #Candy