VOOZH about

URL: https://dzone.com/articles/scheduler-agent-supervisor-pattern-reliable-task-o

⇱ Scalable Distributed Workflows with the SAS Pattern


Related

  1. DZone
  2. Software Design and Architecture
  3. Microservices
  4. Scheduler-Agent-Supervisor Pattern: Reliable Task Orchestration in Distributed Systems

Scheduler-Agent-Supervisor Pattern: Reliable Task Orchestration in Distributed Systems

The Scheduler-Agent-Supervisor (SAS) pattern enables scalable, resilient, and maintainable distributed task execution through clear role separation.

Likes
Comment
Save
2.7K Views

Join the DZone community and get the full member experience.

Join For Free

The Scheduler-Agent-Supervisor (SAS) pattern is a powerful architectural approach for managing distributed, asynchronous, and long-running tasks in a reliable and scalable way. It is particularly well-suited for systems where work needs to be orchestrated across many independent units—each capable of failing and retrying—while maintaining observability and idempotency.

This pattern divides responsibilities into three well-defined roles:

  1. Scheduler: Initiates workflows and tracks high-level progress
  2. Agent: Executes individual task units
  3. Supervisor: Monitors and manages task execution

Key Components With C# Implementation

1. Scheduler Component

The scheduler triggers the workflow. Here's a C# example using a timer:

C#
public class DataExportScheduler : BackgroundService
{
 private readonly ILogger<DataExportScheduler> _logger;
 private readonly ISupervisorClient _supervisorClient;
 private readonly Timer _timer;

 public DataExportScheduler(ILogger<DataExportScheduler> logger, ISupervisorClient supervisorClient)
 {
 _logger = logger;
 _supervisorClient = supervisorClient;
 _timer = new Timer(ExecuteScheduledJob, null, Timeout.Infinite, Timeout.Infinite);
 }

 protected override Task ExecuteAsync(CancellationToken stoppingToken)
 {
 // Run every day at 2 AM
 _timer.Change(GetNextRunTime(), Timeout.Infinite);
 return Task.CompletedTask;
 }

 private TimeSpan GetNextRunTime()
 {
 var now = DateTime.Now;
 var nextRun = now.Date.AddDays(1).AddHours(2); // Tomorrow at 2 AM
 return nextRun - now;
 }

 private async void ExecuteScheduledJob(object state)
 {
 _logger.LogInformation("Initiating data export workflow");

 try
 {
 var fileList = await GetFileListToProcess();
 await _supervisorClient.StartWorkflowAsync(fileList);
 }
 catch (Exception ex)
 {
 _logger.LogError(ex, "Failed to initiate workflow");
 }
 finally
 {
 // Reset the timer for next run
 _timer.Change(GetNextRunTime(), Timeout.Infinite);
 }
 }

 private async Task<List<string>> GetFileListToProcess()
 {
 // Implementation to fetch files from storage
 return new List<string> { "file1.csv", "file2.csv" /* ... */ };
 }
}


2. Agent Component

Agents perform the actual work. Here's an idempotent file processor:

C#
public class FileProcessingAgent
{
 private readonly ILogger<FileProcessingAgent> _logger;
 private readonly IBlobStorageService _storageService;
 private readonly IDatabaseRepository _repository;

 public FileProcessingAgent(
 ILogger<FileProcessingAgent> logger,
 IBlobStorageService storageService,
 IDatabaseRepository repository)
 {
 _logger = logger;
 _storageService = storageService;
 _repository = repository;
 }

 [FunctionName("ProcessFile")]
 public async Task ProcessFile(
 [ActivityTrigger] string fileName,
 ExecutionContext context)
 {
 // Check if already processed (idempotency check)
 if (await _repository.IsFileProcessed(fileName))
 {
 _logger.LogInformation($"File {fileName} already processed. Skipping.");
 return;
 }

 try
 {
 _logger.LogInformation($"Processing file: {fileName}");

 // 1. Download file
 var fileContent = await _storageService.DownloadFileAsync(fileName);

 // 2. Parse content
 var records = CsvParser.Parse(fileContent);

 // 3. Transform data
 var transformedData = DataTransformer.Transform(records);

 // 4. Upload to database
 await _repository.BulkInsertAsync(transformedData);

 // 5. Mark as completed
 await _repository.MarkFileAsProcessed(fileName);

 _logger.LogInformation($"Successfully processed {fileName}");
 }
 catch (Exception ex)
 {
 _logger.LogError(ex, $"Failed to process file {fileName}");

 // Clean up any partial state
 await _repository.RollbackFileProcessing(fileName);

 throw; // Let supervisor handle retry
 }
 }
}


3. Supervisor Component

The supervisor orchestrates and monitors the workflow:

C#
public class FileProcessingSupervisor

{

 private readonly ILogger<FileProcessingSupervisor> _logger;

 private readonly IAgentClient _agentClient;

 private readonly INotificationService _notificationService;

 

 public FileProcessingSupervisor(

 ILogger<FileProcessingSupervisor> logger,

 IAgentClient agentClient,

 INotificationService notificationService)

 {

 _logger = logger;

 _agentClient = agentClient;

 _notificationService = notificationService;

 }

 

 [FunctionName("FileProcessingOrchestrator")]

 public async Task RunOrchestrator(

 [OrchestrationTrigger] IDurableOrchestrationContext context)

 {

 var files = context.GetInput<List<string>>();

 var retryOptions = new RetryOptions(

 firstRetryInterval: TimeSpan.FromSeconds(30),

 maxNumberOfAttempts: 3);

 

 _logger.LogInformation($"Starting processing of {files.Count} files");

 

 // Parallel processing with retry logic

 var processingTasks = new List<Task>();

 foreach (var file in files)

 {

 var task = context.CallActivityWithRetryAsync(

 "ProcessFile",

 retryOptions,

 file);

 processingTasks.Add(task);

 }

 

 try

 {

 await Task.WhenAll(processingTasks);

 _logger.LogInformation("All files processed successfully");

 }

 catch (Exception ex)

 {

 _logger.LogError(ex, "Some files failed processing after retries");

 

 // Get failed files

 var failedFiles = processingTasks

 .Where(t => t.IsFaulted)

 .Select(t => (string)t.AsyncState)

 .ToList();

 

 await _notificationService.SendAlert(

 "File Processing Failure",

 $"Failed to process {failedFiles.Count} files: {string.Join(", ", failedFiles)}");

 

 // Persist failed files for manual intervention

 await context.CallActivityAsync("PersistFailedFiles", failedFiles);

 

 // Continue workflow with remaining files

 throw;

 }

 }

}


Complete System Integration

Here's how to wire up the components in a .NET application:

C#
var builder = Host.CreateDefaultBuilder(args)
 .ConfigureServices((context, services) =>
 {
 // Register components
 services.AddHostedService<DataExportScheduler>();
 services.AddSingleton<ISupervisorClient, DurableFunctionsSupervisorClient>();
 services.AddSingleton<IAgentClient, AzureFunctionsAgentClient>();

 // Register dependencies
 services.AddSingleton<IBlobStorageService, AzureBlobStorageService>();
 services.AddSingleton<IDatabaseRepository, SqlDatabaseRepository>();
 services.AddSingleton<INotificationService, EmailNotificationService>();

 // Configure Durable Functions
 services.AddDurableTask(options =>
 {
 options.HubName = "FileProcessingHub";
 options.StorageProvider["maxQueuePollingInterval"] = "00:00:10";
 });
 })
 .ConfigureLogging(logging =>
 {
 logging.AddApplicationInsights();
 logging.AddConsole();
 });

await builder.Build().RunAsync();


When to Use the SAS Pattern

Ideal Use Cases:

  • ETL pipelines: Processing large volumes of data with reliability requirements
  • Order fulfillment systems: Where each step must be tracked and retried
  • Distributed computations: Breaking large problems into smaller, parallel tasks

Anti-Patterns:

  • Simple CRUD operations: Where the overhead isn't justified
  • Real-time processing: Consider event streaming patterns instead
  • Synchronous workflows: Where immediate response is required

Best Practices

Idempotency:

C#
// Example idempotent operation
public async Task ProcessOrder(Order order)
{
 // Check if already processed
 if (await _repository.OrderExists(order.Id))
 return;

 // Process with transaction
 using var transaction = await _repository.BeginTransactionAsync();
 try
 {
 await _inventoryService.ReserveItems(order.Items);
 await _paymentService.ProcessPayment(order.Payment);
 await _repository.SaveOrder(order);

 await transaction.CommitAsync();
 }
 catch
 {
 await transaction.RollbackAsync();
 throw;
 }
}


Observability:

C#
// Enhanced logging with correlation IDs
public async Task ProcessItem(string itemId)
{
 using var scope = _logger.BeginScope(new Dictionary<string, object>
 {
 ["CorrelationId"] = Guid.NewGuid(),
 ["ItemId"] = itemId
 });

 _logger.LogInformation("Starting processing");
 var stopwatch = Stopwatch.StartNew();

 try
 {
 // Processing logic...
 _logger.LogInformation("Processing completed in {ElapsedMs}ms",
 stopwatch.ElapsedMilliseconds);
 }
 catch (Exception ex)
 {
 _logger.LogError(ex, "Processing failed after {ElapsedMs}ms",
 stopwatch.ElapsedMilliseconds);
 throw;
 }
}


Circuit Breakers:

C#
// Using Polly for resilient HTTP calls

var circuitBreaker = Policy
 .Handle<HttpRequestException>()
 .Or<TimeoutException>()
 .CircuitBreakerAsync(
 exceptionsAllowedBeforeBreaking: 3,
 durationOfBreak: TimeSpan.FromMinutes(1));

public async Task CallExternalService()
{
 await circuitBreaker.ExecuteAsync(async () =>
 {
 var response = await _httpClient.GetAsync("https://api.example.com/data");
 response.EnsureSuccessStatusCode();
 return await response.Content.ReadAsStringAsync();
 });
}


Conclusion

The Scheduler-Agent-Supervisor pattern provides a robust framework for building distributed systems that require:

  1. Resilience: Automatic retries and failure handling
  2. Scalability: Parallel processing of independent tasks
  3. Maintainability: Clear separation of concerns
  4. Auditability: Comprehensive tracking of task states
systems Architecture Observability

Opinions expressed by DZone contributors are their own.

Related

  • Building an Agentic Incident Resolution System for Developers
  • Implementing Observability in Distributed Systems Using OpenTelemetry
  • Designing API-First EMR Architectures in .NET: Enabling Modular Growth in Compliance-Driven Systems
  • How Retry Storms Crash API-Led Systems: Bounded Reliability Patterns for Distributed Architectures

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

Let's be friends: