VOOZH about

URL: https://deepwiki.com/friendsofhyperf/components/6.1-architecture-and-consumer-process

⇱ Architecture and Consumer Process | friendsofhyperf/components | DeepWiki


Loading...
Last indexed: 14 February 2026 (15d5ca)
Menu

Architecture and Consumer Process

This document explains the architecture of the Trigger component's consumer process, which connects to MySQL binlog streams and processes database change events. It covers the Consumer class design, how ConsumerManager orchestrates multiple consumer processes, the integration with MySQLReplicationFactory, and the lifecycle of a consumer from startup to shutdown.

For information about event processing and trigger execution within the consumer, see Event Processing and Trigger Execution. For health monitoring and fault tolerance mechanisms, see Health Monitoring and Fault Tolerance.


Consumer Class Architecture

The Consumer class src/trigger/src/Consumer.php29-240 is the core component that manages a single MySQL binlog replication connection. Each consumer instance connects to one MySQL server, subscribes to binlog events, and coordinates the event processing pipeline.

Consumer Components and Dependencies


Sources: src/trigger/src/Consumer.php29-70 src/trigger/src/Config.php16-31

The Consumer constructor src/trigger/src/Consumer.php45-70 initializes:

PropertyTypePurposeDefault
connectionstringDatabase connection name'default'
namestringProcess name for identificationtrigger.{connection}
identifierstringCoordinator identifiertrigger.{connection}
configConfigConfiguration wrapperConstructed from options
binLogCurrentSnapshotBinLogCurrentSnapshotInterfaceTracks binlog positionRedisBinLogCurrentSnapshot
healthMonitor?HealthMonitorMonitors replication healthCreated if enabled
serverMutex?ServerMutexInterfaceDistributed lock for HACreated if enabled

Sources: src/trigger/src/Consumer.php45-70 src/trigger/publish/trigger.php14-56


MySQLReplicationFactory Integration

The Consumer creates and configures a MySQLReplicationFactory instance from the php-mysql-replication library, which performs the actual MySQL binlog protocol communication.

Replication Factory Configuration


Sources: src/trigger/src/Consumer.php182-239

The makeReplication() method src/trigger/src/Consumer.php182-239 performs these steps:

  1. Aggregate database and table filters src/trigger/src/Consumer.php186-194:

    • Merges databases_only from config with databases from TriggerManager
    • Merges tables_only from config with tables from TriggerManager
  2. Build configuration src/trigger/src/Consumer.php196-208:

    • Connection parameters: user, host, password, port from config
    • Random slaveId between 10000-9999999
    • heartbeatPeriod for connection keepalive (default 3 seconds)
    • UUID for php-mysql-replication >= 8.0
  3. Restore binlog position src/trigger/src/Consumer.php210-218:

    • Retrieves BinLogCurrent from snapshot
    • Sets binLogFileName and binLogPosition if snapshot exists
    • Logs position for debugging
  4. Register subscribers src/trigger/src/Consumer.php228-236:

    • Gets subscriber classes for this connection from SubscriberManager
    • Instantiates each subscriber with consumer and logger dependencies
    • Registers them with the factory

Sources: src/trigger/src/Consumer.php182-239


Consumer Lifecycle

The consumer follows a structured lifecycle with distinct startup, running, and shutdown phases.

Startup and Main Loop


Sources: src/trigger/src/Consumer.php72-117

The start() method src/trigger/src/Consumer.php72-117 orchestrates the consumer lifecycle:

  1. Reset state src/trigger/src/Consumer.php77:

    • Sets stopped = false
  2. Start health monitor src/trigger/src/Consumer.php80:

  3. Create replication src/trigger/src/Consumer.php82:

    • Calls makeReplication() to create MySQLReplicationFactory
  4. Resume coordinator src/trigger/src/Consumer.php85:

    • Signals coordinator with identifier to resume waiting coroutines
  5. Register exit handler src/trigger/src/Consumer.php90-94:

    • Creates coroutine that waits for Constants::WORKER_EXIT
    • Calls stop() when worker exit signal received
  6. Main consume loop src/trigger/src/Consumer.php96-109:

    • Checks isStopped() each iteration
    • Calls replication->consume() which blocks until binlog event arrives
    • Catches exceptions, logs error, and calls stop()

Sources: src/trigger/src/Consumer.php72-117

Stop and Cleanup

The stop() method src/trigger/src/Consumer.php171-175 performs cleanup:



Sources: src/trigger/src/Consumer.php171-175


ConsumerManager and Process Orchestration

The ConsumerManager src/trigger/src/ConsumerManager.php22-76 is responsible for creating and registering consumer processes based on configuration.

Process-Per-Connection Model


Sources: src/trigger/src/ConsumerManager.php22-76 src/trigger/src/Listener/BindTriggerProcessesListener.php22-47

Registration Process

The register() method src/trigger/src/ConsumerManager.php30-44 creates processes:

  1. Read configuration src/trigger/src/ConsumerManager.php33:

    • Gets trigger.connections array from config
    • Each key is a connection name, value is connection options
  2. Filter enabled connections src/trigger/src/ConsumerManager.php36-38:

    • Skips connections where enable is false
  3. Create process src/trigger/src/ConsumerManager.php40:

    • Calls createProcess($connection, $options)
  4. Register with ProcessManager src/trigger/src/ConsumerManager.php42:

    • Adds process to Hyperf's process registry

Sources: src/trigger/src/ConsumerManager.php30-44

Anonymous Process Creation

The createProcess() method src/trigger/src/ConsumerManager.php54-75 creates an anonymous process class:


Sources: src/trigger/src/ConsumerManager.php54-75

The anonymous process src/trigger/src/ConsumerManager.php56-74:

Sources: src/trigger/src/ConsumerManager.php54-75


Configuration Structure

The trigger system is configured through the trigger.php configuration file, organized by connection.

Configuration Schema

SectionKeysDescriptionDefault
connections.{name}enableEnable this connectiontrue
hostMySQL host''
portMySQL port3306
userMySQL user''
passwordMySQL password''
databases_onlyDatabase whitelist[]
tables_onlyTable whitelist[]
heartbeat_periodKeepalive seconds3
connect_retriesRetry attempts10
subscribersSubscriber classes[TriggerSubscriber, SnapshotSubscriber]
server_mutexenableEnable distributed locktrue
prefixRedis key prefix{APP_NAME}:server_mutex:
expiresLock TTL seconds30
keepalive_intervalRenewal interval seconds10
retry_intervalAcquisition retry seconds10
health_monitorenableEnable health monitoringtrue
intervalMonitor interval seconds30
snapshotversionSnapshot key version1.0
expiresSnapshot TTL seconds86400
intervalSnapshot interval seconds10
concurrentlimitConcurrent trigger limit1

Sources: src/trigger/publish/trigger.php13-56

Dependency Injection Bindings

The ConfigProvider src/trigger/src/ConfigProvider.php14-48 registers these bindings:

InterfaceImplementationPurpose
ServerMutexInterfaceRedisServerMutexDistributed locking via Redis
BinLogCurrentSnapshotInterfaceRedisBinLogCurrentSnapshotPosition persistence via Redis
Contract\LoggerInterfaceStdoutLoggerInterfaceLogging adapter

Sources: src/trigger/src/ConfigProvider.php24-28


Integration with Subscriber System

The consumer delegates event processing to registered subscribers. Each connection can have multiple subscribers that implement EventSubscriberInterface.

Subscriber Registration Flow


Sources: src/trigger/src/ConsumerManager.php30-44 src/trigger/src/Consumer.php228-236 src/trigger/src/SubscriberManager.php37-85 src/trigger/src/Listener/BindTriggerProcessesListener.php41-46

The default subscribers registered src/trigger/publish/trigger.php27-30 are:

  1. TriggerSubscriber src/trigger/src/Subscriber/TriggerSubscriber.php31-221:

  2. SnapshotSubscriber src/trigger/src/Subscriber/SnapshotSubscriber.php17-42:

    • Subscribes to all events
    • Updates HealthMonitor with current BinLogCurrent
    • Enables position tracking for snapshots

Sources: src/trigger/src/Subscriber/TriggerSubscriber.php53-60 src/trigger/src/Subscriber/SnapshotSubscriber.php23-41


Consumer-to-Code Entity Mapping

This diagram shows how consumer concepts map to concrete code entities:


Sources: All files referenced in diagram labels


Process Isolation and Resource Management

Each consumer runs in an isolated process with dedicated resources:

Per-Consumer Resources

ResourceScopeImplementation
MySQL connectionPer consumerCreated by MySQLReplicationFactory
Redis lockPer connection nameRedisServerMutex with unique key
Binlog snapshotPer connectionRedis key trigger:snapshot:binLogCurrent:{version}:{connection}
Health monitor timerPer consumerTimer instance in HealthMonitor
Event channelPer consumerChannel in TriggerSubscriber with configurable size
Concurrent limiterPer consumerConcurrent instance with configured limit

Sources: src/trigger/src/Consumer.php45-70 src/trigger/src/Mutex/RedisServerMutex.php36-59 src/trigger/src/Snapshot/RedisBinLogCurrentSnapshot.php65-74 src/trigger/src/Subscriber/TriggerSubscriber.php39-50

This isolation ensures:

  • Independent failure domains - one connection failure doesn't affect others
  • Resource isolation - each consumer has dedicated memory and connections
  • Horizontal scalability - add connections without interference
  • Independent restart - stop/start individual consumers without affecting others

Sources: src/trigger/src/ConsumerManager.php54-75