Ingest Kafka Stream in API

This template imports data from the Kafka queue into the Capillary system and then transforms it for further processing.

Use case

Scenario

An airline brand uses a Passenger Service System (PSS) to send post-flight passenger data to the Capillary platform. The data includes details like passenger information, flight completion status, and other relevant metadata. This data is sent in bulk every hour, reaching up to 1,000 requests per minute. Due to the high volume and frequency, the PSS streams this data into a Kafka queue.

The system needs a reliable and scalable way to:

Continuously consume data from the Kafka queue

Transform it into a format that Capillary’s platform can process

Forward the processed data for downstream use, such as awarding loyalty points or updating customer profiles

Solution

Use the "Ingest from Kafka Stream" template in Connect+ to build a Kafka consumer that retrieves and processes data efficiently.

This setup will:

Connect to the specified Kafka topic using OAuth-based credentials

Consume messages in real time as they arrive in the Kafka queue

Transform the incoming payloads using a JavaScript-based transformer to match Capillary's expected schema

Forward the transformed data to the appropriate platform APIs for processing (e.g., behavioral event ingestion, customer updates)

Prerequisites

The following are the prerequisites for using the Ingest Kafka Stream in API:

  • Name of the Kafka server
  • Kafka topic
  • Consumer group ID
  • Username and password to access the Kafka server.

Configuring Ingest Kafka stream in API

Below is a screenshot of the template, showing the various blocks.

To configure Ingest Kafka stream in API template, the following are the steps:

  1. In the Connect-to-source-kafka block, enter the Kafka details including the server address, kafka topic, username and password from where the events are imported. For the information on configuring this block, refer to Connect-to-source-kafka.
  2. In the neo-Transformer block, enter the URL of the Neo dataflow created to transform the data required as per your requirement. For the information on configuring this block, refer to neo-Transformer.
  3. In the Connect-to-destination enter the details of the API endpoint. For the information on configuring this block, refer to Connect to destination.

Connect-to-source-kafka

This block enables you to define the Kafka server details. Below is the screenshot showing the fields in the block.

The table below describes the fields in the block:

Field Name

Description

kafka brokers

Kafka server URL. The URL is a combination of the server address and the port number.

  • *Example**: kafka-connectplus:9092

kafka topics

Predefined topic name from which you consume messages. These topics must already exist in your Kafka system. You can list multiple topics separated by commas.

Consumer Group Id

Specifies the predefined consumer group ID used for access control.

Security Protocol

Specify the security protocol you want to use to exchange data between the Kafka client and server. The supported protocols are:

  • PLAINTEXT: No data encryption or authentication.
  • SSL: Uses the SSL protocol for data encryption but no SASL authentication.
  • SASL_PLAINTEXT: Uses SASL protocol for authentication but does not encrypt data.
  • SASL_SSL: Uses the SASL protocol for authentication and SSL for encryption.

SASL Mechanism

Specify the authentication method used to connect to Kafka. The supported mechanisms are:

  • GSSAPI
  • PLAIN
  • SCRAM-SHA-256

Username and Password

Username and password to access the Kafka server.

Headers as Attribute

Headers from the Kafka message contain metadata like user-id, priority, and source-system. These headers are extracted as data attributes and are available for all the steps in Connect+. The headers are required for the following:

  • Routing: Direct the messages based on headers
  • Filtering: Process only the relevant messages
  • Tracking: Use for obervability and tracking
  • Simplified Logic: Apply business logic without needing to parse the main message.

neo-Transformer

The Neo Transformer block allows you to call a NEO dataflow for data transformation. Below is the screenshot showing the fields in the block.

The table below describes the fields in the block.

Field Name

Description

Enter url

Select the Neo Dataflow from the drop-down menu. The drop-down lists Dataflows with the Connectplus tag.

Authorization

Authorisation key for the API (if required).

Split Response

Splits the response when the API returns an array of objects.

  • *True **- Separates each array element in the API response into individual responses
  • *False **- Keeps the response as a single array without separation