Development

NestJS and Kafka

The Apache Kafka message queue easily integrates into NestJS servers, as described in the respective documentation. This article gives you a short overview of default communication behavior, some of the possible features and configurations and some good practices.

Overview of Basic Kafka Features in NestJS

NestJS Integration

  • Summary: NestJS facilitates the integration of Kafka, offering a streamlined approach for both message production and consumption within applications.
  • Default Behavior: Through the use of decorators and modules, NestJS abstracts much of the complexity involved in setting up Kafka clients, enabling straightforward message handling capabilities.
  • Configuration: Kafka can be configured at various points within a NestJS application, including during bootstrap for global settings, within the AppModule for application-wide settings, and within feature modules for localized settings. This flexibility allows for detailed control over consumer groups, error handling, and message retry strategies.
  • Bootstrap Initialization: Initializing Kafka in the bootstrap function is essential for setting up the Kafka microservice within the application context. It allows the app to connect to Kafka as a microservice. Global Kafka configurations, such as consumer group settings and error handling strategies.
    const app = await NestFactory.create(AppModule, …)
    app.connectMicroservice(kafkaOptions)
    await app.startAllMicroservices()
    await app.listen(4000, '0.0.0.0')
  • AppModule Initialization: You often see an application-wide Kafka configuration in the main AppModule, potentially overriding or complementing bootstrap settings. This makes no sense and is completely redundant with the anyway mandatory bootstrap-initialistion. So don’t do it.
    @Module({
      imports: [KafkaModule.register(kafkaOptions)]
    })
    export class AppModule {}
  • Feature Module Kafka Client Injection: Necessary when using kafka service injection to produce messages (this.kafka.emit(topic, data)) or when needing explicit control over the Kafka client in a specific module. When you’re only consuming messages using @EventPattern or @MessagePattern, without the need to explicitly produce messages within a service, the direct injection of Kafka (ClientKafka) might not be necessary. The @EventPattern and @MessagePattern decorators can be used in controllers or providers to handle incoming Kafka messages without the need for direct client injection.
    @Module({
      imports: [ClientsModule.register([{ name: 'kafka', kafkaOptions }])],
      providers: [XyzService],
      controllers: [XyzController],
      exports: [XyzService]
    })
    export class XyzModule {}
  • Service Injection: Kafka injection in a service requires the above mentioned featre module Kafka client injection. Then in the cnstructor of class XyzService, you can use the following pattern to get access to kafka client functions, namele this.kafka.emit. The name in
    @Inject('kafka') is arbirrary and must match name: 'kafka' in ClientsModule.register.

    constructor(@Inject('kafka') private kafka: ClientKafka) {}

One-to-Many Broadcast

  • Summary: Kafka’s model allows for broadcasting messages to multiple consumers. All consumers subscribed to a topic will receive messages sent to that topic.
  • Default Behavior: By default, all messages sent to a topic are broadcasted to all consumers subscribed to that topic.
  • Configuration: Configuration is managed at the consumer level by subscribing to topics.
  • Example:
    @Injectable()
    export class MyService {
      @EventPattern('myTopic')
      async handleBroadcastMessage(@Payload() message: any) {
        // Process message
      }
    }
    

Error Handling and Retries

  • Summary: In NestJS, unhandled exceptions during Kafka message processing lead to retries, affecting the message’s processing within its topic or potentially the entire client group if only a single processing thread is available.
  • Default Behavior: Throwing an exception in an event handler indicates to Kafka to retry the message. This may block further processing of the topic or the entire client group if it operates with a single thread.
  • Configuration: To manage retries and error handling more granularly, disable auto-commit and control offset commits manually, or use specific exceptions like `KafkaRetriableException` for controlled retry behavior.
  • Example:
    @EventPattern('requestTopic') handleRequest(data) {
      throw new Error() // This leads to a retry
    }
    
  • Good Practice Retry Pattern: Implementing a manual retry mechanism by re-emitting the failed message back to its topic can serve as a pragmatic approach to ensure that processing attempts continue without indefinitely blocking the queue. This pattern, however, is best suited for scenarios where message order is not paramount.
    @EventPattern('requestTopic') handleRequest(data) {
      try {
        // Perform the required processing
      } catch (e) {
        this.kafka.emit('requestTopic', data) // re-add to the back of the queue
      }
    }
    

Auto-Commit vs. Manual-Commit

  • Summary: Kafka supports both auto-committing offsets and manual offset management.
  • Default Behavior: Auto-commit is enabled by default, committing offsets at a configured interval.
  • Configuration: To switch to manual commit, disable auto-commit and manually manage offset commits.
  • Example:
    // Disable auto-commit
    consumerConfig = {
      ...consumerConfig,
      allowAutoCommit: false
    }
    

Consumer Groups

  • Summary: Kafka distributes messages among consumers in the same group, ensuring a message is processed once per group.
  • Default Behavior: Consumers in the same group share the workload of message processing.
  • Configuration: Different consumer groups can be set up to receive messages independently.
  • Example:
    const consumerConfig = {
      groupId: 'myUniqueGroup' // Unique group for independent consumption
    }
    

Historical Messages

  • Summary: New consumers can catch up with all missed messages since their last offset or from the beginning of the log.
  • Default Behavior: Consumers start consuming from their last known offset.
  • Configuration: Set auto.offset.reset to earliest to consume from the beginning if no offset is stored.
  • Example:
    const consumerConfig = {
      ...consumerConfig,
      autoOffsetReset: 'earliest'
    }
    

Return Value in @EventPattern and @MessagePattern

  • Summary: Return values in message handlers don’t influence the message flow in one-way communication patterns.
  • Default Behavior: Return values are generally ignored unless in a request-reply pattern.
  • Configuration: Implement explicit messaging for request-reply patterns.
  • Example:
    @MessagePattern('requestTopic') handleRequest() {
      // Process and return response
      return {data: 'response'} // this makes no sense
    }
    

Bidirectional Communication Pattern

  • Summary: Kafka primarily supports asynchronous communication, but can be configured for request-reply patterns by emitting to a previously agreed response topic.
  • Default Behavior: Asynchronous message broadcasting to multiple consumers.
  • Configuration: Use reply-to topics and correlation IDs for request-reply communication.
  • Example: The response is received by all consumer groups registered to the topic given in message.replyTo.
    // Producer sending a request
    this.kafka.emit('requestTopic', {
      data: 'request',
      replyTo: 'responseTopic'
    })
    
    // Consumer processing and replying
    @EventPattern('requestTopic') processRequest(message) {
      this.kafka.emit(message.replyTo, {
        data: 'response'
      })
    }
    
  • Example: To limit the response to be sent only to the same group as the request has been sent, you may add the group name in the request parameters and add it to the response’s topic. Be aware, that this is your convention not a security feature. Kafka offers Access Control Lists (ACL) if you need real access restrictions.
    // Producer sending a request with group id
    this.kafka.emit('requestTopic', {
      data: 'request',
      replyTo: 'responseTopic',
      consumerGroup: 'senderGroup'
    })
    
    // Consumer processing and replying
    @EventPattern('requestTopic') processRequest(message) {
      this.kafka.emit(`${message.consumerGroup}-${message.replyTo}`, {
        data: 'response'
      })
    }
    

Data Retention and Scaling

  • Summary: Kafka allows configurable message retention, supporting scalability by adding more consumers.
  • Default Behavior: Messages are retained for a default period, with scalability limited by topic partitions.
  • Configuration: Adjust retention settings and partition counts to scale and maintain messages as needed.
  • Example:
    # Kafka CLI to adjust retention period
    kafka-configs.sh --alter \
                     --entity-type topics --entity-name myTopic \
                     --add-config retention.ms=172800000
    

Good Practice in Microservices

A well-adopted design pattern in microservices architecture involves assigning each microservice its own unique group ID, ideally derived from the service’s name. This approach significantly benefits the scalability and reliability aspects of microservices, especially when deployed in cloud environments where multiple replicas of the same service might be instantiated to handle increased load or ensure high availability.

By default, assigning a unique group ID to each microservice ensures that messages are processed just once by one of the service’s replicas. This behavior aligns with the typical requirements of distributed systems, where duplicate processing of messages is undesirable. Should the processing of a message fail, resulting in an exception, the default Kafka behavior ensures the message is retried until successfully processed by one of the clients. This mechanism usually matches the desired behavior, it follows the requirements of the twelve-factor app and can be implemented effortlessly.

However, it’s crucial to recognize that the message queue may become stuck if an unresolvable error occurs, preventing further message processing. Therefore, it’s important to differentiate between recoverable and unrecoverable errors in your code. Unrecoverable errors often stem from coding mistakes or incorrect configurations. In such scenarios, rigorous testing of the software becomes indispensable.

Identifying and handling unrecoverable errors properly ensures that the system can degrade gracefully or alert the necessary operations personnel to intervene manually. Implementing robust error handling and logging mechanisms can aid in quickly diagnosing and rectifying such issues, minimizing downtime and improving the overall resilience of the microservices architecture.

In summary, careful consideration of group ID assignment, coupled with effective error handling strategies, lays the foundation for a scalable, reliable, and maintainable microservices ecosystem. Rigorous testing plays a crucial role in ensuring that the system behaves as expected under various conditions, thereby safeguarding against potential failures that could lead to message processing stalls.

Pacta PagesServices and Pages by Pacta AG

Pacta.Cash

This is the easiest crypto wallet on the market. Manage your Ethers and Bitcoins securely without having to understand the technical details. You own the keys, all data is stored on your device. Trade without registration.

Pacta.Swiss

Company representation page of the Swiss Pacta Corporation Pacta Plc. This page is provided by Pacta Plc (in German: Pacta AG).