VOOZH about

URL: https://deepwiki.com/hypervel/queue/5.3-concurrency-and-resource-management

⇱ Concurrency and Resource Management | hypervel/queue | DeepWiki


Loading...
Menu

Concurrency and Resource Management

This document explains how the worker system manages resources and concurrency. The system uses Hyperf's coroutine-based concurrency to process multiple jobs in parallel, implements timeout monitoring to terminate long-running jobs, enforces memory limits to prevent resource exhaustion, and provides graceful shutdown mechanisms for worker termination.

For information about the overall Worker class architecture and job processing flow, see page 5.1. For details on configuration parameters, see page 5.2.


Concurrency Architecture

The worker system uses Hyperf's coroutine-based concurrency model to process multiple jobs simultaneously without the complexity of multi-threading or multi-processing. This approach provides efficient parallel execution while maintaining shared memory state within a single process.

Concurrent Job Execution Model


Sources: src/Worker.php135-208

The concurrency system operates through the following components:

ComponentTypePurpose
ConcurrentHyperf\Coroutine\ConcurrentManages the concurrency limit and spawns coroutines
$runningJobsarrayTracks all currently executing jobs with metadata
$options->concurrencyintMaximum number of jobs that can run simultaneously
WaiterHypervel\Coroutine\WaiterEnsures synchronous behavior for job popping

Concurrency Limit Enforcement

The worker enforces the concurrency limit at src/Worker.php169-172 before popping new jobs:


This check ensures that:

  1. No new jobs are accepted if the concurrency limit is reached
  2. No new jobs are accepted if any timeout jobs exist (to prevent further damage)

Sources: src/Worker.php169-172 src/Worker.php145-146

Coroutine Creation Flow

When a job is popped and the concurrency limit has not been reached, the worker spawns a new coroutine at src/Worker.php184:


This creates a non-blocking coroutine that executes runJob(), allowing the main daemon loop to continue processing additional jobs immediately.

Sources: src/Worker.php178-185


Timeout Monitoring System

The timeout monitoring system is a separate timer-based mechanism that runs concurrently with job processing. It periodically scans all running jobs and terminates those that have exceeded their timeout limits.

Monitoring Architecture


Sources: src/Worker.php213-240 src/Worker.php245-255 src/Worker.php487-496

Monitor Timer Implementation

The monitoring system is initialized at src/Worker.php148-151 before the daemon loop begins:


The monitorTimeoutJobs() method at src/Worker.php213-240 creates a periodic timer that runs every monitorInterval seconds (default 1 second):

ConfigurationDefaultSource
$options->monitorInterval1 secondsrc/WorkerOptions.php38
$this->monitorInterval1 secondsrc/Worker.php126

The timer callback is protected by a monitorLocked flag to prevent overlapping scans if a previous scan is still executing.

Sources: src/Worker.php224-239 src/Console/WorkCommand.php54 src/Console/WorkCommand.php154


Job Registration and Tracking

Every job that begins execution is registered in the $runningJobs array with metadata necessary for timeout detection.

Registration Process


Sources: src/Worker.php454 src/Worker.php487-496 src/Worker.php472-474

Running Jobs Data Structure

Each entry in $runningJobs contains:

FieldTypeDescription
jobJobContractThe job instance being executed
start_atfloatThe microtime when the job started
expires_atfloatThe microtime when the job times out

The expires_at value is calculated at src/Worker.php492:


The timeoutForJob() method at src/Worker.php303-306 determines the timeout value:


This allows individual jobs to override the default worker timeout.

Sources: src/Worker.php487-496 src/Worker.php303-306


Timeout Detection and Termination

The timeout detection system periodically scans running jobs and identifies those that have exceeded their time limits.

Timeout Scanning Process

The terminateTimeoutJobs() method at src/Worker.php245-255 implements the scanning logic:


This method:

  1. Gets the current time
  2. Iterates through all running jobs
  3. Compares each job's expires_at time to the current time
  4. If expired, adds the job ID to $timeoutJobIds
  5. Removes the job from $runningJobs
  6. Calls handleTimeoutJob() to process the timeout

Sources: src/Worker.php245-255

Timeout Job Handling


Sources: src/Worker.php273-298

The handleTimeoutJob() method at src/Worker.php273-298 performs three checks:

  1. Max Attempts Check src/Worker.php275-280: Marks the job as failed if it will exceed the maximum retry attempts
  2. Max Exceptions Check src/Worker.php282-286: Marks the job as failed if it will exceed the maximum exception count
  3. Should Fail On Timeout Check src/Worker.php288-292: Marks the job as failed if the job's shouldFailOnTimeout() method returns true

Finally, it dispatches a JobTimedOut event at src/Worker.php294-297

Worker Shutdown on Timeout

When timeout jobs are detected, the worker initiates a graceful shutdown at src/Worker.php233-236:


The kill() method at src/Worker.php764-779 waits for all active coroutines to complete before exiting:


This ensures that running jobs can complete gracefully while preventing new jobs from being accepted.

Sources: src/Worker.php233-236 src/Worker.php764-779 src/Worker.php260-263 src/Worker.php268-271


Timeout Prevention in Job Processing

The job processing flow includes a check to prevent further processing if a timeout has occurred.

Timeout Check During Execution


Sources: src/Worker.php461-464

At src/Worker.php461-464 after a job executes, the worker checks if the job ID is in the $timeoutJobIds array:


This prevents the worker from dispatching success events or deleting a job that has already been marked as timed out by the monitoring system. The timeout handling was already completed by handleTimeoutJob().

Sources: src/Worker.php461-464


Memory Limits

The worker system monitors memory usage and automatically stops when consumption exceeds configured limits. This prevents memory exhaustion and ensures system stability.

Memory Monitoring

The memoryExceeded() method at src/Worker.php744-747 checks current memory usage:


This method:

  1. Uses memory_get_usage(true) to get real memory allocation including system overhead
  2. Converts bytes to megabytes by dividing by 1024 twice
  3. Compares against the configured $memoryLimit

Sources: src/Worker.php744-747

Memory Limit Enforcement

Memory limits are checked in the stopIfNecessary() method at src/Worker.php331-342:


When memory exceeds the limit, the worker returns EXIT_MEMORY_LIMIT (exit code 12), which triggers worker shutdown at src/Worker.php204-206

Sources: src/Worker.php331-342 src/Worker.php335 src/Worker.php41

Memory Configuration

ParameterTypeDefaultDescription
memoryint128Maximum memory in megabytes before worker stops

The memory limit is configured through:

Sources: src/WorkerOptions.php28 src/Console/WorkCommand.php46 src/Console/WorkCommand.php142


Graceful Shutdown Mechanisms

The worker system implements multiple graceful shutdown mechanisms to ensure clean termination without data loss.

Shutdown Triggers


Sources: src/Worker.php331-342 src/Worker.php752-757 src/Worker.php764-779 src/Worker.php233-236

Exit Codes

The worker defines three exit codes at src/Worker.php37-41:

ConstantValueDescription
EXIT_SUCCESS0Normal shutdown, worker should restart
EXIT_ERROR1Error occurred (currently unused)
EXIT_MEMORY_LIMIT12Memory limit exceeded, worker should restart

Sources: src/Worker.php37-41

Stop Method

The stop() method at src/Worker.php752-757 performs a normal shutdown:


This method:

  1. Dispatches the WorkerStopping event for cleanup hooks
  2. Returns the status code to the daemon loop
  3. Allows the daemon loop to exit naturally

Sources: src/Worker.php752-757

Kill Method

The kill() method at src/Worker.php764-779 performs a forced shutdown:


This method:

  1. Dispatches the WorkerStopping event
  2. Waits for all active coroutines to complete at src/Worker.php770-772
  3. Sends SIGKILL signal to self if POSIX is available
  4. Exits with the specified status code

Sources: src/Worker.php764-779 src/Worker.php260-263

Signal Handling

The worker listens for POSIX signals at src/Worker.php723-731:


Signal handling is enabled at src/Worker.php137-139 if PCNTL extension is available:

SignalActionPurpose
SIGQUITshouldQuit = trueGraceful shutdown after current job
SIGTERMshouldQuit = trueGraceful shutdown after current job
SIGUSR2paused = truePause job processing
SIGCONTpaused = falseResume job processing

Sources: src/Worker.php723-731 src/Worker.php137-139 src/Worker.php736-739

Pause Mechanism

The worker supports pausing via the $paused flag at src/Worker.php102 When paused:

  1. The daemon loop checks daemonShouldRun() at src/Worker.php157
  2. If paused, pauseWorker() is called at src/Worker.php158
  3. The worker sleeps and checks for stop conditions at src/Worker.php321-326
  4. No new jobs are accepted until resumed

Sources: src/Worker.php102 src/Worker.php157-165 src/Worker.php321-326 src/Worker.php311-316


Resource Management Configuration

The worker system is configured through WorkerOptions and command-line arguments.

Configuration Parameters

ParameterTypeDefaultDescriptionSource
concurrencyint1Number of jobs to process simultaneouslysrc/WorkerOptions.php37
timeoutint60Seconds before a job times outsrc/WorkerOptions.php29
memoryint128Memory limit in megabytessrc/WorkerOptions.php28
maxJobsint0Maximum jobs before restart (0 = unlimited)src/WorkerOptions.php34
maxTimeint0Maximum seconds before restart (0 = unlimited)src/WorkerOptions.php35
monitorIntervalint1Seconds between timeout scanssrc/WorkerOptions.php38
stopWhenEmptyboolfalseStop when queue is emptysrc/WorkerOptions.php33

Sources: src/WorkerOptions.php25-40

Command-Line Configuration

Command-Line Options to WorkerOptions Mapping


Sources: src/Console/WorkCommand.php133-156

The gatherWorkerOptions() method at src/Console/WorkCommand.php133-156 reads command-line options:

Sources: src/Console/WorkCommand.php43-54 src/Console/WorkCommand.php133-156


Integration with Worker Daemon

The resource management systems are integrated throughout the worker daemon loop.

Complete Daemon Loop with Resource Management


Sources: src/Worker.php135-208

Stop Condition Evaluation Order

The stopIfNecessary() method at src/Worker.php331-342 evaluates stop conditions in priority order:

















































PriorityConditionExit CodeReason
1shouldQuit = trueEXIT_SUCCESSSignal received or timeout jobs detected
2Memory exceededEXIT_MEMORY_LIMITMemory limit reached
3Queue restartEXIT_SUCCESSRestart command issued
4Stop when emptyEXIT_SUCCESSQueue empty and flag set
5Max timeEXIT_SUCCESSTime limit reached
6Max jobsEXIT_SUCCESSJob count limit reached

Sources: src/Worker.php331-342

Resource Management Integration Points

The daemon loop at src/Worker.php135-208 integrates resource management at these points:

  1. Signal Setup src/Worker.php137-139: Enables signal handling for shutdown control
  2. Monitor Initialization src/Worker.php148-151: Starts timeout monitor before loop
  3. Concurrency Limit src/Worker.php169-172: Prevents job popping at capacity
  4. Job Spawning src/Worker.php184: Creates coroutines within concurrency limit
  5. Resource Checks src/Worker.php196-206: Evaluates all stop conditions after each iteration

Sources: src/Worker.php135-208


State Management

The worker maintains several state arrays to coordinate concurrency and timeout management:

State Arrays

ArrayTypePurposeLifecycle
$runningJobsarray<string, array>Tracks all currently executing jobs with metadataEntry added in registerCoroutineJob(), removed in finally block or on timeout
$timeoutJobIdsarray<int, string>Stores UUIDs of jobs that have timed outEntry added in terminateTimeoutJobs(), cleared on worker shutdown

Sources: src/Worker.php77 src/Worker.php82 src/Worker.php487-496 src/Worker.php245-255

State Flags

FlagTypePurposeSet By
$monitorLockedboolPrevents overlapping timeout scansMonitor timer callback src/Worker.php229
$shouldQuitboolSignals worker should exitTimeout detection src/Worker.php234 signal handlers src/Worker.php727-728
$pausedboolIndicates worker is pausedSignal handler src/Worker.php729

Sources: src/Worker.php92 src/Worker.php97 src/Worker.php102 src/Worker.php229 src/Worker.php234 src/Worker.php729

Thread-Safety Considerations

While the worker uses coroutines (not threads), the $monitorLocked flag at src/Worker.php225-227 and src/Worker.php238 prevents race conditions that could occur if a timer tick happens while a previous tick is still executing:


This ensures the timeout scanning process is atomic and prevents double-processing of timed-out jobs.

Sources: src/Worker.php225-238

Refresh this wiki

On this page