Customization Guidelines for Connect SDK
Author:
Fluent Commerce
Changed on:
10 July 2024
Overview
The SDK's handling mechanism involves loading and prioritizing handlers during server startup. It provides transparency with special spring actuators for listing active routes and handlers. Message handlers follow error-handling protocols, while job handlers process data polled from external systems. Configuration handlers react to changes, and the SDK supports various ways of receiving messages from external systems.
Key points
- Handler Prioritization: Customize execution order by assigning priorities during startup.
- Message Handling Protocols: Follow specific rules for retries, dead letter queue forwarding, and managing multiple items within a message.
- Implementation Guidelines: Adhere to guidelines, use annotations like @HandlerInfo and @Component, and configure routes. Extensibility is seen in job handlers, configuration handlers, and product availability enrichments.
- Receiving External Messages: Support diverse methods, including HTTP requests with authentication filters and external listeners like SQS and Kafka. Custom controllers enhance authentication through the SDK's security module.
Handlers
When the server starts up, the SDK will load all handlers found in the spring context and bind them to the correct service they belong to. As part of this binding process, the SDK determines which handler has precedence over others of the same name. In other words, it is possible to override existing handlers by assigning a higher priority to the custom handler. By default, all handlers will have a zero priority, and the SDK understands the highest priority as the higher number. For example, priority = 100 takes precedence over priority = 1.
1@HandlerInfo(name = "CustomCategoryUpsert", priority = 100)
Language: java
Name: Example
Description:
[Warning: empty required content area]The SDK provides a couple of special spring actuators that can list all active routes and handlers available to the application.
- routes - http://localhost:8080/actuator/sdkroutes
Below is the default list of routes of the SDK. For each section, it contains a list of
`active`
`inactive`
1{
2 "configuration": {
3 "active": [],
4 "inactive": []
5 },
6 "job": {
7 "active": [
8 {
9 "route": "event-failure-summary-monitor",
10 "handler": "EventFailureSummaryJobHandler",
11 "priority": 0,
12 "props": {
13 "page-size": "100",
14 "event-status": "FAILED",
15 "event-type": "ORCHESTRATION_AUDIT"
16 },
17 "className": "com.fluentcommerce.connect.core.job.handler.EventFailureSummaryJobHandler",
18 "description": "Grabs failed events from Fluent and builds a summary"
19 }
20 ],
21 "inactive": []
22 },
23 "message": {
24 "active": [],
25 "inactive": []
26 },
27 "notification": {
28 "active": [
29 {
30 "route": "notification",
31 "handler": "DefaultNotificationHandler",
32 "priority": 0,
33 "props": {},
34 "className": "com.fluentcommerce.connect.core.handler.DefaultNotificationHandler",
35 "description": "Handles notifications and publishes to their designated route"
36 }
37 ],
38 "inactive": []
39 }
40}
Language: json
Name: Example
Description:
[Warning: empty required content area]- handlers - http://localhost:8080/actuator/sdkhandlers
Similar to routes, below are the default handlers of the sdk. It also contains the active and inactive list and their concept is the same.
1{
2 "configuration": {
3 "active": [
4 {
5 "name": "log-level-update",
6 "route": "",
7 "priority": 0,
8 "props": {},
9 "className": "com.fluentcommerce.connect.core.configuration.handler.LogLevelConfigurationHandler"
10 }
11 ],
12 "inactive": []
13 },
14 "job": {
15 "active": [
16 {
17 "name": "EventFailureSummaryJobHandler",
18 "route": "event-failure-summary-monitor",
19 "priority": 0,
20 "props": {
21 "page-size": "100",
22 "event-status": "FAILED",
23 "event-type": "ORCHESTRATION_AUDIT"
24 },
25 "className": "com.fluentcommerce.connect.core.job.handler.EventFailureSummaryJobHandler"
26 }
27 ],
28 "inactive": []
29 },
30 "message": {
31 "active": [],
32 "inactive": []
33 },
34 "notification": {
35 "active": [
36 {
37 "name": "DefaultNotificationHandler",
38 "route": "notification",
39 "priority": 0,
40 "props": {},
41 "className": "com.fluentcommerce.connect.core.handler.DefaultNotificationHandler"
42 }
43 ],
44 "inactive": []
45 }
46}
Language: json
Name: Example
Description:
[Warning: empty required content area]Message Handlers
Message handlers are dedicated to processing messages pushed to the message queue of the SDK.
Error Handling
- For errors that a retry won't make a difference, exit the method with a UnprocessableMessageException, and the message will be forwarded to the DLQ (dead letter queue).
- For errors that should have the message retried, exit the method with a . Note that each listener (sqs vs. Kafka, for example) has a different retry implementation.
`HandlerRetryException`
- If no exception is thrown, the message is removed from the queue, and it is assumed that the processing was done and no further action is required.
- For messages that have more than one item to be processed (e.g., a message has a list of products to be processed), either approach is viable:
- Any failure will stop the processing, and when the message is retried, all items are reprocessed.
- Failure to process one item will not abort the processing of the rest. Failed items can be individually sent to the SDK as new messages for retry.
- Messages that need to perform multiple actions may require different approaches based on what kind of data is being processed.
- In this scenario, each operation should be idempotent, so if the message is retried or replayed, the outcome will not change.
Sample code and configuration
A message handler must implement
`MessageHandler`
`@HandlerInfo`
`@Component`
1@Slf4j
2@Component
3@HandlerInfo(
4 name = "ProductUpsert",
5 route = "commercetools.connect.product.upsert",
6 description = "Updates products in Fluent.",
7 props = {@HandlerProp(key = "query", value = "ct-product.graphql")}
8)
9public class ProductUpsertHandler implements MessageHandler {
10
11 @Override
12 public void processMessage(final MessageHandlerContext context) throws UnprocessableMessageException, HandlerException {
13 final Optional<MyPayload> payload = context.getMessagePayload(MyPayload.class);
14 if (payload.isPresent()) {
15 //logic to process a message
16 final Event productEvent = processProduct(payload);
17 //send product to Fluent
18 context.ofFluentContext().sendEvent(productEvent);
19 }
20 throw new UnprocessableMessageException("Unable to get message payload");
21 }
22}
Language: java
Name: Example
Description:
[Warning: empty required content area]Every message handler must have a
`route`
`route`
`route-mapping`
`route`
- route: Contains the build time configuration of a handler - always required. a. route: Name of the route. b. handler-name**: ID of the handler, for example, .c.props: Additional and optional configuration the handler can use when processing a message.
`ProductUpsert`
- route-mapping: Used to determine a route to a handler when receiving external messages. This is usually used when receiving messages from external systems. a. route: Name of the route - it must match the name defined above on the route.b.props: Any configuration necessary to drive the logic to determine a route based on the information received in a message.
When a message is routed to a message handler, the SDK will look at the message name and match it against a route name. Whenever a match is found, it takes the handler from that route and finds the class/bean implementing it to process the message.
In the route-mapping example below,
`props`
`inclusion-filter`
`commercetools.connect.product.upsert`
`ResourceUpdated`
`commercetools.connect.product.upsert`
1# Route mappings are used to determine a route to a handler when receiving external messages
2route-mapping:
3- route: "commercetools.connect.product.upsert"
4 props:
5 # any number of properties that can be used to determine if this route is adequate for the message received
6 name: "product"
7 inclusion-filter:
8 - "ResourceUpdated"
9 - "ResourceCreated"
10
11# Handler routes
12routes:
13message:
14 - route: "commercetools.connect.product.upsert"
15 handler-name: "ProductUpsert"
16 props:
17 # any number of properties can drive or assist the handler to process a message
18 query: "ct-product.graphql"
Language: java
Name: Example
Description:
[Warning: empty required content area]Job Handlers
When it is impossible to receive input/requests from external systems to the SDK (Fluent or another system), jobs can poll these systems to retrieve and process data. Note that full extracts should be avoided in favor of delta extracts. See Fluent Connector Configuration for more details on configuring and running a job.
All jobs in the Connect SDK share the same pre- and post-execution steps:
- A job cannot have multiple instances running simultaneously.
- Retrieves the job configuration from Fluent settings.
- Calculate a date range (from-to datetime) the job may use during its execution.
- Execute the actual job.
- Persist on the calculated date range as part of the job configuration for the subsequent execution.
- Connect SDK runs at UTC.
1@Slf4j
2@Component
3@HandlerInfo(
4 name = "BatchInventorySyncJob",
5 route = "batch-inventory-sync",
6 description = "Sync inventory details from Fluent to commerceTools",
7 props = {@HandlerProp(key = "page-size", value = "500")}
8)
9public class InventoryJobHandler implements JobHandler {
10
11 @Override
12 public void run(final JobHandlerContext jobHandlerContext) throws JobExecutionException {
13 final JobProperties settings = jobHandlerContext.getJobSettings();
14 final LocalDateTime start = settings.getStart();
15 final LocalDateTime end = settings.getEnd();
16
17 //get a runtime configuration propperty
18 final String myProp = settings.getProp("my-property", StringUtils.EMPTY);
19
20 //get a build time (route) configuration property
21 final int pageSize = jobHandlerContext.getRouteSettings().getProp("page-size", 100);
22
23 //your job logic next...
24
25 //poll external system
26 //either add each item into the queue for async processing
27 //jobHandlerContext.getPublishService().publishMessage(sdkMessage);
28 //or process each received item here
29 }
30}
Language: java
Name: Example
Description:
[Warning: empty required content area]Every job needs to have a route, and this is used to find the class that executes the job given a job execution request. The execution request will include the name of the job
`fc.connect.batch-inventory-sync`
1 routes:
2 job:
3 - route: "fc.connect.batch-inventory-sync"
4 handler-name: "BatchInventorySyncJob"
5 props:
6 page-size: 500
Language: java
Name: Example
Description:
[Warning: empty required content area]Configuration Handlers
Configuration handlers can be used when there is a need to react to configuration changes. The example below modifies the application log level based on the configured log level. Note that there is a validation step where one can determine if this handler should be executed. In this example, it is simply comparing the current log level with the new log level. When they are different, it returns
`true`
1@Slf4j
2@Component
3@HandlerInfo(name = "log-level-update", route="")
4public class LogLevelConfigurationHandler implements ConfigurationHandler {
5 private static final String DEFAULT_LEVEL = "INFO";
6 private final FluentConnectConfiguration connectConfiguration;
7
8 private Level currentLevel;
9
10 @Autowired
11 public LogLevelConfigurationHandler(final FluentConnectConfiguration connectConfiguration) {
12 this.connectConfiguration = connectConfiguration;
13 }
14
15 @Override
16 public boolean validate(@NotNull final Configuration configuration) {
17 final var newLevel = getLogLevel(configuration);
18
19 final var shouldExecute = currentLevel != null && newLevel.levelInt != currentLevel.levelInt;
20 currentLevel = newLevel;
21 return shouldExecute;
22 }
23
24 @Override
25 public void run(final @NotNull Configuration configuration) {
26 final LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
27 final Logger logger = loggerContext.getLogger("com.fluentcommerce.connect");
28 if (logger != null) {
29 logger.setLevel(getLogLevel(configuration));
30 log.info("Log level for com.fluentcommerce.connect set to [{}].", logger.getLevel());
31
32 } else {
33 log.error("Unable to detect logger context: com.fluentcommerce.connect.");
34 }
35 }
36
37 private Level getLogLevel(@NotNull final Configuration configuration) {
38 final String key = connectConfiguration.getConfiguration().getLogLevelKey();
39 final ConfigurationTuple newValue = configuration.getProperties().getOrDefault(key, ConfigurationTuple.ofReadOnly(key, DEFAULT_LEVEL));
40
41 return StringUtils.isNotBlank(newValue.getValue()) ? Level.toLevel(newValue.getValue()) : Level.INFO;
42 }
43}
Language: java
Name: Example
Description:
[Warning: empty required content area]Receiving messages from external systems
There are several ways to receive messages or requests from external systems.
HTTP Requests
Authenticating HTTP Requests
The SDK provides a library bundling spring security with helpful authentication filters. This library provides a default security configuration and doesn’t limit or restrict additional custom configuration of spring security should it be required.
1<dependency>
2 <groupId>com.fluentcommerce.connect</groupId>
3 <artifactId>connect-sdk-core-web-security</artifactId>
4</dependency>
Language: java
Name: Example
Description:
[Warning: empty required content area]By adding the library above and the configuration below, the SDK will only allow unauthenticated traffic to the URLs specified under
`publicEndpoints`
1security:
2 web:
3 publicEndpoints:
4 - "/actuator/health"
5 - "/api/docs"
6 - "/swagger-ui.html"
7 - "/api/v1/fluent-connect/webhook"
8 customAuthFilters:
9 - "com.fluentcommerce.connect.core.web.security.filters.ApiKeyAuthenticationFilter"
10 - "com.fluentcommerce.connect.core.web.security.filters.FluentOauthTokenAuthenticationFilter"
Language: java
Name: Example
Description:
[Warning: empty required content area]!!! Note that the Fluent webhook API must remain public without authentication.
**Available SDK Filters: **
Filter name | Class name | Required headers |
API Key authentication |
|
|
Fluent OAuth token authentication |
|
|
To customize Spring security to your needs, create a component implementing
`HttpSecurityBuildHook`
The authentication filter structure mostly follows Spring Security standards with an additional annotation. Let’s dive into the filter above to explain how to create your own one.
1@SdkAuthenticationFilterInfo(provider = "api-key-provider", converter = "api-key-converter")
2public class ApiKeyAuthenticationFilter extends SdkAuthenticationFilter {
3
4 public ApiKeyAuthenticationFilter(
5 @NotNull final AuthenticationManager authenticationManager,
6 @NotNull final AuthenticationConverter authenticationConverter) {
7
8 super(authenticationManager, authenticationConverter);
9 }
10}
Language: java
Name: Example
Description:
[Warning: empty required content area]An SDK authentication filter shouldn’t be a spring-a-bean; otherwise, it will be automatically available to the spring context, and it may clash with how the SDK expects filters to work. The annotation
`SdkAuthenticationFilter`
`customAuthFilters`
A converter should always implement
`AuthenticationConverter`
`Authentication`
`fluent.account`
`Authorization ApiKey <the_key>`
1@Slf4j
2@Component("api-key-converter")
3public class ApiKeyDetailAuthenticationConverter implements AuthenticationConverter {
4 protected static final String FLUENT_ACCOUNT_HEADER = "fluent.account";
5 private static final String SECURITY_TOKEN_GROUP = "securityToken";
6
7 private static final String BEARER_TOKEN_PATTERN = "ApiKey\\s(?<securityToken>.*)";
8 @Value("${fluent-connect.mock.accountId:}")
9 protected String fluentAccount;
10
11 @Override
12 public Authentication convert(final HttpServletRequest request) {
13 if (log.isTraceEnabled()) {
14 log.trace("Authentication filter running for request URL: {}", request.getRequestURL().toString());
15 }
16
17 try {
18 final var authApiKey = resolveSecurityTokenFromHttpRequest(request);
19 return new ApiKeyDetailAuthentication(authApiKey, getAccountFromRequest(request));
20 } catch (final BadCredentialsException e) {
21 //returning null allows the spring security to try the next security filter
22 return null;
23 }
24 }
25 ...
26}
Language: java
Name: Example
Description:
[Warning: empty required content area]Once the Authorization detail is defined in the context, the provider is called to validate if the credentials extracted can be authenticated and return the Authentication if successful or null if not.
1@Slf4j
2@Component("api-key-provider")
3public class ApiKeyDetailAuthenticationProvider implements AuthenticationProvider {
4 private final APIKeyService apiKeyService;
5 private final ContextBuilder contextBuilder;
6
7 public ApiKeyDetailAuthenticationProvider(final APIKeyService apiKeyService, final ContextBuilder contextBuilder) {
8 this.apiKeyService = apiKeyService;
9 this.contextBuilder = contextBuilder;
10 }
11
12 @Override
13 public Authentication authenticate(final Authentication authentication) throws AuthenticationException {
14 final var authApiKey = (String) authentication.getCredentials();
15 final var accountId = (String) authentication.getPrincipal();
16
17 final var keyDetails = validateApiKeyDetails(authApiKey, accountId);
18 ((ApiKeyDetailAuthentication) authentication).init(keyDetails, contextBuilder);
19 authentication.setAuthenticated(true);
20 return authentication;
21 }
22 ...
23}
Language: java
Name: Example
Description:
[Warning: empty required content area]Note that the authentication provider will not raise any exceptions if the authentication fails. This is important to allow the other filters to execute and have a chance to authenticate the request. If none of the filters configured can authenticate the request, Spring will automatically deny the request.
Creating a new controller/endpoint
The example below illustrates how a custom controller can be created and utilize the security module to handle the security/authentication and get an SDK context based on the authenticated account/user/token.
`SdkSecurityService`
1@Slf4j
2@RestController
3@RequestMapping("/sample")
4public class SampleController {
5 /**
6 * SDK service that has access to the spring security context and returns a SDK context based on the authenticated/authorised credentials
7 */
8 private final SdkSecurityService securityService;
9
10 @Autowired
11 public SampleController(final SdkSecurityService securityService) {
12 this.securityService = securityService;
13 }
14
15 @RequestMapping(method = RequestMethod.POST, consumes = "application/json")
16 public ResponseEntity<String> receive(@RequestBody final SampleRequest sampleRequest) {
17 return securityService.getAuthenticatedContext()
18 .map(context -> processRequest(context, sampleRequest))
19 .orElseThrow(() -> new BadCredentialsException("Unable to authenticate request"));
20 }
21
22 private ResponseEntity<String> processRequest(final HandlerContext handlerContext, final SampleRequest sampleRequest) {
23 //TODO your logic / call a service
24 return ResponseEntity.ok().build();
25 }
26
27 @Value
28 public static class SampleRequest {
29 String id;
30 String name;
31 //...
32 }
33}
Language: java
Name: Example
Description:
[Warning: empty required content area]External Listeners (SQS, Kafka, etc)
The SDK has its internal listeners, and although it is possible to extend it, this topic will only cover external listeners. An external listener receives messages from external systems to the SDK, for example, listening to orders published by an e-commerce system. To create an external listener subscribing to a topic or queue, there is a specific class to extend, as the example below illustrates. When defining such a class, it is important to specify what kind of payload this listener will work with at the Generic type and the
`@ListenerInfo`
1@Slf4j
2@Component
3@ListenerInfo(id = "my_queue_id", messageClass = SampleQueueListener.MyMessagePayload.class)
4public class SampleQueueListener extends ExternalListener<SampleQueueListener.MyMessagePayload> {
5
6 @Override
7 public Optional<ConnectSDKMessage> receiveMessage(final ExternalMessage message) throws UnprocessableMessageException {
8 final MyMessagePayload messagePayload = getMessageContent(message);
9
10 final Optional<AccountReference> accountReference = getFluentAccountReference(Collections.singletonMap("website-id", messagePayload.websiteId));
11 if (accountReference.isPresent()) {
12 return Optional.ofNullable(getConnectSDKMessage(messagePayload, accountReference.get()));
13 }
14
15 log.error("Unable to retrieve a Fluent account for the message received [{}].", messagePayload.getId());
16 throw new UnprocessableMessageException("Unable to retrieve a Fluent account for the message received.");
17 }
18
19 /**
20 * Extract a subset of the payload, pre-process and enrich it, convert into a different format or simply use it as is.
21 */
22 private ConnectSDKMessage getConnectSDKMessage(final MyMessagePayload messagePayload,
23 final AccountReference accountReference) throws UnprocessableMessageException {
24 //The example below is a pass-through
25 try {
26 return ConnectSDKMessage.builder()
27 .id(UUID.randomUUID().toString())
28 .name(getMessageRoute(messagePayload))
29 .accountId(accountReference.getAccountId())
30 .retailerId(accountReference.getRetailerId())
31 .payload(ConnectSDKMessage.toJson(messagePayload))
32 .build();
33 } catch (JsonProcessingException e) {
34 throw new UnprocessableMessageException(e);
35 }
36 }
37
38 /**
39 * Apply your logic to determine the route name
40 */
41 private String getMessageRoute(final MyMessagePayload messagePayload) {
42 return "my-route";
43 }
44
45 @Value
46 public static class MyMessagePayload {
47 String id;
48 String websiteId;
49 //...
50 }
51}
Language: java
Name: Example
Description:
[Warning: empty required content area]Each listener must have its configuration section; the snippet below only covers the essentials. See Fluent Connector Configuration for all configuration options and detailed information about each property.
1my_queue_id:
2 name: "SQS_CONNECTOR"
3 fluent-account: "DEV_ACCOUNT"
4 type: sqs
Language: java
Name: Example
Description:
[Warning: empty required content area]Overriding Default Implementations
Secrets Manager
The SDK allows easy extension of its credential service by adding a new implementation of a secret manager service by simply implementing a SecretManagerService interface and configuring the new service with the SDK. This service must be annotated with
`@CredentialManagerInfo`
Service implementation sample
1/**
2 * Implementation of a secrets manager service using AWS Secrets Manager
3 */
4@Slf4j
5@Service
6@CredentialManagerInfo(id = "aws-secrets-manager")
7public class AWSCredentialService implements SecretManagerService {
8 public static final String VERSION_STAGE = "AWSCURRENT";
9 private final AWSSecretsManager secretsManager;
10
11 @Autowired
12 public AWSCredentialService(final AWSSecretsManager secretsManager) {
13 this.secretsManager = secretsManager;
14 }
15
16 @Override
17 public Optional<String> getSecret(@NotNull final String key) {
18 try {
19 final GetSecretValueResult secretValue = secretsManager.getSecretValue(new GetSecretValueRequest().withSecretId(key).withVersionStage(VERSION_STAGE));
20 return Optional.ofNullable(secretValue.getSecretString());
21 } catch (final ResourceNotFoundException e) {
22 log.debug("Unable to fetch secret [{}]: [{}]", key, e.getMessage());
23 }
24 return Optional.empty();
25 }
26}
Language: java
Name: Example
Description:
[Warning: empty required content area]SDK Configuration
1fluent-connect:
2 credential:
3 credential-manager: "aws-secrets-manager"
Language: java
Name: Example
Description:
[Warning: empty required content area]Job Scheduler
The main scheduling service of the SDK is "DefaultJobSchedulerService", and it relies on two key service implementations:
- JobSchedulerService: A specialized scheduler service that can schedule and execute a job. The actual execution is delegated to .
`JobExecutionService`
- JobExecutionService: A service that executes a job and handles pre-post execution steps.
Before a job can be executed, it is necessary to determine a job handler based on the job name. For this, use
`JobRouterService`
To add a new job scheduler to the SDK, one must implement
`JobSchedulerService`
`JobExecutionService`
`JobSchedulerService`
`JobSchedulerInfo`
1 job-scheduler:
2 job-scheduler-type: "new-scheduler"
Language: java
Name: Example
Description:
[Warning: empty required content area]Product Availability
DefaultFulfilmentOptionService does the default implementation with a spring bean named
`defaultFulfilmentOptionService`
Below is an empty example of an enrichment step that could add location information to a product availability response—for example, more details of the location of a click-and-collect fulfilment option.
1@Component
2@Slf4j
3@FulfilmentOptionTransformationStep(id = "pickup-location-enrichment", priority = 1)
4public class PickupLocationTransformation implements FulfilmentOptionTransformationChain<FulfilmentRequest, FulfilmentResponse> {
5
6 @Override
7 public FulfilmentResponse transform(final FulfilmentRequest request, final FulfilmentResponse response, final FluentContext context) {
8 //your enrichment logic here
9 }
10}
Language: java
Name: Example
Description:
[Warning: empty required content area]It is up to each enrichment step to decide whether it can use a cache or always perform a live request to an external system to obtain the enrichment details. This will always be specific to the nature of the enrichment step and how volatile the data is.
Should you modify how the graphQL query is built in the default implementation, extend
`DefaultFulfilmentOptionService`
`defaultFulfilmentOptionService`
`@Primary`
If the standard request or response payload doesn't fit the requirements," Please note this is a simplified way to run batch operations and not an actual scheduler. All the module above provides a way to receive a web request to run a batch operation. As the SDK gets the request, it adds it to a batch queue, which is then picked up by one of the SDK nodes (containers) running and executes the batch operation. The SDK prevents the same job from running simultaneously, but it is possible to run different jobs simultaneously. This module relies on external systems to trigger the SDK to run jobs whenever necessary; it can only react to external inputs.
If Kafka is used instead of SQS, it will further limit the job scheduler's capabilities. With SQS, the connect SDK can accept new job requests while executing other jobs. This is not possible with Kafka, as it can only accept new requests once the current job in execution is complete. This can introduce problems as the SDK may think that the new queued job request is stale if the current job execution runs for an extended period causing the queued job request to be discarded.
Fluent Configuration Service
The SDK has two types of configurations: build time and run time. Build time configuration is managed by
`ApplicationConfigurationService`
`ConfigurationService`
`FluentConfigurationService`
`fluent-settings`
1 configuration:
2 settingsManager: "custom-configuration-service"
Language: java
Name: Example
Description:
[Warning: empty required content area]Handlers
- Handlers should be kept small and have a single purpose.
- Handlers may receive a message that contains one or more items to be processed. When receiving multiple items, choosing the error recovery strategies is important. For example, what is the accepted behavior if 1 in 10 items fails - would all 10 be reprocessed, or would a single new message be generated for reprocessing and the original message completed successfully?
- It is also important to be aware of the message size limitations of your selected listener. This may impact the design of how a handler processes a message.
- When a handler exits with a , the SDK will automatically let the message be retried if supported by the listener selected.
`HandlerRetryException`
- Any other kind of exception will cause the message to move to the DLQ (dead letter queue).
- See more about the retry mechanism and DLQ in Fluent Connector Configuration | Listener Configuration
Retry example:
1@Slf4j
2@Component
3@HandlerInfo(
4 name = "retryExampleHandler",
5 route = "retryExample",
6 description = "Retry example",
7 props = {@HandlerProp(key = "some-key",value = "a-value")}
8)
9public class RetryExampleHandler implements MessageHandler {
10 @Override
11 public void processMessage(@NotNull final MessageHandlerContext context) {
12 try {
13 service.doSomeLogic()
14 } catch (SomeCustomException ex) {
15 throw new HandlerRetryException(e); //Let the message be retried when getting SomeCustomException
16 }
17 //If a handler throws any other kind of exeception, the message will be move to DLQ
18
19 }
20}
Language: java
Name: Example
Description:
[Warning: empty required content area]There are two ways of overbidding an existing handler or a route: code or configuration.
Let's assume the handler below as the base example to describe the override options.
1@Slf4j
2@Component
3@HandlerInfo(
4 name = "defaultNotificationHandler",
5 route = "notification",
6 description = "Handles notifications and publishes to their designated route",
7 props = {@HandlerProp(key = "some-key",value = "a-value")}
8)
9public class DefaultNotificationHandler implements NotificationHandler {}
Language: java
Name: Example
Description:
[Warning: empty required content area]Overriding by code
Only pick this option if you wish to override the default logic of a handler. Opt for overriding by configuration if the intent is to re-route or change the route's properties. With this option, it is possible to override it in 2 different ways:
- re-routing to a new handler - In this option both old () and new (
`defaultNotificationHandler`
) handlers are available to the SDK as valid handlers. This option uses the new handler to process messages destined to route`customNotificationHandler`
instead of the old one.`notification`
1@Slf4j
2@Component
3@HandlerInfo(
4 name = "customNotificationHandler",
5 route = "notification",
6 priority = 100,
7 description = "A custom handler for notifications"
8)
9public class CustomNotificationHandler implements NotificationHandler {}
Language: java
Name: Example
Description:
[Warning: empty required content area]- override the handler - This option will disable the old handler (defaultNotificationHandler) and make the new one take its place. Messages destined to route will be processed by the new handler.
`notification`
1@Slf4j
2@Component
3@HandlerInfo(
4 name = "defaultNotificationHandler",
5 route = "notification",
6 priority = 100,
7 description = "A custom handler for notifications"
8)
9public class CustomNotificationHandler implements NotificationHandler {}
Language: java
Name: Example
Description:
[Warning: empty required content area]Overriding by configuration
Use this option to re-route a message to a different handler, create a new route using an existing handler or modify the properties of an existing handler. Please note that when the properties
`props`
1 routes:
2 notification:
3 # Scenario 1: Adding a new configuration to add a new route and re-use an existing handler. Props is not specified and defaults are used
4 - route: : "custom-notification"
5 handler-name: "defaultNotificationHandler"
6
7 # Scenario 2: Modifying the configuration of an existing route to override the properties
8 - route: : "notification"
9 handler-name: "defaultNotificationHandler"
10 props:
11 some-key: "some-other-value"
12 another-key: "yet-another-value"
13
14 # Scenario 3: Modify an existing route to use a different existing handler
15 - route: : "notification"
16 handler-name: "anotherNotificationHandler"
17 props:
18 some-key: "some-other-value"
19 another-key: "yet-another-value"
Language: java
Name: Example
Description:
[Warning: empty required content area]