ParallelService Class
Thread-safe parallel processing with built-in observability and concurrency control
The ParallelService offers thread-safe parallel processing with built-in observability, concurrency control, and flexible configuration options.
In particular, it provides dynamically configurable concurrency levels for CPU and I/O bound operations in .NET applications.
ParallelService is part of Diginsight.Components.
The service supports both synchronous and asynchronous parallel operations with predefined concurrency levels (Low, Medium, High) or custom parallelism settings.
Table of Contents
📋 Overview
The ParallelService provides a unified interface for executing parallel operations with controlled concurrency. It automatically manages thread allocation, provides built-in observability through Diginsight telemetry, and supports both CPU-bound and I/O-bound workloads with optimized execution strategies.
Key Features
- Controlled Concurrency: Predefined Low, Medium, and High concurrency levels with configurable limits
- Flexible Execution: Support for both synchronous and asynchronous parallel operations
- Built-in Observability: Automatic telemetry and activity tracking through Diginsight observability
- Environment Overrides: Runtime concurrency control via environment variables
- WhenAll Support: Parallel execution of task factories with result collection
- Tuple Decomposition: Type-safe parallel execution with structured result handling
- Exception Safety: Graceful handling of break conditions and operation failures
- Configuration Flexibility: Dynamic and volatile configuration support
Concurrency Levels
The service provides three predefined concurrency levels optimized for different workload types:
| Level | Default Value | Typical Use Case |
|---|---|---|
| Low | 3 | I/O-bound operations, external API calls |
| Medium | 6 | Mixed workloads, moderate CPU usage |
| High | 12 | CPU-intensive operations, computational tasks |
All levels respect the global MaxConcurrency setting when configured.
🔍 Additional Details
Synchronous vs Asynchronous Operations
The ParallelService supports both execution patterns with different underlying implementations:
Synchronous Operations (ForEach): - Uses Parallel.ForEach from TPL - Best for CPU-bound operations - Automatic work-stealing and load balancing - Thread pool based execution
Asynchronous Operations (ForEachAsync, WhenAllAsync): - Uses Parallel.ForEachAsync (.NET 6+) or custom implementation (.NET Framework/Core) - Optimized for I/O-bound operations - Async/await pattern support - Better resource utilization for async workloads
Concurrency Control
The service implements a hierarchical concurrency control system:
- Global MaxConcurrency: Sets upper limit for all operations
- Level-specific Settings: LowConcurrency, MediumConcurrency, HighConcurrency
- Runtime Overrides: Environment variables can override settings
- Default Fallbacks: Built-in defaults when no configuration is provided
Environment Variable Overrides
The service supports runtime configuration through the MaxConcurrency environment variable:
This override takes precedence over configuration files and is useful for: - Production tuning without redeployment - Environment-specific optimization - Load testing scenarios - Container resource limiting
Break Loop Exception Handling
The service provides controlled loop termination through BreakLoopException:
The exception allows graceful termination of parallel loops while preserving context information for debugging and telemetry.
⚙️ Configuration
Configuration in appsettings.json
Configuration in the startup sequence
Register the ParallelService in your dependency injection container:
// Basic registration
services.AddScoped<IParallelService, ParallelService>();
// With configuration
services.Configure<ParallelServiceOptions>(options =>
{
options.MaxConcurrency = 16; // Global maximum
options.LowConcurrency = 4; // For I/O operations
options.MediumConcurrency = 8; // For mixed workloads
options.HighConcurrency = 16; // For CPU operations
});
// Register options from configuration
services.Configure<ParallelServiceOptions>(
configuration.GetSection("ParallelServiceOptions"));Dynamic Configuration
The service supports dynamic configuration updates through the options pattern:
// Dynamic configuration update
public class ParallelServiceController : ControllerBase
{
private readonly IOptionsMonitor<ParallelServiceOptions> _optionsMonitor;
public ParallelServiceController(IOptionsMonitor<ParallelServiceOptions> optionsMonitor)
{
_optionsMonitor = optionsMonitor;
}
[HttpPost("update-concurrency")]
public IActionResult UpdateConcurrency([FromBody] ParallelServiceOptions newOptions)
{
// Options will be automatically updated and applied to new operations
// Note: Existing operations continue with their original settings
return Ok();
}
}💡 Usage Examples
Basic ForEach Operations
public class DocumentProcessor
{
private readonly IParallelService _parallelService;
public DocumentProcessor(IParallelService parallelService)
{
_parallelService = parallelService;
}
public void ProcessDocuments(IEnumerable<Document> documents)
{
// CPU-intensive processing with high concurrency
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _parallelService.HighConcurrency
};
_parallelService.ForEach(documents, options, document =>
{
// Synchronous processing
ProcessDocument(document);
});
}
public void ValidateDocuments(IEnumerable<Document> documents)
{
// Light validation with low concurrency
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _parallelService.LowConcurrency
};
_parallelService.ForEach(documents, options, document =>
{
ValidateDocument(document);
});
}
}Async ForEach Operations
public class ApiDataProcessor
{
private readonly IParallelService _parallelService;
private readonly HttpClient _httpClient;
public ApiDataProcessor(IParallelService parallelService, HttpClient httpClient)
{
_parallelService = parallelService;
_httpClient = httpClient;
}
public async Task ProcessUrlsAsync(IEnumerable<string> urls)
{
// I/O-bound operations with medium concurrency
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _parallelService.MediumConcurrency
};
await _parallelService.ForEachAsync(urls, options, async url =>
{
try
{
var response = await _httpClient.GetAsync(url);
var content = await response.Content.ReadAsStringAsync();
await ProcessContentAsync(content);
}
catch (Exception ex)
{
// Handle individual URL failures
LogError($"Failed to process {url}: {ex.Message}");
}
});
}
}WhenAll Operations
public class DataAggregationService
{
private readonly IParallelService _parallelService;
public async Task<AggregatedData> GetAggregatedDataAsync()
{
var taskFactories = new List<Func<Task>>
{
() => LoadUserDataAsync(),
() => LoadProductDataAsync(),
() => LoadOrderDataAsync(),
() => LoadInventoryDataAsync()
};
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _parallelService.HighConcurrency
};
// Execute all tasks in parallel
await _parallelService.WhenAllAsync(taskFactories, options);
return new AggregatedData();
}
public async Task<List<T>> ProcessMultipleEndpointsAsync<T>(
IEnumerable<Func<Task<T>>> endpointCalls)
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _parallelService.LowConcurrency
};
var results = await _parallelService.WhenAllAsync(endpointCalls, options);
return results.ToList();
}
}Tuple Decomposition
public class UserProfileService
{
private readonly IParallelService _parallelService;
public async Task<UserProfileViewModel> GetUserProfileAsync(int userId)
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _parallelService.MediumConcurrency
};
// Execute multiple related operations in parallel
var (user, preferences, activities, notifications) =
await _parallelService.WhenAllAsync(
() => GetUserAsync(userId),
() => GetUserPreferencesAsync(userId),
() => GetRecentActivitiesAsync(userId),
() => GetNotificationsAsync(userId),
options
);
return new UserProfileViewModel
{
User = user,
Preferences = preferences,
Activities = activities,
Notifications = notifications
};
}
// Also supports 2-tuple and 3-tuple overloads
public async Task<(UserData, UserPreferences)> GetBasicProfileAsync(int userId)
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _parallelService.LowConcurrency
};
return await _parallelService.WhenAllAsync(
() => GetUserAsync(userId),
() => GetUserPreferencesAsync(userId),
options
);
}
}🔧 Troubleshooting
Common Issues
1. Poor Performance with High Concurrency
High concurrency doesn’t always mean better performance:
// Problem: Too much concurrency for I/O operations
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _parallelService.HighConcurrency // 12 threads
};
await _parallelService.ForEachAsync(apiCalls, options, CallExternalApiAsync);
// Solution: Use appropriate concurrency level
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _parallelService.LowConcurrency // 3 threads
};
await _parallelService.ForEachAsync(apiCalls, options, CallExternalApiAsync);2. Thread Starvation
When mixing CPU and I/O operations:
// Problem: Blocking the thread pool
_parallelService.ForEach(items, options, item =>
{
// Blocking I/O operation on thread pool thread
var result = httpClient.GetAsync(item.Url).Result;
ProcessResult(result);
});
// Solution: Use async version
await _parallelService.ForEachAsync(items, options, async item =>
{
var result = await httpClient.GetAsync(item.Url);
ProcessResult(result);
});3. Configuration Not Applied
Ensure proper service registration and configuration:
// Verify registration order
services.Configure<ParallelServiceOptions>(configuration.GetSection("ParallelServiceOptions"));
services.AddScoped<IParallelService, ParallelService>();
// Verify configuration values
var options = serviceProvider.GetRequiredService<IOptions<ParallelServiceOptions>>();
Console.WriteLine($"MaxConcurrency: {options.Value.MaxConcurrency}");Performance Considerations
CPU-Bound Operations: - Use ForEach for better performance - Set concurrency to CPU core count or slightly higher - Avoid excessive context switching
I/O-Bound Operations: - Use ForEachAsync and WhenAllAsync - Keep concurrency moderate (3-10) to avoid overwhelming external services - Use appropriate timeout settings
Memory Usage: - Monitor memory consumption with large datasets - Consider batching for very large collections - Be aware of closure captures in lambda expressions
Debugging
Enable detailed logging for troubleshooting:
Use the built-in observability features:
📚 Reference
Classes and Interfaces
IParallelService: Main service interface for parallel operationsParallelService: Default implementation with configurable concurrency levelsIParallelServiceOptions: Configuration interface for concurrency settingsParallelServiceOptions: Configuration implementation with dynamic/volatile supportBreakLoopException: Exception for controlled loop termination
Methods
ForEach Operations
WhenAll Operations
Tuple Decomposition Overloads
Task<(T1, T2)> WhenAllAsync<T1, T2>(
Func<Task<T1>> taskFactory1,
Func<Task<T2>> taskFactory2,
ParallelOptions parallelOptions)
Task<(T1, T2, T3)> WhenAllAsync<T1, T2, T3>(
Func<Task<T1>> taskFactory1,
Func<Task<T2>> taskFactory2,
Func<Task<T3>> taskFactory3,
ParallelOptions parallelOptions)
Task<(T1, T2, T3, T4)> WhenAllAsync<T1, T2, T3, T4>(
Func<Task<T1>> taskFactory1,
Func<Task<T2>> taskFactory2,
Func<Task<T3>> taskFactory3,
Func<Task<T4>> taskFactory4,
ParallelOptions parallelOptions)Configuration Properties
| Property | Type | Default | Description |
|---|---|---|---|
MaxConcurrency |
int |
0 |
Global maximum concurrency limit (0 = unlimited) |
LowConcurrency |
int |
3 |
Concurrency level for I/O-bound operations |
MediumConcurrency |
int |
6 |
Concurrency level for mixed workloads |
HighConcurrency |
int |
12 |
Concurrency level for CPU-intensive operations |
Default Values
💡 Best Practices
Choosing Concurrency Levels
Low Concurrency (3 threads): - External API calls - Database queries - File I/O operations - Network requests to rate-limited services
Medium Concurrency (6 threads): - Mixed CPU/I/O workloads - Image/document processing - Data transformation tasks - Moderate computational work
High Concurrency (12 threads): - Pure CPU-bound calculations - Mathematical computations - Cryptographic operations - Parallel algorithms
Thread Safety
Ensure thread-safe operations in parallel bodies:
// Thread-safe: Each operation works on independent data
await _parallelService.ForEachAsync(users, options, async user =>
{
user.ProcessedAt = DateTime.UtcNow; // Safe: each user is independent
await SaveUserAsync(user); // Safe: if SaveUserAsync is thread-safe
});
// Not thread-safe: Shared state modification
var totalProcessed = 0;
_parallelService.ForEach(items, options, item =>
{
ProcessItem(item);
totalProcessed++; // Race condition!
});
// Thread-safe alternative: Use concurrent collections
var totalProcessed = 0;
_parallelService.ForEach(items, options, item =>
{
ProcessItem(item);
Interlocked.Increment(ref totalProcessed); // Safe atomic operation
});Resource Management
Properly manage resources in parallel operations:
public class ResourceManagedProcessor
{
private readonly IParallelService _parallelService;
private readonly SemaphoreSlim _semaphore;
public ResourceManagedProcessor(IParallelService parallelService)
{
_parallelService = parallelService;
_semaphore = new SemaphoreSlim(5, 5); // Limit concurrent resource usage
}
public async Task ProcessItemsAsync(IEnumerable<Item> items)
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _parallelService.MediumConcurrency
};
await _parallelService.ForEachAsync(items, options, async item =>
{
await _semaphore.WaitAsync();
try
{
using var resource = CreateExpensiveResource();
await ProcessWithResourceAsync(item, resource);
}
finally
{
_semaphore.Release();
}
});
}
}Key Guidelines: - Always consider the nature of your workload (CPU vs I/O bound) - Monitor resource usage and adjust concurrency accordingly - Use appropriate exception handling for individual operations - Leverage the built-in observability for performance monitoring - Test with realistic data volumes and network conditions