| 
<?php
 /** @noinspection PhpComposerExtensionStubsInspection */
 
 declare(strict_types=1);
 
 error_reporting(E_ALL);
 date_default_timezone_set('UTC');
 include __DIR__ . '/../vendor/autoload.php';
 
 use Doctrine\DBAL\Connection;
 use Doctrine\DBAL\DriverManager;
 use MySQLReplication\Config\ConfigBuilder;
 use MySQLReplication\Definitions\ConstEventType;
 use MySQLReplication\Event\DTO\UpdateRowsDTO;
 use MySQLReplication\Event\EventSubscribers;
 use MySQLReplication\MySQLReplicationFactory;
 
 /**
 * Simple benchmark to test how fast events are consumed
 */
 class benchmark
 {
 private const DB_NAME = 'mysqlreplication_test';
 private const DB_USER = 'root';
 private const DB_PASS = 'root';
 private const DB_HOST = '127.0.0.1';
 private const DB_PORT = 3306;
 
 private MySQLReplicationFactory $binLogStream;
 
 public function __construct()
 {
 $conn = $this->getConnection();
 $conn->executeStatement('DROP DATABASE IF EXISTS ' . self::DB_NAME);
 $conn->executeStatement('CREATE DATABASE ' . self::DB_NAME);
 $conn->executeStatement('USE ' . self::DB_NAME);
 $conn->executeStatement('CREATE TABLE test (i INT) ENGINE = MEMORY');
 $conn->executeStatement('INSERT INTO test VALUES(1)');
 $conn->executeStatement('CREATE TABLE test2 (i INT) ENGINE = MEMORY');
 $conn->executeStatement('INSERT INTO test2 VALUES(1)');
 $conn->executeStatement('RESET MASTER');
 
 $this->binLogStream = new MySQLReplicationFactory(
 (new ConfigBuilder())
 ->withUser(self::DB_USER)
 ->withPassword(self::DB_PASS)
 ->withHost(self::DB_HOST)
 ->withPort(self::DB_PORT)
 ->withEventsOnly(
 [
 ConstEventType::UPDATE_ROWS_EVENT_V2->value,
 // for mariadb v1
 ConstEventType::UPDATE_ROWS_EVENT_V1->value,
 ]
 )
 ->withSlaveId(9999)
 ->withDatabasesOnly([self::DB_NAME])
 ->build()
 );
 
 $this->binLogStream->registerSubscriber(
 new class() extends EventSubscribers {
 private float $start;
 
 private int $counter = 0;
 
 public function __construct()
 {
 $this->start = microtime(true);
 }
 
 public function onUpdate(UpdateRowsDTO $event): void
 {
 ++$this->counter;
 if (0 === ($this->counter % 1000)) {
 echo ((int)($this->counter / (microtime(
 true
 ) - $this->start)) . ' event by seconds (' . $this->counter . ' total)') . PHP_EOL;
 }
 }
 }
 );
 }
 
 public function run(): void
 {
 $pid = pcntl_fork();
 if ($pid === -1) {
 throw new InvalidArgumentException('Could not fork');
 }
 
 if ($pid) {
 $this->consume();
 pcntl_wait($status);
 } else {
 $this->produce();
 }
 }
 
 private function getConnection(): Connection
 {
 return DriverManager::getConnection(
 [
 'user' => self::DB_USER,
 'password' => self::DB_PASS,
 'host' => self::DB_HOST,
 'port' => self::DB_PORT,
 'driver' => 'pdo_mysql',
 'dbname' => self::DB_NAME,
 ]
 );
 }
 
 private function consume(): void
 {
 $this->binLogStream->run();
 }
 
 private function produce(): void
 {
 $conn = $this->getConnection();
 
 echo 'Start insert data' . PHP_EOL;
 
 /** @phpstan-ignore-next-line */
 while (1) {
 $conn->executeStatement('UPDATE  test SET i = i + 1;');
 $conn->executeStatement('UPDATE test2 SET i = i + 1;');
 }
 }
 }
 
 (new benchmark())->run();
 
 |