Pipeline Behaviors¶
AlexaVoxCraft provides a comprehensive pipeline system for implementing cross-cutting concerns through request and response interceptors, enabling clean separation of business logic from infrastructure concerns.
🎯 Trivia Skill Examples: All code examples show implementing trivia game infrastructure including player data loading, game state persistence, performance monitoring, and error handling for quiz interactions.
Features¶
Request/Response Interceptors: Pre and post-processing pipeline
Observability: Built-in logging, tracing, and metrics
User Management: Authentication and session handling
Error Handling: Centralized exception processing and recovery
Performance Monitoring: Request timing and performance metrics
Auto-Registration: Automatic interceptor discovery and registration
Basic Usage¶
Request Interceptors¶
public class LoggingRequestInterceptor : IRequestInterceptor
{
private readonly ILogger<LoggingRequestInterceptor> _logger;
public LoggingRequestInterceptor(ILogger<LoggingRequestInterceptor> logger)
{
_logger = logger;
}
public Task Process(IHandlerInput input, CancellationToken cancellationToken = default)
{
var request = input.RequestEnvelope.Request;
var userId = input.RequestEnvelope.GetUserId();
_logger.LogInformation("Processing {RequestType} for user {UserId}",
request.GetType().Name,
userId);
if (request is IntentRequest intentRequest)
{
_logger.LogDebug("Intent: {IntentName} with slots: {@Slots}",
intentRequest.Intent.Name,
intentRequest.Intent.Slots);
}
return Task.CompletedTask;
}
}
Response Interceptors¶
public class MetricsResponseInterceptor : IResponseInterceptor
{
private readonly ILogger<MetricsResponseInterceptor> _logger;
private readonly Counter<int> _responseCounter;
public MetricsResponseInterceptor(ILogger<MetricsResponseInterceptor> logger)
{
_logger = logger;
_responseCounter = Metrics.CreateCounter<int>("alexa_responses_total", "Total responses sent");
}
public Task Process(IHandlerInput input, SkillResponse response, CancellationToken cancellationToken = default)
{
var requestType = input.RequestEnvelope.Request.GetType().Name;
var hasCards = response.Response?.Card != null;
var hasDirectives = response.Response?.Directives?.Any() == true;
_responseCounter.Add(1, new KeyValuePair<string, object>[]
{
new("request_type", requestType),
new("has_card", hasCards),
new("has_directives", hasDirectives)
});
_logger.LogDebug("Response sent for {RequestType}: cards={HasCards}, directives={HasDirectives}",
requestType, hasCards, hasDirectives);
return Task.CompletedTask;
}
}
Game Manager Interceptors¶
Request Interceptor for State Loading¶
From the trivia skill implementation:
public class GameServiceRequestInterceptor : IRequestInterceptor
{
private readonly IGameService _gameService;
private readonly ILogger<GameServiceRequestInterceptor> _logger;
private readonly ActivitySource _activitySource = new("TriviaSkill");
public GameServiceRequestInterceptor(IGameService gameService,
ILogger<GameServiceRequestInterceptor> logger)
{
_gameService = gameService ?? throw new ArgumentNullException(nameof(gameService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task Process(IHandlerInput input, CancellationToken cancellationToken = default)
{
var request = input.RequestEnvelope;
using var activity = DiagnosticsConfig.Source.StartActivityWithTags(
$"{nameof(GameServiceRequestInterceptor)}.{nameof(Process)}", [
new(DiagnosticsNames.RpcService, nameof(GameServiceRequestInterceptor)),
new(DiagnosticsNames.RpcSystem, DiagnosticsConfig.SystemName),
new(DiagnosticsNames.RequestType, request.Request.Type)
]);
if (request.Request is IntentRequest intent)
{
_logger.LogDebug("Received intent {intentType}", intent.Intent.Name);
activity?.SetTag(DiagnosticsNames.IntentType, intent.Intent.Name);
}
var userId = input.RequestEnvelope.GetUserId();
_logger.LogDebug("Loading player for {userId}", userId);
// Load current player and game state in parallel
var sessionAttributes = await input.AttributesManager.GetSessionAttributes(cancellationToken);
var loadCurrentPlayerTask = _gameService.LoadCurrentPlayer(userId, cancellationToken);
var loadCurrentGameTask = _gameService.LoadCurrentGame(sessionAttributes, cancellationToken);
var tasks = new[] { loadCurrentGameTask, loadCurrentPlayerTask };
try
{
await tasks.WhenAll();
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (AggregateException ae)
{
foreach (var innerException in ae.InnerExceptions)
{
_logger.LogError(innerException, innerException.Message);
}
activity?.AddException(ae);
activity?.SetStatus(ActivityStatusCode.Error, ae.Message);
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error occurred");
activity?.AddException(ex);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
}
}
}
Response Interceptor for State Saving¶
public class GameServiceResponseInterceptor : IResponseInterceptor
{
private readonly IGameService _gameService;
private readonly ILogger<GameServiceResponseInterceptor> _logger;
public GameServiceResponseInterceptor(IGameService gameService,
ILogger<GameServiceResponseInterceptor> logger)
{
_gameService = gameService ?? throw new ArgumentNullException(nameof(gameService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task Process(IHandlerInput input, SkillResponse response, CancellationToken cancellationToken = default)
{
using var activity = DiagnosticsConfig.Source.StartActivityWithTags(
$"{nameof(GameServiceResponseInterceptor)}.{nameof(Process)}", [
new(DiagnosticsNames.RpcService, nameof(GameServiceResponseInterceptor)),
new(DiagnosticsNames.RpcSystem, DiagnosticsConfig.SystemName)
]);
try
{
// Save game state to session attributes
var sessionAttributes = await input.AttributesManager.GetSessionAttributes(cancellationToken);
await _gameService.SaveCurrentGame(sessionAttributes, cancellationToken);
await input.AttributesManager.SetSessionAttributes(sessionAttributes, cancellationToken);
// Save player data to persistent storage
await _gameService.SaveCurrentPlayer(cancellationToken);
_logger.LogDebug("Successfully saved game and player state");
activity?.SetStatus(ActivityStatusCode.Ok);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error saving game state");
activity?.AddException(ex);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
}
}
}
Localization Interceptors¶
Request Interceptor for Localization¶
public class LocalizationRequestInterceptor : IRequestInterceptor
{
private readonly ILogger<LocalizationRequestInterceptor> _logger;
public LocalizationRequestInterceptor(ILogger<LocalizationRequestInterceptor> logger)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public Task Process(IHandlerInput input, CancellationToken cancellationToken = default)
{
var locale = input.RequestEnvelope.Request.Locale ?? "en-US";
// Set culture for string resources
var culture = new CultureInfo(locale);
CultureInfo.CurrentCulture = culture;
CultureInfo.CurrentUICulture = culture;
_logger.LogDebug("Set locale to {locale}", locale);
// Store locale in request context for later use
input.RequestEnvelope.Context ??= new Context();
input.RequestEnvelope.Context.Extensions ??= new Dictionary<string, object>();
input.RequestEnvelope.Context.Extensions["locale"] = locale;
return Task.CompletedTask;
}
}
Activity Source Decorators¶
Request Handler Decorator¶
public class ActivitySourceRequestHandlerDecorator<TRequest> : IRequestHandler<TRequest>
where TRequest : Request
{
private readonly IRequestHandler<TRequest> _decorated;
private readonly ActivitySource _activitySource;
public ActivitySourceRequestHandlerDecorator(IRequestHandler<TRequest> decorated)
{
_decorated = decorated;
_activitySource = new ActivitySource("TriviaSkill");
}
public Task<bool> CanHandle(IHandlerInput handlerInput, CancellationToken cancellationToken = default)
{
return _decorated.CanHandle(handlerInput, cancellationToken);
}
public async Task<SkillResponse> Handle(IHandlerInput handlerInput, CancellationToken cancellationToken = default)
{
using var activity = _activitySource.StartActivityWithTags($"{_decorated.GetType().Name}.Handle", [
new(DiagnosticsNames.RpcService, _decorated.GetType().Name),
new(DiagnosticsNames.RpcSystem, DiagnosticsConfig.SystemName),
new(DiagnosticsNames.RequestType, typeof(TRequest).Name),
new(DiagnosticsNames.UserId, handlerInput.RequestEnvelope.GetUserId())
]);
if (handlerInput.RequestEnvelope.Request is IntentRequest intent)
{
activity?.SetTag(DiagnosticsNames.IntentType, intent.Intent.Name);
}
try
{
var result = await _decorated.Handle(handlerInput, cancellationToken);
activity?.SetStatus(ActivityStatusCode.Ok);
return result;
}
catch (Exception ex)
{
activity?.AddException(ex);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}
}
// Service registration with Scrutor
services.Decorate<IRequestHandler<IntentRequest>, ActivitySourceRequestHandlerDecorator<IntentRequest>>();
services.Decorate<IRequestHandler<LaunchRequest>, ActivitySourceRequestHandlerDecorator<LaunchRequest>>();
services.Decorate<IRequestHandler<UserEventRequest>, ActivitySourceRequestHandlerDecorator<UserEventRequest>>();
services.Decorate<IRequestHandler<SessionEndedRequest>, ActivitySourceRequestHandlerDecorator<SessionEndedRequest>>();
Diagnostics Configuration¶
Activity Source Setup¶
public static class DiagnosticsConfig
{
public static readonly ActivitySource Source = new("TriviaSkill");
public static readonly string ServiceName = "TriviaSkill";
public static readonly string SystemName = "AlexaVoxCraft";
// Metrics
public static readonly Counter<int> CorrectCounter =
Metrics.CreateCounter<int>("trivia_correct_answers", "Number of correct answers");
public static readonly Counter<int> IncorrectCounter =
Metrics.CreateCounter<int>("trivia_incorrect_answers", "Number of incorrect answers");
public static readonly Histogram<double> RequestDuration =
Metrics.CreateHistogram<double>("alexa_request_duration", "Request processing duration");
}
public static class DiagnosticsNames
{
public const string RpcService = "rpc.service";
public const string RpcSystem = "rpc.system";
public const string RequestType = "alexa.request.type";
public const string IntentType = "alexa.intent.name";
public const string UserId = "alexa.user.id";
}
Extension Methods for Activities¶
public static class ActivityExtensions
{
public static Activity? StartActivityWithTags(this ActivitySource source, string name,
KeyValuePair<string, object?>[] tags)
{
var activity = source.StartActivity(name);
if (activity != null)
{
foreach (var tag in tags)
{
activity.SetTag(tag.Key, tag.Value);
}
}
return activity;
}
public static void AddException(this Activity? activity, Exception exception)
{
if (activity == null) return;
activity.SetTag("exception.type", exception.GetType().FullName);
activity.SetTag("exception.message", exception.Message);
activity.SetTag("exception.stacktrace", exception.StackTrace);
}
}
Performance Monitoring¶
Request Timing Interceptor¶
public class PerformanceRequestInterceptor : IRequestInterceptor
{
private readonly ILogger<PerformanceRequestInterceptor> _logger;
private readonly Histogram<double> _requestDuration;
public PerformanceRequestInterceptor(ILogger<PerformanceRequestInterceptor> logger)
{
_logger = logger;
_requestDuration = Metrics.CreateHistogram<double>("alexa_request_duration_seconds", "Request processing duration");
}
public Task Process(IHandlerInput input, CancellationToken cancellationToken = default)
{
var stopwatch = Stopwatch.StartNew();
input.RequestEnvelope.Context ??= new Context();
input.RequestEnvelope.Context.Extensions ??= new Dictionary<string, object>();
input.RequestEnvelope.Context.Extensions["performance_stopwatch"] = stopwatch;
return Task.CompletedTask;
}
}
public class PerformanceResponseInterceptor : IResponseInterceptor
{
private readonly ILogger<PerformanceResponseInterceptor> _logger;
private readonly Histogram<double> _requestDuration;
public PerformanceResponseInterceptor(ILogger<PerformanceResponseInterceptor> logger)
{
_logger = logger;
_requestDuration = Metrics.CreateHistogram<double>("alexa_request_duration_seconds", "Request processing duration");
}
public Task Process(IHandlerInput input, SkillResponse response, CancellationToken cancellationToken = default)
{
if (input.RequestEnvelope.Context?.Extensions?.TryGetValue("performance_stopwatch", out var stopwatchObj) == true &&
stopwatchObj is Stopwatch stopwatch)
{
stopwatch.Stop();
var duration = stopwatch.Elapsed.TotalSeconds;
var requestType = input.RequestEnvelope.Request.GetType().Name;
_requestDuration.Record(duration, new KeyValuePair<string, object>[]
{
new("request_type", requestType)
});
if (duration > 5.0) // Log slow requests
{
_logger.LogWarning("Slow request detected: {RequestType} took {Duration:F2}s",
requestType, duration);
}
else
{
_logger.LogDebug("Request {RequestType} completed in {Duration:F2}s",
requestType, duration);
}
}
return Task.CompletedTask;
}
}
Error Handling Pipeline¶
Global Exception Handler¶
public class GlobalExceptionHandler : IExceptionHandler
{
private readonly ILogger<GlobalExceptionHandler> _logger;
private readonly Counter<int> _errorCounter;
public GlobalExceptionHandler(ILogger<GlobalExceptionHandler> logger)
{
_logger = logger;
_errorCounter = Metrics.CreateCounter<int>("alexa_errors_total", "Total errors");
}
public Task<bool> CanHandle(IHandlerInput handlerInput, Exception ex, CancellationToken cancellationToken)
{
return Task.FromResult(true); // Handle all exceptions
}
public Task<SkillResponse> Handle(IHandlerInput handlerInput, Exception ex, CancellationToken cancellationToken)
{
var requestType = handlerInput.RequestEnvelope.Request.GetType().Name;
var userId = handlerInput.RequestEnvelope.GetUserId();
_errorCounter.Add(1, new KeyValuePair<string, object>[]
{
new("request_type", requestType),
new("exception_type", ex.GetType().Name)
});
_logger.LogError(ex, "Unhandled exception in {RequestType} for user {UserId}: {Message}",
requestType, userId, ex.Message);
var speechText = ex switch
{
TimeoutException => "Sorry, that took too long. Please try again.",
ArgumentException => "Sorry, there was a problem with your request. Please try again.",
InvalidOperationException => "Sorry, I can't do that right now. Please try again later.",
_ => "Sorry, something went wrong. Please try again."
};
return handlerInput.ResponseBuilder
.Speak(speechText)
.Reprompt("Is there anything else I can help you with?")
.GetResponse(cancellationToken);
}
}
Task Extension Utilities¶
Parallel Task Execution¶
public static class TaskExtensions
{
public static async Task WhenAll(this Task[] tasks)
{
try
{
await Task.WhenAll(tasks);
}
catch (Exception)
{
// Examine each task to see which ones faulted
var exceptions = new List<Exception>();
foreach (var task in tasks)
{
if (task.IsFaulted && task.Exception != null)
{
exceptions.AddRange(task.Exception.InnerExceptions);
}
}
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
throw;
}
}
public static async Task<T[]> WhenAll<T>(this Task<T>[] tasks)
{
try
{
return await Task.WhenAll(tasks);
}
catch (Exception)
{
var exceptions = new List<Exception>();
var results = new List<T>();
foreach (var task in tasks)
{
if (task.IsFaulted && task.Exception != null)
{
exceptions.AddRange(task.Exception.InnerExceptions);
}
else if (task.IsCompletedSuccessfully)
{
results.Add(task.Result);
}
}
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
return results.ToArray();
}
}
}
Registration and Configuration¶
Automatic Registration¶
// In your AlexaSkillFunction
protected override void Init(IHostBuilder builder)
{
builder.ConfigureServices((context, services) =>
{
// Auto-register interceptors from assembly
services.AddSkillMediator(context.Configuration, cfg =>
cfg.RegisterServicesFromAssemblyContaining<Program>());
// Manual interceptor registration (if needed)
services.AddScoped<IRequestInterceptor, GameServiceRequestInterceptor>();
services.AddScoped<IRequestInterceptor, LocalizationRequestInterceptor>();
services.AddScoped<IRequestInterceptor, PerformanceRequestInterceptor>();
services.AddScoped<IResponseInterceptor, GameServiceResponseInterceptor>();
services.AddScoped<IResponseInterceptor, MetricsResponseInterceptor>();
services.AddScoped<IResponseInterceptor, PerformanceResponseInterceptor>();
// Exception handlers
services.AddScoped<IExceptionHandler, GlobalExceptionHandler>();
// Decorator registration with Scrutor
services.Decorate<IRequestHandler<IntentRequest>, ActivitySourceRequestHandlerDecorator<IntentRequest>>();
services.Decorate<IRequestHandler<LaunchRequest>, ActivitySourceRequestHandlerDecorator<LaunchRequest>>();
});
}
Conditional Registration¶
// Environment-specific interceptors
if (context.HostingEnvironment.IsDevelopment())
{
services.AddScoped<IRequestInterceptor, DebugRequestInterceptor>();
services.AddScoped<IResponseInterceptor, DebugResponseInterceptor>();
}
if (context.Configuration.GetValue<bool>("EnableDetailedMetrics"))
{
services.AddScoped<IResponseInterceptor, DetailedMetricsInterceptor>();
}
Best Practices¶
1. Keep Interceptors Focused¶
Each interceptor should have a single responsibility:
// ✅ Good - Single responsibility
public class AuthenticationInterceptor : IRequestInterceptor { }
public class LoggingInterceptor : IRequestInterceptor { }
// ❌ Avoid - Multiple responsibilities
public class AuthenticationAndLoggingInterceptor : IRequestInterceptor { }
2. Handle Exceptions Gracefully¶
public async Task Process(IHandlerInput input, CancellationToken cancellationToken)
{
try
{
// Interceptor logic
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in {InterceptorName}", GetType().Name);
// Don't rethrow unless critical - let the request continue
}
}
3. Use Structured Logging¶
_logger.LogInformation("Processing {RequestType} for user {UserId} in {Duration}ms",
requestType,
userId,
stopwatch.ElapsedMilliseconds);
4. Implement Proper Cancellation¶
public async Task Process(IHandlerInput input, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
await _service.DoWorkAsync(cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
}
Examples¶
For complete pipeline behavior examples, see the Examples section with the trivia skill's comprehensive interceptor implementation.