VOOZH about

URL: https://deepwiki.com/friendsofhyperf/components/6.2-event-processing-and-trigger-execution

⇱ Event Processing and Trigger Execution | friendsofhyperf/components | DeepWiki


Loading...
Last indexed: 14 February 2026 (15d5ca)
Menu

Event Processing and Trigger Execution

This document explains how binlog events are processed and dispatched to user-defined triggers in the MySQL Binlog Trigger System. It covers the subscriber pattern, event filtering and transformation, channel-based async execution, and the user trigger interface.

For information about the Consumer architecture and binlog connection management, see Architecture and Consumer Process. For distributed locking and health monitoring, see Health Monitoring and Fault Tolerance.

Subscriber Pattern

The trigger system uses the subscriber pattern from the php-mysql-replication library. Subscribers implement EventSubscriberInterface and register for specific binlog event types through the MySQLReplicationFactory.

Subscriber Registration

The Consumer registers subscribers during replication factory initialization at src/trigger/src/Consumer.php228-236:


Default subscribers are defined in the configuration at src/trigger/publish/trigger.php27-30:

Subscriber ClassPurpose
TriggerSubscriberProcesses INSERT/UPDATE/DELETE events and dispatches to user triggers
SnapshotSubscriberUpdates health monitor with current binlog position

The SubscriberManager loads subscribers from both configuration and @Subscriber annotations at src/trigger/src/SubscriberManager.php37-84 registering them with priority-based ordering using SplPriorityQueue.

Sources: src/trigger/src/Consumer.php228-236 src/trigger/src/SubscriberManager.php37-84 src/trigger/publish/trigger.php27-30

Event Types and Dispatching


The TriggerSubscriber subscribes to three event types at src/trigger/src/Subscriber/TriggerSubscriber.php53-60:


These map to the binlog event names defined in ConstEventsNames enum at src/trigger/src/ConstEventsNames.php17-31:

Event NameTrigger MethodSQL Operation
writeonWrite()INSERT
updateonUpdate()UPDATE
deleteonDelete()DELETE

Sources: src/trigger/src/Subscriber/TriggerSubscriber.php53-60 src/trigger/src/ConstEventsNames.php17-31

TriggerSubscriber Event Processing

The TriggerSubscriber is the core event processor that transforms binlog events into user trigger invocations.

Event Filtering and Data Extraction

When an event is received, TriggerSubscriber::allEvents() first filters for RowsDTO events at src/trigger/src/Subscriber/TriggerSubscriber.php62-74:


The process() method extracts database and table information using compatibility logic for different php-mysql-replication versions at src/trigger/src/Subscriber/TriggerSubscriber.php81-98:


The key format {connection}.{database}.{table}.{event_type} is used to look up registered triggers from the TriggerManager.

Sources: src/trigger/src/Subscriber/TriggerSubscriber.php62-114

Value Transformation by Event Type

Event values are transformed based on the event type at src/trigger/src/Subscriber/TriggerSubscriber.php100-114:























Event TypeArguments Passed to Trigger
write[$newRow]
update[$oldRow, $newRow]
delete[$oldRow]

Sources: src/trigger/src/Subscriber/TriggerSubscriber.php100-114

Channel-Based Execution Pipeline


The TriggerSubscriber uses a Channel for async execution at src/trigger/src/Subscriber/TriggerSubscriber.php144-209 The channel is initialized with a configurable size:


Configuration options from src/trigger/publish/trigger.php51-53:

OptionDefaultPurpose
concurrent.limit1Maximum concurrent trigger executions
channel.size65535Channel buffer size for pending triggers

Triggers are pushed to the channel at src/trigger/src/Subscriber/TriggerSubscriber.php120-138:


Sources: src/trigger/src/Subscriber/TriggerSubscriber.php39-51 src/trigger/src/Subscriber/TriggerSubscriber.php120-138 src/trigger/publish/trigger.php51-53

Concurrency Control and Execution

A consumer coroutine processes the channel at src/trigger/src/Subscriber/TriggerSubscriber.php150-196:


The Concurrent limiter ensures only concurrent.limit triggers execute simultaneously. Without a limiter, each trigger executes in a separate coroutine without limit.

A shutdown coroutine waits for the channel to drain at src/trigger/src/Subscriber/TriggerSubscriber.php199-207:


Sources: src/trigger/src/Subscriber/TriggerSubscriber.php150-207

SnapshotSubscriber and Position Tracking


The SnapshotSubscriber maintains the current binlog position by updating the HealthMonitor for every event at src/trigger/src/Subscriber/SnapshotSubscriber.php23-41:


The HealthMonitor periodically persists the position at src/trigger/src/Monitor/HealthMonitor.php62-84:


The position is stored in Redis via RedisBinLogCurrentSnapshot at src/trigger/src/Snapshot/RedisBinLogCurrentSnapshot.php32-36:


Configuration options from src/trigger/publish/trigger.php45-49:

OptionDefaultPurpose
snapshot.version1.0Version string in Redis key
snapshot.expires86400 (24h)Redis key TTL in seconds
snapshot.interval10Seconds between persists

Sources: src/trigger/src/Subscriber/SnapshotSubscriber.php23-41 src/trigger/src/Monitor/HealthMonitor.php62-84 src/trigger/src/Snapshot/RedisBinLogCurrentSnapshot.php32-36 src/trigger/publish/trigger.php45-49

User Trigger Interface

Users define triggers by implementing the TriggerInterface at src/trigger/src/Contract/TriggerInterface.php14-21:


The AbstractTrigger base class provides empty implementations at src/trigger/src/Trigger/AbstractTrigger.php16-29:


Trigger Method Arguments

MethodParametersDescription
onWrite(array $new)$new: New row dataCalled for INSERT operations
onUpdate(array $old, array $new)$old: Row before update
$new: Row after update
Called for UPDATE operations
onDelete(array $old)$old: Deleted row dataCalled for DELETE operations

The row data arrays use column names as keys:


Triggers are resolved from the DI container at src/trigger/src/Subscriber/TriggerSubscriber.php168 allowing dependency injection:


Sources: src/trigger/src/Contract/TriggerInterface.php14-21 src/trigger/src/Trigger/AbstractTrigger.php16-29 src/trigger/src/Subscriber/TriggerSubscriber.php164-168

Complete Event Flow


The complete flow from binlog to user trigger execution:

  1. Binlog Consumption: MySQLReplicationFactory::consume() reads events from MySQL binlog stream at src/trigger/src/Consumer.php103

  2. Event Dispatch: Events are dispatched to registered subscribers (both SnapshotSubscriber and TriggerSubscriber)

  3. Position Tracking: SnapshotSubscriber extracts BinLogCurrent and updates HealthMonitor, which persists to Redis every snapshot.interval seconds

  4. Event Processing: TriggerSubscriber filters for RowsDTO events, extracts metadata, and builds lookup key {connection}.{database}.{table}.{type}

  5. Trigger Lookup: TriggerManager returns list of registered trigger class/method pairs for the key

  6. Value Transformation: Event values are transformed into method arguments based on event type (INSERT, UPDATE, DELETE)

  7. Channel Push: Each trigger invocation is pushed as [class, method, args] to the channel

  8. Async Execution: Consumer coroutine pops from channel, applies concurrency limit, resolves class from container, and invokes method

  9. User Trigger: User-defined onWrite(), onUpdate(), or onDelete() method executes with row data

This architecture provides:

  • Decoupling: Channel decouples event processing from trigger execution
  • Concurrency Control: Concurrent limiter prevents resource exhaustion
  • Fault Tolerance: Channel buffering handles temporary trigger backlog
  • Graceful Shutdown: Worker exit waits for channel to drain at src/trigger/src/Subscriber/TriggerSubscriber.php199-207

Sources: src/trigger/src/Consumer.php96-109 src/trigger/src/Subscriber/TriggerSubscriber.php62-209 src/trigger/src/Subscriber/SnapshotSubscriber.php23-41 src/trigger/src/Monitor/HealthMonitor.php62-84