Changed on:
31 Jan 2024
A Fluent Connector project typically has the structure as defined below :
com.fluentcommerce.connect.custom
Any customizations/extensions can either be done under the custom folder or as a separate module (jar) altogether.
Regardless of which way one chooses to extend it, the key message here is that any custom code should be under the
`com.fluentcommerce.connect`
`com.fluentcommerce.connect`
`@HandlerInfo(name = "CustomCategoryUpsert", priority = 100)`
Message handlers are dedicated to processing messages pushed to the message queue of the commercetools-connector.
`UnprocessableMessageException,`
`HandlerException`
A message handler has to extend
`MessageHandler`
`@HandlerInfo`
`@Component`
1@Slf4j
2@Component
3@HandlerInfo(name = "ProductUpsert")
4public class ProductUpsertHandler extends MessageHandler {
5
6 @Override
7 public void processMessage(final MessageHandlerContext context) throws UnprocessableMessageException, HandlerException {
8 final Optional<MyPayload> payload = context.getMessagePayload(MyPayload.class);
9 if (payload.isPresent()) {
10 //logic to process a message
11 final Event productEvent = processProduct(payload);
12 //send product to Fluent
13 context.ofFluentContext().sendEvent(productEvent);
14 }
15 throw new UnprocessableMessageException("Unable to get message payload");
16 }
17}
Language: java
Name: Example
Description:
[Warning: empty required content area]Every message handler must have a
`route`
`route-mapping`
`route`
route: Contains the build time configuration of a handler - always required.
`ProductUpsert`
route-mapping: Used to determine a route to a handler when receiving external messages. This is usually used when receiving messages from external systems.
When a message is routed to a message handler, the commercetools-connector will look at the message name and match it against a route name. Whenever a match is found, it takes the handler from that route and finds the class/bean implementing it to have the message processed by it.
In the route-mapping example below,
`props`
`inclusion-filter,`
`commercetools.connect.product.upsert`
`ResourceUpdated`
`commercetools.connect.product.upsert`
1 # Route mappings are used to determine a route to a handler when receiving external messages
2 route-mapping:
3 - route: "commercetools.connect.product.upsert"
4 props:
5 # any number of properties that can be used to determine if this route is adequate for the message received
6 name: "product"
7 inclusion-filter:
8 - "ResourceUpdated"
9 - "ResourceCreated"
10
11 # Handler routes
12 routes:
13 - name: "commercetools.connect.product.upsert"
14 handler: "ProductUpsert"
15 props:
16 # any number of properties can drive or assit the handler to process a message
17 query: "ct-product.graphql"
Language: java
Name: Example
Description:
[Warning: empty required content area]When it is not possible to receive input/requests from systems (whether it’s Fluent OMS or other systems), jobs can be used to poll these systems, retrieve data, and have it processed. Note that full extracts should be avoided in favour of delta extracts. See commercetools Connector Configuration for more details on how to configure and run a job.
All jobs in the commercetools-connector share the same pre-and post-execution steps:
1@Slf4j
2@Component
3@HandlerInfo(name = "BatchInventorySyncJob")
4public class InventoryJobHandler extends JobHandler {
5
6 @Override
7 public void run(final JobHandlerContext jobHandlerContext) throws JobExecutionException {
8 final JobProperties settings = jobHandlerContext.getJobSettings();
9 final LocalDateTime start = settings.getStart();
10 final LocalDateTime end = settings.getEnd();
11
12 //get a runtime configuration propperty
13 final String myProp = settings.getProp("my-property", StringUtils.EMPTY);
14
15 //get a build time (route) configuration property
16 final int pageSize = jobHandlerContext.getRouteSettings().getProp("page-size", 100);
17
18 //your job logic next...
19
20 //poll external system
21 //either add each item into the queue for async processing
22 //jobHandlerContext.getPublishService().publishMessage(commercetools-connectorMessage);
23 //or process each received item here
24 }
25}
Language: java
Name: Example
Description:
[Warning: empty required content area]Every job needs to have a route, and this is used to find the class that executes the job from a job execution request. The execution request will contain the Fluent account and retailer it should run on and the name of the job
`fc.connect.batch-inventory-sync`
1routes:
2 - name: "fc.connect.batch-inventory-sync"
3 handler: "BatchInventorySyncJob"
4 props:
5 page-size: 500
Language: java
Name: Example
Description:
[Warning: empty required content area]Configuration handlers can be used when there is a need to react to configuration changes. The example below modifies the application log level based on the configured log level. Note that there is a validation step where one can determine if this handler should be executed or not, and in this example, it is just simply comparing the current log level with the new log level. When they are different, it returns
`true,`
1@Slf4j
2@Component
3@HandlerInfo(name = "log-level-update")
4public class LogLevelConfigurationHandler extends ConfigurationHandler {
5 private static final String DEFAULT_LEVEL = "INFO";
6 private final FluentConnectConfiguration connectConfiguration;
7
8 private Level currentLevel;
9
10 @Autowired
11 public LogLevelConfigurationHandler(final FluentConnectConfiguration connectConfiguration) {
12 this.connectConfiguration = connectConfiguration;
13 }
14
15 @Override
16 public boolean validate(@NotNull final Configuration configuration) {
17 final var newLevel = getLogLevel(configuration);
18
19 final var shouldExecute = currentLevel != null && newLevel.levelInt != currentLevel.levelInt;
20 currentLevel = newLevel;
21 return shouldExecute;
22 }
23
24 @Override
25 public void run(final @NotNull Configuration configuration) {
26 final LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
27 final Logger logger = loggerContext.getLogger("com.fluentcommerce.connect");
28 if (logger != null) {
29 logger.setLevel(getLogLevel(configuration));
30 log.info("Log level for com.fluentcommerce.connect set to [{}].", logger.getLevel());
31
32 } else {
33 log.error("Unable to detect logger context: com.fluentcommerce.connect.");
34 }
35 }
36
37 private Level getLogLevel(@NotNull final Configuration configuration) {
38 final String key = connectConfiguration.getConfiguration().getLogLevelKey();
39 final ConfigurationTuple newValue = configuration.getProperties().getOrDefault(key, ConfigurationTuple.ofReadOnly(key, DEFAULT_LEVEL));
40
41 return StringUtils.isNotBlank(newValue.getValue()) ? Level.toLevel(newValue.getValue()) : Level.INFO;
42 }
43}
Language: java
Name: Example
Description:
[Warning: empty required content area]There are a couple of different ways to receive messages or requests from external systems.
The commercetools-connector comes with spring web-enabled, and opening up a new endpoint is as easy as creating a new rest controller. Please note that security is not enabled by default on the commercetools-connector, but it is possible to add the spring security library. If one does decide to add web security, check if your setup impacts the existing APIs. There is an open API page listing all existing endpoints - /api/docs.
1@Slf4j
2@RestController
3@RequestMapping("/sample")
4public class SampleController {
5
6 private final MyService myService;
7
8 @Autowired
9 public SampleController(final MyService myService) {
10 this.myService = myService;
11 }
12
13 @RequestMapping(method = RequestMethod.POST, consumes = "application/json")
14 public ResponseEntity<String> receive(@RequestBody final SampleRequest sampleRequest) {
15 try {
16 myService.process(sampleRequest);
17 } catch (final Exception e) {
18 log.error("Error receiving request", e);
19 return ResponseEntity.unprocessableEntity().build();
20 }
21 return ResponseEntity.ok().build();
22 }
23
24 @Value
25 public static class SampleRequest{
26 String id;
27 String name;
28 //...
29 }
30}
Language: java
Name: Example
Description:
[Warning: empty required content area]The commercetools-connector has its internal listeners, and although it is possible to extend it, this topic will only cover external listeners. An external listener receives messages from external systems to the commercetools-connector, for example, listing orders published by an e-commerce system. To create an external listener that subscribes to either a topic or queue, there is a specific class to extend, as the example below illustrates. When defining such a class, it is important to specify what kind of payload this listener will be working with at both the Generic type and the
`@ListenerInfo`
1@Slf4j
2@Component
3@ListenerInfo(id = "my_queue_id", messageClass = SampleQueueListener.MyMessagePayload.class)
4public class SampleQueueListener extends ExternalListener<SampleQueueListener.MyMessagePayload> {
5
6 @Override
7 public Optional<Connectcommercetools-connectorMessage> receiveMessage(final ExternalMessage message) throws UnprocessableMessageException {
8 final MyMessagePayload messagePayload = getMessageContent(message);
9
10 final Optional<AccountReference> accountReference = getFluentAccountReference(Collections.singletonMap("website-id", messagePayload.websiteId));
11 if (accountReference.isPresent()) {
12 return Optional.ofNullable(getConnectcommercetools-connectorMessage(messagePayload, accountReference.get()));
13 }
14
15 log.error("Unable to retrieve a Fluent account for the message received [{}].", messagePayload.getId());
16 throw new UnprocessableMessageException("Unable to retrieve a Fluent account for the message received.");
17 }
18
19 /**
20 * Extract a subset of the payload, pre-process and enrich it, convert into a different format or simply use it as is.
21 */
22 private Connectcommercetools-connectorMessage getConnectcommercetools-connectorMessage(final MyMessagePayload messagePayload,
23 final AccountReference accountReference) throws UnprocessableMessageException {
24 //The example below is a pass-through
25 try {
26 return Connectcommercetools-connectorMessage.builder()
27 .id(UUID.randomUUID())
28 .name(getMessageRoute(messagePayload))
29 .accountId(accountReference.getAccountId())
30 .retailerId(accountReference.getRetailerId())
31 .payload(Connectcommercetools-connectorMessage.toJson(messagePayload))
32 .build();
33 } catch (JsonProcessingException e) {
34 throw new UnprocessableMessageException(e);
35 }
36 }
37
38 /**
39 * Apply your logic to determine the route name
40 */
41 private String getMessageRoute(final MyMessagePayload messagePayload) {
42 return "my-route";
43 }
44
45 @Value
46 public static class MyMessagePayload {
47 String id;
48 String websiteId;
49 //...
50 }
51}
Language: java
Name: Example
Description:
[Warning: empty required content area]Each listener must have its own configuration section and the snippet below only covers the essentials. See Commercetools Connector Configuration for all configuration options and detailed information about each property.
1my_queue_id:
2 name: "SQS_CONNECTOR"
3 fluent-account: "DEV_ACCOUNT"
4 type: sqs
Language: java
Name: Example
Description:
[Warning: empty required content area]Copyright © 2024 Fluent Retail Pty Ltd (trading as Fluent Commerce). All rights reserved. No materials on this docs.fluentcommerce.com site may be used in any way and/or for any purpose without prior written authorisation from Fluent Commerce. Current customers and partners shall use these materials strictly in accordance with the terms and conditions of their written agreements with Fluent Commerce or its affiliates.