VOOZH about

URL: https://deepwiki.com/hypervel/bus/9.2-database-implementation

⇱ Database Implementation | hypervel/bus | DeepWiki


Loading...
Menu

Database Implementation

Purpose and Scope

This document explains the DatabaseBatchRepository implementation, which persists batch data to a relational database. It covers the database table structure, atomic job count updates, serialization handling, and the factory pattern used for instantiation.

For information about the repository contracts and interface methods, see Batch Repository Interface. For batch cleanup operations, see Pruning Old Batch Data.


Class Structure and Dependencies

The DatabaseBatchRepository implements the PrunableBatchRepository contract and relies on several collaborating components to persist and retrieve batch data.


Sources: src/DatabaseBatchRepository.php17-335 src/DatabaseBatchRepositoryFactory.php1-24 src/BatchFactory.php1-41

The DatabaseBatchRepository class src/DatabaseBatchRepository.php17 depends on:

DependencyPurpose
BatchFactoryCreates Batch instances from database records
ConnectionResolverInterfaceResolves database connections by name
string $tableDatabase table name (default: job_batches)
string $connectionDatabase connection name (optional, uses default if null)

Configuration and Instantiation

The DatabaseBatchRepositoryFactory src/DatabaseBatchRepositoryFactory.php11-24 creates DatabaseBatchRepository instances using configuration from Hyperf's config system.


Configuration Keys:

Config KeyDefault ValuePurpose
queue.batching.tablejob_batchesDatabase table name
queue.batching.databasenullDatabase connection name (null uses default)

Sources: src/DatabaseBatchRepositoryFactory.php13-22


Database Schema

The DatabaseBatchRepository stores batch data in a single table with the following structure (inferred from src/DatabaseBatchRepository.php72-83):

ColumnTypeDescription
idstring (UUID)Unique batch identifier (ordered UUID)
namestringHuman-readable batch name
total_jobsintegerTotal number of jobs in the batch
pending_jobsintegerNumber of jobs not yet completed
failed_jobsintegerNumber of jobs that failed
failed_job_idsJSON arrayIDs of jobs that failed
optionsserializedBatch configuration options and callbacks
created_attimestampUnix timestamp when batch was created
cancelled_attimestamp (nullable)When batch was cancelled
finished_attimestamp (nullable)When batch completed

Sources: src/DatabaseBatchRepository.php72-83


Storing New Batches

The store method src/DatabaseBatchRepository.php68-86 creates a new batch record in the database:


Key Implementation Details:

  1. UUID Generation src/DatabaseBatchRepository.php70: Uses Str::orderedUuid() for time-ordered UUIDs
  2. Initial Counters: All job counts start at 0 and are incremented as jobs are added
  3. Options Serialization src/DatabaseBatchRepository.php79: Batch options (including callbacks) are serialized using serialize()
  4. Return Value: The method retrieves and returns the newly created Batch object

Sources: src/DatabaseBatchRepository.php68-86


Retrieving Batches

The repository provides two methods for retrieving batch data:

Paginated Batch Listing

The get method src/DatabaseBatchRepository.php40-50 retrieves multiple batches in descending order by ID:






















ParameterTypePurpose
$limitintMaximum number of batches to return (default: 50)
$beforemixedCursor for pagination (batch ID)

Sources: src/DatabaseBatchRepository.php40-50

Single Batch Retrieval

The find method src/DatabaseBatchRepository.php55-63 retrieves a specific batch by ID, using the write PDO connection to ensure read-after-write consistency:


The useWritePdo() call src/DatabaseBatchRepository.php58 ensures the query reads from the primary database node, avoiding potential replication lag issues.

Sources: src/DatabaseBatchRepository.php55-63


Atomic Job Count Updates

The repository uses database-level locking to ensure atomic updates to job counters, preventing race conditions when multiple workers process batch jobs simultaneously.


Sources: src/DatabaseBatchRepository.php91-152

Increment Total Jobs

The incrementTotalJobs method src/DatabaseBatchRepository.php91-98 uses SQL expressions for efficiency:


This approach avoids the need for locking since the database engine handles the atomic increment internally. The method also resets finished_at to null when jobs are added to an existing batch.

Sources: src/DatabaseBatchRepository.php91-98

Decrement Pending Jobs

The decrementPendingJobs method src/DatabaseBatchRepository.php103-117 uses pessimistic locking:


This method removes the job ID from the failed_job_ids array if it exists src/DatabaseBatchRepository.php109 ensuring accurate failure tracking when failed jobs are retried successfully.

Sources: src/DatabaseBatchRepository.php103-117

Increment Failed Jobs

The incrementFailedJobs method src/DatabaseBatchRepository.php122-136 follows the same locking pattern but increments the failure counter and adds the job ID to the failed_job_ids array src/DatabaseBatchRepository.php128

Sources: src/DatabaseBatchRepository.php122-136

Atomic Update Implementation

The updateAtomicValues helper src/DatabaseBatchRepository.php141-152 provides the locking mechanism:

  1. Start Transaction src/DatabaseBatchRepository.php143: Wraps the entire operation in a database transaction
  2. Acquire Lock src/DatabaseBatchRepository.php144-146: Uses lockForUpdate() to acquire a pessimistic row lock
  3. Read Current State: Retrieves the current batch record
  4. Compute New Values src/DatabaseBatchRepository.php148: Executes the provided callback to calculate updates
  5. Apply Changes src/DatabaseBatchRepository.php149: Updates the batch record
  6. Commit: Transaction commits automatically when the closure returns

Sources: src/DatabaseBatchRepository.php141-152


State Management Operations

The repository provides methods for managing batch lifecycle states:

MethodPurposeUpdates
markAsFinished() src/DatabaseBatchRepository.php157-162Mark batch as completeSets finished_at timestamp
cancel() src/DatabaseBatchRepository.php167-173Cancel batch executionSets cancelled_at and finished_at
delete() src/DatabaseBatchRepository.php178-181Remove batch recordDeletes row from table

All three methods perform simple updates without locking, as they represent terminal state transitions that don't require atomic read-modify-write operations.

Sources: src/DatabaseBatchRepository.php157-181


Data Serialization and PostgreSQL Support

The repository handles serialization differently depending on the database driver to work around PostgreSQL's binary data handling.


Sources: src/DatabaseBatchRepository.php262-296

Serialization Logic

The serialize method src/DatabaseBatchRepository.php262-269 checks if the connection is PostgreSQL src/DatabaseBatchRepository.php274-278:

  • PostgreSQL connections (pgsql, pgsql-swoole): Serialized data is base64-encoded
  • Other databases: Serialized data is stored directly

Deserialization Logic

The unserialize method src/DatabaseBatchRepository.php283-296 reverses the process:

  1. Check if connection is PostgreSQL
  2. If yes, check if the string lacks : or ; characters (indicating base64 encoding)
  3. Decode from base64 if necessary
  4. Unserialize the data
  5. Return empty array on error src/DatabaseBatchRepository.php292-294

Sources: src/DatabaseBatchRepository.php262-296


Converting Database Records to Batch Objects

The toBatch method src/DatabaseBatchRepository.php301-316 transforms raw database records into Batch instances using the BatchFactory.


Sources: src/DatabaseBatchRepository.php301-316 src/BatchFactory.php26-40

Field Transformations

The method performs several transformations on the raw database record:

Database FieldTransformationResult Type
idDirect pass-throughstring
nameDirect pass-throughstring
total_jobsCast to integerint
pending_jobsCast to integerint
failed_jobsCast to integerint
failed_job_idsJSON decode to arrayarray
optionsUnserializearray
created_atConvert to CarbonImmutableCarbonImmutable
cancelled_atConvert to CarbonImmutable or null?CarbonImmutable
finished_atConvert to CarbonImmutable or null?CarbonImmutable

Timestamp Handling src/DatabaseBatchRepository.php312-314: The method uses CarbonImmutable::createFromTimestamp() with the default timezone for timestamp conversion.

Sources: src/DatabaseBatchRepository.php301-316


Transaction Support

The repository implements the transaction methods required by the BatchRepository contract:

Execute Within Transaction

The transaction method src/DatabaseBatchRepository.php246-249 wraps operations in a database transaction:


This allows batch operations to be grouped atomically with other database operations.

Manual Rollback

The rollBack method src/DatabaseBatchRepository.php254-257 provides manual transaction rollback:


Sources: src/DatabaseBatchRepository.php246-257


Connection Management

The repository provides methods for managing database connections:

Getting the Current Connection

The connection method src/DatabaseBatchRepository.php321-324 resolves the database connection:


If no connection name was specified in the constructor, this resolves to the default database connection.

Setting the Connection

The setConnection method src/DatabaseBatchRepository.php329-334 allows runtime connection switching:


This is useful for multi-tenant applications or when different batches need to be stored in different databases.

Sources: src/DatabaseBatchRepository.php321-334


Summary

The DatabaseBatchRepository provides a robust, production-ready implementation for persisting batch data with the following key characteristics:

FeatureImplementation
PersistenceSingle database table with configurable name and connection
ConcurrencyPessimistic locking for atomic counter updates
CompatibilitySpecial serialization handling for PostgreSQL
ScalabilityChunked deletion in pruning methods (see Pruning Old Batch Data)
Type SafetyStrong typing with contract interfaces
Factory PatternDecouples Batch object creation from repository logic

Sources: src/DatabaseBatchRepository.php17-335 src/DatabaseBatchRepositoryFactory.php1-24 src/BatchFactory.php1-41