< Summary - Kestrun — Combined Coverage

Information
Class: Kestrun.Callback.CallbackWorker
Assembly: Kestrun
File(s): /home/runner/work/Kestrun/Kestrun/src/CSharp/Kestrun/Callback/CallbackWorker.cs
Tag: Kestrun/Kestrun@ca54e35c77799b76774b3805b6f075cdbc0c5fbe
Line coverage
78%
Covered lines: 54
Uncovered lines: 15
Coverable lines: 69
Total lines: 158
Line coverage: 78.2%
Branch coverage
75%
Covered branches: 15
Total branches: 20
Branch coverage: 75%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Coverage history

Coverage history 0 25 50 75 100 01/02/2026 - 00:16:25 Line coverage: 78.2% (54/69) Branch coverage: 75% (15/20) Total lines: 158 Tag: Kestrun/Kestrun@8405dc23b786b9d436fba0d65fb80baa4171e1d0 01/02/2026 - 00:16:25 Line coverage: 78.2% (54/69) Branch coverage: 75% (15/20) Total lines: 158 Tag: Kestrun/Kestrun@8405dc23b786b9d436fba0d65fb80baa4171e1d0

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()75%4487.5%
ProcessOne()100%6676.92%
HandleFailure()100%66100%
<HandleFailure()0%14415.38%
EnqueueAgain()100%11100%

File(s)

/home/runner/work/Kestrun/Kestrun/src/CSharp/Kestrun/Callback/CallbackWorker.cs

#LineLine coverage
 1
 2using System.Threading.Channels;
 3
 4namespace Kestrun.Callback;
 5
 6/// <summary>
 7/// Background worker that processes callback requests from the queue.
 8/// </summary>
 9/// <remarks>
 10/// Initializes a new instance of the <see cref="CallbackWorker"/> class.
 11/// </remarks>
 12/// <param name="queue"> The in-memory callback queue.</param>
 13/// <param name="sender"> The callback sender.</param>
 14/// <param name="retry"> The callback retry policy.</param>
 15/// <param name="log"> The logger.</param>
 16/// <param name="store"> The optional callback store.</param>
 217public sealed class CallbackWorker(
 218    InMemoryCallbackQueue queue,
 219    ICallbackSender sender,
 220    ICallbackRetryPolicy retry,
 221    Serilog.ILogger log,
 222    ICallbackStore? store = null) : BackgroundService
 23{
 224    private readonly ChannelReader<CallbackRequest> _reader = queue.Channel.Reader;
 225    private readonly ChannelWriter<CallbackRequest> _writer = queue.Channel.Writer;
 226    private readonly ICallbackSender _sender = sender;
 227    private readonly ICallbackRetryPolicy _retry = retry;
 228    private readonly ICallbackStore? _store = store; // optional
 229    private readonly Serilog.ILogger _log = log;
 30
 31    /// <summary>
 32    /// Executes the background service.
 33    /// </summary>
 34    /// <param name="stoppingToken"> Cancellation token to stop the service.</param>
 35    /// <returns> A task that represents the background service execution.</returns>
 36    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
 37    {
 38        try
 39        {
 1040            await foreach (var req in _reader.ReadAllAsync(stoppingToken))
 41            {
 42                // Fire-and-limit concurrency via Task.Run? Better: use a SemaphoreSlim
 343                _ = ProcessOne(req, stoppingToken);
 44            }
 045        }
 246        catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
 47        {
 48            // Graceful shutdown
 249            if (_log.IsEnabled(Serilog.Events.LogEventLevel.Information))
 50            {
 251                _log.Information("CallbackWorker is stopping due to cancellation.");
 52            }
 253        }
 254    }
 55
 56    /// <summary>
 57    /// Processes a single callback request.
 58    /// </summary>
 59    /// <param name="req"> The callback request to process.</param>
 60    /// <param name="ct">  The cancellation token.</param>
 61    /// <returns> A task that represents the asynchronous operation.</returns>
 62    private async Task ProcessOne(CallbackRequest req, CancellationToken ct)
 63    {
 64        try
 65        {
 366            if (_store is not null)
 67            {
 368                await _store.MarkInFlightAsync(req, ct);
 69            }
 70
 371            var result = await _sender.SendAsync(req, ct);
 72
 373            if (result.Success)
 74            {
 175                if (_store is not null)
 76                {
 177                    await _store.MarkSucceededAsync(req, result, ct);
 78                }
 79
 180                return;
 81            }
 82
 283            await HandleFailure(req, result, ct);
 284        }
 085        catch (Exception ex)
 86        {
 087            var result = new CallbackResult(false, null, ex.GetType().Name, ex.Message, DateTimeOffset.UtcNow);
 088            await HandleFailure(req, result, ct);
 89        }
 390    }
 91
 92    /// <summary>
 93    /// Handles a failed callback request.
 94    /// </summary>
 95    /// <param name="req"> The callback request that failed.</param>
 96    /// <param name="result"> The result of the callback attempt.</param>
 97    /// <param name="ct"> The cancellation token.</param>
 98    /// <returns> A task that represents the asynchronous operation.</returns>
 99    private async Task HandleFailure(CallbackRequest req, CallbackResult result, CancellationToken ct)
 100    {
 2101        var decision = _retry.Evaluate(req, result);
 102
 2103        if (decision.Kind == RetryDecisionKind.Retry)
 104        {
 1105            req.Attempt++;
 1106            req.NextAttemptAt = decision.NextAttemptAt;
 107
 1108            if (_store is not null)
 109            {
 1110                await _store.MarkRetryScheduledAsync(req, result, ct);
 111            }
 112
 113            // schedule re-enqueue (simple in-memory) or rely on durable poller
 1114            _ = Task.Delay(decision.Delay, ct).ContinueWith(async _ =>
 1115            {
 1116                // ignore exceptions if shutting down
 2117                try { await EnqueueAgain(req, ct); }
 0118                catch (OperationCanceledException)
 1119                {
 1120                    // expected during shutdown; no further action required
 0121                    if (_log.IsEnabled(Serilog.Events.LogEventLevel.Debug))
 1122                    {
 0123                        _log.Debug("Callback {CallbackId} re-enqueue skipped due to shutdown.",
 0124                            req.CallbackId);
 1125                    }
 0126                }
 0127                catch (Exception ex)
 1128                {
 0129                    if (_log.IsEnabled(Serilog.Events.LogEventLevel.Debug))
 1130                    {
 0131                        _log.Debug(ex,
 0132                            "Failed to re-enqueue callback {CallbackId} during retry scheduling.",
 0133                            req.CallbackId);
 1134                    }
 0135                }
 2136            }, TaskScheduler.Default);
 137
 1138            return;
 139        }
 140
 1141        if (_store is not null)
 142        {
 1143            await _store.MarkFailedPermanentAsync(req, result, ct);
 144        }
 145
 1146        _log.Warning("Callback failed permanently {CallbackId} after {Attempts} attempts. Last error: {Err}",
 1147            req.CallbackId, req.Attempt + 1, result.ErrorMessage);
 2148    }
 149
 150    /// <summary>
 151    /// Enqueues the callback request again for retry.
 152    /// </summary>
 153    /// <param name="req"> The callback request to enqueue again.</param>
 154    /// <param name="ct"> The cancellation token.</param>
 155    /// <returns> A task that represents the asynchronous operation.</returns>
 156    private async Task EnqueueAgain(CallbackRequest req, CancellationToken ct)
 1157        => await _writer.WriteAsync(req, ct).ConfigureAwait(false);
 158}