Creating Resilient Webhooks on AWS CDK

Creating Resilient Webhooks on AWS CDK


In this post you’ll learn how to create a resilient webhook using AWS CDK. The architecture ensures scalability, reliability, and fault tolerance.

AWS Service we’ll use:

  • SQS
  • API Gateway
  • EventBridge Pipes
  • Step Functions

Let’s break down the architecture step-by-step.

We’ll start by integrating API Gateway directly with SQS. This will allows us to handle incoming webhook requests at scale and buffer requests before they reach downstream services. Using SQS means our services don’t need to scale instantly in response to traffic spikes, which helps maintain stability. Attaching a Dead Letter Queue (DLQ) ensures that messages will never lost and can be inspected to be debug or retried if processing fails.
To send messages to our SQS we need to define a Role which later on we will be attaching it to the API Integration.


    /* ----------------- SQS ----------------- */
    const dlq = new Queue(this, 'WebhookDLQ', {
      queueName: 'webhook-dlq',
      retentionPeriod: Duration.days(14),
    });

    const webhookQueue = new Queue(this, 'WebhookQueue', {
      deadLetterQueue: {
        queue: dlq,
        maxReceiveCount: 3,
      },
      queueName: 'webhook-queue',
      retentionPeriod: Duration.days(7),
    });

    // We create a Role for the API Gateway to be able to send messages to the SQS queue
    const webhookSubscriptionAPIRole = new Role(
      this,
      'WebhookSubscriptionApiRole',
      {
        assumedBy: new ServicePrincipal('apigateway.amazonaws.com'),
        inlinePolicies: {
          sqsSendMessage: new PolicyDocument({
            statements: [
              new PolicyStatement({
                actions: ['sqs:SendMessage'],
                effect: Effect.ALLOW,
                resources: [webhookQueue.queueArn],
              }),
            ],
          }),
        },
      },
    );

    const sqsIntegration = new apigateway.AwsIntegration({
      integrationHttpMethod: 'POST',
      options: {
        credentialsRole: webhookSubscriptionAPIRole,
        passthroughBehavior: PassthroughBehavior.NEVER,
        integrationResponses: [
          {
            responseTemplates: {
              'application/json': '{}',
            },
            statusCode: '200',
          },
        ],
        requestParameters: {
          'integration.request.header.Content-Type': `'application/x-www-form-urlencoded'`,
        },
        // We extract the body from the request and send it to the SQS queue
        requestTemplates: {
          'application/json':
            'Action=SendMessage&MessageBody={"body": $util.urlEncode($input.body)}',
        },
      },
      path: `${this.account}/${webhookQueue.queueName}`,
      region: this.region,
      service: 'sqs',
    });
Enter fullscreen mode

Exit fullscreen mode

The point of entry and our first layer of protection. It provides a secure, scalable endpoint that can validate incoming requests before they hit our SQS queue.
In a previous post I talked about how to do this validation at the level of API Gateway with Zod.

I’m going to use a schema which represents a User.

import { z } from 'zod';

export const userSchema = z.object({
  name: z.string(), // Name must be a string
  age: z.number().int().min(18), // User's age must be an integer greater than 18
  email: z.string().email(), // Must be a valid email
});

export type UserEventType = z.infer<typeof userSchema>;

Enter fullscreen mode

Exit fullscreen mode

And use the library zod-to-json-schema to generate a Model to be used to do the validation in our Stack.


    /* ----------------- API Gateway ----------------- */

    // We create an API Gateway
    const myWebhookAPI = new RestApi(this, 'MyAPI', {
      restApiName: 'my-webhook-api',
    }); 
        // Add the POST method to the API Gateway with the SQS integration and the request validator
    myWebhookAPI.root.addMethod('POST', sqsIntegration, {
      requestValidatorOptions: {
        requestValidatorName: 'webhook-request-validator',
        validateRequestBody: true,
      },
      requestModels: {
        'application/json': new Model(this, 'webhook-request-model', {
          restApi: myWebhookAPI,
          contentType: 'application/json',
          description: 'Validation model for the request body',
          modelName: 'myRequestJsonSchema',
          schema: myRequestJsonSchema,
        }),
      },
      methodResponses: [
        {
          statusCode: '200',
          responseModels: {
            'application/json': Model.EMPTY_MODEL,
          },
        },
        {
          statusCode: '400',
          responseModels: {
            'application/json': Model.ERROR_MODEL,
          },
        },
        {
          statusCode: '500',
          responseModels: {
            'application/json': Model.ERROR_MODEL,
          },
        },
      ],
    });

    myWebhookAPI.addGatewayResponse('ValidationError', {
      type: apigateway.ResponseType.BAD_REQUEST_BODY,
      statusCode: '400',
      templates: {
        'application/json': JSON.stringify({
          errors: '$context.error.validationErrorString',
          details: '$context.error.message',
        }),
      },
    });

Enter fullscreen mode

Exit fullscreen mode

Once our messages have passed through API Gateway and SQS, we’ll use a Step Function to process them.
As a first step on the State Machine we will make sure that the message received is valid, meaning we will validate the HMAC signature, perform basic authentication, or other logic to ensure the message source is trusted.
For this state machine, I’ve chosen as a type a Express State Machine as if it fails at any point our message will be delivered to the DLQ . This will not happen if it’s an Standard State Machine as it will be executed by a Fire and Forget event which will not wait for a State Machine to be executed.

State Machine Definition

/* ----------------- Step Function ----------------- */
    const validateMessageLambda = new NodejsFunction(
      this,
      'ValidateMessageLambda',
      {
        code: Code.fromInline(`
        exports.handler = async (event) => {
            console.log('Validating message');
            console.log(JSON.parse(event[0].body)); 
            return true;
          };
        `),
        handler: 'index.handler',
        runtime: Runtime.NODEJS_22_X,
      },
    );

    const myStepFunction = new StateMachine(this, 'MyStepFunction', {
      stateMachineType: StateMachineType.EXPRESS,
      logs: {
        destination: new LogGroup(this, 'MyStepFunctionLogs', {
          logGroupName: '/aws/vendedlogs/states/my-step-function-logs',
          removalPolicy: RemovalPolicy.DESTROY,
          retention: RetentionDays.ONE_DAY,
        }),
        level: LogLevel.ERROR,
      },
      definitionBody: DefinitionBody.fromChainable(
        new LambdaInvoke(this, 'ValidateMessageTask', {
          lambdaFunction: validateMessageLambda,
          comment: 'Validate the message',
          stateName:
            'Validate Message (HMAC Signature, Basic Authentication, etc)',
        }).next(
          new Pass(this, 'ProcessMessageTask', {
            stateName: 'Process Message',
          }),
        ),
      ),
    });

Enter fullscreen mode

Exit fullscreen mode

We’ll consume messages from SQS and send them into the Step Functions state machine through EventBridge Pipes. If anything goes wrong, the message will be routed to the DLQ for reprocessing or debugging.
You can use the CFNPipe construct or be a little more risky and use the Alpha Constructs of EventBridge Pipes

import {
  CloudwatchLogsLogDestination,
  DesiredState,
  LogLevel as PipesLogLevel,
  Pipe,
} from '@aws-cdk/aws-pipes-alpha';
import { SqsSource } from '@aws-cdk/aws-pipes-sources-alpha';
import {
  SfnStateMachine,
  StateMachineInvocationType,
} from '@aws-cdk/aws-pipes-targets-alpha';

....
/* ----------------- Pipe ----------------- */
    const myIstateMachine = StateMachine.fromStateMachineArn(
      this,
      'myIStateMachine',
      myStepFunction.stateMachineArn,
    );
    new Pipe(this, 'MyPipe', {
      source: new SqsSource(webhookQueue),
      target: new SfnStateMachine(myIstateMachine, {
        invocationType: StateMachineInvocationType.REQUEST_RESPONSE,
      }),
      // Configure the pipe to send logs to CloudWatch Logs
      logLevel: PipesLogLevel.ERROR,
      logDestinations: [
        new CloudwatchLogsLogDestination(
          new LogGroup(this, 'PipeLogs', {
            logGroupName: '/aws/vendedlogs/states/pipe-logs',
            removalPolicy: RemovalPolicy.DESTROY,
            retention: RetentionDays.ONE_DAY,
          }),
        ),
      ],
      desiredState: DesiredState.RUNNING,
    });

Enter fullscreen mode

Exit fullscreen mode



Additional Documentation for an extra read



Source link
lol

By stp2y

Leave a Reply

Your email address will not be published. Required fields are marked *

No widgets found. Go to Widget page and add the widget in Offcanvas Sidebar Widget Area.