2025-05-02
Simple CDC with Debezium + NATS + Docker
I needed to get real-time events from a proprietary system that uses a MySQL database.
The de facto standard for CDC (Change Data Capture) solutions today is Debezium. Under the hood it uses Kafka and all the tooling that comes with it for cluster management. I wanted to avoid technologies made up of many complex components. It turned out there is a lightweight version — Debezium Server, which allows using alternative message brokers.
I chose NATS as the message queue.
Eventually everything worked. Here are the configs.
NOTE: MySQL must have Binlog enabled.
Files needed to build the image:
debezium/conf/application.properties
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=<db host>
debezium.source.database.port=3306
debezium.source.database.user=debezium
debezium.source.database.password=<password>
debezium.source.database.server.id=1
debezium.source.database.include.list=table1,table2
debezium.source.database.history.internal=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.internal.file.filename=/tmp/dbhistory.dat
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=data/schema_history.dat
debezium.source.topic.prefix=ivms
debezium.sink.type=nats-jetstream
debezium.sink.nats-jetstream.url=nats://nats:4222
debezium.sink.nats-jetstream.create-stream=true
debezium.sink.nats-jetstream.auth.user=ruser
debezium.sink.nats-jetstream.auth.password=pass
debezium.sink.nats-jetstream.storage=file
Dockerfile
services:
nats:
image: nats:latest
container_name: nats-jetstream
command: --name=nats1 --debug --http_port=8222 --js --sd=/data
ports:
- "8222:8222"
- "4222:4222"
- "6222:6222"
volumes:
- nats-data:/data
networks:
- debezium-net
restart: always
debezium-server:
image: quay.io/debezium/server:latest
volumes:
- ./debezium/conf:/debezium/config
container_name: debezium-mysql-nats-jetstream
ports:
- "8080:8080"
depends_on:
- nats
networks:
- debezium-net
restart: always
networks:
debezium-net:
driver: bridge
volumes:
nats-data:
PHP client
composer require basis-company/nats
worker.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Basis\Nats\Client;
use Basis\Nats\Configuration;
use Basis\Nats\Stream\RetentionPolicy;
use Basis\Nats\Stream\StorageBackend;
use Basis\Nats\Consumer\Configuration as ConsumerConfiguration;
use Basis\Nats\Consumer\DeliverPolicy;
$configuration = new Configuration(
host: 'localhost',
port: 4222,
user: 'ruser',
pass: 'pass'
);
$client = new Client($configuration);
$accountInfo = $client->getApi()->getInfo();
$client->ping();
$stream = $client->getApi()->getStream('DebeziumStream');
$stream->getConfiguration()->setRetentionPolicy(RetentionPolicy::WORK_QUEUE)->setSubjects(['ivms', 'ivms.>']);
$configuration = (new ConsumerConfiguration($stream->getName()));
var_dump($configuration);
$consumer = $stream->getConsumer('attendance_events');
$consumer->create();
$consumer->handle(function ($msg) {
echo "handle---------\n";
var_dump($msg);
});
Run with php worker.php
comments powered by Disqus