La complejidad y requisitos de las aplicaciones de hoy en día nos imponen nuevos retos tratando de testearlas en nuestros entornos locales. Los mayores desafíos nos los encontramos cuando queremos probar nuestro software de manera end to end, sobre todo si la infraestructura se compone de servicios en la nube. Sin embargo, la mayor parte de las veces Docker entra a nuestro rescate, ofreciéndonos imágenes construidas que nos pueden ayudar a emular el comportamiento de estos servicios.

Específicamente, para trabajar con AWS podemos encontrar soluciones completas como Localstack. Este servicio emula gran parte de los servicios de AWS exponiendo endpoints contra los que podemos simular nuestra infraestructura, automatizarla con Terraform o realizar pruebas end to end en nuestro proceso de integración continua sin depender del proveedor cloud.

En este post hablaremos de una solución más ligera para emular el comportamiento de una cola AWS SQS totalmente compatible con un consumer construido en TypeScript dentro del contexto de una aplicación NestJS.

¿Qué es una cola SQS?

SQS, acrónimo de Simple Queue Service, es un servicio de AWS para gestionar colas de mensajes. Este tipo de servicios sirve generalmente para desacoplar nuestro software, procesar mensajes de manera asíncrona y escalar de manera independiente. El estilo de mensajería se basa en el patrón PubSub, en el cual un publisher deja mensajes en un topic sin conocer al destinatario. Posteriormente unos consumers leen de esas colas, procesan el mensaje y lo eliminan.

SQS es la solución de AWS, pero encontramos servicios de idéntico comportamiento en Azure con Azure Queue Storage o en Google Cloud Platform con Cloud PubSub.

Emulando la cola SQS

Para emular nuestra cola sin depender de AWS usaremos ElasticMQ. ElasticMQ es un sistema de colas ligero que almacena los mensajes temporalmente en memoria y sigue la semántica de AWS SQS. La gran ventaja es que podemos utilizar el SDK de AWS directamente contra este servicio.

Los mensajes en SQS se reciben sondeando la cola. Cuando se recibe un mensaje, se bloquea durante un período de tiempo específico (visibilityTimeout). Si el mensaje no se elimina durante ese tiempo, vuelve a estar disponible para su entrega. Por eso debemos cumplir con el principio de idempotencia: el consumer no debe verse afectado negativamente si procesa el mismo mensaje más de una vez.

A continuación, el docker-compose.yml con el servicio SQS usando la imagen alpine-sqs conectado al servicio Node que levanta nuestra API:

version: "3.4"
services:
  api:
    build:
      context: .
      dockerfile: Dockerfile.dev
    environment:
      AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
      AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
      AWS_SQS_REGION: eu-west-2
      AWS_SQS_ENDPOINT: "http://sqs:9324"
      AWS_SQS_QUEUE_NAME: service-queue
    ports:
      - "8080:8080"
    depends_on:
      - sqs
  sqs:
    image: roribio16/alpine-sqs
    ports:
      - "9324:9324"
      - "9325:9325"
    volumes:
      - ./infra/sqs/elasticmq.conf:/opt/config/elasticmq.conf

La imagen alpine-sqs expone dos puertos: 9324 como endpoint HTTP y 9325 como interfaz web para depurar los mensajes que llegan a la cola.

Incluiremos también un fichero de configuración mapeado en el volumen /opt/config/elasticmq.conf:

include classpath("application.conf")
 
node-address {
  protocol = http
  host = "*"
  port = 9324
  context-path = ""
}
 
rest-sqs {
  enabled = true
  bind-port = 9324
  bind-hostname = "0.0.0.0"
  sqs-limits = strict
}
 
queues {
  default {
    defaultVisibilityTimeout = 10 seconds
    delay = 5 seconds
    receiveMessageWait = 0 seconds
  }
  service-queue {
    defaultVisibilityTimeout = 10 seconds
    delay = 5 seconds
    receiveMessageWait = 0 seconds
  }
}

Para enviar un mensaje de prueba a la cola podemos usar el CLI de AWS apuntando al endpoint local:

aws --endpoint-url http://localhost:9324 sqs send-message \
  --queue-url http://localhost:9324/queue/service-queue \
  --message-body '{"event":"test"}'

Implementando el consumer en NestJS

Una vez tenemos la cola lista, necesitamos implementar un consumer para leer y procesar sus mensajes. NestJS ofrece soporte para AMQP/RabbitMQ, pero no incluye una interfaz nativa para colas SQS. Por ello hacemos uso directo del SDK de AWS para Node implementando una estrategia de short polling.

La siguiente clase abstracta contiene la funcionalidad necesaria para leer mensajes de una cola SQS:

import {
  Logger,
  OnApplicationBootstrap,
  OnApplicationShutdown,
} from '@nestjs/common'
import { AWSError, SQS } from 'aws-sdk'
import { PromiseResult } from 'aws-sdk/lib/request'
 
export default abstract class SQSQueue
  implements OnApplicationBootstrap, OnApplicationShutdown
{
  protected readonly logger = new Logger(SQSQueue.name, true)
  protected queueUrl: string
  protected polling = false
  protected timeoutRef: NodeJS.Timeout = null
  protected service: SQS
 
  constructor(
    protected readonly queueName: string,
    protected readonly region: string,
    protected readonly timeout = 100
  ) {
    this.service = new SQS({
      endpoint: process.env.AWS_SQS_ENDPOINT,
      region,
    })
  }
 
  async onApplicationBootstrap(): Promise<void> {
    this.logger.log(`Initiating queue consumer with name ${this.queueName}`)
    this.polling = true
    this.queueUrl = (
      await this.service
        .getQueueUrl({ QueueName: this.queueName })
        .promise()
    ).QueueUrl
    this.logger.log(`Reading messages from ${this.queueUrl}`)
    this.timeoutRef = setTimeout(async () => this.poll(), this.timeout)
  }
 
  onApplicationShutdown(): void {
    this.polling = false
    clearTimeout(this.timeoutRef)
  }
 
  public async poll(): Promise<void> {
    const result: SQS.ReceiveMessageResult = await this.receiveMessage()
    try {
      await this.handleSQSResponse(result)
    } catch (err) {
      Logger.error(err)
    }
    if (this.polling) {
      this.timeoutRef = setTimeout(() => this.poll(), this.timeout)
    }
  }
 
  private async handleSQSResponse(
    result: SQS.ReceiveMessageResult
  ): Promise<void> {
    if (!result.Messages || result.Messages.length === 0) return
    await Promise.all(result.Messages.map(this.handleMessage.bind(this)))
  }
 
  private async receiveMessage(): Promise<
    PromiseResult<SQS.ReceiveMessageResult, AWSError>
  > {
    return this.service
      .receiveMessage({ QueueUrl: this.queueUrl })
      .promise()
  }
 
  protected async handleMessage(message: SQS.Message): Promise<void> {
    await this.handle(message)
    await this.service
      .deleteMessage({
        QueueUrl: this.queueUrl,
        ReceiptHandle: message.ReceiptHandle,
      })
      .promise()
  }
 
  protected abstract handle(message: SQS.Message): Promise<void>
}

OnApplicationBootstrap y OnApplicationShutdown son interfaces de NestJS que nos permiten engancharnos a los eventos de arranque y apagado del servicio para iniciar y detener el polling. El método handleMessage se encarga de recibir el mensaje, llamar al manejador abstracto con la lógica de negocio y eliminar el mensaje de la cola para que no vuelva a ser entregado.

Para implementar nuestra lógica creamos una clase que extienda SQSQueue:

import { Injectable, Logger } from '@nestjs/common'
import { SQS } from 'aws-sdk'
import SQSQueue from './sqs/sqs.queue'
 
@Injectable()
export default class CallbackSQS extends SQSQueue {
  async handle(message: SQS.Message): Promise<void> {
    this.logger.log(
      `Handling message ${message.MessageId} with body: ${message.Body}`
    )
    // lógica de negocio aquí
  }
}

Dentro del contexto de una aplicación NestJS, CallbackSQS se registra como provider en el módulo correspondiente.

Conclusión

Como hemos visto, es sencillo emular comportamientos de servicios cloud apoyándonos en las imágenes Docker disponibles. Poder desarrollar en local sin depender de la nube nos da la flexibilidad y libertad para asegurar el comportamiento y la calidad de nuestro software antes de integrarlo con el proveedor real.