Skip to content

Advanced Topics

Job Filters

Job filters intercept the execution pipeline, providing cross-cutting concerns like logging, metrics, and auditing.

Implementing a Filter

csharp
using GrydJobs.Core.Abstractions;

public class LoggingJobFilter : IJobFilter
{
    public int Order => 0; // Lower = executes first

    private readonly ILogger<LoggingJobFilter> _logger;

    public LoggingJobFilter(ILogger<LoggingJobFilter> logger)
        => _logger = logger;

    public Task OnBeforeExecutionAsync(JobExecutionContext context, CancellationToken ct)
    {
        _logger.LogInformation("Starting job {JobType} (ID: {JobId})",
            context.JobTypeName, context.JobId);
        return Task.CompletedTask;
    }

    public Task OnAfterExecutionAsync(JobExecutionContext context, CancellationToken ct)
    {
        _logger.LogInformation("Completed job {JobType} in {Duration}ms",
            context.JobTypeName, context.Duration?.TotalMilliseconds);
        return Task.CompletedTask;
    }

    public Task OnErrorAsync(JobExecutionContext context, Exception exception, CancellationToken ct)
    {
        _logger.LogError(exception, "Failed job {JobType} (ID: {JobId})",
            context.JobTypeName, context.JobId);
        return Task.CompletedTask;
    }
}

Registering Filters

Filters are registered in DI and resolved automatically:

csharp
builder.Services.AddTransient<IJobFilter, LoggingJobFilter>();
builder.Services.AddTransient<IJobFilter, MetricsJobFilter>();

Observability

Metrics

GrydJobs emits the following metrics via IMetricsCollector:

MetricTypeDescription
gryd_jobs_enqueued_totalCounterTotal jobs enqueued
gryd_jobs_executed_totalCounterTotal jobs executed
gryd_jobs_failed_totalCounterTotal jobs failed
gryd_jobs_deadlettered_totalCounterTotal jobs dead-lettered
gryd_jobs_retried_totalCounterTotal retries attempted
gryd_jobs_cancelled_totalCounterTotal jobs cancelled
gryd_jobs_execution_duration_msHistogramExecution duration
gryd_jobs_queue_depthGaugeCurrent queue depth
gryd_jobs_activeGaugeCurrently processing jobs

All metrics include tags: job.type, job.queue, tenant.id.

Distributed Tracing

GrydJobs creates traces via IActivitySourceProvider:

SpanKindDescription
GrydJobs.ExecuteConsumerJob execution span
GrydJobs.EnqueueProducerJob enqueue span
GrydJobs.ScheduleProducerJob schedule span

Tags: job.type, job.id, job.queue, job.retry_count, job.status.

Testing

Unit Testing Jobs

csharp
[Fact]
public async Task SendEmailJob_SendsEmail()
{
    var sender = Substitute.For<IEmailSender>();
    var job = new SendEmailJob(sender);
    var context = new JobExecutionContext
    {
        JobId = "test-123",
        JobTypeName = nameof(SendEmailJob)
    };

    await job.ExecuteAsync(
        new SendEmailArgs("user@test.com", "Test", "Body"),
        context,
        CancellationToken.None);

    await sender.Received(1).SendAsync(
        "user@test.com", "Test", "Body", Arg.Any<CancellationToken>());
}

Testing with Mocked Scheduler

csharp
[Fact]
public async Task OrderService_EnqueuesConfirmationEmail()
{
    var scheduler = Substitute.For<IJobScheduler>();
    var service = new OrderService(scheduler);

    await service.PlaceOrderAsync(new Order { Email = "test@test.com" });

    await scheduler.Received(1).EnqueueAsync<SendEmailJob, SendEmailArgs>(
        Arg.Is<SendEmailArgs>(a => a.To == "test@test.com"),
        Arg.Any<CancellationToken>());
}

In-Memory Provider for Testing

For integration tests, you can create an in-memory IJobScheduler:

csharp
public class InMemoryJobScheduler : IJobScheduler
{
    public List<(Type JobType, object Args)> EnqueuedJobs { get; } = [];

    public Task<string> EnqueueAsync<TJob, TArgs>(TArgs args, CancellationToken ct)
        where TJob : IJob<TArgs> where TArgs : class
    {
        EnqueuedJobs.Add((typeof(TJob), args));
        return Task.FromResult(Guid.NewGuid().ToString());
    }
    // ... implement other methods as needed
}

Idempotency

For jobs that must not execute more than once (e.g., payment processing), implement idempotency guards:

csharp
public class ProcessPaymentJob : IJob<PaymentArgs>
{
    private readonly IPaymentStore _store;

    public async Task ExecuteAsync(
        PaymentArgs args,
        JobExecutionContext context,
        CancellationToken ct)
    {
        // Check if already processed
        if (await _store.IsProcessedAsync(args.PaymentId, ct))
        {
            return; // Idempotent — skip duplicate execution
        }

        await _store.ProcessAsync(args.PaymentId, ct);
        await _store.MarkProcessedAsync(args.PaymentId, ct);
    }
}

Released under the MIT License.