Customization Guidelines for Connect SDK
Author:
Fluent Commerce
Changed on:
10 July 2024
Overview
The SDK's handling mechanism involves loading and prioritizing handlers during server startup. It provides transparency with special spring actuators for listing active routes and handlers. Message handlers follow error-handling protocols, while job handlers process data polled from external systems. Configuration handlers react to changes, and the SDK supports various ways of receiving messages from external systems.
Key points
- Handler Prioritization: Customize execution order by assigning priorities during startup.
- Message Handling Protocols: Follow specific rules for retries, dead letter queue forwarding, and managing multiple items within a message.
- Implementation Guidelines: Adhere to guidelines, use annotations like @HandlerInfo and @Component, and configure routes. Extensibility is seen in job handlers, configuration handlers, and product availability enrichments.
- Receiving External Messages: Support diverse methods, including HTTP requests with authentication filters and external listeners like SQS and Kafka. Custom controllers enhance authentication through the SDK's security module.
Handlers
When the server starts up, the SDK will load all handlers found in the spring context and bind them to the correct service they belong to. As part of this binding process, the SDK determines which handler has precedence over others of the same name. In other words, it is possible to override existing handlers by assigning a higher priority to the custom handler. By default, all handlers will have a zero priority, and the SDK understands the highest priority as the higher number. For example, priority = 100 takes precedence over priority = 1.
1@HandlerInfo(name = "CustomCategoryUpsert", priority = 100)The SDK provides a couple of special spring actuators that can list all active routes and handlers available to the application.
- routes - http://localhost:8080/actuator/sdkroutes
Below is the default list of routes of the SDK. For each section, it contains a list of `active` and `inactive` routes. Inactive routes are routes that have been disabled due to another overriding it. There is currently no way to disable.
1{
2  "configuration": {
3    "active": [],
4    "inactive": []
5  },
6  "job": {
7    "active": [
8      {
9        "route": "event-failure-summary-monitor",
10        "handler": "EventFailureSummaryJobHandler",
11        "priority": 0,
12        "props": {
13          "page-size": "100",
14          "event-status": "FAILED",
15          "event-type": "ORCHESTRATION_AUDIT"
16        },
17        "className": "com.fluentcommerce.connect.core.job.handler.EventFailureSummaryJobHandler",
18        "description": "Grabs failed events from Fluent and builds a summary"
19      }
20    ],
21    "inactive": []
22  },
23  "message": {
24    "active": [],
25    "inactive": []
26  },
27  "notification": {
28    "active": [
29      {
30        "route": "notification",
31        "handler": "DefaultNotificationHandler",
32        "priority": 0,
33        "props": {},
34        "className": "com.fluentcommerce.connect.core.handler.DefaultNotificationHandler",
35        "description": "Handles notifications and publishes to their designated route"
36      }
37    ],
38    "inactive": []
39  }
40}- handlers - http://localhost:8080/actuator/sdkhandlers
Similar to routes, below are the default handlers of the sdk. It also contains the active and inactive list and their concept is the same.
1{
2  "configuration": {
3    "active": [
4      {
5        "name": "log-level-update",
6        "route": "",
7        "priority": 0,
8        "props": {},
9        "className": "com.fluentcommerce.connect.core.configuration.handler.LogLevelConfigurationHandler"
10      }
11    ],
12    "inactive": []
13  },
14  "job": {
15    "active": [
16      {
17        "name": "EventFailureSummaryJobHandler",
18        "route": "event-failure-summary-monitor",
19        "priority": 0,
20        "props": {
21          "page-size": "100",
22          "event-status": "FAILED",
23          "event-type": "ORCHESTRATION_AUDIT"
24        },
25        "className": "com.fluentcommerce.connect.core.job.handler.EventFailureSummaryJobHandler"
26      }
27    ],
28    "inactive": []
29  },
30  "message": {
31    "active": [],
32    "inactive": []
33  },
34  "notification": {
35    "active": [
36      {
37        "name": "DefaultNotificationHandler",
38        "route": "notification",
39        "priority": 0,
40        "props": {},
41        "className": "com.fluentcommerce.connect.core.handler.DefaultNotificationHandler"
42      }
43    ],
44    "inactive": []
45  }
46}Message Handlers
Message handlers are dedicated to processing messages pushed to the message queue of the SDK.
Error Handling
- For errors that a retry won't make a difference, exit the method with a UnprocessableMessageException, and the message will be forwarded to the DLQ (dead letter queue).
- For errors that should have the message retried, exit the method with a `HandlerRetryException`. Note that each listener (sqs vs. Kafka, for example) has a different retry implementation.
- If no exception is thrown, the message is removed from the queue, and it is assumed that the processing was done and no further action is required.
- For messages that have more than one item to be processed (e.g., a message has a list of products to be processed), either approach is viable:- Any failure will stop the processing, and when the message is retried, all items are reprocessed.
- Failure to process one item will not abort the processing of the rest. Failed items can be individually sent to the SDK as new messages for retry.
 
- Messages that need to perform multiple actions may require different approaches based on what kind of data is being processed.- In this scenario, each operation should be idempotent, so if the message is retried or replayed, the outcome will not change.
 
Sample code and configuration
A message handler must implement `MessageHandler` and be annotated with `@HandlerInfo` and `@Component`. The Component annotation will make this class a singleton spring bean, and the fact that it extends MessageHandler, the SDK can locate it and bind it to the Message Routing Service. The HandlerInfo annotation helps set up the default route configuration.
1@Slf4j
2@Component
3@HandlerInfo(
4        name = "ProductUpsert",
5        route = "commercetools.connect.product.upsert",
6        description = "Updates products in Fluent.",
7        props = {@HandlerProp(key = "query", value = "ct-product.graphql")}
8)
9public class ProductUpsertHandler implements MessageHandler {
10
11    @Override
12    public void processMessage(final MessageHandlerContext context) throws UnprocessableMessageException, HandlerException {
13        final Optional<MyPayload> payload = context.getMessagePayload(MyPayload.class);
14        if (payload.isPresent()) {
15            //logic to process a message
16            final Event productEvent = processProduct(payload);
17            //send product to Fluent
18            context.ofFluentContext().sendEvent(productEvent);
19        }
20        throw new UnprocessableMessageException("Unable to get message payload");      
21    }
22}Every message handler must have a `route` configuration. You can override a handler's default `route` configuration or add route configuration for a handler. Some handlers may work in a pair of `route-mapping` and `route`.
- route: Contains the build time configuration of a handler - always required. a. route: Name of the route. b. handler-name**: ID of the handler, for example, `ProductUpsert`.c.props: Additional and optional configuration the handler can use when processing a message.
- route-mapping: Used to determine a route to a handler when receiving external messages. This is usually used when receiving messages from external systems. a. route: Name of the route - it must match the name defined above on the route.b.props: Any configuration necessary to drive the logic to determine a route based on the information received in a message.
When a message is routed to a message handler, the SDK will look at the message name and match it against a route name. Whenever a match is found, it takes the handler from that route and finds the class/bean implementing it to process the message.
In the route-mapping example below, `props` have a property called `inclusion-filter`, a list of message types supported by the route `commercetools.connect.product.upsert`. If the message received has the type of `ResourceUpdated`, then the message can be routed to `commercetools.connect.product.upsert`. Receiving an external message and applying the logic to determine a route is always done by the listener. There is a further example of a listener at SQS Queue Listener topic.
1# Route mappings are used to determine a route to a handler when receiving external messages
2route-mapping:
3- route: "commercetools.connect.product.upsert"
4    props:
5    # any number of properties that can be used to determine if this route is adequate for the message received
6        name: "product"
7        inclusion-filter:
8        - "ResourceUpdated"
9        - "ResourceCreated"
10
11# Handler routes          
12routes:
13message:
14    -   route: "commercetools.connect.product.upsert"
15        handler-name: "ProductUpsert"
16        props:
17        # any number of properties can drive or assist the handler to process a message
18            query: "ct-product.graphql"Job Handlers
When it is impossible to receive input/requests from external systems to the SDK (Fluent or another system), jobs can poll these systems to retrieve and process data. Note that full extracts should be avoided in favor of delta extracts. See Fluent Connector Configuration for more details on configuring and running a job.
All jobs in the Connect SDK share the same pre- and post-execution steps:
- A job cannot have multiple instances running simultaneously.
- Retrieves the job configuration from Fluent settings.
- Calculate a date range (from-to datetime) the job may use during its execution.
- Execute the actual job.
- Persist on the calculated date range as part of the job configuration for the subsequent execution.
- Connect SDK runs at UTC.
1@Slf4j
2@Component
3@HandlerInfo(
4        name = "BatchInventorySyncJob",
5        route = "batch-inventory-sync",
6        description = "Sync inventory details from Fluent to commerceTools",
7        props = {@HandlerProp(key = "page-size", value = "500")}
8)
9public class InventoryJobHandler implements JobHandler {
10
11    @Override
12    public void run(final JobHandlerContext jobHandlerContext) throws JobExecutionException {
13        final JobProperties settings = jobHandlerContext.getJobSettings();
14        final LocalDateTime start = settings.getStart();
15        final LocalDateTime end = settings.getEnd();
16
17        //get a runtime configuration propperty 
18        final String myProp = settings.getProp("my-property", StringUtils.EMPTY);
19
20        //get a build time (route) configuration property
21        final int pageSize = jobHandlerContext.getRouteSettings().getProp("page-size", 100);
22
23        //your job logic next...
24
25        //poll external system
26        //either add each item into the queue for async processing 
27            //jobHandlerContext.getPublishService().publishMessage(sdkMessage);
28        //or process each received item here
29    }
30}Every job needs to have a route, and this is used to find the class that executes the job given a job execution request. The execution request will include the name of the job `fc.connect.batch-inventory-sync`, along with the Fluent account and retailer it should run against. Like the message handler, the route can have optional props properties for any additional property the job requires. Note that properties set at the route level are build-time configurations.
1    routes:
2    job:
3        -   route: "fc.connect.batch-inventory-sync"
4            handler-name: "BatchInventorySyncJob"
5            props:
6            page-size: 500Configuration Handlers
Configuration handlers can be used when there is a need to react to configuration changes. The example below modifies the application log level based on the configured log level. Note that there is a validation step where one can determine if this handler should be executed. In this example, it is simply comparing the current log level with the new log level. When they are different, it returns `true`, and the SDK will execute this configuration handler.
1@Slf4j
2@Component
3@HandlerInfo(name = "log-level-update", route="")
4public class LogLevelConfigurationHandler implements ConfigurationHandler {
5    private static final String DEFAULT_LEVEL = "INFO";
6    private final FluentConnectConfiguration connectConfiguration;
7
8    private Level currentLevel;
9
10    @Autowired
11    public LogLevelConfigurationHandler(final FluentConnectConfiguration connectConfiguration) {
12        this.connectConfiguration = connectConfiguration;
13    }
14
15    @Override
16    public boolean validate(@NotNull final Configuration configuration) {
17        final var newLevel = getLogLevel(configuration);
18
19        final var shouldExecute = currentLevel != null && newLevel.levelInt != currentLevel.levelInt;
20        currentLevel = newLevel;
21        return shouldExecute;
22    }
23
24    @Override
25    public void run(final @NotNull Configuration configuration) {
26        final LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
27        final Logger logger = loggerContext.getLogger("com.fluentcommerce.connect");
28        if (logger != null) {
29            logger.setLevel(getLogLevel(configuration));
30            log.info("Log level for com.fluentcommerce.connect set to [{}].", logger.getLevel());
31
32        } else {
33            log.error("Unable to detect logger context: com.fluentcommerce.connect.");
34        }
35    }
36
37    private Level getLogLevel(@NotNull final Configuration configuration) {
38        final String key = connectConfiguration.getConfiguration().getLogLevelKey();
39        final ConfigurationTuple newValue = configuration.getProperties().getOrDefault(key, ConfigurationTuple.ofReadOnly(key, DEFAULT_LEVEL));
40
41        return StringUtils.isNotBlank(newValue.getValue()) ? Level.toLevel(newValue.getValue()) : Level.INFO;
42    }
43}Receiving messages from external systems
There are several ways to receive messages or requests from external systems.
HTTP Requests
Authenticating HTTP Requests
The SDK provides a library bundling spring security with helpful authentication filters. This library provides a default security configuration and doesn’t limit or restrict additional custom configuration of spring security should it be required.
1<dependency>
2    <groupId>com.fluentcommerce.connect</groupId>
3    <artifactId>connect-sdk-core-web-security</artifactId>
4</dependency>By adding the library above and the configuration below, the SDK will only allow unauthenticated traffic to the URLs specified under `publicEndpoints`. Any other URL must be authenticated before the controllers are allowed to process the request. By default, no authentication filters are active, and it’s up to the developer to define which one wants to have - it is possible to have multiple.
1security:
2    web:
3      publicEndpoints:
4        - "/actuator/health"
5        - "/api/docs"
6        - "/swagger-ui.html"
7        - "/api/v1/fluent-connect/webhook"
8      customAuthFilters:
9        - "com.fluentcommerce.connect.core.web.security.filters.ApiKeyAuthenticationFilter"
10        - "com.fluentcommerce.connect.core.web.security.filters.FluentOauthTokenAuthenticationFilter"!!! Note that the Fluent webhook API must remain public without authentication.
**Available SDK Filters: **
| Filter name | Class name | Required headers | 
| API Key authentication | 
 | 
 | 
| Fluent OAuth token authentication | 
 | 
 | 
To customize Spring security to your needs, create a component implementing `HttpSecurityBuildHook` interface. The SDK will call your component before finalizing the Spring security configuration, allowing the opportunity to modify or add additional configuration.
The authentication filter structure mostly follows Spring Security standards with an additional annotation. Let’s dive into the filter above to explain how to create your own one.
1@SdkAuthenticationFilterInfo(provider = "api-key-provider", converter = "api-key-converter")
2public class ApiKeyAuthenticationFilter extends SdkAuthenticationFilter {
3
4    public ApiKeyAuthenticationFilter(
5        @NotNull final AuthenticationManager authenticationManager,
6        @NotNull final AuthenticationConverter authenticationConverter) {
7
8        super(authenticationManager, authenticationConverter);
9    }
10}An SDK authentication filter shouldn’t be a spring-a-bean; otherwise, it will be automatically available to the spring context, and it may clash with how the SDK expects filters to work. The annotation `SdkAuthenticationFilter` is used to both tell the SDK of the existence of the filter and specify the spring beans that will be used for the authentication provider and converter for this filter. When the application starts, the SDK will spawn an instance of all filters specified at `customAuthFilters` and use the annotation information to drive its configuration.
A converter should always implement `AuthenticationConverter` as shown below. The idea of the converter is to extract the credential or security token from the incoming request and return the same as an `Authentication` POJO that will be used to authenticate the request. This filter expects the Fluent account from an HTTP header `fluent.account` and the API key from another header `Authorization ApiKey <the_key>`.
1@Slf4j
2@Component("api-key-converter")
3public class ApiKeyDetailAuthenticationConverter implements AuthenticationConverter {
4    protected static final String FLUENT_ACCOUNT_HEADER = "fluent.account";
5    private static final String SECURITY_TOKEN_GROUP = "securityToken";
6
7    private static final String BEARER_TOKEN_PATTERN = "ApiKey\\s(?<securityToken>.*)";
8    @Value("${fluent-connect.mock.accountId:}")
9    protected String fluentAccount;
10
11    @Override
12    public Authentication convert(final HttpServletRequest request) {
13        if (log.isTraceEnabled()) {
14            log.trace("Authentication filter running for request URL: {}", request.getRequestURL().toString());
15        }
16
17        try {
18            final var authApiKey = resolveSecurityTokenFromHttpRequest(request);
19            return new ApiKeyDetailAuthentication(authApiKey, getAccountFromRequest(request));
20        } catch (final BadCredentialsException e) {
21            //returning null allows the spring security to try the next security filter
22            return null;
23        }
24    } 
25    ...
26}Once the Authorization detail is defined in the context, the provider is called to validate if the credentials extracted can be authenticated and return the Authentication if successful or null if not.
1@Slf4j
2@Component("api-key-provider")
3public class ApiKeyDetailAuthenticationProvider implements AuthenticationProvider {
4    private final APIKeyService apiKeyService;
5    private final ContextBuilder contextBuilder;
6
7    public ApiKeyDetailAuthenticationProvider(final APIKeyService apiKeyService, final ContextBuilder contextBuilder) {
8        this.apiKeyService = apiKeyService;
9        this.contextBuilder = contextBuilder;
10    }
11
12    @Override
13    public Authentication authenticate(final Authentication authentication) throws AuthenticationException {
14        final var authApiKey = (String) authentication.getCredentials();
15        final var accountId = (String) authentication.getPrincipal();
16
17        final var keyDetails = validateApiKeyDetails(authApiKey, accountId);
18        ((ApiKeyDetailAuthentication) authentication).init(keyDetails, contextBuilder);
19        authentication.setAuthenticated(true);
20        return authentication;
21    }
22    ...
23}    Note that the authentication provider will not raise any exceptions if the authentication fails. This is important to allow the other filters to execute and have a chance to authenticate the request. If none of the filters configured can authenticate the request, Spring will automatically deny the request.
Creating a new controller/endpoint
The example below illustrates how a custom controller can be created and utilize the security module to handle the security/authentication and get an SDK context based on the authenticated account/user/token. `SdkSecurityService` is an SDK service that can look into the authenticated application security context and create the corresponding SDK context with the right account/retailer and credentials.
1@Slf4j
2@RestController
3@RequestMapping("/sample")
4public class SampleController {
5    /**
6     * SDK service that has access to the spring security context and returns a SDK context based on the authenticated/authorised credentials
7     */
8    private final SdkSecurityService securityService;
9
10    @Autowired
11    public SampleController(final SdkSecurityService securityService) {
12        this.securityService = securityService;
13    }
14
15    @RequestMapping(method = RequestMethod.POST, consumes = "application/json")
16    public ResponseEntity<String> receive(@RequestBody final SampleRequest sampleRequest) {
17        return securityService.getAuthenticatedContext()
18            .map(context -> processRequest(context, sampleRequest))
19            .orElseThrow(() -> new BadCredentialsException("Unable to authenticate request"));
20    }
21
22    private ResponseEntity<String> processRequest(final HandlerContext handlerContext, final SampleRequest sampleRequest) {
23        //TODO your logic / call a service 
24        return ResponseEntity.ok().build();
25    }
26
27    @Value
28    public static class SampleRequest {
29        String id;
30        String name;
31        //...
32    }
33}External Listeners (SQS, Kafka, etc)
The SDK has its internal listeners, and although it is possible to extend it, this topic will only cover external listeners. An external listener receives messages from external systems to the SDK, for example, listening to orders published by an e-commerce system. To create an external listener subscribing to a topic or queue, there is a specific class to extend, as the example below illustrates. When defining such a class, it is important to specify what kind of payload this listener will work with at the Generic type and the `@ListenerInfo` annotation. The ID of the listener is not the topic/queue name but the configuration ID of the listener.
1@Slf4j
2@Component
3@ListenerInfo(id = "my_queue_id", messageClass = SampleQueueListener.MyMessagePayload.class)
4public class SampleQueueListener extends ExternalListener<SampleQueueListener.MyMessagePayload> {
5
6  @Override
7  public Optional<ConnectSDKMessage> receiveMessage(final ExternalMessage message) throws UnprocessableMessageException {
8      final MyMessagePayload messagePayload = getMessageContent(message);
9
10      final Optional<AccountReference> accountReference = getFluentAccountReference(Collections.singletonMap("website-id", messagePayload.websiteId));
11      if (accountReference.isPresent()) {
12          return Optional.ofNullable(getConnectSDKMessage(messagePayload, accountReference.get()));
13      }
14
15      log.error("Unable to retrieve a Fluent account for the message received [{}].", messagePayload.getId());
16      throw new UnprocessableMessageException("Unable to retrieve a Fluent account for the message received.");
17  }
18
19  /**
20   * Extract a subset of the payload, pre-process and enrich it, convert into a different format or simply use it as is.
21   */
22  private ConnectSDKMessage getConnectSDKMessage(final MyMessagePayload messagePayload,
23                                                 final AccountReference accountReference) throws UnprocessableMessageException {
24      //The example below is a pass-through
25      try {
26          return ConnectSDKMessage.builder()
27                  .id(UUID.randomUUID().toString())
28                  .name(getMessageRoute(messagePayload))
29                  .accountId(accountReference.getAccountId())
30                  .retailerId(accountReference.getRetailerId())
31                  .payload(ConnectSDKMessage.toJson(messagePayload))
32                  .build();
33      } catch (JsonProcessingException e) {
34          throw new UnprocessableMessageException(e);
35      }
36  }
37
38  /**
39   * Apply your logic to determine the route name
40   */
41  private String getMessageRoute(final MyMessagePayload messagePayload) {
42      return "my-route";
43  }
44
45  @Value
46  public static class MyMessagePayload {
47      String id;
48      String websiteId;
49      //... 
50  }
51}Each listener must have its configuration section; the snippet below only covers the essentials. See Fluent Connector Configuration for all configuration options and detailed information about each property.
1my_queue_id:
2    name: "SQS_CONNECTOR"
3    fluent-account: "DEV_ACCOUNT"
4    type: sqsOverriding Default Implementations
Secrets Manager
The SDK allows easy extension of its credential service by adding a new implementation of a secret manager service by simply implementing a SecretManagerService interface and configuring the new service with the SDK. This service must be annotated with `@CredentialManagerInfo` and the ID is unique.
Service implementation sample
1/**
2 * Implementation of a secrets manager service using AWS Secrets Manager
3 */
4@Slf4j
5@Service
6@CredentialManagerInfo(id = "aws-secrets-manager")
7public class AWSCredentialService implements SecretManagerService {
8    public static final String VERSION_STAGE = "AWSCURRENT";
9    private final AWSSecretsManager secretsManager;
10
11    @Autowired
12    public AWSCredentialService(final AWSSecretsManager secretsManager) {
13        this.secretsManager = secretsManager;
14    }
15
16    @Override
17    public Optional<String> getSecret(@NotNull final String key) {
18        try {
19            final GetSecretValueResult secretValue = secretsManager.getSecretValue(new GetSecretValueRequest().withSecretId(key).withVersionStage(VERSION_STAGE));
20            return Optional.ofNullable(secretValue.getSecretString());
21        } catch (final ResourceNotFoundException e) {
22            log.debug("Unable to fetch secret [{}]: [{}]", key, e.getMessage());
23        }
24        return Optional.empty();
25    }
26}SDK Configuration
1fluent-connect:
2  credential:
3    credential-manager: "aws-secrets-manager"Job Scheduler
The main scheduling service of the SDK is "DefaultJobSchedulerService", and it relies on two key service implementations:
- JobSchedulerService: A specialized scheduler service that can schedule and execute a job. The actual execution is delegated to `JobExecutionService`.
- JobExecutionService: A service that executes a job and handles pre-post execution steps.
Before a job can be executed, it is necessary to determine a job handler based on the job name. For this, use `JobRouterService` to obtain a JobHandler.
To add a new job scheduler to the SDK, one must implement `JobSchedulerService`, and `JobExecutionService` and configure the SDK to use the new scheduler. The `JobSchedulerService` implementation must also use the `JobSchedulerInfo` annotation to identify the scheduler service.
1  job-scheduler:
2    job-scheduler-type: "new-scheduler"Product Availability
DefaultFulfilmentOptionService does the default implementation with a spring bean named `defaultFulfilmentOptionService`. This service will automatically pick up any enrichment steps available and execute them in the order of their priority, where the highest priority number takes precedence over the lesser ones.
Below is an empty example of an enrichment step that could add location information to a product availability response—for example, more details of the location of a click-and-collect fulfilment option.
1@Component
2@Slf4j
3@FulfilmentOptionTransformationStep(id = "pickup-location-enrichment", priority = 1)
4public class PickupLocationTransformation implements FulfilmentOptionTransformationChain<FulfilmentRequest, FulfilmentResponse> {
5
6    @Override
7    public FulfilmentResponse transform(final FulfilmentRequest request, final FulfilmentResponse response, final FluentContext context) {
8        //your enrichment logic here
9    }
10}It is up to each enrichment step to decide whether it can use a cache or always perform a live request to an external system to obtain the enrichment details. This will always be specific to the nature of the enrichment step and how volatile the data is.
Should you modify how the graphQL query is built in the default implementation, extend `DefaultFulfilmentOptionService`, keep the spring bean name as `defaultFulfilmentOptionService`, and add `@Primary` annotation to the new service? This will cause SDK to use the new custom service instead of the out-of-the-box one.
If the standard request or response payload doesn't fit the requirements," Please note this is a simplified way to run batch operations and not an actual scheduler. All the module above provides a way to receive a web request to run a batch operation. As the SDK gets the request, it adds it to a batch queue, which is then picked up by one of the SDK nodes (containers) running and executes the batch operation. The SDK prevents the same job from running simultaneously, but it is possible to run different jobs simultaneously. This module relies on external systems to trigger the SDK to run jobs whenever necessary; it can only react to external inputs.
If Kafka is used instead of SQS, it will further limit the job scheduler's capabilities. With SQS, the connect SDK can accept new job requests while executing other jobs. This is not possible with Kafka, as it can only accept new requests once the current job in execution is complete. This can introduce problems as the SDK may think that the new queued job request is stale if the current job execution runs for an extended period causing the queued job request to be discarded.
Fluent Configuration Service
The SDK has two types of configurations: build time and run time. Build time configuration is managed by `ApplicationConfigurationService`, and this is restricted to YAML files. Run time is through `ConfigurationService`, which uses a configuration to indicate which service is active. By default, the SDK provides `FluentConfigurationService` (with service id `fluent-settings`) that uses Fluent settings as the storage/persistence location. Implementing a new configuration service to store settings elsewhere is possible by implementing the class SpecialisedConfigurationService and modifying the setting below to use the new service.
1  configuration:
2    settingsManager: "custom-configuration-service"Handlers
- Handlers should be kept small and have a single purpose.
- Handlers may receive a message that contains one or more items to be processed. When receiving multiple items, choosing the error recovery strategies is important. For example, what is the accepted behavior if 1 in 10 items fails - would all 10 be reprocessed, or would a single new message be generated for reprocessing and the original message completed successfully?
- It is also important to be aware of the message size limitations of your selected listener. This may impact the design of how a handler processes a message.
- When a handler exits with a `HandlerRetryException`, the SDK will automatically let the message be retried if supported by the listener selected.- Any other kind of exception will cause the message to move to the DLQ (dead letter queue).
- See more about the retry mechanism and DLQ in Fluent Connector Configuration | Listener Configuration
 
Retry example:
1@Slf4j
2@Component
3@HandlerInfo(
4    name = "retryExampleHandler",
5    route = "retryExample",
6    description = "Retry example",
7    props = {@HandlerProp(key = "some-key",value = "a-value")}
8)
9public class RetryExampleHandler implements MessageHandler {
10    @Override
11    public void processMessage(@NotNull final MessageHandlerContext context) {
12        try {
13          service.doSomeLogic()
14        } catch (SomeCustomException ex) {
15          throw new HandlerRetryException(e);  //Let the message be retried when getting SomeCustomException
16        }
17        //If a handler throws any other kind of exeception, the message will be move to DLQ
18
19    }
20}There are two ways of overbidding an existing handler or a route: code or configuration.
Let's assume the handler below as the base example to describe the override options.
1@Slf4j
2@Component
3@HandlerInfo(
4    name = "defaultNotificationHandler",
5    route = "notification",
6    description = "Handles notifications and publishes to their designated route",
7    props = {@HandlerProp(key = "some-key",value = "a-value")}
8)
9public class DefaultNotificationHandler implements NotificationHandler {}Overriding by code
Only pick this option if you wish to override the default logic of a handler. Opt for overriding by configuration if the intent is to re-route or change the route's properties. With this option, it is possible to override it in 2 different ways:
- re-routing to a new handler - In this option both old (`defaultNotificationHandler`) and new (`customNotificationHandler`) handlers are available to the SDK as valid handlers. This option uses the new handler to process messages destined to route`notification`instead of the old one.
1@Slf4j
2@Component
3@HandlerInfo(
4    name = "customNotificationHandler",
5    route = "notification",
6    priority = 100,
7    description = "A custom handler for notifications"
8)
9public class CustomNotificationHandler implements NotificationHandler {}- override the handler - This option will disable the old handler (defaultNotificationHandler) and make the new one take its place. Messages destined to route `notification`will be processed by the new handler.
1@Slf4j
2@Component
3@HandlerInfo(
4    name = "defaultNotificationHandler",
5    route = "notification",
6    priority = 100,
7    description = "A custom handler for notifications"
8)
9public class CustomNotificationHandler implements NotificationHandler {}Overriding by configuration
Use this option to re-route a message to a different handler, create a new route using an existing handler or modify the properties of an existing handler. Please note that when the properties `props` are added, they will replace the code-level properties; it will not merge the properties.
1  routes:
2    notification:
3      # Scenario 1: Adding a new configuration to add a new route and re-use an existing handler. Props is not specified and defaults are used
4      - route: : "custom-notification"
5        handler-name: "defaultNotificationHandler"
6
7      # Scenario 2: Modifying the configuration of an existing route to override the properties
8      - route: : "notification"
9        handler-name: "defaultNotificationHandler"
10        props:
11          some-key: "some-other-value"
12          another-key: "yet-another-value"
13
14      # Scenario 3: Modify an existing route to use a different existing handler
15      - route: : "notification"
16        handler-name: "anotherNotificationHandler"
17        props:
18          some-key: "some-other-value"
19          another-key: "yet-another-value"