In this post you’ll learn how to create a resilient webhook using AWS CDK. The architecture ensures scalability, reliability, and fault tolerance.
AWS Service we’ll use:
- SQS
- API Gateway
- EventBridge Pipes
- Step Functions
Let’s break down the architecture step-by-step.
We’ll start by integrating API Gateway directly with SQS. This will allows us to handle incoming webhook requests at scale and buffer requests before they reach downstream services. Using SQS means our services don’t need to scale instantly in response to traffic spikes, which helps maintain stability. Attaching a Dead Letter Queue (DLQ) ensures that messages will never lost and can be inspected to be debug or retried if processing fails.
To send messages to our SQS we need to define a Role which later on we will be attaching it to the API Integration.
/* ----------------- SQS ----------------- */
const dlq = new Queue(this, 'WebhookDLQ', {
queueName: 'webhook-dlq',
retentionPeriod: Duration.days(14),
});
const webhookQueue = new Queue(this, 'WebhookQueue', {
deadLetterQueue: {
queue: dlq,
maxReceiveCount: 3,
},
queueName: 'webhook-queue',
retentionPeriod: Duration.days(7),
});
// We create a Role for the API Gateway to be able to send messages to the SQS queue
const webhookSubscriptionAPIRole = new Role(
this,
'WebhookSubscriptionApiRole',
{
assumedBy: new ServicePrincipal('apigateway.amazonaws.com'),
inlinePolicies: {
sqsSendMessage: new PolicyDocument({
statements: [
new PolicyStatement({
actions: ['sqs:SendMessage'],
effect: Effect.ALLOW,
resources: [webhookQueue.queueArn],
}),
],
}),
},
},
);
const sqsIntegration = new apigateway.AwsIntegration({
integrationHttpMethod: 'POST',
options: {
credentialsRole: webhookSubscriptionAPIRole,
passthroughBehavior: PassthroughBehavior.NEVER,
integrationResponses: [
{
responseTemplates: {
'application/json': '{}',
},
statusCode: '200',
},
],
requestParameters: {
'integration.request.header.Content-Type': `'application/x-www-form-urlencoded'`,
},
// We extract the body from the request and send it to the SQS queue
requestTemplates: {
'application/json':
'Action=SendMessage&MessageBody={"body": $util.urlEncode($input.body)}',
},
},
path: `${this.account}/${webhookQueue.queueName}`,
region: this.region,
service: 'sqs',
});
The point of entry and our first layer of protection. It provides a secure, scalable endpoint that can validate incoming requests before they hit our SQS queue.
In a previous post I talked about how to do this validation at the level of API Gateway with Zod.
I’m going to use a schema which represents a User.
import { z } from 'zod';
export const userSchema = z.object({
name: z.string(), // Name must be a string
age: z.number().int().min(18), // User's age must be an integer greater than 18
email: z.string().email(), // Must be a valid email
});
export type UserEventType = z.infer<typeof userSchema>;
And use the library zod-to-json-schema to generate a Model to be used to do the validation in our Stack.
/* ----------------- API Gateway ----------------- */
// We create an API Gateway
const myWebhookAPI = new RestApi(this, 'MyAPI', {
restApiName: 'my-webhook-api',
});
// Add the POST method to the API Gateway with the SQS integration and the request validator
myWebhookAPI.root.addMethod('POST', sqsIntegration, {
requestValidatorOptions: {
requestValidatorName: 'webhook-request-validator',
validateRequestBody: true,
},
requestModels: {
'application/json': new Model(this, 'webhook-request-model', {
restApi: myWebhookAPI,
contentType: 'application/json',
description: 'Validation model for the request body',
modelName: 'myRequestJsonSchema',
schema: myRequestJsonSchema,
}),
},
methodResponses: [
{
statusCode: '200',
responseModels: {
'application/json': Model.EMPTY_MODEL,
},
},
{
statusCode: '400',
responseModels: {
'application/json': Model.ERROR_MODEL,
},
},
{
statusCode: '500',
responseModels: {
'application/json': Model.ERROR_MODEL,
},
},
],
});
myWebhookAPI.addGatewayResponse('ValidationError', {
type: apigateway.ResponseType.BAD_REQUEST_BODY,
statusCode: '400',
templates: {
'application/json': JSON.stringify({
errors: '$context.error.validationErrorString',
details: '$context.error.message',
}),
},
});
Once our messages have passed through API Gateway and SQS, we’ll use a Step Function to process them.
As a first step on the State Machine we will make sure that the message received is valid, meaning we will validate the HMAC signature, perform basic authentication, or other logic to ensure the message source is trusted.
For this state machine, I’ve chosen as a type a Express State Machine as if it fails at any point our message will be delivered to the DLQ . This will not happen if it’s an Standard State Machine as it will be executed by a Fire and Forget event which will not wait for a State Machine to be executed.
/* ----------------- Step Function ----------------- */
const validateMessageLambda = new NodejsFunction(
this,
'ValidateMessageLambda',
{
code: Code.fromInline(`
exports.handler = async (event) => {
console.log('Validating message');
console.log(JSON.parse(event[0].body));
return true;
};
`),
handler: 'index.handler',
runtime: Runtime.NODEJS_22_X,
},
);
const myStepFunction = new StateMachine(this, 'MyStepFunction', {
stateMachineType: StateMachineType.EXPRESS,
logs: {
destination: new LogGroup(this, 'MyStepFunctionLogs', {
logGroupName: '/aws/vendedlogs/states/my-step-function-logs',
removalPolicy: RemovalPolicy.DESTROY,
retention: RetentionDays.ONE_DAY,
}),
level: LogLevel.ERROR,
},
definitionBody: DefinitionBody.fromChainable(
new LambdaInvoke(this, 'ValidateMessageTask', {
lambdaFunction: validateMessageLambda,
comment: 'Validate the message',
stateName:
'Validate Message (HMAC Signature, Basic Authentication, etc)',
}).next(
new Pass(this, 'ProcessMessageTask', {
stateName: 'Process Message',
}),
),
),
});
We’ll consume messages from SQS and send them into the Step Functions state machine through EventBridge Pipes. If anything goes wrong, the message will be routed to the DLQ for reprocessing or debugging.
You can use the CFNPipe construct or be a little more risky and use the Alpha Constructs of EventBridge Pipes
import {
CloudwatchLogsLogDestination,
DesiredState,
LogLevel as PipesLogLevel,
Pipe,
} from '@aws-cdk/aws-pipes-alpha';
import { SqsSource } from '@aws-cdk/aws-pipes-sources-alpha';
import {
SfnStateMachine,
StateMachineInvocationType,
} from '@aws-cdk/aws-pipes-targets-alpha';
....
/* ----------------- Pipe ----------------- */
const myIstateMachine = StateMachine.fromStateMachineArn(
this,
'myIStateMachine',
myStepFunction.stateMachineArn,
);
new Pipe(this, 'MyPipe', {
source: new SqsSource(webhookQueue),
target: new SfnStateMachine(myIstateMachine, {
invocationType: StateMachineInvocationType.REQUEST_RESPONSE,
}),
// Configure the pipe to send logs to CloudWatch Logs
logLevel: PipesLogLevel.ERROR,
logDestinations: [
new CloudwatchLogsLogDestination(
new LogGroup(this, 'PipeLogs', {
logGroupName: '/aws/vendedlogs/states/pipe-logs',
removalPolicy: RemovalPolicy.DESTROY,
retention: RetentionDays.ONE_DAY,
}),
),
],
desiredState: DesiredState.RUNNING,
});
Additional Documentation for an extra read
Source link
lol