I needed to get real-time events from a proprietary system that uses a MySQL database.

The de facto standard for CDC (Change Data Capture) solutions today is Debezium. Under the hood it uses Kafka and all the tooling that comes with it for cluster management. I wanted to avoid technologies made up of many complex components. It turned out there is a lightweight version — Debezium Server, which allows using alternative message brokers.

I chose NATS as the message queue.

Eventually everything worked. Here are the configs.

NOTE: MySQL must have Binlog enabled.

Files needed to build the image:

debezium/conf/application.properties

debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=<db host>
debezium.source.database.port=3306
debezium.source.database.user=debezium
debezium.source.database.password=<password>
debezium.source.database.server.id=1
debezium.source.database.include.list=table1,table2
debezium.source.database.history.internal=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.internal.file.filename=/tmp/dbhistory.dat
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=data/schema_history.dat

debezium.source.topic.prefix=ivms

debezium.sink.type=nats-jetstream
debezium.sink.nats-jetstream.url=nats://nats:4222
debezium.sink.nats-jetstream.create-stream=true
debezium.sink.nats-jetstream.auth.user=ruser
debezium.sink.nats-jetstream.auth.password=pass
debezium.sink.nats-jetstream.storage=file

Dockerfile

services:
  nats:
    image: nats:latest
    container_name: nats-jetstream
    command: --name=nats1 --debug --http_port=8222 --js --sd=/data

    ports:
      - "8222:8222"
      - "4222:4222"
      - "6222:6222"
    volumes:
      - nats-data:/data
    networks:
      - debezium-net
    restart: always

  debezium-server:
    image: quay.io/debezium/server:latest
    volumes:
      - ./debezium/conf:/debezium/config
    container_name: debezium-mysql-nats-jetstream
    ports:
      - "8080:8080"
    depends_on:
      - nats
    networks:
      - debezium-net
    restart: always

networks:
  debezium-net:
    driver: bridge

volumes:
  nats-data:

PHP client

composer require basis-company/nats

worker.php

<?php
require_once __DIR__ . '/vendor/autoload.php';

use Basis\Nats\Client;
use Basis\Nats\Configuration;

use Basis\Nats\Stream\RetentionPolicy;
use Basis\Nats\Stream\StorageBackend;

use Basis\Nats\Consumer\Configuration as ConsumerConfiguration;
use Basis\Nats\Consumer\DeliverPolicy;


$configuration = new Configuration(
        host: 'localhost',
        port: 4222,
        user: 'ruser',
        pass: 'pass'
);

$client = new Client($configuration);

$accountInfo = $client->getApi()->getInfo();

$client->ping();

$stream = $client->getApi()->getStream('DebeziumStream');

$stream->getConfiguration()->setRetentionPolicy(RetentionPolicy::WORK_QUEUE)->setSubjects(['ivms', 'ivms.>']);


$configuration = (new ConsumerConfiguration($stream->getName()));

var_dump($configuration);

$consumer = $stream->getConsumer('attendance_events');


$consumer->create();

$consumer->handle(function ($msg) {
        echo "handle---------\n";
        var_dump($msg);
});

Run with php worker.php