CosmosDbObservableExtensions Class

reference
cosmos-db
observability
extensions
A comprehensive reference for the CosmosDbObservableExtensions class that provides observable extensions for CosmosDB operations, offering enhanced logging, metrics, and OpenTelemetry integration for database operations.
Author

Diginsight Team

Published

October 4, 2025

CosmosDbObservableExtensions Class

The CosmosDbObservableExtensions class provides observable extensions for CosmosDB operations.

In particular, it offers observable versions of common CosmosDB SDK methods that:

  • provide logging of Queries and CRUD operations together with their payload sizes, the databases and collections they are exetuted with and, optionally, their full request and response payloads.
  • provides metrics to OpenTelemetry about Latencies, inbound and oudbound data volumes and Request Units (RU) consumption for every operation. such metrics are tagged with database, container, operation type and caller information to allow detailed performance analysis.

CosmosDbObservableExtensions is part of the Diginsight.Components.Azure package.

Table of Contents

📋 Overview

The CosmosDbObservableExtensions class provides observable versions of standard CosmosDB SDK methods, automatically adding:

  • Structured Logging: Detailed logs with connection info, queries, and results
  • OpenTelemetry Activities: Distributed tracing with custom tags
  • Error Handling: Comprehensive exception logging and re-throwing
  • Performance Metrics: Automatic RU consumption tracking
  • Context Enrichment: Container, database, and endpoint information

Key Features

  • Zero Configuration: Works out-of-the-box with existing CosmosDB code
  • Non-Intrusive: Drop-in replacements for standard SDK methods
  • Rich Telemetry: Comprehensive observability without manual instrumentation
  • Error Resilience: Graceful error handling without impact on application flow
  • Query Cost Tracking: Automatic Request Unit (RU) consumption monitoring
  • Structured Data: JSON serialization of entities and responses

Observable Operations

All major CosmosDB operations have observable counterparts:

Category Operations Count
Query FeedIterator, LINQ Queryable 8 methods
CRUD Create, Read, Upsert, Replace, Delete 10 methods
Batch ReadMany, DeleteAllByPartitionKey, TransactionalBatch 5 methods
Patch PatchItem operations 2 methods
Utility AsyncEnumeration, Response reading 3 methods

🔍 Additional Details

Synchronous vs Asynchronous Operations

The observable extensions maintain the same patterns as the original CosmosDB SDK while adding comprehensive telemetry. Every observable operation creates rich diagnostic information including:

Operation Context: - Database endpoint and authentication details - Container and partition information - Request timing and concurrency patterns

Performance Metrics: - Request Units (RU) consumption per operation - Latency measurements and distribution - Throughput and operation success rates

Query Analysis: - Full query text with parameter values - Query execution plans and optimization hints - Result set sizes and pagination information

Activity Tracking

Every observable operation creates an OpenTelemetry activity with rich context:

Standard Activity Tags: - query: SQL query text or operation type - container: CosmosDB container name - database: CosmosDB database name
- endpoint: CosmosDB endpoint URI - operation_type: Type of operation (query, read, create, etc.)

Query Cost Integration: - query_cost: Request Units consumed (when available) - Automatic integration with QueryCostMetricRecorder

Example Activity:

Activity: CosmosDbObservableExtensions.GetItemQueryIteratorObservable
  - query: "SELECT * FROM c WHERE c.Status = @status"
  - container: "users"
  - database: "myapp"
  - endpoint: "https://myaccount.documents.azure.com:443/"
  - query_cost: 12.45

Logging

Comprehensive structured logging with consistent patterns across all operations:

Query Operations

🔍 CosmosDB query for class 'User' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
🔍 Query: "SELECT * FROM c WHERE c.Status = 'Active'"

Item Operations

📦 CosmosDB create item for class 'User' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'  
📦 entity:{"Id":"user123","Name":"John Doe","Status":"Active"}

🔍 CosmosDB read item for id 'user123' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
🔍 partitionKey:["users"]

🔄 CosmosDB upsert for class 'User' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
🔄 entity:{"Id":"user123","Name":"John Updated","Status":"Active"}

🗑️ CosmosDB delete item for class 'User' with id 'user123' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
🗑️ partitionKey:["users"]

Patch Operations

✂️ CosmosDB patch item for class 'User' with id 'user123' in database https://myaccount.documents.azure.com:443/, container users, collection 'users'
✂️ partitionKey:["users"]  
✂️ patchOperations:Replace /status "Inactive", Set /modifiedDate "2023-12-01T10:30:00Z"

Error Logging

❌ Error creating item in CosmosDB for type User: Request rate is large. ActivityId: 12345678-1234-1234-1234-123456789012

Metrics

When integrated with QueryCostMetricRecorder, observable operations automatically contribute to:

  • diginsight.query_cost histogram: Request Unit consumption tracking
  • Custom tags: Method names, callers, container, and database information
  • Query normalization: Reduced cardinality through query pattern analysis

Error Handling

Comprehensive error handling with detailed logging:

try 
{
    var response = await container.CreateItemObservableAsync(duplicateUser);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.Conflict)
{
    // Conflict automatically logged:
    // ❌ Error creating item in CosmosDB for type User: Item with the specified id already exists
    
    // Handle duplicate item scenario
    await HandleDuplicateItemAsync(duplicateUser);
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
    // Rate limiting automatically logged:
    // ❌ Error creating item in CosmosDB for type User: Request rate is large
    
    // Implement retry logic
    await Task.Delay(ex.RetryAfter ?? TimeSpan.FromSeconds(1));
    return await container.CreateItemObservableAsync(duplicateUser);
}

Error Categories Handled: - Rate Limiting (429): Too Many Requests with retry guidance - Conflicts (409): Item already exists scenarios
- Not Found (404): Missing items or containers - Timeout: Request timeout scenarios - General Exceptions: Network, authentication, and service errors

⚙️ Configuration

Basic Usage

Replace standard CosmosDB SDK method calls with their observable counterparts by adding Observable to the method name:

// Standard SDK
var iterator = container.GetItemQueryIterator<User>("SELECT * FROM c WHERE c.Type = 'Active'");

// Observable version
var iterator = container.GetItemQueryIteratorObservable<User>("SELECT * FROM c WHERE c.Type = 'Active'");

// Standard SDK
var response = await container.ReadItemAsync<User>("user123", new PartitionKey("users"));

// Observable version  
var response = await container.ReadItemObservableAsync<User>("user123", new PartitionKey("users"));

Initial Setup

Ensure Diginsight observability is configured in your application:

// Program.cs
var builder = WebApplication.CreateBuilder(args);

// Add Diginsight observability
builder.Services.AddObservability();

// Configure OpenTelemetry (optional - for metrics)
builder.Services.AddOpenTelemetry()
    .WithMetrics(metrics => metrics.AddMeter("Diginsight.Components.Azure"));

var app = builder.Build();

Dependency Injection

Register CosmosDB services with observable extensions:

// Program.cs
var builder = WebApplication.CreateBuilder(args);

// Add Diginsight observability
builder.Services.AddObservability();

// Add CosmosDB with observable extensions
builder.Services.AddSingleton(sp =>
{
    var connectionString = builder.Configuration.GetConnectionString("CosmosDB");
    return new CosmosClient(connectionString);
});

builder.Services.AddScoped<IUserRepository, UserRepository>();

// UserRepository.cs
public class UserRepository : IUserRepository
{
    private readonly Container _container;
    
    public UserRepository(CosmosClient cosmosClient)
    {
        _container = cosmosClient.GetContainer("MyApp", "users");
    }
    
    public async Task<User> GetUserAsync(string id)
    {
        // Use observable extension - automatic telemetry
        var response = await _container.ReadItemObservableAsync<User>(
            id: id,
            partitionKey: new PartitionKey("users")
        );
        
        return response.Resource;
    }
    
    public async IAsyncEnumerable<User> GetActiveUsersAsync()
    {
        var iterator = _container.GetItemQueryIteratorObservable<User>(
            "SELECT * FROM c WHERE c.Status = 'Active'"
        );
        
        // Use async enumeration utility
        await foreach (var user in iterator.GetAsyncItems())
        {
            yield return user;
        }
    }
}

OpenTelemetry Setup

Configure OpenTelemetry to capture all observable telemetry:

// Program.cs
builder.Services.AddOpenTelemetry()
    .WithTracing(tracing =>
    {
        tracing.AddAspNetCoreInstrumentation();
        tracing.AddHttpClientInstrumentation();
        
        // Add Diginsight activity sources
        tracing.AddSource("Diginsight.Components.Azure");
        
        // Configure exporters
        tracing.AddConsoleExporter();
        tracing.AddJaegerExporter();
    })
    .WithMetrics(metrics =>
    {
        metrics.AddAspNetCoreInstrumentation();
        metrics.AddHttpClientInstrumentation();
        
        // Add Diginsight meters for query cost tracking
        metrics.AddMeter("Diginsight.Components.Azure");
        
        // Configure exporters
        metrics.AddConsoleExporter();
        metrics.AddPrometheusExporter();
    });

// Add query cost metric recording
builder.Services.AddCosmosDbQueryCostMetricRecorder(options =>
{
    options.AddNormalizedQueryTag = true;  // Include query patterns in metrics
    options.AddQueryCallers = 2;           // Include caller context
});

Logging Configuration

Configure logging to capture observable operations:

// appsettings.json
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Information",
      
      // Enable CosmosDB observable logging
      "Diginsight.Components.Azure": "Debug",
      "Diginsight.Components.Azure.Extensions.CosmosDbObservableExtensions": "Information"
    }
  }
}

// Program.cs - Configure structured logging
builder.Services.Configure<LoggerFilterOptions>(options =>
{
    // Show CosmosDB operations at Info level
    options.AddFilter("Diginsight.Components.Azure.Extensions", LogLevel.Information);
    
    // Show query cost metrics at Debug level
    options.AddFilter("Diginsight.Components.Azure.Metrics", LogLevel.Debug);
});

💡 Usage Examples

Query Operations

ToFeedIteratorObservable()

Creates an observable FeedIterator from an IQueryable with automatic telemetry.

// Method signature
public static FeedIterator<T> ToFeedIteratorObservable<T>(this IQueryable<T> query, Container? container = null)
public static FeedIterator<T> ToFeedIteratorObservable<T>(this Container container, IQueryable<T> query)

// Usage
var query = container.GetItemLinqQueryable<User>()
    .Where(u => u.Status == "Active")
    .OrderBy(u => u.CreatedDate);

var iterator = query.ToFeedIteratorObservable(container);
// or
var iterator = container.ToFeedIteratorObservable(query);

Telemetry Added: - Activity with query text and container information - Log messages with query details - Automatic query cost tracking when used with ReadNextObservableAsync()

GetItemQueryIteratorObservable()

Observable version of GetItemQueryIterator<T>() with multiple overloads for different query scenarios.

// String query
var iterator = container.GetItemQueryIteratorObservable<User>(
    query: "SELECT * FROM c WHERE c.Status = @status",
    continuationToken: null,
    requestOptions: new QueryRequestOptions 
    { 
        PartitionKey = new PartitionKey("users"),
        MaxItemCount = 100 
    }
);

// QueryDefinition
var queryDef = new QueryDefinition("SELECT * FROM c WHERE c.Status = @status")
    .WithParameter("@status", "Active");
    
var iterator = container.GetItemQueryIteratorObservable<User>(
    queryDefinition: queryDef,
    continuationToken: null,
    requestOptions: requestOptions
);

// With FeedRange for parallel processing
var iterator = container.GetItemQueryIteratorObservable<User>(
    feedRange: feedRange,
    queryDefinition: queryDef,
    continuationToken: token,
    requestOptions: requestOptions
);

Logged Information: - Database endpoint and container details - Full query text or QueryDefinition - Query parameters (when using QueryDefinition)

GetItemQueryStreamIteratorObservable()

Observable version for stream-based query operations, useful for large result sets or custom deserialization.

// String query
var streamIterator = container.GetItemQueryStreamIteratorObservable(
    query: "SELECT * FROM c",
    continuationToken: null,
    requestOptions: requestOptions
);

// QueryDefinition
var streamIterator = container.GetItemQueryStreamIteratorObservable(
    queryDefinition: queryDef,
    continuationToken: null,
    requestOptions: requestOptions
);

// Process stream results
while (streamIterator.HasMoreResults)
{
    using var response = await streamIterator.ReadNextAsync();
    // Custom stream processing
}

GetItemLinqQueryableObservable()

Creates observable LINQ queryables with optional transformation functions.

// Basic queryable
var queryable = container.GetItemLinqQueryableObservable<User>(
    allowSynchronousQueryExecution: false,
    continuationToken: null,
    requestOptions: requestOptions,
    linqSerializerOptions: null
);

// With transformation
var filteredQueryable = container.GetItemLinqQueryableObservable<User>(
    transform: q => q.Where(u => u.Status == "Active").OrderBy(u => u.Name),
    allowSynchronousQueryExecution: false,
    continuationToken: null,
    requestOptions: requestOptions
);

Use Cases: - Building dynamic LINQ queries - Complex filtering and sorting - Query composition patterns

Item Operations

CreateItemObservableAsync()

Observable version of CreateItemAsync<T>() with comprehensive logging and error handling.

var newUser = new User 
{ 
    Id = "user123", 
    Name = "John Doe", 
    Status = "Active" 
};

var response = await container.CreateItemObservableAsync(
    item: newUser,
    partitionKey: new PartitionKey("users"),
    requestOptions: new ItemRequestOptions { EnableContentResponseOnWrite = true },
    cancellationToken: cancellationToken
);

Console.WriteLine($"Created item with RU cost: {response.RequestCharge}");

Telemetry Features: - Logs full entity details using Stringify() extension - Tracks creation success/failure - Records RU consumption - Activity output includes full response details

ReadItemObservableAsync()

Observable version of ReadItemAsync<T>() with detailed logging.

try 
{
    var response = await container.ReadItemObservableAsync<User>(
        id: "user123",
        partitionKey: new PartitionKey("users"),
        requestOptions: new ItemRequestOptions { ConsistencyLevel = ConsistencyLevel.Session },
        cancellationToken: cancellationToken
    );
    
    var user = response.Resource;
    Console.WriteLine($"Read user: {user.Name}, RU cost: {response.RequestCharge}");
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
    // Item not found - automatically logged by observable extension
    Console.WriteLine("User not found");
}

UpsertItemObservableAsync()

Observable version of UpsertItemAsync<T>() for insert-or-update operations.

var user = new User 
{ 
    Id = "user123", 
    Name = "John Updated", 
    Status = "Active",
    ModifiedDate = DateTime.UtcNow
};

var response = await container.UpsertItemObservableAsync(
    item: user,
    partitionKey: new PartitionKey("users"),
    requestOptions: new ItemRequestOptions 
    { 
        IndexingDirective = IndexingDirective.Include,
        EnableContentResponseOnWrite = true
    },
    cancellationToken: cancellationToken
);

bool wasCreated = response.StatusCode == HttpStatusCode.Created;
bool wasUpdated = response.StatusCode == HttpStatusCode.OK;

ReplaceItemObservableAsync()

Observable version of ReplaceItemAsync<T>() for updating existing items.

var updatedUser = existingUser with { Status = "Inactive", ModifiedDate = DateTime.UtcNow };

var response = await container.ReplaceItemObservableAsync(
    item: updatedUser,
    id: updatedUser.Id,
    partitionKey: new PartitionKey("users"),
    requestOptions: new ItemRequestOptions 
    { 
        IfMatchEtag = existingUser.ETag  // Optimistic concurrency
    },
    cancellationToken: cancellationToken
);

DeleteItemObservableAsync()

Observable version of DeleteItemAsync<T>() with deletion logging.

var response = await container.DeleteItemObservableAsync<User>(
    id: "user123",
    partitionKey: new PartitionKey("users"),
    requestOptions: new ItemRequestOptions 
    { 
        IfMatchEtag = user.ETag  // Ensure no concurrent modifications
    },
    cancellationToken: cancellationToken
);

Console.WriteLine($"Deleted user, RU cost: {response.RequestCharge}");

Stream Operations

For high-performance scenarios, stream-based operations avoid object serialization overhead:

// Create with stream
using var stream = new MemoryStream(JsonSerializer.SerializeToUtf8Bytes(user));
var response = await container.CreateItemStreamObservableAsync(
    streamPayload: stream,
    partitionKey: new PartitionKey("users"),
    requestOptions: requestOptions,
    cancellationToken: cancellationToken
);

// Read with stream  
var streamResponse = await container.ReadItemStreamObservableAsync(
    id: "user123",
    partitionKey: new PartitionKey("users"),
    requestOptions: requestOptions,
    cancellationToken: cancellationToken
);

if (streamResponse.IsSuccessStatusCode)
{
    var user = await JsonSerializer.DeserializeAsync<User>(streamResponse.Content);
}

Batch Operations

CreateTransactionalBatchObservable()

Creates an observable transactional batch for atomic operations within a single partition.

// Create a transactional batch for a specific partition
var batch = container.CreateTransactionalBatchObservable(new PartitionKey("user-partition"));

// Add operations to the batch
batch.CreateItem(newUser);
batch.UpsertItem(existingUser);
batch.DeleteItem("item-to-delete");

// Execute the batch atomically
var response = await batch.ExecuteObservableAsync(cancellationToken);

if (response.IsSuccessStatusCode)
{
    Console.WriteLine($"Batch executed successfully, RU cost: {response.RequestCharge}");
    
    // Process individual operation results
    for (int i = 0; i < response.Count; i++)
    {
        var operationResult = response[i];
        Console.WriteLine($"Operation {i}: {operationResult.StatusCode}");
    }
}

ExecuteObservableAsync()

Executes a transactional batch with full observability.

var batch = container.CreateTransactionalBatch(new PartitionKey("partition-key"));
batch.CreateItem(item1);
batch.UpsertItem(item2);

// Execute with observable telemetry
var response = await batch.ExecuteObservableAsync(cancellationToken);

Console.WriteLine($"Batch execution completed with {response.RequestCharge} RU consumed");

ReadManyItemsObservableAsync()

Efficiently read multiple items in a single request.

var itemsToRead = new List<(string id, PartitionKey partitionKey)>
{
    ("user1", new PartitionKey("users")),
    ("user2", new PartitionKey("users")),
    ("user3", new PartitionKey("users"))
};

var response = await container.ReadManyItemsObservableAsync<User>(
    items: itemsToRead,
    readManyRequestOptions: new ReadManyRequestOptions 
    { 
        ConsistencyLevel = ConsistencyLevel.Session 
    },
    cancellationToken: cancellationToken
);

foreach (var user in response)
{
    Console.WriteLine($"Read user: {user.Name}");
}

Console.WriteLine($"Read {response.Count} users with {response.RequestCharge} RU");

DeleteAllItemsByPartitionKeyStreamObservableAsync()

Delete all items in a partition key efficiently.

var response = await container.DeleteAllItemsByPartitionKeyStreamObservableAsync(
    partitionKey: new PartitionKey("inactive-users"),
    requestOptions: new RequestOptions 
    { 
        IfMatchEtag = partitionETag  // Optional optimistic concurrency
    },
    cancellationToken: cancellationToken
);

if (response.IsSuccessStatusCode)
{
    Console.WriteLine($"Deleted all items in partition, RU cost: {response.Headers.RequestCharge}");
}

Patch Operations

CosmosDB patch operations allow efficient partial updates:

PatchItemObservableAsync()

Observable version of PatchItemAsync<T>() for partial item updates.

var patchOperations = new[]
{
    PatchOperation.Replace("/status", "Inactive"),
    PatchOperation.Set("/modifiedDate", DateTime.UtcNow),
    PatchOperation.Add("/tags/-", "archived")  // Add to array
};

var response = await container.PatchItemObservableAsync<User>(
    id: "user123",
    partitionKey: new PartitionKey("users"),
    patchOperations: patchOperations,
    requestOptions: new PatchItemRequestOptions 
    { 
        IfMatchEtag = user.ETag,
        EnableContentResponseOnWrite = true 
    },
    cancellationToken: cancellationToken
);

var patchedUser = response.Resource;
Console.WriteLine($"Patched user, RU cost: {response.RequestCharge}");

Patch Operation Types: - Replace: Update existing property value - Add: Add new property or append to array - Remove: Delete property or array element
- Set: Add property if missing, replace if exists - Increment: Numeric increment operation

Utility Methods

ReadNextObservableAsync()

Observable version of ReadNextAsync() for FeedIterator processing with enhanced telemetry.

var iterator = container.GetItemQueryIteratorObservable<User>("SELECT * FROM c");

while (iterator.HasMoreResults)
{
    var response = await iterator.ReadNextObservableAsync(cancellationToken);
    
    foreach (var user in response)
    {
        Console.WriteLine($"Processing user: {user.Name}");
    }
    
    Console.WriteLine($"Page RU cost: {response.RequestCharge}");
    Console.WriteLine($"Continuation token: {response.ContinuationToken}");
}

Enhanced Telemetry: - Detailed logging of page results and RU consumption - Automatic query cost metric recording (when configured) - Activity tags for performance monitoring

GetAsyncItems()

Convert FeedIterator to IAsyncEnumerable for modern async processing patterns.

var iterator = container.GetItemQueryIteratorObservable<User>("SELECT * FROM c WHERE c.Status = 'Active'");

// Process items as they arrive
await foreach (var user in iterator.GetAsyncItems())
{
    Console.WriteLine($"Processing user: {user.Name}");
    
    // Process each user individually
    await ProcessUserAsync(user);
}

Benefits: - Memory efficient streaming processing - Compatible with LINQ async operators - Natural integration with async/await patterns

GetItemsAsync()

Materialize all results from a FeedIterator into a collection.

var iterator = container.GetItemQueryIteratorObservable<User>("SELECT * FROM c");

// Get all items at once
var allUsers = await iterator.GetItemsAsync();

Console.WriteLine($"Retrieved {allUsers.Count()} users total");

// Process collection
var activeUsers = allUsers.Where(u => u.Status == "Active").ToList();

Use Cases: - Small result sets that fit in memory - Scenarios requiring full collection operations - Integration with synchronous processing logic

🚀 Advanced Usage

Custom Request Options

All observable methods support the full range of CosmosDB request options:

// Query with custom options
var queryOptions = new QueryRequestOptions
{
    PartitionKey = new PartitionKey("users"),
    MaxItemCount = 50,
    MaxConcurrency = 10,
    EnableScanInQuery = false,
    ConsistencyLevel = ConsistencyLevel.Session,
    IndexingDirective = IndexingDirective.Include,
    ResponseContinuationTokenLimitInKb = 1
};

var iterator = container.GetItemQueryIteratorObservable<User>(
    query: "SELECT * FROM c WHERE c.Status = @status",
    requestOptions: queryOptions
);

// Item operations with custom options
var itemOptions = new ItemRequestOptions
{
    ConsistencyLevel = ConsistencyLevel.Strong,
    EnableContentResponseOnWrite = true,
    IfMatchEtag = existingItem.ETag,
    IfNoneMatchEtag = "*",  // Fail if item exists
    IndexingDirective = IndexingDirective.Exclude,
    PreTriggers = new List<string> { "validateItem" },
    PostTriggers = new List<string> { "updateMetadata" }
};

var response = await container.CreateItemObservableAsync(
    item: newItem,
    partitionKey: partitionKey,
    requestOptions: itemOptions
);

Partition Key Management

Proper partition key handling for optimal performance:

// Explicit partition key specification
var explicitPK = new PartitionKey("user-partition");
var response = await container.ReadItemObservableAsync<User>("user123", explicitPK);

// Hierarchical partition keys (CosmosDB v3.21+)
var hierarchicalPK = new PartitionKeyBuilder()
    .Add("tenant-id")
    .Add("user-group")
    .Add("region")
    .Build();

var item = await container.ReadItemObservableAsync<User>("user123", hierarchicalPK);

// None partition key for single partition containers
var singlePartitionItem = await container.ReadItemObservableAsync<Config>(
    id: "app-config",
    partitionKey: PartitionKey.None
);

Async Enumeration Patterns

Modern async processing with observable operations:

// Streaming processing with backpressure handling
var query = container.GetItemQueryIteratorObservable<User>("SELECT * FROM c");

var processingOptions = new ParallelOptions
{
    MaxDegreeOfParallelism = Environment.ProcessorCount,
    CancellationToken = cancellationToken
};

await foreach (var user in query.GetAsyncItems().WithCancellation(cancellationToken))
{
    // Process users with controlled parallelism
    await Parallel.ForEachAsync(
        new[] { user }, 
        processingOptions, 
        async (u, ct) => await ProcessUserAsync(u, ct)
    );
}

// Batch processing with size control
const int batchSize = 100;
var users = new List<User>(batchSize);

await foreach (var user in query.GetAsyncItems())
{
    users.Add(user);
    
    if (users.Count >= batchSize)
    {
        await ProcessUserBatchAsync(users);
        users.Clear();
    }
}

// Process remaining users
if (users.Count > 0)
{
    await ProcessUserBatchAsync(users);
}

🔧 Troubleshooting

Common Issues

1. Missing Telemetry Data

Symptoms: No logs or activities from observable operations

Solutions:

// Ensure Diginsight observability is registered
builder.Services.AddObservability();

// Verify activity sources are configured
builder.Services.AddOpenTelemetry()
    .WithTracing(tracing => tracing.AddSource("Diginsight.Components.Azure"));

// Check logging configuration
"Diginsight.Components.Azure": "Information"

2. High Memory Usage with Large Queries

Symptoms: Memory exhaustion when processing large result sets

Solutions:

// Use async enumeration instead of materializing all results
await foreach (var item in iterator.GetAsyncItems())
{
    // Process items individually
}

// Instead of:
var allItems = await iterator.GetItemsAsync(); // Loads everything into memory

3. Rate Limiting Not Handled

Symptoms: Frequent 429 (Too Many Requests) exceptions

Solutions:

// Implement proper retry logic
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
    await Task.Delay(ex.RetryAfter ?? TimeSpan.FromSeconds(1));
    // Retry operation
}

// Or use CosmosDB SDK built-in retry
var clientOptions = new CosmosClientOptions
{
    MaxRetryAttemptsOnRateLimitedRequests = 3,
    MaxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds(10)
};

4. Query Cost Metrics Not Appearing

Symptoms: Missing diginsight.query_cost metrics

Solutions:

// Ensure QueryCostMetricRecorder is registered
builder.Services.AddCosmosDbQueryCostMetricRecorder();

// Verify OpenTelemetry meter is configured
.WithMetrics(metrics => metrics.AddMeter("Diginsight.Components.Azure"));

// Use ReadNextObservableAsync() for automatic cost tracking
var response = await iterator.ReadNextObservableAsync();

Debugging

Enable Detailed Logging:

// appsettings.Development.json
{
  "Logging": {
    "LogLevel": {
      "Diginsight.Components.Azure": "Trace",
      "Diginsight.Components.Azure.Extensions.CosmosDbObservableExtensions": "Debug"
    }
  }
}

Activity Debugging:

// Add activity listeners for debugging
ActivitySource.AddActivityListener(new ActivityListener
{
    ShouldListenTo = source => source.Name.StartsWith("Diginsight"),
    Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
    ActivityStarted = activity => Console.WriteLine($"Started: {activity.OperationName}"),
    ActivityStopped = activity => Console.WriteLine($"Stopped: {activity.OperationName} ({activity.Duration.TotalMilliseconds}ms)")
});

📚 Reference

Extension Methods Summary

Method Return Type Purpose
ToFeedIteratorObservable<T>() FeedIterator<T> Convert IQueryable to observable FeedIterator
GetItemQueryIteratorObservable<T>() FeedIterator<T> Observable typed query iterator
GetItemQueryStreamIteratorObservable() FeedIterator Observable stream query iterator
GetItemLinqQueryableObservable<T>() IOrderedQueryable<T> Observable LINQ queryable
CreateItemObservableAsync<T>() Task<ItemResponse<T>> Observable item creation
ReadItemObservableAsync<T>() Task<ItemResponse<T>> Observable item reading
UpsertItemObservableAsync<T>() Task<ItemResponse<T>> Observable item upsert
ReplaceItemObservableAsync<T>() Task<ItemResponse<T>> Observable item replacement
DeleteItemObservableAsync<T>() Task<ItemResponse<T>> Observable item deletion
PatchItemObservableAsync<T>() Task<ItemResponse<T>> Observable item patching
ReadManyItemsObservableAsync<T>() Task<FeedResponse<T>> Observable batch reading
DeleteAllItemsByPartitionKeyStreamObservableAsync() Task<ResponseMessage> Observable partition deletion
ReadNextObservableAsync<T>() Task<FeedResponse<T>> Observable feed iteration
GetAsyncItems<T>() IAsyncEnumerable<T> Async enumerable conversion
GetItemsAsync<T>() Task<IEnumerable<T>> Full result materialization

Activity Tags

Standard tags added to OpenTelemetry activities:

Tag Name Description Example
query SQL query text or operation description "SELECT * FROM c WHERE c.Status = @status"
container CosmosDB container name "users"
database CosmosDB database name "MyApplication"
endpoint CosmosDB service endpoint "https://myaccount.documents.azure.com:443/"
query_cost Request Units consumed (when available) 12.45
operation_type Type of operation performed "read", "query", "create", etc.

Log Messages

Standard log message patterns with emojis for easy identification:

Operation Emoji Pattern
Query 🔍 CosmosDB query for class '{Type}' in database {Endpoint}, container {Container}
Create 📦 CosmosDB create item for class '{Type}' in database {Endpoint}, container {Container}
Read 🔍 CosmosDB read item for id '{Id}' in database {Endpoint}, container {Container}
Upsert 🔄 CosmosDB upsert for class '{Type}' in database {Endpoint}, container {Container}
Replace 🔄 CosmosDB replace item for class '{Type}' with id '{Id}'
Delete 🗑️ CosmosDB delete item for class '{Type}' with id '{Id}'
Patch ✂️ CosmosDB patch item for class '{Type}' with id '{Id}'
Error Error {operation} in CosmosDB for type {Type}: {ErrorMessage}

Entity Logging: - Entity data is logged using the Stringify() extension method for structured representation - Partition keys are logged with their full structure - Patch operations include detailed operation lists

Query Information: - Full query text is logged for debugging - Query parameters are included when using QueryDefinition - Continuation tokens and request options are logged when relevant

💡 Best Practices

Performance Considerations

1. Use Appropriate Method Variants

// For large result sets - use streaming
var streamIterator = container.GetItemQueryStreamIteratorObservable(query);

// For typed results with moderate size - use generic methods
var typedIterator = container.GetItemQueryIteratorObservable<User>(query);

// For small, complete result sets - use materialization
var iterator = container.GetItemQueryIteratorObservable<User>(query);
var allUsers = await iterator.GetItemsAsync();  // Only for small sets

2. Optimize Request Options

// Optimize for performance
var options = new QueryRequestOptions
{
    MaxItemCount = 1000,      // Larger pages = fewer round trips
    MaxConcurrency = 10,      // Parallel execution
    EnableScanInQuery = false, // Avoid scans when possible
    PartitionKey = partitionKey // Single partition queries
};

var iterator = container.GetItemQueryIteratorObservable<User>(query, requestOptions: options);

3. Use Async Enumeration for Memory Efficiency

// Memory efficient - processes items as they arrive
await foreach (var user in iterator.GetAsyncItems())
{
    await ProcessUserAsync(user);
}

// Memory intensive - loads all items first  
var allUsers = await iterator.GetItemsAsync();
foreach (var user in allUsers)
{
    await ProcessUserAsync(user);
}

Error Handling Patterns

1. Specific Exception Handling

try 
{
    var response = await container.ReadItemObservableAsync<User>("user123", partitionKey);
    return response.Resource;
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotFound)
{
    // Handle missing item (logged automatically)
    return null;
}
catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
    // Handle rate limiting (logged automatically)
    await Task.Delay(ex.RetryAfter ?? TimeSpan.FromSeconds(1));
    return await GetUserWithRetryAsync(id, partitionKey);
}

2. Retry Patterns with Observability

public async Task<T> ExecuteWithRetryAsync<T>(Func<Task<T>> operation, int maxRetries = 3)
{
    for (int attempt = 1; attempt <= maxRetries; attempt++)
    {
        try 
        {
            return await operation();
        }
        catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests && attempt < maxRetries)
        {
            // Rate limiting exception automatically logged by observable extension
            var delay = ex.RetryAfter ?? TimeSpan.FromSeconds(Math.Pow(2, attempt));
            await Task.Delay(delay);
        }
    }
    
    // Final attempt without catch
    return await operation();
}

// Usage
var user = await ExecuteWithRetryAsync(async () =>
{
    var response = await container.ReadItemObservableAsync<User>("user123", partitionKey);
    return response.Resource;
});

Observability Guidelines

1. Use Meaningful Operation Names

The observable extensions automatically use the method name as the activity name. For custom operations, consider wrapping calls:

public async Task<User> GetUserProfileAsync(string userId)
{
    using var activity = Observability.ActivitySource.StartMethodActivity(logger, () => new { userId });

    activity?.SetTag("user.id", userId);
    
    var response = await container.ReadItemObservableAsync<User>(
        userId, 
        new PartitionKey("users")
    );
    
    return response.Resource;
}

2. Configure Query Cost Tracking

// Enable query cost metrics for performance monitoring
services.AddCosmosDbQueryCostMetricRecorder(options =>
{
    options.AddQueryCallers = 1;           // Track immediate caller
    options.IgnoreQueryCallers = new[]     // Skip infrastructure methods
    {
        "*Repository.Get*",
        "CosmosDbObservableExtensions.*"
    };
});

3. Monitor Key Metrics

Track these metrics for CosmosDB health: - Query Cost (diginsight.query_cost): Request Unit consumption - Query Duration: Response time patterns
- Error Rates: By operation type and error code - Throughput: Operations per second by container


This documentation covers CosmosDbObservableExtensions v1.0+. For the latest updates and examples, see the Diginsight Components repository.

Back to top