VOOZH about

URL: https://deepwiki.com/inclusionAI/AReaL/10.7-distributed-data-service

⇱ Distributed Data Service | inclusionAI/AReaL | DeepWiki


Loading...
Last indexed: 7 May 2026 (2e12c1)
Menu

Distributed Data Service

The Distributed Data Service is a microservice-based architecture designed to decouple data loading from the main training and inference processes. By offloading dataset management, tokenization, and sampling to dedicated worker processes, AReaL ensures that training engines remain unblocked by I/O-bound data operations. This service is particularly critical for large-scale RL experiments where data preparation (e.g., image processing for VLMs or complex reward filtering) can become a bottleneck.

Architectural Overview

The service follows a hierarchical microservice pattern consisting of a DataController, a Gateway, a Router, and multiple DataWorkers.

System Components

ComponentRoleCode Entity
DataControllerOrchestrator that manages the lifecycle of the entire service.DataController areal/infra/data_service/controller/controller.py35-72
DataGatewayEntry point for clients; handles authentication and request forwarding.create_gateway_app areal/infra/data_service/gateway/app.py78-79
DataRouterMaintains worker health and provides load balancing (Round Robin).create_router_app areal/infra/data_service/router/app.py49-53
DataWorkerLoads the actual datasets and serves samples over HTTP.create_worker_app areal/infra/data_service/worker/app.py52-75
RDatasetClient-side proxy that implements the PyTorch Dataset interface.RDataset areal/infra/data_service/rdataset.py157-176

Service Topology and Data Flow

The following diagram illustrates how the components interact to serve data to a training rank.

Data Service Microservice Interaction


Sources: areal/infra/data_service/rdataset.py36-54 areal/infra/data_service/gateway/app.py23-36 areal/infra/data_service/router/app.py121-136

DataController Lifecycle

The DataController is responsible for launching the service infrastructure. It uses the Scheduler to create RPCGuard workers, which then fork the specific service processes.

  1. Initialization: Launches RPCGuard workers based on the DataServiceConfig areal/infra/data_service/controller/controller.py142-181
  2. Forking: Commands the guards to fork DataWorker, DataRouter, and DataGateway processes areal/infra/data_service/controller/controller.py197-251
  3. Registration: Registers each DataWorker address with the DataRouter areal/infra/data_service/controller/controller.py253-263
  4. Dataset Registration: When a dataset is added, the controller broadcasts a load command to all workers and generates a unique API key via the gateway areal/infra/data_service/gateway/app.py109-177

Sources: areal/infra/data_service/controller/controller.py77-186 areal/infra/data_service/controller/config.py11-30

Remote Dataset (RDataset) and Prefetching

The RDataset acts as a local proxy for a dataset residing on the remote workers. To hide network latency, it employs a _PrefetchBuffer that runs in a background thread.

Prefetching Mechanism

The _PrefetchBuffer maintains a local cache of samples. It observes the index order provided by the sampler and proactively fetches chunks of data.

Data Fetching Flow

Code Entity Mapping: Request to Sample


Sources: areal/infra/data_service/rdataset.py78-155 areal/infra/data_service/rdataset.py210-230

DataWorker Implementation

Each DataWorker is a FastAPI application that manages multiple datasets in memory.

Key API Endpoints

EndpointRequest TypeDescription
/datasets/loadWorkerLoadDatasetRequestLoads a dataset using _get_custom_dataset and initializes a StatefulDataLoader areal/infra/data_service/worker/app.py104-143
/v1/samples/fetchFetchSamplesRequestRetrieves serialized samples for the requested indices areal/infra/data_service/worker/app.py185-189
/datasets/state/saveWorkerStateSaveRequestSaves the StatefulDataLoader state to disk for checkpointing areal/infra/data_service/worker/app.py228-243
/datasets/unloadWorkerUnloadDatasetRequestClears the dataset from worker memory areal/infra/data_service/worker/app.py191-226

Concurrency and Locking

The worker uses a dual-locking strategy to ensure thread safety:

  1. datasets_lock: Guards the global dictionary of loaded datasets and the _loading_ids reservation set areal/infra/data_service/worker/app.py63
  2. state.lock: A per-dataset lock that ensures operations like epoch resets or state saving do not conflict with sample fetching areal/infra/data_service/worker/app.py49

Sources: areal/infra/data_service/worker/app.py41-63 areal/infra/data_service/types.py11-45

Distributed Sampling and Evaluation

The service supports both standard training and evaluation sampling.

Sources: areal/infra/data_service/worker/app.py124-143 areal/infra/data_service/worker/config.py9-15