Возникла задача получать реалтайм события из одной проприетарной штуки, которая использует бд MySQL.

Де-факто стандарт для CDC-решений (Change Data Capture) на сегодня – Debezim. Под капотом используется Kafka и все что идет в комплекте для создания кластеров. Хотелось обойтись без технологий, состоящих из множества сложных компонентов. Обнаружилось, что есть легковесная версия – Debezium Server, которая даёт возможность использовать альтернативные брокеры.

Взял NATS в качестве очереди.

В конце концов всё завелось. Выкладываю конфиги.

NOTE: В MySQL должен быть включён Binlog.

Файлы необходимые для сборки образа:

debezium/conf/application.properties

debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=<db host>
debezium.source.database.port=3306
debezium.source.database.user=debezium
debezium.source.database.password=<password>
debezium.source.database.server.id=1
debezium.source.database.include.list=table1,table2
debezium.source.database.history.internal=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.internal.file.filename=/tmp/dbhistory.dat
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=data/schema_history.dat

debezium.source.topic.prefix=ivms

debezium.sink.type=nats-jetstream
debezium.sink.nats-jetstream.url=nats://nats:4222
debezium.sink.nats-jetstream.create-stream=true
debezium.sink.nats-jetstream.auth.user=ruser
debezium.sink.nats-jetstream.auth.password=pass
debezium.sink.nats-jetstream.storage=file
.properties

Dockerfile

services:
  nats:
    image: nats:latest
    container_name: nats-jetstream
    command: --name=nats1 --debug --http_port=8222 --js --sd=/data

    ports:
      - "8222:8222"
      - "4222:4222"
      - "6222:6222"
    volumes:
      - nats-data:/data
    networks:
      - debezium-net
    restart: always

  debezium-server:
    image: quay.io/debezium/server:latest
    volumes:
      - ./debezium/conf:/debezium/config
    container_name: debezium-mysql-nats-jetstream
    ports:
      - "8080:8080"
    depends_on:
      - nats
    networks:
      - debezium-net
    restart: always

networks:
  debezium-net:
    driver: bridge

volumes:
  nats-data:
Docker

Клиент на php

composer require basis-company/nats

worker.php

<?php
require_once __DIR__ . '/vendor/autoload.php';

use Basis\Nats\Client;
use Basis\Nats\Configuration;

use Basis\Nats\Stream\RetentionPolicy;
use Basis\Nats\Stream\StorageBackend;

use Basis\Nats\Consumer\Configuration as ConsumerConfiguration;
use Basis\Nats\Consumer\DeliverPolicy;


$configuration = new Configuration(
        host: 'localhost',
        port: 4222,
        user: 'ruser',
        pass: 'pass'
);

$client = new Client($configuration);

$accountInfo = $client->getApi()->getInfo();

$client->ping();

$stream = $client->getApi()->getStream('DebeziumStream');

$stream->getConfiguration()->setRetentionPolicy(RetentionPolicy::WORK_QUEUE)->setSubjects(['ivms', 'ivms.>']);


$configuration = (new ConsumerConfiguration($stream->getName()));

var_dump($configuration);

$consumer = $stream->getConsumer('attendance_events');


$consumer->create();

$consumer->handle(function ($msg) {
        echo "handle---------\n";
        var_dump($msg);
});
PHP

запускаем php worker.php