Aggregate Inventory Sync
Changed on:
31 Jan 2024
Overview
The Aggregate Sync will enable product syncing from Fluent to commercetools at an aggregate level. The master source will be Fluent for syncing.
Detailed Technical Description
Once the aggregate is created/updated on Fluent, we have to create/update the same at commercetools via channels settings.
In commercetools, go to `Merchant center > Project Settings> Channels`
. Add channel details.

Map aggregate CT channel key and fluent aggregate virtual catalogue under Fluent setting name `fc.connect.commercetools.batch.batch-inventory-sync`
as shown below.
1{
2 "previousEndDate": "2022-10-20T04:12:08.812570167",
3 "props": {
4 "ctChannelKey": "ct-fluent-channel",
5 "fluentAggregateVirtualCatalogueRef": "ATS:1"
6 },
7 "lastRun": {
8 "param": {
9 "start": "2022-10-19T04:12:08.491444877",
10 "end": "2022-10-20T04:12:08.812570167",
11 "props": {
12 "ctChannelKey": "ct-fluent-channel",
13 "fluentAggregateVirtualCatalogueRef": "ATS:1"
14 }
15 },
16 "jobStart": "2022-10-20T04:12:08.716938923",
17 "jobEnd": "2022-10-20T04:12:08.836448368",
18 "status": "SUCCESSFUL"
19 }
20}
The syncing will be done via a batch job handler on the scheduled frequency. Partners can map and change the frequency as per their requirements. The scheduler is an bridge , and the configuration is done through a cloud formation template. Implementation of the can vary depending on the approach. Out-of-the-box(OOTB) implementation uses Brudge for scheduled triggers.

Event Queue: The queue is responsible for holding the Fluent message to be processed further.
Batch Queue: The Batch queue will handle Bulk data sync. This request will be triggered from an external scheduler with the relevant sync batch job properties. This request is to update aggregate product with available quantity in commercetools.
Job Router: The Job router contains the name of the job and passes it to the job handler
Inventory Batch Handler: This receives the job name and data to fetch product from the fluent internal queue and update commercetools.
Example to extend the functionality
Partners can override existing handlers using the application-connector.yml and Handler class. To change the logic of the Upsert handler, first go to `commercetools-modules/commercetools-connector/src/main/resources/application-connector.yml`
replace the “handler” under routes > name > handler node.
1name: "commercetools.connect.inventory.upsert"
2 handler: "InventoryUpsertHandler"
3 props:
4 payload: "inventory"
5 query-inventories: "ct-inventories.graphql"
6 query-channel: "ct-channel.graphql"
7 mutation-create-inventory: "ct-createinventory.graphql"
8 mutation-update-inventory: "ct-updateinventory.graphql"
We can also add new handlers by just adding them in the above `application-connector.yml`
Create a new Handler that extends `MessageHandler`
and ensure that your package is under the main SDK package `com.fluentcommerce.connect`
Eg) : `com.fluentcommerce.connect.custom.commercetools.handler.message.<your-handler>.java`
. For more details on how to extend please refer to extend guidelines.
1@Slf4j
2@Component
3@HandlerInfo(name = "InventoryUpsertHandler", description = "Upsert Inventory at commercetools" )
4public class FluentInventoryUpsertHandler extends BaseMessageHandler {
5 private static final String Aggregate_Inventory = "inventory";
6 // write your logic
7}
Fluent Inventory Query
1query GetAggregateVirtualPositions($after: String, $first: Int, $catalogueRef: String!, $status: [String!], $updatedOnFrom: DateTime, $updatedOnTo: DateTime) {
2 virtualPositions(after: $after, first: $first, catalogue: {ref: $catalogueRef}, status: $status, updatedOn: {from: $updatedOnFrom, to: $updatedOnTo}) {
3 virtualPositionEdge: edges {
4 virtualPositionNode: node {
5 quantity
6 productRef
7 }
8 cursor
9 }
10 pageInfo {
11 hasNextPage
12 }
13 }
14}