commercetools Connector Extend (Customisation) Guidelines
Changed on:
31 Jan 2024
Overview
A Fluent Connector project typically has the structure as defined below :
com.fluentcommerce.connect.custom
Any customizations/extensions can either be done under the custom folder or as a separate module (jar) altogether.
Regardless of which way one chooses to extend it, the key message here is that any custom code should be under the
`com.fluentcommerce.connect`
`com.fluentcommerce.connect`
During the server start-up, the commercetools-connector will load all handlers found in the spring context and bind them to the correct service they belong to. As part of this binding process, the commercetools-connector also determines which handler has precedence over others of the same name. In other words, it is possible to override existing handlers by assigning a higher priority to the custom handler. By default, all handlers will have a priority of zero, and the commercetools-connector understands the highest priority as the higher number. For example, priority = 100 takes precedence over priority = 1.
`@HandlerInfo(name = "CustomCategoryUpsert", priority = 100)`
Detailed Technical Description
Message Handlers
Message handlers are dedicated to processing messages pushed to the message queue of the commercetools-connector.
Error Handling
- For errors that a retry won't make a difference, exit the method with a and the message will be forwarded to the DLQ (dead letter queue).
`UnprocessableMessageException,`
- For errors that should have the message retried, exit the method with a . Note that each listener (SQS vs Kafka, for example) has a different implementation of retry.
`HandlerException`
- If no exception is thrown, the message is removed from the queue, and it is assumed that the processing was done and no further action is required.
- For messages that have more than one item to be processed (e.g., a message has a list of products to be processed), either approach is viable.
- Any failure will stop the processing, and when the message is retried, all items are reprocessed.
- Failure to process one item will not abort the processing of the rest of the items. Failed items can be individually sent to the commercetools-connector as new messages for retry.
- Messages that need to perform more than one action may require different approaches based on what kind of data is being processed.
- In this scenario, each operation should be idempotent, so if the message is retried or replayed, the outcome will not change.
Sample code and configuration
A message handler has to extend
`MessageHandler`
`@HandlerInfo`
`@Component`
bind it to the Message Routing Service.
1@Slf4j
2@Component
3@HandlerInfo(name = "ProductUpsert")
4public class ProductUpsertHandler extends MessageHandler {
5
6 @Override
7 public void processMessage(final MessageHandlerContext context) throws UnprocessableMessageException, HandlerException {
8 final Optional<MyPayload> payload = context.getMessagePayload(MyPayload.class);
9 if (payload.isPresent()) {
10 //logic to process a message
11 final Event productEvent = processProduct(payload);
12 //send product to Fluent
13 context.ofFluentContext().sendEvent(productEvent);
14 }
15 throw new UnprocessableMessageException("Unable to get message payload");
16 }
17}
Language: java
Name: Example
Description:
[Warning: empty required content area]Every message handler must have a
`route`
`route-mapping`
`route`
route: Contains the build time configuration of a handler - always required.
- name: Name of the route.
- handler: ID of the handler, for example, .
`ProductUpsert`
- props: Additional and optional configuration the handler can use when processing a message.
route-mapping: Used to determine a route to a handler when receiving external messages. This is usually used when receiving messages from external systems.
- route: Name of the route - it must match the name defined above on the route name.
- props: Any configuration necessary to drive the logic to determine a route based on the information received in a message.
When a message is routed to a message handler, the commercetools-connector will look at the message name and match it against a route name. Whenever a match is found, it takes the handler from that route and finds the class/bean implementing it to have the message processed by it.
In the route-mapping example below,
`props`
`inclusion-filter,`
`commercetools.connect.product.upsert`
`ResourceUpdated`
`commercetools.connect.product.upsert`
1 # Route mappings are used to determine a route to a handler when receiving external messages
2 route-mapping:
3 - route: "commercetools.connect.product.upsert"
4 props:
5 # any number of properties that can be used to determine if this route is adequate for the message received
6 name: "product"
7 inclusion-filter:
8 - "ResourceUpdated"
9 - "ResourceCreated"
10
11 # Handler routes
12 routes:
13 - name: "commercetools.connect.product.upsert"
14 handler: "ProductUpsert"
15 props:
16 # any number of properties can drive or assit the handler to process a message
17 query: "ct-product.graphql"
Language: java
Name: Example
Description:
[Warning: empty required content area]Job Handlers
When it is not possible to receive input/requests from systems (whether it’s Fluent OMS or other systems), jobs can be used to poll these systems, retrieve data, and have it processed. Note that full extracts should be avoided in favour of delta extracts. See commercetools Connector Configuration for more details on how to configure and run a job.
All jobs in the commercetools-connector share the same pre-and post-execution steps:
- A job cannot have multiple instances running simultaneously.
- Retrieves the job configuration from Fluent settings.
- Calculates a date range (from-to datetime) the job may use during its execution.
- Executes the actual job.
- Persists the calculated date range as part of the job configuration so that it can be utilized on the next execution.
- commercetools-connector runs at UTC time.
1@Slf4j
2@Component
3@HandlerInfo(name = "BatchInventorySyncJob")
4public class InventoryJobHandler extends JobHandler {
5
6 @Override
7 public void run(final JobHandlerContext jobHandlerContext) throws JobExecutionException {
8 final JobProperties settings = jobHandlerContext.getJobSettings();
9 final LocalDateTime start = settings.getStart();
10 final LocalDateTime end = settings.getEnd();
11
12 //get a runtime configuration propperty
13 final String myProp = settings.getProp("my-property", StringUtils.EMPTY);
14
15 //get a build time (route) configuration property
16 final int pageSize = jobHandlerContext.getRouteSettings().getProp("page-size", 100);
17
18 //your job logic next...
19
20 //poll external system
21 //either add each item into the queue for async processing
22 //jobHandlerContext.getPublishService().publishMessage(commercetools-connectorMessage);
23 //or process each received item here
24 }
25}
Language: java
Name: Example
Description:
[Warning: empty required content area]Every job needs to have a route, and this is used to find the class that executes the job from a job execution request. The execution request will contain the Fluent account and retailer it should run on and the name of the job
`fc.connect.batch-inventory-sync`
1routes:
2 - name: "fc.connect.batch-inventory-sync"
3 handler: "BatchInventorySyncJob"
4 props:
5 page-size: 500
Language: java
Name: Example
Description:
[Warning: empty required content area]Configuration Handlers
Configuration handlers can be used when there is a need to react to configuration changes. The example below modifies the application log level based on the configured log level. Note that there is a validation step where one can determine if this handler should be executed or not, and in this example, it is just simply comparing the current log level with the new log level. When they are different, it returns
`true,`
1@Slf4j
2@Component
3@HandlerInfo(name = "log-level-update")
4public class LogLevelConfigurationHandler extends ConfigurationHandler {
5 private static final String DEFAULT_LEVEL = "INFO";
6 private final FluentConnectConfiguration connectConfiguration;
7
8 private Level currentLevel;
9
10 @Autowired
11 public LogLevelConfigurationHandler(final FluentConnectConfiguration connectConfiguration) {
12 this.connectConfiguration = connectConfiguration;
13 }
14
15 @Override
16 public boolean validate(@NotNull final Configuration configuration) {
17 final var newLevel = getLogLevel(configuration);
18
19 final var shouldExecute = currentLevel != null && newLevel.levelInt != currentLevel.levelInt;
20 currentLevel = newLevel;
21 return shouldExecute;
22 }
23
24 @Override
25 public void run(final @NotNull Configuration configuration) {
26 final LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
27 final Logger logger = loggerContext.getLogger("com.fluentcommerce.connect");
28 if (logger != null) {
29 logger.setLevel(getLogLevel(configuration));
30 log.info("Log level for com.fluentcommerce.connect set to [{}].", logger.getLevel());
31
32 } else {
33 log.error("Unable to detect logger context: com.fluentcommerce.connect.");
34 }
35 }
36
37 private Level getLogLevel(@NotNull final Configuration configuration) {
38 final String key = connectConfiguration.getConfiguration().getLogLevelKey();
39 final ConfigurationTuple newValue = configuration.getProperties().getOrDefault(key, ConfigurationTuple.ofReadOnly(key, DEFAULT_LEVEL));
40
41 return StringUtils.isNotBlank(newValue.getValue()) ? Level.toLevel(newValue.getValue()) : Level.INFO;
42 }
43}
Language: java
Name: Example
Description:
[Warning: empty required content area]Receiving messages from external systems
There are a couple of different ways to receive messages or requests from external systems.
HTTP Requests
The commercetools-connector comes with spring web-enabled, and opening up a new endpoint is as easy as creating a new rest controller. Please note that security is not enabled by default on the commercetools-connector, but it is possible to add the spring security library. If one does decide to add web security, check if your setup impacts the existing APIs. There is an open API page listing all existing endpoints - /api/docs.
1@Slf4j
2@RestController
3@RequestMapping("/sample")
4public class SampleController {
5
6 private final MyService myService;
7
8 @Autowired
9 public SampleController(final MyService myService) {
10 this.myService = myService;
11 }
12
13 @RequestMapping(method = RequestMethod.POST, consumes = "application/json")
14 public ResponseEntity<String> receive(@RequestBody final SampleRequest sampleRequest) {
15 try {
16 myService.process(sampleRequest);
17 } catch (final Exception e) {
18 log.error("Error receiving request", e);
19 return ResponseEntity.unprocessableEntity().build();
20 }
21 return ResponseEntity.ok().build();
22 }
23
24 @Value
25 public static class SampleRequest{
26 String id;
27 String name;
28 //...
29 }
30}
Language: java
Name: Example
Description:
[Warning: empty required content area]External Listeners (SQS, Kafka, etc)
The commercetools-connector has its internal listeners, and although it is possible to extend it, this topic will only cover external listeners. An external listener receives messages from external systems to the commercetools-connector, for example, listing orders published by an e-commerce system. To create an external listener that subscribes to either a topic or queue, there is a specific class to extend, as the example below illustrates. When defining such a class, it is important to specify what kind of payload this listener will be working with at both the Generic type and the
`@ListenerInfo`
1@Slf4j
2@Component
3@ListenerInfo(id = "my_queue_id", messageClass = SampleQueueListener.MyMessagePayload.class)
4public class SampleQueueListener extends ExternalListener<SampleQueueListener.MyMessagePayload> {
5
6 @Override
7 public Optional<Connectcommercetools-connectorMessage> receiveMessage(final ExternalMessage message) throws UnprocessableMessageException {
8 final MyMessagePayload messagePayload = getMessageContent(message);
9
10 final Optional<AccountReference> accountReference = getFluentAccountReference(Collections.singletonMap("website-id", messagePayload.websiteId));
11 if (accountReference.isPresent()) {
12 return Optional.ofNullable(getConnectcommercetools-connectorMessage(messagePayload, accountReference.get()));
13 }
14
15 log.error("Unable to retrieve a Fluent account for the message received [{}].", messagePayload.getId());
16 throw new UnprocessableMessageException("Unable to retrieve a Fluent account for the message received.");
17 }
18
19 /**
20 * Extract a subset of the payload, pre-process and enrich it, convert into a different format or simply use it as is.
21 */
22 private Connectcommercetools-connectorMessage getConnectcommercetools-connectorMessage(final MyMessagePayload messagePayload,
23 final AccountReference accountReference) throws UnprocessableMessageException {
24 //The example below is a pass-through
25 try {
26 return Connectcommercetools-connectorMessage.builder()
27 .id(UUID.randomUUID())
28 .name(getMessageRoute(messagePayload))
29 .accountId(accountReference.getAccountId())
30 .retailerId(accountReference.getRetailerId())
31 .payload(Connectcommercetools-connectorMessage.toJson(messagePayload))
32 .build();
33 } catch (JsonProcessingException e) {
34 throw new UnprocessableMessageException(e);
35 }
36 }
37
38 /**
39 * Apply your logic to determine the route name
40 */
41 private String getMessageRoute(final MyMessagePayload messagePayload) {
42 return "my-route";
43 }
44
45 @Value
46 public static class MyMessagePayload {
47 String id;
48 String websiteId;
49 //...
50 }
51}
Language: java
Name: Example
Description:
[Warning: empty required content area]Each listener must have its own configuration section and the snippet below only covers the essentials. See Commercetools Connector Configuration for all configuration options and detailed information about each property.
1my_queue_id:
2 name: "SQS_CONNECTOR"
3 fluent-account: "DEV_ACCOUNT"
4 type: sqs
Language: java
Name: Example
Description:
[Warning: empty required content area]