VOOZH about

URL: https://deepwiki.com/friendsofhyperf/components/6-mysql-binlog-trigger-system

⇱ MySQL Binlog Trigger System | friendsofhyperf/components | DeepWiki


Loading...
Last indexed: 14 February 2026 (15d5ca)
Menu

MySQL Binlog Trigger System

The MySQL Binlog Trigger System provides real-time database change capture by consuming MySQL binary logs and dispatching events to user-defined trigger classes. This component enables applications to react to INSERT, UPDATE, and DELETE operations on MySQL tables without using database-level triggers, supporting use cases like cache invalidation, data synchronization, audit logging, and event-driven architectures.

This page covers the trigger system's architecture, consumer lifecycle, event processing, and fault tolerance mechanisms. For information about model lifecycle observers, see Model Enhancements.


System Architecture

The trigger system operates as a standalone process per database connection, consuming binlog events via the php-mysql-replication library and dispatching them through a subscriber pattern to user-defined trigger classes.

Component Diagram


Sources: src/trigger/src/Consumer.php29-240 src/trigger/src/ConsumerManager.php22-76 src/trigger/src/Subscriber/TriggerSubscriber.php31-221 src/trigger/src/Subscriber/SnapshotSubscriber.php17-42 src/trigger/src/Mutex/RedisServerMutex.php22-131 src/trigger/src/Monitor/HealthMonitor.php23-92


Configuration

The trigger system is configured via config/autoload/trigger.php, supporting multiple database connections with independent settings.

Configuration Structure

Configuration KeyTypeDefaultDescription
connections.{name}.enablebooltrueEnable/disable this connection
connections.{name}.hoststring127.0.0.1MySQL host
connections.{name}.portint3306MySQL port
connections.{name}.userstringrootMySQL user with replication privileges
connections.{name}.passwordstringrootMySQL password
connections.{name}.databases_onlyarray[]Filter specific databases
connections.{name}.tables_onlyarray[]Filter specific tables
connections.{name}.heartbeat_periodfloat3.0Heartbeat interval in seconds
connections.{name}.subscribersarraySee belowSubscriber classes
connections.{name}.server_mutex.enablebooltrueEnable distributed locking
connections.{name}.server_mutex.prefixstringtrigger:server_mutex:Redis key prefix
connections.{name}.server_mutex.expiresint30Lock TTL in seconds
connections.{name}.server_mutex.keepalive_intervalint10Lock renewal interval
connections.{name}.server_mutex.retry_intervalint10Retry interval for acquiring lock
connections.{name}.health_monitor.enablebooltrueEnable health monitoring
connections.{name}.health_monitor.intervalint30Monitor check interval
connections.{name}.snapshot.versionstring1.0Snapshot format version
connections.{name}.snapshot.expiresint86400Snapshot TTL in Redis
connections.{name}.snapshot.intervalint10Snapshot save interval
connections.{name}.concurrent.limitint1Max concurrent trigger executions
connections.{name}.channel.sizeint65535Event channel buffer size

Default Subscribers

The default subscriber configuration includes:


Sources: src/trigger/publish/trigger.php13-56 src/trigger/src/Config.php16-31


Consumer Lifecycle

Consumer Initialization and Startup


Sources: src/trigger/src/Consumer.php72-117 src/trigger/src/ConsumerManager.php54-75 src/trigger/src/Monitor/HealthMonitor.php34-86

MySQLReplicationFactory Configuration

The Consumer::makeReplication() method constructs the replication factory with position recovery:


The consumer merges configured filters with dynamically registered triggers:

  • databases_only: Union of config array and TriggerManager::getDatabases()
  • tables_only: Union of config array and TriggerManager::getTables()

If a BinLogCurrent snapshot exists in Redis, the factory resumes from that position; otherwise, it starts from the current binlog end.

Sources: src/trigger/src/Consumer.php182-239


Event Processing Pipeline

Subscriber Event Flow


Sources: src/trigger/src/Subscriber/TriggerSubscriber.php53-221 src/trigger/src/Subscriber/SnapshotSubscriber.php23-41

TriggerSubscriber Processing Logic

The TriggerSubscriber::process() method performs the following steps:

  1. Extract table metadata from RowsDTO event (database, table, event type)
  2. Build trigger key: {connection}.{database}.{table}.{event_type}
  3. Extract row values: Handles both v7.x (getValues()) and v8+ (property access)
  4. Transform arguments based on event type:
    • WRITE: [[$new]]
    • UPDATE: [[$old, $new]] for each affected row
    • DELETE: [[$old]]
  5. Query TriggerManager for registered triggers matching the key
  6. Push to channel: Each [class, method, args] tuple is pushed to the channel

The channel consumer coroutine then:

  1. Pops payloads from the channel
  2. Resolves the trigger class from the container
  3. Invokes the method with arguments
  4. Optionally uses Concurrent for rate limiting

Sources: src/trigger/src/Subscriber/TriggerSubscriber.php79-139 src/trigger/src/Subscriber/TriggerSubscriber.php144-209

Concurrent Execution Control

When concurrent.limit is configured (default: 1), the TriggerSubscriber uses Hyperf's Concurrent class to limit parallel executions:


This prevents overwhelming downstream services when processing high-volume binlog events.

Sources: src/trigger/src/Subscriber/TriggerSubscriber.php39-51 src/trigger/src/Subscriber/TriggerSubscriber.php177-180


Fault Tolerance and High Availability

Distributed Locking with RedisServerMutex

The RedisServerMutex ensures only one consumer process runs per connection across a cluster:


Lock Parameters:

ParameterDefaultDescription
expires30Lock TTL in seconds
keepalive_interval10Renewal frequency
retry_interval10Acquisition retry frequency
ownerUtil::getInternalIp()Unique identifier

Sources: src/trigger/src/Mutex/RedisServerMutex.php66-113

Position Persistence with RedisBinLogCurrentSnapshot

The RedisBinLogCurrentSnapshot class persists the current binlog position to Redis:

Redis Key Structure:

trigger:snapshot:binLogCurrent:{version}:{connection}

Snapshot Data Structure:


Persistence Flow:

  1. SnapshotSubscriber receives every binlog event
  2. Extracts BinLogCurrent from EventInfo
  3. Passes to HealthMonitor::setBinLogCurrent()
  4. HealthMonitor ticks every snapshot.interval seconds
  5. Compares with cached position to detect stalls
  6. Saves to Redis via RedisBinLogCurrentSnapshot::set()

Recovery Flow:

  1. Consumer::makeReplication() calls binLogCurrentSnapshot->get()
  2. If snapshot exists, configures ConfigBuilder with:
    • withBinLogFileName($binFileName)
    • withBinLogPosition($binLogPosition)
  3. If snapshot invalid or missing, starts from current position

Sources: src/trigger/src/Snapshot/RedisBinLogCurrentSnapshot.php23-75 src/trigger/src/Subscriber/SnapshotSubscriber.php23-41 src/trigger/src/Monitor/HealthMonitor.php62-84

Health Monitoring

The HealthMonitor performs two critical functions:

1. Binlog Progress Monitoring

Every health_monitor.interval seconds (default: 30), logs the current binlog position for debugging:


2. Stall Detection

Every snapshot.interval seconds (default: 10), checks if position has changed:


This enables applications to detect replication lag or stalls and trigger alerts.

Sources: src/trigger/src/Monitor/HealthMonitor.php34-86


User Trigger Interface

TriggerInterface Contract

User-defined triggers must implement the TriggerInterface:


The AbstractTrigger class provides empty implementations for convenience:


Sources: src/trigger/src/Contract/TriggerInterface.php14-21 src/trigger/src/Trigger/AbstractTrigger.php16-29

Trigger Registration

Triggers are registered via the TriggerManager using annotations or configuration. The TriggerSubscriber queries the manager by constructing a key:


Each trigger class is resolved from the container and invoked with row data.

Sources: src/trigger/src/Subscriber/TriggerSubscriber.php93-138


Graceful Shutdown

The consumer handles graceful shutdown through Hyperf's coordinator system:


Shutdown Sequence:

  1. Worker exit signal triggers CoordinatorManager::until(Constants::WORKER_EXIT)->yield()
  2. Consumer calls stop(), setting $this->stopped = true
  3. Consumer releases the server mutex
  4. Channel consumer waits for channel to drain (checks every 100ms)
  5. Channel closes
  6. Consumer loop exits on next iteration when isStopped() returns true

Sources: src/trigger/src/Consumer.php90-109 src/trigger/src/Consumer.php171-180 src/trigger/src/Subscriber/TriggerSubscriber.php199-207


Component Registration

The trigger system registers itself through the standard Hyperf ConfigProvider:

Services:

  • Mutex\ServerMutexInterface::classMutex\RedisServerMutex::class
  • Snapshot\BinLogCurrentSnapshotInterface::classSnapshot\RedisBinLogCurrentSnapshot::class
  • Contract\LoggerInterface::classHyperf\Contract\StdoutLoggerInterface::class

Commands:

  • Command\ConsumeCommand::class - Manual consumer start
  • Command\ServerMutexCommand::class - List/release locks
  • Command\SubscribersCommand::class - List registered subscribers
  • Command\TriggersCommand::class - List registered triggers

Aspects:

  • Aspect\BinaryDataReaderAspect::class - Fixes MySQLReplication library bugs

The BindTriggerProcessesListener (commented out by default) can be enabled to auto-register consumers on server start.

Sources: src/trigger/src/ConfigProvider.php14-48 src/trigger/src/Listener/BindTriggerProcessesListener.php21-47


Summary

The MySQL Binlog Trigger System provides:

FeatureImplementation
Real-time Change CaptureMySQLReplicationFactory consuming binary logs
Event DispatchingTriggerSubscriber with channel-based async execution
Position RecoveryRedisBinLogCurrentSnapshot persisting BinLogCurrent
High AvailabilityRedisServerMutex ensuring single-consumer-per-connection
Health MonitoringHealthMonitor detecting stalls and logging progress
Graceful ShutdownCoordinator-based channel draining and cleanup
Concurrent ControlConfigurable Concurrent limiting parallel trigger executions
Fault ToleranceAutomatic reconnection, position restoration, lock renewal

The system integrates seamlessly with Hyperf's process management, dependency injection, and coroutine runtime, making it suitable for production deployments requiring reliable database change streaming.