9 Oct 2023
In this sample project, we will go through some basic features of the SDK by creating a batch job that can read resources (from a file path, S3 …) and then load to Fluent System.
These are the topics covered by this guide:
There is a script
It is best to do the following:
First, run this command to open a session with the localstack container
1docker exec -it localstack /bin/bash
Then use the commands below to create the secrets, but ensure to update the variables: $ACCOUNT, $RETAILER, $USERNAME, $PASSWORD and $REGION. Regions values are: sydney, dublin, singapore or north_america.
1docker exec -d localstack awslocal secretsmanager create-secret --name fc/connect/inventory-file-loader/api/fluent/activeAccounts --secret-string "{\"accounts\":[{\"name\":\"$ACCOUNT\", \"region\":\"$REGION\", \"retailers\":[$RETAILER]}]}" ;
2docker exec -d localstack awslocal secretsmanager create-secret --name fc/connect/inventory-file-loader/$ACCOUNT/api/fluent-account/$RETAILER --secret-string "{\"retailer\":\"1\", \"userName\":\"$USERNAME\", \"password\":\"$PASSWORD\"}";
4docker cp $SAMPLE_FILE localstack:/$SAMPLE_FILE
5docker exec -d localstack awslocal s3api create-bucket --bucket $BUCKET
6docker exec -d localstack awslocal s3api put-object --bucket $BUCKET --key $KEY --body /$SAMPLE_FILE
Use ctrl + D to exit the localstack session.
Spring Batch is a Java framework for writing and executing multi-step, chunk-oriented batch jobs. It provides reusable functions essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management.
Architecturally, Spring Batch is divided into three main components:
Define Inventory Job Handler:
4 name = "InventoryJobHandler",
5 route = "inventory-job",
6 description = "Sync Inventory from external inventory source file to Fluent platform"
8public class InventoryJobHandler implements JobHandler {
9 public InventoryJobHandler(final BatchJobBuilder<InventoryLocation> batchJobBuilder,
10 final AmazonS3 amazonS3, final ApplicationContext applicationContext) {
11 this.batchJobBuilder = batchJobBuilder;
12 this.amazonS3 = amazonS3;
13 this.resourcePatternResolver = new PathMatchingSimpleStorageResourcePatternResolver(amazonS3, applicationContext);
14 }
16 public void run(@NotNull final JobHandlerContext context) throws JobExecutionException {
17 ...
18 }
The code uses the
Define a job setting on Fluent OMS
2 "props":{
3 "location-pattern":"s3://sample-bucket/samples/inventory-*.csv"
4 "archive-folder":"s3://sample-bucket/archive/"
5 "fields":"locationRef,skuRef,qty"
6 "delimiter":","
7 "skip-line":"1"
8 "encoding":"UTF-8"
9 "chunk-size":"5000"
10 "skip-limit":"1"
11 "concurrency-limit":"5"
12 }
`chunk-size`
Resolve resources:
1Resource[] resources;
2try {
3 resources = this.resourcePatternResolver
4 .getResources(settings.getProp(LOCATION_PATTERN, StringUtils.EMPTY));
5 Arrays.sort(resources, Comparator.comparing(Resource::getFilename));
6} catch (IOException ex) {
7 throw new RuntimeException(ex);
This is an example of S3 resources resolver. The order of resources in a
Resource[] resources = resourcePatternResolver.getResources("file:/foo/bar/**);
Resource[] resources = resourcePatternResolver.getResources("s3:/foo/bar/**);
Resource[] resources = resourcePatternResolver.getResources("**);
Note: As Fluent's Batch API functions asynchronously, it cannot guarantee the sequence of resource processing, whereby resource 2 may execute before resource 1 is persisted. To ensure data integrity, it is recommended to ensure resource data is unique or to consider processing only one resource at a time.
Define the Reader:
1final var reader = new CsvItemReader<>(fields, delimiter, encoding, skipLine, InventoryLocation.class);
`CsvItemReader`
Define the Writer:
1final var writer = new FluentInventoryItemWriter(context, response.getId());
`FluentInventoryItemWriter`
Define the process step:
1final var loadStep = batchJobBuilder.buildProcessStep(LOAD_STEP, resources, reader, writer, chunkSize, skipLimit, concurrencyLimit
Define a Tasklet to move/archive files to another location on Amazon S3:
1public class S3ArchiveTasklet implements Tasklet {
3 private final AmazonS3 amazonS3;
4 private final Resource[] resources;
5 private final String archiveFolder;
7 public S3ArchiveTasklet(final AmazonS3 amazonS3, @NotNull Resource[] resources, @NotNull String archiveFolder) {
8 this.amazonS3 = amazonS3;
9 this.resources = resources;
10 this.archiveFolder = archiveFolder;
11 }
13 public RepeatStatus execute(final @NotNull StepContribution stepContribution,
14 final @NotNull ChunkContext chunkContext) {
15 final var archiveFolderURI = new AmazonS3URI(archiveFolder);
17 for (Resource resource : resources) {
18 SimpleStorageResource s3Resource = (SimpleStorageResource) resource;
19 final var sourceURI = new AmazonS3URI(s3Resource.getS3Uri());
20 final var destinationKey = archiveFolderURI.getKey() + sourceURI.getKey();
22 amazonS3.copyObject(sourceURI.getBucket(), resource.getFilename(), archiveFolderURI.getBucket(), destinationKey);
23 amazonS3.deleteObject(sourceURI.getBucket(), resource.getFilename());
24 }
26 return RepeatStatus.FINISHED;
27 }
Define the archiveTasklet:
1final var archiveTasklet = new S3ArchiveTasklet(amazonS3, resources, archiveFolder);
Define the archiveStep:
1final var archiveStep = batchJobBuilder.buildCustomStep(ARCHIVE_STEP, archiveTasklet);
Launch the Batch Job:
1batchJobBuilder.launchJob(UUID.randomUUID().toString(), JOB_NAME, loadStep, archiveStep);
Parameters:
1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/<ACCOUNT>/<RETAILER>/inventory-job
1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/cnctdev/34/inventory-job
