Fluent Commerce Logo
Docs

Fluent Connector Configuration

Essential knowledge

Author:

Fluent Commerce

Changed on:

20 Nov 2023

Overview

The Connect SDK expects a minimum of 4 configuration files:
  • application-connector.yml - Project and Connector specific configuration.
  • application.yml - Core Connect SDK configuration. These are mostly default configurations and core settings.
  • bootstrap.yml - Server/Container configuration. This file includes settings for all different environments and - follows the spring profile standards.
  • logback-spring.xml - Log configuration also supports different configurations based on spring profiles.
It is possible to have dedicated configuration files for each environment, often used to override a common setting for a particular environment. To create such a file, follow Spring Boot's format having the additional desired spring profile name at the end of the application.yml file, for example, application-dev.yml.

Key points

  • Credential / Secrets Configuration
  • Configuration Service
  • Listener Configuration
  • Job Scheduler Configuration
  • Cache Configuration
  • Product Availability
  • API Key Configuration
  • Spring actuators

Credential / Secrets Configuration

For the Connect SDK to interact with external systems, it requires access to their credentials, which must be stored in a secured location. The SDK provides a way for new credential storage services to be added, which is covered in the customisation guide. The example below uses a credential service from the connect-sdk-core-aws module, and to configure your credentials on your development environment, follow the Credential / Secret Setup Guide.
1fluent-connect:
2  credential:
3    credential-manager: "aws-secrets-manager"
PropertyRequiredDescriptionDefault
credential-manageryesID of the credential service used by the SDK.n/a
Basic SDK Secret Configuration
  • Active accounts This is used to tell the SDK what region the account is on Fluent and what retailers the Connect SDK will consider when processing messages.
Available regions:
  • sydney
  • dublin
  • singapore
  • north_america
Secret name: `fc/connect/{connector-name}/api/fluent/activeAccounts`Sample Configuration:
1{
2"accounts": [
3    {
4    "name": "CNCTDEV",
5    "region":"sydney",
6    "retailers": [
7        1,
8        34,
9        67,
10        100
11    ]
12    }
13]
14}
  • Fluent Credentials
These are the credentials the SDK uses to make requests to Fluent while pushing or pulling information from Fluent. When using listeners in the SDK, this configuration is mandatory. Having as many entries as necessary to cover all the retailers using the SDK is possible.Secret name: `fc/connect/{connector-name}/{your-fluent-account}/api/fluent-account/1`Sample Configuration:
1{
2    "retailer": "1",
3    "userName": "<FLUENT-RETAILER-USERNAME>",
4    "password": "<FLUENT-RETAILER-PASSWORD>"
5}
Custom Secrets 
It is possible to store any secret at the secret manager and have the SDK use them. All secret names must adhere to the SDK naming convention; otherwise, they will be discarded and not used. All secrets must be prefixed with: `fc/connect/{connector-name}/`. An example of a custom secret is `fc/connect/{connector-name}/my-secret`.

Configuration Service

The SDK uses a configuration service to read runtime configuration/settings. The default implementation uses Fluent settings as the storage location, but adding a new service to manage the runtime configuration/settings is possible.
1fluent-connect:
2  configuration:
3    settings-manager: "fluent-settings"
PropertyRequiredDescriptionDefault
settings-manageryesID of the configuration service used by the SDK. The default setting uses Fluent Settings as the storage location.`fluent-settings`
General Configuration 
Connect SDK provides the following configuration.
PropertyRequiredDescriptionDefault
connector-nameyesThis is the connector's name and this property is used to both name the connector and build the path of other configuration keys. Keep characters limited to 'a-z' and '-'.n/a
event-log-managernoId of the service that will log SDK events. There are two pre-defined options:`fluent-eventlog`: writes events to the standard log output.`disabled`: does nothing when an event is requested to be tracked`fluent-eventlog`
enable-listenersnoEnable or disable all SDK listenerstrue
disable-listenersnoA list of handlers will be disabled. The routes that use these disabled handlers will be disabled as well
Example:
`disable-handlers: EventFailureSummaryJobHandler`
empty
1connector-name: <my-connector-name>
2event-log-manager: "fluent-eventlog"
The table below describes the default configuration keys provided by the SDK. These are used when fetching configuration (fluent settings). Note that altering these may affect the behavior of the SDK.
PropertyDescriptionDefault
base-path-keyControls the base key path of all configurations of the SDK. Nothing outside the path is read by the SDK.
When using the SDK to fetch a configuration, it is not required to specify the full path of the fluent setting. The SDK will automatically infer the full path based on this property. For example, requesting a configuration named “my-key” will result in the SDK retrieving “`fc.connect.<my-connector-name>.my-key`setting from Fluent.
`fc.connect.<my-connector-name>`
account-mapping-keyMaps a Fluent account-retailer to an external system. The identification of the external system must be unique, and it is possible to be a composite value.`fc.connect.<my-connector-name>.account-mapping`
general-keyAny general settings that don't quite fit in any category.`fc.connect.<my-connector-name>.general`
log-level-keyGlobal level setting key to controlling the SDK's log level (DEBUG, INFO, WARN, ERROR). Changes to this property will take effect on the first configuration refresh in the SDK.`fc.connect.<my-connector-name>.log-level`
unit-conversion-keyUsed for unit conversion configuration, for example, Kilogram to Gram.`fc.connect.<my-connector-name>.unit-conversion`
Configuration Samples
1{
2    "fluent": {
3        "accountId": "FCTDEV",
4        "retailerId": "100"
5    },
6    "externalAccount": {
7        "projectKey": "my-project-qa",
8        "uid": "123"
9    }
10}
1{
2    "productCatalogue": {
3        "entityRef": "PC:MASTER:100",
4        "entitySubType": "MASTER"
5    }
6}
1{
2    "conversionSettings": [
3        {
4            "fromUnit": "cm",
5            "toUnit": "mm",
6            "conversionFactor": 10
7        },
8        {
9            "fromUnit": "kg",
10            "toUnit": "gm",
11            "conversionFactor": 1000
12        }
13    ]
14}
1{
2    "fluent": {
3        "PROCESSING": "Initial",
4        "AWAITING_WAVE": "Initial",
5        "ASSIGNED": "picking",
6        "PARTIALLY_FULFILLED": "picking",
7        "FULFILLED": "picking",
8        "AWAITING_COURIER_COLLECTION": "readyToShip",
9        "AWAITING_CUSTOMER_COLLECTION": "readyToShip",
10        "COMPLETE": "shipped",
11        "ESCALATED": "Initial",
12        "REJECTED": "backorder",
13        "CANCELLED": "Cancelled"
14    }
15}

Listener Configuration

Listeners allow connectors to subscribe to queues/topics which is enabled by default. To disable all listeners, including the SDK internal listeners, set the following property `enable-listeners: false` at application-connector.yml. It is possible to have any number of listeners running, and each must have its own configuration, like the sample below.
1listeners:
2    # listener configuration block 
3    listener-id:
4      name: "ENV_VARIABLE"
5      fluent-account: FLUENT_ACCOUNT
6      type: "sqs"
7      core-pool-size: 5       
8      pool-size: 20
9      prefix: "my-prefix"
10      shutdown-timeout: 300
11      visibility-timeout: 120
12      poll-wait: 20
13      dlq-suffix: "-dlq"
14      retry: 5 
PropertyRequiredDescriptionDefault value
listener-idyesActual id of the listener. This must match the id defined at the class when using @ListenerInfo.n/a
nameyesName of the environment variable that will contain the real queue name.Empty String
typeyesType of listener. This is based on a list of supported listeners of the Connect SDK.
Current options are: `sqs` or `kafka`.
Notes: The required SDK library must be present to use different listener types.
NOT_DEFINED
fluent-accountrequired for custom queues (aka external queues)Fluent account name. This is required and restricts the processing of messages to services and configuration for said account.n/a
core-pool-sizenoMinimal Number of threads available to process messages from the queue. Determines the minimal number of threads available to process messages10
pool-sizenoNumber of threads available to process messages from the queue. Determines how many messages can be processed in parallel200
prefixnoPrefix given to the threads processing messages.default
shutdown-timeoutnoWait time in seconds to allow in-flight threads to gracefully finish processing before forcing them to quit300
visibility-timeoutnoQueue visibility timeout in seconds - Messages read from the queue and kept invisible to other listeners while the node that took the message processes it. If the message is not deleted before the timeout the message is made available again for processing - retry in case of errors.120
poll-waitnoListener wait time in seconds to read as many items from the queue as possible.10
dlq-suffixnoDead letter queue suffix. DLQ is a generated name from the queue name with the suffix appended to the end.`-dlq`
retrynoQueue retry policy. How often is the message put for retry on the queue before it goes to DLQ.
Restrictions:
SQS requires at least one retry.Kafka may have no retries or up to 5 retries.
2
max-allowed-pull-sizenoMax number of allowed messages to pull from the queue.10
propsnoAdditional listener configuration. Connect SDK will read all configuration set under props and have them stored as a Map.
Known properties:`retryIntervalInSec` - Used to specify the retry interval for Kafka
n/a
Kafka Listener Configuration
Kafka is a distributed message broker that can be utilised as a messaging queue for SDK listeners. The SDK provides a seamless experience for listeners to subscribe to different message brokers through configuration if the correct SDK library is available.To configure the Kafka brokers with the SDK, use an environment variable `KAFKA_BROKERS` to specify the location of the brokers. The default value is set to `localhost:9092`.Every listener requires at least two topics, one as the main topic to receive the incoming messages and another as a dead letter queue for failed messages. If the retry policy is configured, then additional topics are required to the amount of the retry specified. The SDK expects the following naming convention for the topics, which the Kafka administrator must create, as the SDK will not automatically create them.
Topic typesListener Configuration sampleExpected topic names
Main topicname = notificationnotification
DLQ topicdlq-suffix = -dlqnotification-dlq
Retry topicsretry = 2notification-retry-1notification-retry-2
Key different between SQS and Kafka
  • Both listeners work with a thread pool, but the main difference is that SQS constantly pulls messages from the queue and processes them if threads are idle in the pool. On the other hand, Kafka works with batches, reads X messages from the topic, and is only capable of reading more messages once the first batch has completed processing.
  • Kafka requires additional topics to support its retry policy
  • Messages processed from SQS are removed from the queue, whereas in Kafka, they remain there after processing. Note that Kafka is not an event store, and selectively reprocessing old messages is not possible.
Retry mechanism:
Kafka doesn’t have a retry mechanism inbuilt like AWS SQS, and to achieve a similar behavior, SDK implements its own retry policy by using additional topics. The diagram below illustrates how messages can be forwarded to the different topics of a listener.No alt providedEach Kafka listener configured with a retry policy will work with a pair of listeners. The first listener is dedicated to processing messages from the main topic, while the second listener will subscribe to all retry topics. The main topic listener will never stop processing messages. The retry listener will regularly pause the subscriptions to allow the retry interval time to elapse and resume when the time is due to process the messages waiting for a retry.

Job Scheduler Configuration

Connect SDK doesn't have an internal scheduler, and it depends on external systems to trigger an HTTP request for a job to be executed as described below.
1POST https://<domain>/api/v1/fluent-connect/scheduler/add/<fluent-account>/<fluent-retailer>/<job-name>
However, it is possible to extend the SDK and add different types of schedulers. The module `connect-sdk-core-aws` provides the `message-queue` mode, for example. The configuration below tells the SDK which job scheduler service to use.
1job-scheduler:
2    job-scheduler-type: "message-queue"
All job requests require three parameters:
  • fluent-account: Fluent account name
  • fluent-retailer: Fluent retailer id
  • job-name: The name of the handler that will execute the job
1POST https://localhost:8080/api/v1/fluent-connect/scheduler/add/cnctdev/1/batch-inventory-sync
The execution of a job is performed asynchronously, and the HTTP request returns 200 OK if the Connect SDK managed queue the execution correctly. All jobs in the Connect SDK share the same pre- and post-execution steps:
  • A job cannot have multiple instances running simultaneously
  • Retrieves the job configuration from Fluent settings.
  • Calculate a date range (from-to datetime) the job may use during its execution.
  • Execute the actual job.
  • Persist the calculated date range as part of the job configuration for the next execution.
  • Connect SDK runs at UTC time.
There are two kinds of configuration for a job:
  • Runtime configuration - More frequently updated properties that control the job behavior.
  • Built time configuration - Properties that limit or give a purpose to the job that end user should modify.
Runtime configuration 
  • Kept at Fluent as settings
  • The configuration key of a job is determined dynamically by taking the configuration `job-scheduler.config-prefix` and appending the job name at the end. The SDK `job-scheduler.config-prefix` default template is: `fc.connect.<my-sample-connector>.batch.<job-name>`
Every job gets a pre-calculated date range, and the SDK provides this to any job handler as part of the job context. This date range allows the job to work with data in delta mode (if required), only fetching/processing items that have changed since the last execution of the job.Below is an example of a configuration with no previous execution and some properties in `props`. The date `previousEndDate` is not mandatory. Whenever there is no valid date set at `prev`’s current datetime (in UTC). The calculated date range used by the job is also saved at lastRun.param.start and `lastRun.param.end` - note the correlation of `lastRun.param.end` and `previousEndDate`.
1{
2  "previousEndDate": "2021-09-22T03:00:00Z",
3  "props": {
4    "fluentNetworkRef": "BASE_67"
5  },
6  "lastRun": {
7    "param": {
8      "start": "1970-01-01T00:00:00Z",
9      "end": "2021-09-22T03:00:00Z",
10      "props": {
11        "fluentNetworkRef": "BASE_67"
12      }
13    },
14    "jobStart": "2021-09-22T03:00:00Z",
15    "jobEnd": "2021-09-22T03:05:00Z",
16    "status": "SUCCESSFUL"
17  }
18}
The next day the job runs again, the current value of `previousEndDate` is used as the start date `lastRun.param.start` and a new date is set for both `lastRun.param.end` and `previousEndDate`.
1{
2  "previousEndDate": "2021-09-22T02:00:00Z",
3  "props": {
4    "fluentNetworkRef": "BASE_67"
5  },
6  "lastRun": {
7    "param": {
8      "start": "2021-09-22T03:00:00Z",
9      "end": "2021-09-23T02:00:00Z",
10      "props": {
11        "fluentNetworkRef": "BASE_67"
12      }
13    },
14    "jobStart": "2021-09-23T02:00:00Z",
15    "jobEnd": "2021-09-23T02:05:00Z",
16    "status": "SUCCESSFUL"
17  }
18}
It is also possible for a user to override the normal job operation described above by providing a custom date range. These user-specified dates are only used once and will not be taken into account for subsequent executions.
1{
2  "customDateRange": {
3    "start": "2021-04-22T00:00:00Z",
4    "end": "2021-04-23T00:00:00Z"
5  },
6  "previousEndDate": "2021-09-22T02:00:00Z",
7  "props": {
8    "fluentNetworkRef": "BASE_67"
9  },
10  "lastRun": {
11    "param": {
12      "start": "2021-09-22T03:00:00Z",
13      "end": "2021-09-23T02:00:00Z",
14      "props": {
15        "fluentNetworkRef": "BASE_67"
16      }
17    },
18    "jobStart": "2021-09-23T02:00:00Z",
19    "jobEnd": "2021-09-23T02:05:00Z",
20    "status": "SUCCESSFUL"
21  }
22}
After the job runs with the custom dates, the custom range is removed and `previousEndDate` remains as it previously was so the job resumes its normal operation.
1{
2  "previousEndDate": "2021-09-22T02:00:00Z",
3  "props": {
4    "fluentNetworkRef": "BASE_67"
5  },
6  "lastRun": {
7    "param": {
8      "start": "2022-04-22T00:00:00Z",
9      "end": "2022-04-23T00:00:00Z",
10      "props": {
11        "fluentNetworkRef": "BASE_67"
12      }
13    },
14    "jobStart": "2021-09-24T02:00:00Z",
15    "jobEnd": "2021-09-24T02:05:00Z",
16    "status": "SUCCESSFUL"
17  }
18}
While the job is running, a temporary property called `executionStart` is added to indicate the job is running, and it is removed when the job is completed.
1{
2  "previousEndDate": "2021-09-22T02:00:00Z",
3  "executionStart" : "2021-09-24T02:00:00Z",
4  "props": {
5    "fluentNetworkRef": "BASE_67"
6  },
7  "lastRun": {
8    "param": {
9      "start": "2022-04-22T00:00:00Z",
10      "end": "2022-04-23T00:00:00Z",
11      "props": {
12        "fluentNetworkRef": "BASE_67"
13      }
14    },
15    "jobStart": "2021-09-24T02:00:00Z",
16    "jobEnd": "2021-09-24T02:05:00Z",
17    "status": "SUCCESSFUL"
18  }
19}
PropertyDescription
customDateRangeOptional date range a user can set to override the jobs current `previousEndDate`. This is only used once and gets removed after a successful execution.
customDateRange.startCustom user date range start datetime in UTC.
customDateRange.endCustom user date range end datetime in UTC.
previousEndDateUsed to keep track of the last execution date. This drives how the SDK calculates the next date range execution. Whatever the value is set here, this is used as the next start date. If this is missing or invalid, epoch is as the start date. The end date is always the current time the job is executed.
propsOptional job settings.
lastRunKeeps a copy of the last execution parameters, date range used as well as job execution times.
lastRun.param.startLast execution date range start in UTC.
lastRun.param.endLast execution date range end in UTC.
lastRun.param.propsLast execution props.
lastRun.jobStartLast execution date time the job start running in UTC.
lastRun.jobsEndLast execution date time the job finished running in UTC.
lastRun.statusLast execution status of how the job finished: FAILED / SUCCESSFUL.
Kafka Limitations
When using a Kafka topic to queue job requests from a scheduler, it's important to consider that Kafka works differently from SQS. When the SDK reads messages from the job topic using Kafka, it will take the available job requests and start processing them. If new job requests arrive while the first set is still being processed, then these will remain queued until the SDK completes the first set and is able to take the queued jobs. A queued job time to live is heavily coupled with how frequently the SDK pulls the topic and the lowest frequency allowed is 1 minute. It is not possible to guarantee that the first job requests will complete processing before the second set of job requests expires. When job requests expire in the topic, they get discarded and are moved to the DLQ. This means the schedule job is not executed and it will have to wait until the scheduler issues another request for its execution.
Build configuration 
The first level of a job configuration is at the class where the job is written through the `HandlerInfo` annotation.
1@HandlerInfo(
2        name = "InventorySyncJob",
3        route = "batch-inventory-sync",
4        description = "Sync inventory positions from Fluent to Commercetools",
5        props = {@HandlerProp(key = "page-size", value = "500")}
6)
To override an existing job configuration, it is possible to use the YAML configuration files. The first route below simply changes the properties by giving the job a larger page size. The second route uses the same job class identified by the handler name `InventorySyncJob` and creates a new route also setting new properties. With the configuration below, it is possible to execute the same job through different routes with different properties.
1  routes:
2    job:
3      - route: "batch-inventory-sync"
4        handler-name: "InventorySyncJob"
5        props:
6          page-size: 1000
7      - route: "custom-inventory-sync"
8        handler-name: "InventorySyncJob"
9        props:
10          page-size: 200          
11          another-property: "some value"

Cache Configuration

ConnectSDK uses spring cache to store some data in memory and the table below lists the standard caches from SDK.
Cache NameDescriptionDefault Expiry in Seconds
fluent-api-clientFluent API Client cache300
account-mappingFluent account mapping cache. This is often used to map a Fluent account to external systems or vice versa300
account-mappingsFluent account mapping cache. This is often used to map a Fluent account to external systems or vice versa300
account-configurationConfiguration (Fluent settings) cache300
fluent-auth-token-contextHandler context cache from a Fluent Auth token300
api-key-contextHandler context cache from a SDK API Key300
It is possible to both create new cache or update the settings of existing ones through the YAML configuration file.
1fluent-connect:
2  cache:
3    caffeine:
4      - name: "fluent-api-client"
5        expiryInSeconds: 600
6      - name: YourNewCache
7        expiryInSeconds: 600
In order to make use of the cache, just add a new entry for your cache and add the following annotation to your method.
1@Cacheable(cacheManager = cacheManager = CacheConfiguration.SDK_CACHE_MANAGER, cacheNames = "YourNewCache")

Product Availability

Feature Configuration 
PropertyDescriptionDefault
fulfilment-option.enabledEnables the product availability endpoint. If set to false the endpoint is not available and the request will result in a 404 response.true
fulfilment-option.logging.enabledLogs extra information on the request, response, and execution time of each enrichment step. Not recommended for production environments.false

API Key Configuration

API Keys are configured directly at the Secret Manager just like any other credential used by the SDK. API keys are specific for a Fluent account and therefore must be configured using the following secret key template: `fc/connect/{connector-name}/{fluent-account-name}/api/fluent-account/api-keys`. Note that `connector-name` and `fluent-account-name` must be updated with the correct values before using the key above.A token has the following properties:
  • API Key size: must contain at least 16 characters.
  • Duplicate keys: Must be unique. Duplicate tokens are deemed invalid by the SDK and disregarded.
  • Expiry: May or may not have an expiry. Tokens without an expiry will always be valid. The expiry date-time must be in UTC format as shown below.
  • Caching: keys are cached by the SDK. Modifying an API key configuration may not have an immediate effect.
  • Multi-retailer: It's possible to have any number of retailers with any number of keys for each retailer.
For more information on how to configure your credentials in your development environment follow the Credential / Secret Setup Guide.Example of a token configuration:
1{
2  "api-keys": [
3    {
4      "retailer": "1",
5      "username": "fluent-username-1",
6      "password": "ZMCZZZ",
7      "keys": [
8        {
9          "key": "1283712983721983",
10          "expiry": "2020-12-30T20:28:07.00Z"
11        },
12        {
13          "key": "3333332222222222"
14        }
15      ]
16    },
17    {
18      "retailer": "34",
19      "username": "fluent-username-34",
20      "password": "ZMCFFF",
21      "keys": [
22        {
23          "key": "2222333344445555"
24        }
25      ]
26    }
27  ]
28}
Use the command below to test a token, but remember to first update the URL and token. Successful configurations will return the details of the user configured for the specified token, otherwise a 401 unauthorized response.
1curl -X 'GET' \
2'http://localhost:8080/api/v1/fluent-connect/api-key' \
3-H 'Authorization: ApiKey 1283712983721983'
4-H 'fluent.account: CNCTDEV'

Spring actuators

The SDK is built with Spring Framework, and it bundles a couple of custom actuators on the top of the default ones. The list of enabled actuators in the SDK can be found at bootstrap.yml file. To view the list of actuators active, see http://localhost:8080/actuator.To learn more about actuators, please see Spring Boot Actuator Web API Documentation.This is the default SDK settings for actuators.
1management:
2  endpoints:
3    web:
4      exposure:
5        include: health, info, caches, scheduledtasks, sdkhandlers, sdkroutes
Fluent Commerce

Fluent Commerce