Kafka Connector built on top of Connect SDK
Author:
Fluent Commerce
Changed on:
30 Oct 2023
Key Points
- The User can configure Kafka Connector on top of the Connect SDK.
 - Some configuration is required.
 
Steps
Kafka Connector built on top of Connect SDK
We have extended our queue listener support beyond SQS and now also includes Kafka now.
Kafka and SQS can be used interchangeably as queue listeners, providing more options to our users.
Provided reference solutions can be used to demonstrate how to apply these changes to configuration files and use either queue listener. With these examples, it's easy to switch between SQS or Kafka, depending on your specific needs and requirements.
Follow this Fluent Connector Configuration | Kafka Listener Configuration for a more detailed explanation of Kafka configuration.
Prerequisites
- At least a running Kafka broker, ZooKeeper container, Kafdrop(Optional)
 - localstack container is running
 - Access to a Fluent account
 - API key/secret for communicating with external network
 
Create a Docker file `docker-compose.yml` with content below to quickly set up a Kafka broker.
1version: "3.9"
2services:
3  localstack:
4    container_name: localstack
5    image: localstack/localstack
6    restart: unless-stopped
7    ports:
8      - "4510-4559:4510-4559"  # external service port range
9      - "4566:4566"
10    environment:
11      - AWS_DEFAULT_REGION=us-east-1
12      - SERVICES=sqs,secretsmanager
13      - HOSTNAME=localstack
14      - HOSTNAME_EXTERNAL=localstack
15    volumes:
16      - ./docker/localstack:/docker-entrypoint-initaws.d
17
18  kafdrop:
19    image: obsidiandynamics/kafdrop
20    container_name: kafkadrop
21    restart: "unless-stopped"
22    ports:
23      - "9000:9000"
24    environment:
25      KAFKA_BROKERCONNECT: "kafka:29092"
26      JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
27    depends_on:
28      - "kafka"
29
30  zookeeper:
31    image: wurstmeister/zookeeper
32    ports:
33      - "2181:2181"
34    restart: unless-stopped
35
36  kafka:
37    image: wurstmeister/kafka
38    ports:
39      - "9092:9092"
40    environment:
41      KAFKA_ADVERTISED_HOST_NAME: kafka
42      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
43      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
44      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
45      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"
46      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
47      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
48      KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
49      KAFKA_RESTART_ATTEMPTS: "10"
50      KAFKA_RESTART_DELAY: "5"
51    restart: unless-stopped
52    depends_on:
53      - zookeeperConfiguration Changes
This is an example of using Kafka:
1fluent-connect:
2  connector-name: ${connectorName}
3  listeners:
4  # Internal listener:
5    messages:
6      name: "KAFKA_EVENTS"
7      type: "kafka"
8      retry: 5
9      props:
10        retryIntervalInSec: 10
11    batch:
12      name: "KAFKA_BATCH"
13      retry: 1
14      type: "kafka"
15      props:
16        retryIntervalInSec: 10
17    notification:
18      name: "KAFKA_NOTIFICATION"
19      retry: 5
20      type: "sqs"
21      poll-wait: 1
22      props:
23        retryIntervalInSec: 10
24kafka:
25  props:
26    bootstrap.servers: localhost:9092    Property  | Mandatory  | Description  | Default Value  | 
  | Yes  | 
  | |
  | No  | The number of retries for a topic determines the number of corresponding retry topics that will be created. These retry topics should follow a common naming convention: 
 For instance, if the "messages" listener has been configured with three retries, the Kafka container should include the following topics:messagesmessages-retry-1messages-retry-2messages-retry-3  | n/a  | 
 
  | No  | The delay time between retry attempts. (The delay time should not be greater than max pool interval of current Kafka broker.)  | n/a  | 
  | No  | The connection configurations for Kafka producer and consumer can be referred to as: (If this property is not set, the producer and consumer will connect to the Kafka on local at   | 
 
 
  | 
`kafka.props` example:
1kafka:
2  props:
3    # Required connection configs for Kafka Local
4    bootstrap.servers: localhost:9092
5
6    # Best practice for higher availability in Apache Kafka clients prior to 3.0
7    session.timeout.ms: 45000
8    
9    # Best practice for Kafka producer to prevent data loss
10    acks: all1kafka:
2  props:
3    # Required connection configs for Confluent Cloud
4    bootstrap.servers: pkc-6ojv2.us-west4.gcp.confluent.cloud:9092
5    security.protocol: SASL_SSL
6    sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
7    sasl.mechanism: PLAIN
8    client.dns.lookup: use_all_dns_ips
9
10    # Best practice for higher availability in Apache Kafka clients prior to 3.0
11    session.timeout.ms: 45000
12
13    # Best practice for Kafka producer to prevent data loss
14    acks: all