Author:
Fluent Commerce
Changed on:
9 Oct 2023
In this sample project, we will go through some basic features of the SDK by creating a batch job that can read resources (from a file path, S3 …) and then load to Fluent System.
These are the topics covered by this guide:
There is a script
`localstack-setup.sh`
It is best to do the following:
First, run this command to open a session with the localstack container
1docker exec -it localstack /bin/bash
Language: java
Name: Command
Description:
[Warning: empty required content area]Then use the commands below to create the secrets, but ensure to update the variables: $ACCOUNT, $RETAILER, $USERNAME, $PASSWORD and $REGION. Regions values are: sydney, dublin, singapore or north_america.
1docker exec -d localstack awslocal secretsmanager create-secret --name fc/connect/inventory-file-loader/api/fluent/activeAccounts --secret-string "{\"accounts\":[{\"name\":\"$ACCOUNT\", \"region\":\"$REGION\", \"retailers\":[$RETAILER]}]}" ;
2docker exec -d localstack awslocal secretsmanager create-secret --name fc/connect/inventory-file-loader/$ACCOUNT/api/fluent-account/$RETAILER --secret-string "{\"retailer\":\"1\", \"userName\":\"$USERNAME\", \"password\":\"$PASSWORD\"}";
3
4docker cp $SAMPLE_FILE localstack:/$SAMPLE_FILE
5docker exec -d localstack awslocal s3api create-bucket --bucket $BUCKET
6docker exec -d localstack awslocal s3api put-object --bucket $BUCKET --key $KEY --body /$SAMPLE_FILE
Language: java
Name: Commands
Description:
[Warning: empty required content area]Use ctrl + D to exit the localstack session.
Spring Batch is a Java framework for writing and executing multi-step, chunk-oriented batch jobs. It provides reusable functions essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management.
Architecturally, Spring Batch is divided into three main components:
Scenario:
Define Inventory Job Handler:
1@Slf4j
2@Component
3@HandlerInfo(
4 name = "InventoryJobHandler",
5 route = "inventory-job",
6 description = "Sync Inventory from external inventory source file to Fluent platform"
7)
8public class InventoryJobHandler implements JobHandler {
9 public InventoryJobHandler(final BatchJobBuilder<InventoryLocation> batchJobBuilder,
10 final AmazonS3 amazonS3, final ApplicationContext applicationContext) {
11 this.batchJobBuilder = batchJobBuilder;
12 this.amazonS3 = amazonS3;
13 this.resourcePatternResolver = new PathMatchingSimpleStorageResourcePatternResolver(amazonS3, applicationContext);
14 }
15
16 public void run(@NotNull final JobHandlerContext context) throws JobExecutionException {
17 ...
18 }
19}
Language: java
Name: Example
Description:
[Warning: empty required content area]The code uses the
`@HandlerInfo`
`@HandlerProp`
Define a job setting on Fluent OMS
1{
2 "props":{
3 "location-pattern":"s3://sample-bucket/samples/inventory-*.csv"
4 "archive-folder":"s3://sample-bucket/archive/"
5 "fields":"locationRef,skuRef,qty"
6 "delimiter":","
7 "skip-line":"1"
8 "encoding":"UTF-8"
9 "chunk-size":"5000"
10 "skip-limit":"1"
11 "concurrency-limit":"5"
12 }
13}
Language: java
Name: Example
Description:
[Warning: empty required content area]`chunk-size`
`100`
`chunk-size`
`10`
`chunk-size`
`chunk-size`
Resolve resources:
1Resource[] resources;
2try {
3 resources = this.resourcePatternResolver
4 .getResources(settings.getProp(LOCATION_PATTERN, StringUtils.EMPTY));
5 Arrays.sort(resources, Comparator.comparing(Resource::getFilename));
6} catch (IOException ex) {
7 throw new RuntimeException(ex);
8}
Language: java
Name: Resolve resources:
Description:
[Warning: empty required content area]This is an example of S3 resources resolver. The order of resources in a
`Reader`
`resources`
`Arrays.sort`
`Comparator`
Example:
Resource[] resources = resourcePatternResolver.getResources("file:/foo/bar/**);
Resource[] resources = resourcePatternResolver.getResources("s3:/foo/bar/**);
Resource[] resources = resourcePatternResolver.getResources("http://foo.com/bar/**);
Note: As Fluent's Batch API functions asynchronously, it cannot guarantee the sequence of resource processing, whereby resource 2 may execute before resource 1 is persisted. To ensure data integrity, it is recommended to ensure resource data is unique or to consider processing only one resource at a time.
Define the Reader:
1final var reader = new CsvItemReader<>(fields, delimiter, encoding, skipLine, InventoryLocation.class);
Language: java
Name: Define the Reader:
Description:
[Warning: empty required content area]`CsvItemReader`
`FlatFileItemReader`
Define the Writer:
1final var writer = new FluentInventoryItemWriter(context, response.getId());
Language: java
Name: Define the Writer:
Description:
[Warning: empty required content area]`FluentInventoryItemWriter`
`ItemWriter`
Define the process step:
1final var loadStep = batchJobBuilder.buildProcessStep(LOAD_STEP, resources, reader, writer, chunkSize, skipLimit, concurrencyLimit
2);
Language: java
Name: Define the process step:
Description:
[Warning: empty required content area]Define a Tasklet to move/archive files to another location on Amazon S3:
1public class S3ArchiveTasklet implements Tasklet {
2
3 private final AmazonS3 amazonS3;
4 private final Resource[] resources;
5 private final String archiveFolder;
6
7 public S3ArchiveTasklet(final AmazonS3 amazonS3, @NotNull Resource[] resources, @NotNull String archiveFolder) {
8 this.amazonS3 = amazonS3;
9 this.resources = resources;
10 this.archiveFolder = archiveFolder;
11 }
12
13 public RepeatStatus execute(final @NotNull StepContribution stepContribution,
14 final @NotNull ChunkContext chunkContext) {
15 final var archiveFolderURI = new AmazonS3URI(archiveFolder);
16
17 for (Resource resource : resources) {
18 SimpleStorageResource s3Resource = (SimpleStorageResource) resource;
19 final var sourceURI = new AmazonS3URI(s3Resource.getS3Uri());
20 final var destinationKey = archiveFolderURI.getKey() + sourceURI.getKey();
21
22 amazonS3.copyObject(sourceURI.getBucket(), resource.getFilename(), archiveFolderURI.getBucket(), destinationKey);
23 amazonS3.deleteObject(sourceURI.getBucket(), resource.getFilename());
24 }
25
26 return RepeatStatus.FINISHED;
27 }
28}
Language: java
Name: Define a Tasklet to move/archive files to another location on Amazon S3:
Description:
[Warning: empty required content area]Define the archiveTasklet:
1final var archiveTasklet = new S3ArchiveTasklet(amazonS3, resources, archiveFolder);
Language: java
Name: Define the archiveTasklet:
Description:
[Warning: empty required content area]Define the archiveStep:
1final var archiveStep = batchJobBuilder.buildCustomStep(ARCHIVE_STEP, archiveTasklet);
Language: java
Name: Define the archiveStep:
Description:
[Warning: empty required content area]Launch the Batch Job:
1batchJobBuilder.launchJob(UUID.randomUUID().toString(), JOB_NAME, loadStep, archiveStep);
Language: java
Name: Launch the Batch Job:
Description:
[Warning: empty required content area]Parameters:
1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/<ACCOUNT>/<RETAILER>/inventory-job
Language: java
Name: Code
Description:
[Warning: empty required content area]1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/cnctdev/34/inventory-job
Language: java
Name: Example of the CURL with the replaced values:
Description:
[Warning: empty required content area]Copyright © 2024 Fluent Retail Pty Ltd (trading as Fluent Commerce). All rights reserved. No materials on this docs.fluentcommerce.com site may be used in any way and/or for any purpose without prior written authorisation from Fluent Commerce. Current customers and partners shall use these materials strictly in accordance with the terms and conditions of their written agreements with Fluent Commerce or its affiliates.