Appearance
Advanced Topics
Job Filters
Job filters intercept the execution pipeline, providing cross-cutting concerns like logging, metrics, and auditing.
Implementing a Filter
csharp
using GrydJobs.Core.Abstractions;
public class LoggingJobFilter : IJobFilter
{
public int Order => 0; // Lower = executes first
private readonly ILogger<LoggingJobFilter> _logger;
public LoggingJobFilter(ILogger<LoggingJobFilter> logger)
=> _logger = logger;
public Task OnBeforeExecutionAsync(JobExecutionContext context, CancellationToken ct)
{
_logger.LogInformation("Starting job {JobType} (ID: {JobId})",
context.JobTypeName, context.JobId);
return Task.CompletedTask;
}
public Task OnAfterExecutionAsync(JobExecutionContext context, CancellationToken ct)
{
_logger.LogInformation("Completed job {JobType} in {Duration}ms",
context.JobTypeName, context.Duration?.TotalMilliseconds);
return Task.CompletedTask;
}
public Task OnErrorAsync(JobExecutionContext context, Exception exception, CancellationToken ct)
{
_logger.LogError(exception, "Failed job {JobType} (ID: {JobId})",
context.JobTypeName, context.JobId);
return Task.CompletedTask;
}
}Registering Filters
Filters are registered in DI and resolved automatically:
csharp
builder.Services.AddTransient<IJobFilter, LoggingJobFilter>();
builder.Services.AddTransient<IJobFilter, MetricsJobFilter>();Observability
Metrics
GrydJobs emits the following metrics via IMetricsCollector:
| Metric | Type | Description |
|---|---|---|
gryd_jobs_enqueued_total | Counter | Total jobs enqueued |
gryd_jobs_executed_total | Counter | Total jobs executed |
gryd_jobs_failed_total | Counter | Total jobs failed |
gryd_jobs_deadlettered_total | Counter | Total jobs dead-lettered |
gryd_jobs_retried_total | Counter | Total retries attempted |
gryd_jobs_cancelled_total | Counter | Total jobs cancelled |
gryd_jobs_execution_duration_ms | Histogram | Execution duration |
gryd_jobs_queue_depth | Gauge | Current queue depth |
gryd_jobs_active | Gauge | Currently processing jobs |
All metrics include tags: job.type, job.queue, tenant.id.
Distributed Tracing
GrydJobs creates traces via IActivitySourceProvider:
| Span | Kind | Description |
|---|---|---|
GrydJobs.Execute | Consumer | Job execution span |
GrydJobs.Enqueue | Producer | Job enqueue span |
GrydJobs.Schedule | Producer | Job schedule span |
Tags: job.type, job.id, job.queue, job.retry_count, job.status.
Testing
Unit Testing Jobs
csharp
[Fact]
public async Task SendEmailJob_SendsEmail()
{
var sender = Substitute.For<IEmailSender>();
var job = new SendEmailJob(sender);
var context = new JobExecutionContext
{
JobId = "test-123",
JobTypeName = nameof(SendEmailJob)
};
await job.ExecuteAsync(
new SendEmailArgs("user@test.com", "Test", "Body"),
context,
CancellationToken.None);
await sender.Received(1).SendAsync(
"user@test.com", "Test", "Body", Arg.Any<CancellationToken>());
}Testing with Mocked Scheduler
csharp
[Fact]
public async Task OrderService_EnqueuesConfirmationEmail()
{
var scheduler = Substitute.For<IJobScheduler>();
var service = new OrderService(scheduler);
await service.PlaceOrderAsync(new Order { Email = "test@test.com" });
await scheduler.Received(1).EnqueueAsync<SendEmailJob, SendEmailArgs>(
Arg.Is<SendEmailArgs>(a => a.To == "test@test.com"),
Arg.Any<CancellationToken>());
}In-Memory Provider for Testing
For integration tests, you can create an in-memory IJobScheduler:
csharp
public class InMemoryJobScheduler : IJobScheduler
{
public List<(Type JobType, object Args)> EnqueuedJobs { get; } = [];
public Task<string> EnqueueAsync<TJob, TArgs>(TArgs args, CancellationToken ct)
where TJob : IJob<TArgs> where TArgs : class
{
EnqueuedJobs.Add((typeof(TJob), args));
return Task.FromResult(Guid.NewGuid().ToString());
}
// ... implement other methods as needed
}Idempotency
For jobs that must not execute more than once (e.g., payment processing), implement idempotency guards:
csharp
public class ProcessPaymentJob : IJob<PaymentArgs>
{
private readonly IPaymentStore _store;
public async Task ExecuteAsync(
PaymentArgs args,
JobExecutionContext context,
CancellationToken ct)
{
// Check if already processed
if (await _store.IsProcessedAsync(args.PaymentId, ct))
{
return; // Idempotent — skip duplicate execution
}
await _store.ProcessAsync(args.PaymentId, ct);
await _store.MarkProcessedAsync(args.PaymentId, ct);
}
}