Kafka Connector built on top of Connect SDK
Author:
Fluent Commerce
Changed on:
30 Oct 2023
Key Points
- The User can configure Kafka Connector on top of the Connect SDK.
- Some configuration is required.
Steps
Kafka Connector built on top of Connect SDK
We have extended our queue listener support beyond SQS and now also includes Kafka now.
Kafka and SQS can be used interchangeably as queue listeners, providing more options to our users.
Provided reference solutions can be used to demonstrate how to apply these changes to configuration files and use either queue listener. With these examples, it's easy to switch between SQS or Kafka, depending on your specific needs and requirements.
Follow this Fluent Connector Configuration | Kafka Listener Configuration for a more detailed explanation of Kafka configuration.
Prerequisites
- At least a running Kafka broker, ZooKeeper container, Kafdrop(Optional)
- localstack container is running
- Access to a Fluent account
- API key/secret for communicating with external network
Create a Docker file
`docker-compose.yml`
1version: "3.9"
2services:
3 localstack:
4 container_name: localstack
5 image: localstack/localstack
6 restart: unless-stopped
7 ports:
8 - "4510-4559:4510-4559" # external service port range
9 - "4566:4566"
10 environment:
11 - AWS_DEFAULT_REGION=us-east-1
12 - SERVICES=sqs,secretsmanager
13 - HOSTNAME=localstack
14 - HOSTNAME_EXTERNAL=localstack
15 volumes:
16 - ./docker/localstack:/docker-entrypoint-initaws.d
17
18 kafdrop:
19 image: obsidiandynamics/kafdrop
20 container_name: kafkadrop
21 restart: "unless-stopped"
22 ports:
23 - "9000:9000"
24 environment:
25 KAFKA_BROKERCONNECT: "kafka:29092"
26 JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
27 depends_on:
28 - "kafka"
29
30 zookeeper:
31 image: wurstmeister/zookeeper
32 ports:
33 - "2181:2181"
34 restart: unless-stopped
35
36 kafka:
37 image: wurstmeister/kafka
38 ports:
39 - "9092:9092"
40 environment:
41 KAFKA_ADVERTISED_HOST_NAME: kafka
42 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
43 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
44 KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
45 KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"
46 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
47 KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
48 KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"
49 KAFKA_RESTART_ATTEMPTS: "10"
50 KAFKA_RESTART_DELAY: "5"
51 restart: unless-stopped
52 depends_on:
53 - zookeeper
Language: java
Name: docker-compose.yml
Description:
[Warning: empty required content area]Configuration Changes
This is an example of using Kafka:
1fluent-connect:
2 connector-name: ${connectorName}
3 listeners:
4 # Internal listener:
5 messages:
6 name: "KAFKA_EVENTS"
7 type: "kafka"
8 retry: 5
9 props:
10 retryIntervalInSec: 10
11 batch:
12 name: "KAFKA_BATCH"
13 retry: 1
14 type: "kafka"
15 props:
16 retryIntervalInSec: 10
17 notification:
18 name: "KAFKA_NOTIFICATION"
19 retry: 5
20 type: "sqs"
21 poll-wait: 1
22 props:
23 retryIntervalInSec: 10
24kafka:
25 props:
26 bootstrap.servers: localhost:9092
Language: java
Name: This is an example of using Kafka:
Description:
[Warning: empty required content area]Property | Mandatory | Description | Default Value |
| Yes |
| |
| No | The number of retries for a topic determines the number of corresponding retry topics that will be created. These retry topics should follow a common naming convention:
For instance, if the "messages" listener has been configured with three retries, the Kafka container should include the following topics:messagesmessages-retry-1messages-retry-2messages-retry-3 | n/a |
| No | The delay time between retry attempts. (The delay time should not be greater than max pool interval of current Kafka broker.) | n/a |
| No | The connection configurations for Kafka producer and consumer can be referred to as: (If this property is not set, the producer and consumer will connect to the Kafka on local at
|
|
`kafka.props`
1kafka:
2 props:
3 # Required connection configs for Kafka Local
4 bootstrap.servers: localhost:9092
5
6 # Best practice for higher availability in Apache Kafka clients prior to 3.0
7 session.timeout.ms: 45000
8
9 # Best practice for Kafka producer to prevent data loss
10 acks: all
Language: java
Name: Integrate with Kafka local:
Description:
[Warning: empty required content area]1kafka:
2 props:
3 # Required connection configs for Confluent Cloud
4 bootstrap.servers: pkc-6ojv2.us-west4.gcp.confluent.cloud:9092
5 security.protocol: SASL_SSL
6 sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
7 sasl.mechanism: PLAIN
8 client.dns.lookup: use_all_dns_ips
9
10 # Best practice for higher availability in Apache Kafka clients prior to 3.0
11 session.timeout.ms: 45000
12
13 # Best practice for Kafka producer to prevent data loss
14 acks: all
Language: java
Name: Integrate with external Kafka (e.g. Confluent Cloud):
Description:
[Warning: empty required content area]