| | | 1 | | using System.Net.Http.Headers; |
| | | 2 | | using System.Net.Sockets; |
| | | 3 | | using System.Security.Authentication; |
| | | 4 | | |
| | | 5 | | namespace Kestrun.Callback; |
| | | 6 | | |
| | | 7 | | /// <summary> |
| | | 8 | | /// In-memory implementation of <see cref="ICallbackDispatcher"/>. |
| | | 9 | | /// Enqueues callback requests into an in-memory queue for processing. |
| | | 10 | | /// </summary> |
| | | 11 | | /// <remarks> |
| | | 12 | | /// Initializes a new instance of the <see cref="InMemoryCallbackDispatchWorker"/> class. |
| | | 13 | | /// </remarks> |
| | | 14 | | /// <param name="queue">The in-memory callback queue.</param> |
| | | 15 | | /// <param name="httpClientFactory">The HTTP client factory.</param> |
| | | 16 | | /// <param name="log">The logger instance.</param> |
| | 1 | 17 | | public sealed class InMemoryCallbackDispatchWorker( |
| | 1 | 18 | | InMemoryCallbackQueue queue, |
| | 1 | 19 | | IHttpClientFactory httpClientFactory, |
| | 1 | 20 | | Serilog.ILogger log) : BackgroundService |
| | | 21 | | { |
| | 1 | 22 | | private readonly InMemoryCallbackQueue _queue = queue; |
| | 1 | 23 | | private readonly IHttpClientFactory _httpClientFactory = httpClientFactory; |
| | 1 | 24 | | private readonly Serilog.ILogger _log = log; |
| | | 25 | | |
| | | 26 | | /// <summary> |
| | | 27 | | /// Executes the callback dispatching process. |
| | | 28 | | /// </summary> |
| | | 29 | | /// <param name="stoppingToken">The token to monitor for cancellation requests.</param> |
| | | 30 | | /// <returns>A task that represents the asynchronous operation.</returns> |
| | | 31 | | protected override async Task ExecuteAsync(CancellationToken stoppingToken) |
| | | 32 | | { |
| | 1 | 33 | | var httpClient = _httpClientFactory.CreateClient("kestrun-callbacks"); |
| | | 34 | | |
| | 2 | 35 | | while (!stoppingToken.IsCancellationRequested) |
| | | 36 | | { |
| | | 37 | | CallbackRequest req; |
| | | 38 | | try |
| | | 39 | | { |
| | 1 | 40 | | req = await _queue.Channel.Reader.ReadAsync(stoppingToken).ConfigureAwait(false); |
| | 1 | 41 | | } |
| | 0 | 42 | | catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) |
| | | 43 | | { |
| | 0 | 44 | | break; |
| | | 45 | | } |
| | | 46 | | |
| | | 47 | | try |
| | | 48 | | { |
| | 1 | 49 | | await DispatchOneAsync(httpClient, req, stoppingToken).ConfigureAwait(false); |
| | 1 | 50 | | } |
| | 0 | 51 | | catch (Exception ex) |
| | | 52 | | { |
| | 0 | 53 | | _log.Error(ex, "Callback dispatch failed. CallbackId={CallbackId} Url={Url}", |
| | 0 | 54 | | req.CallbackId, req.TargetUrl); |
| | | 55 | | |
| | | 56 | | // TODO: retry / dead-letter policy |
| | 0 | 57 | | } |
| | 1 | 58 | | } |
| | 1 | 59 | | } |
| | | 60 | | |
| | | 61 | | /// <summary> |
| | | 62 | | /// Dispatches a single callback request. |
| | | 63 | | /// </summary> |
| | | 64 | | /// <param name="httpClient">The HTTP client used to send the request.</param> |
| | | 65 | | /// <param name="req">The callback request to be dispatched.</param> |
| | | 66 | | /// <param name="token">The cancellation token to observe.</param> |
| | | 67 | | /// <returns>A task that represents the asynchronous operation.</returns> |
| | | 68 | | private async Task DispatchOneAsync(HttpClient httpClient, CallbackRequest req, CancellationToken token) |
| | | 69 | | { |
| | 1 | 70 | | using var msg = new HttpRequestMessage(new HttpMethod(req.HttpMethod), req.TargetUrl); |
| | | 71 | | |
| | 1 | 72 | | if (req.Body is not null) |
| | | 73 | | { |
| | 1 | 74 | | msg.Content = new ByteArrayContent(req.Body); |
| | | 75 | | // Set Content-Type explicitly (avoids overload confusion) |
| | 1 | 76 | | if (!string.IsNullOrWhiteSpace(req.ContentType)) |
| | | 77 | | { |
| | 1 | 78 | | msg.Content.Headers.ContentType = MediaTypeHeaderValue.Parse(req.ContentType); |
| | | 79 | | } |
| | | 80 | | } |
| | | 81 | | |
| | 4 | 82 | | foreach (var h in req.Headers) |
| | | 83 | | { |
| | | 84 | | // Content headers must go on Content, not on msg.Headers |
| | 1 | 85 | | if (!msg.Headers.TryAddWithoutValidation(h.Key, h.Value)) |
| | | 86 | | { |
| | 0 | 87 | | _ = (msg.Content?.Headers.TryAddWithoutValidation(h.Key, h.Value)); |
| | | 88 | | } |
| | | 89 | | } |
| | | 90 | | |
| | 1 | 91 | | _log.Information("Sending callback. CallbackId={CallbackId} Url={Url}", req.CallbackId, req.TargetUrl); |
| | | 92 | | |
| | 1 | 93 | | using var resp = await SendCallbackAsync(httpClient, msg, token).ConfigureAwait(false); |
| | | 94 | | |
| | 1 | 95 | | if ((int)resp.StatusCode >= 500) |
| | | 96 | | { |
| | 0 | 97 | | var body = await resp.Content.ReadAsStringAsync(token).ConfigureAwait(false); |
| | 0 | 98 | | _log.Warning("Callback got {StatusCode} (server error). CallbackId={CallbackId} Url={Url} BodySnippet={Body} |
| | 0 | 99 | | (int)resp.StatusCode, req.CallbackId, req.TargetUrl, Snip(body, 500)); |
| | | 100 | | |
| | | 101 | | // TODO: retry |
| | 0 | 102 | | return; |
| | | 103 | | } |
| | | 104 | | |
| | 1 | 105 | | if ((int)resp.StatusCode >= 400) |
| | | 106 | | { |
| | 0 | 107 | | var body = await resp.Content.ReadAsStringAsync(token).ConfigureAwait(false); |
| | 0 | 108 | | _log.Warning("Callback rejected {StatusCode}. CallbackId={CallbackId} Url={Url} BodySnippet={Body}", |
| | 0 | 109 | | (int)resp.StatusCode, req.CallbackId, req.TargetUrl, Snip(body, 500)); |
| | | 110 | | |
| | | 111 | | // TODO: dead-letter (usually) |
| | 0 | 112 | | return; |
| | | 113 | | } |
| | | 114 | | |
| | 1 | 115 | | _log.Information("Callback delivered. CallbackId={CallbackId} Status={StatusCode}", |
| | 1 | 116 | | req.CallbackId, (int)resp.StatusCode); |
| | 1 | 117 | | } |
| | | 118 | | |
| | | 119 | | private async Task<HttpResponseMessage> SendCallbackAsync( |
| | | 120 | | HttpClient httpClient, |
| | | 121 | | HttpRequestMessage request, |
| | | 122 | | CancellationToken token) |
| | | 123 | | { |
| | | 124 | | try |
| | | 125 | | { |
| | 1 | 126 | | return await httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token) |
| | 1 | 127 | | .ConfigureAwait(false); |
| | | 128 | | } |
| | 0 | 129 | | catch (TaskCanceledException ex) |
| | | 130 | | { |
| | 0 | 131 | | _log.Warning(ex, "Callback timed out. Url={Url}", request.RequestUri); |
| | 0 | 132 | | throw; |
| | | 133 | | } |
| | 0 | 134 | | catch (HttpRequestException ex) when (ex.InnerException is SocketException se) |
| | | 135 | | { |
| | 0 | 136 | | _log.Error(ex, "Callback DNS/connect failure. SocketError={SocketError} Url={Url}", |
| | 0 | 137 | | se.SocketErrorCode, request.RequestUri); |
| | 0 | 138 | | throw; |
| | | 139 | | } |
| | 0 | 140 | | catch (HttpRequestException ex) when (ex.InnerException is AuthenticationException or IOException) |
| | | 141 | | { |
| | 0 | 142 | | _log.Error(ex, "Callback TLS/SSL failure. Url={Url}", request.RequestUri); |
| | 0 | 143 | | throw; |
| | | 144 | | } |
| | 0 | 145 | | catch (HttpRequestException ex) |
| | | 146 | | { |
| | 0 | 147 | | _log.Error(ex, "Callback HTTP request failure. Url={Url}", request.RequestUri); |
| | 0 | 148 | | throw; |
| | | 149 | | } |
| | 1 | 150 | | } |
| | | 151 | | |
| | | 152 | | private static string Snip(string? s, int max) |
| | 0 | 153 | | => string.IsNullOrEmpty(s) ? "" : (s.Length <= max ? s : s[..max]); |
| | | 154 | | } |