Inventory File Loader
Author:
Fluent Commerce
Changed on:
9 Oct 2023
Key Points
- The SDK allows to import data into Fluent OMS via batch jobs.
- The data can be stored in an Amazon S3 bucket and imported into Fluent OMS via API.
- The inventory job handler requires various settings that need to be defined in the account’s settings list ( location-pattern, archive-folder... )
Steps
Inventory File Loader
Overview
In this sample project, we will go through some basic features of the SDK by creating a batch job that can read resources (from a file path, S3 …) and then load to Fluent System.
These are the topics covered by this guide:
- Spring Batch overview
- Create a new Batch Job Handler
Prerequisites
- localstack container is running
- You have access to a Fluent account
There is a script
`localstack-setup.sh`
It is best to do the following:
First, run this command to open a session with the localstack container
1docker exec -it localstack /bin/bash
Language: java
Name: Command
Description:
[Warning: empty required content area]Then use the commands below to create the secrets, but ensure to update the variables: $ACCOUNT, $RETAILER, $USERNAME, $PASSWORD and $REGION. Regions values are: sydney, dublin, singapore or north_america.
1docker exec -d localstack awslocal secretsmanager create-secret --name fc/connect/inventory-file-loader/api/fluent/activeAccounts --secret-string "{\"accounts\":[{\"name\":\"$ACCOUNT\", \"region\":\"$REGION\", \"retailers\":[$RETAILER]}]}" ;
2docker exec -d localstack awslocal secretsmanager create-secret --name fc/connect/inventory-file-loader/$ACCOUNT/api/fluent-account/$RETAILER --secret-string "{\"retailer\":\"1\", \"userName\":\"$USERNAME\", \"password\":\"$PASSWORD\"}";
3
4docker cp $SAMPLE_FILE localstack:/$SAMPLE_FILE
5docker exec -d localstack awslocal s3api create-bucket --bucket $BUCKET
6docker exec -d localstack awslocal s3api put-object --bucket $BUCKET --key $KEY --body /$SAMPLE_FILE
Language: java
Name: Commands
Description:
[Warning: empty required content area]Use ctrl + D to exit the localstack session.
Spring Batch overview
Spring Batch is a Java framework for writing and executing multi-step, chunk-oriented batch jobs. It provides reusable functions essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management.
Architecturally, Spring Batch is divided into three main components:
- Job: Represents the batch job and contains one or more steps.
- Step: Represents an individual unit of work within a job, typically defined as a single chunk-oriented task, such as reading data from a database, transforming the data, and writing the results to a file.
- Processing: These components define the reading, processing, and writing of individual data items within a step
- Chunk-oriented processing: It is a step-based approach where data is read, processed, and written in small chunks
- Tasklet-based processing: It is a single-step approach where a single task is executed. Tasklets are typically used for operations such as cleaning up resources or initializing a database. Tasklet-based processing is best suited for scenarios where a single, well-defined task needs to be executed and is not necessarily concerned with large amounts of data.
Create a new Batch Job Handler
Scenario:
- The e-commerce system generates inventory files in CSV format on an hourly basis, which are stored in the Amazon S3 bucket, "sample-bucket". The object keys for these files are in the format of "samples/inventory-YYYYMMDDHHMMSS.csv".
- A connector has been implemented to facilitate data transfer from Amazon S3 to Fluent by Batch API. This process consists of the following steps:
- Downloading the CSV files from Amazon S3 using a reader component.
- Parsing the data contained within the CSV files
- Loading/importing the parsed data to Fluent by Batch API using a writer component.
- Moving the processed files from the original location to an archive location is accomplished through the use of an ArchiveTasklet.
Define Inventory Job Handler:
1@Slf4j
2@Component
3@HandlerInfo(
4 name = "InventoryJobHandler",
5 route = "inventory-job",
6 description = "Sync Inventory from external inventory source file to Fluent platform"
7)
8public class InventoryJobHandler implements JobHandler {
9 public InventoryJobHandler(final BatchJobBuilder<InventoryLocation> batchJobBuilder,
10 final AmazonS3 amazonS3, final ApplicationContext applicationContext) {
11 this.batchJobBuilder = batchJobBuilder;
12 this.amazonS3 = amazonS3;
13 this.resourcePatternResolver = new PathMatchingSimpleStorageResourcePatternResolver(amazonS3, applicationContext);
14 }
15
16 public void run(@NotNull final JobHandlerContext context) throws JobExecutionException {
17 ...
18 }
19}
Language: java
Name: Example
Description:
[Warning: empty required content area]The code uses the
`@HandlerInfo`
`@HandlerProp`
Define a job setting on Fluent OMS
- Key: fc.connect.inventory-file-loader.batch.inventory-job
- Value:
1{
2 "props":{
3 "location-pattern":"s3://sample-bucket/samples/inventory-*.csv"
4 "archive-folder":"s3://sample-bucket/archive/"
5 "fields":"locationRef,skuRef,qty"
6 "delimiter":","
7 "skip-line":"1"
8 "encoding":"UTF-8"
9 "chunk-size":"5000"
10 "skip-limit":"1"
11 "concurrency-limit":"5"
12 }
13}
Language: java
Name: Example
Description:
[Warning: empty required content area]- location-pattern: The location pattern of the CSV file in S3 (e.g. "s3://sample-bucket/samples/inventory*.csv").
- archive-folder: The archive folder in S3 where the processed CSV files will be moved to (e.g. "s3://sample-bucket/archive/").
- fields: The fields in the CSV file that should be processed (e.g. "locationRef,skuRef,qty").
- delimiter: The delimiter used in the CSV file (e.g. ",").
- skip-line: The number of lines to skip at the beginning of the CSV file (e.g. 1).
- encoding: The encoding used in the CSV file (e.g. "UTF-8").
- chunk-size: The chunk size for processing the data in the CSV file (e.g. 5000). If your is set to
`chunk-size`
, then the reader will read 100 records at a time, and the writer will write those 100 records in a single transaction. If the next 100 records are read, they will also be written in a single transaction. This process continues until all records have been processed. If you do not specify a`100`
, Spring Batch will use the default value of`chunk-size`
, meaning that 10 records will be processed in a single transaction. It's important to note that specifying a`10`
that is too large can cause performance issues and affect the overall efficiency of your batch process. On the other hand, specifying a chunk-size that is too small can cause too many transactions to occur, leading to increased overhead and decreased performance. The optimal`chunk-size`
depends on the specifics of your application and should be chosen carefully.`chunk-size`
- skip-limit: The number of times to skip an error during processing (e.g. 500).
- concurrency-limit: The concurrency limit for processing the data in the CSV file (e.g. 5).
Resolve resources:
1Resource[] resources;
2try {
3 resources = this.resourcePatternResolver
4 .getResources(settings.getProp(LOCATION_PATTERN, StringUtils.EMPTY));
5 Arrays.sort(resources, Comparator.comparing(Resource::getFilename));
6} catch (IOException ex) {
7 throw new RuntimeException(ex);
8}
Language: java
Name: Resolve resources:
Description:
[Warning: empty required content area]This is an example of S3 resources resolver. The order of resources in a
`Reader`
`resources`
In this example, the resources are sorted using the
`Arrays.sort`
`Comparator`
Partner can resolve any kind of resources that provided by Spring or custom one (Refer to 6. Resources)
Example:
Resource[] resources = resourcePatternResolver.getResources("file:/foo/bar/**);
Resource[] resources = resourcePatternResolver.getResources("s3:/foo/bar/**);
Resource[] resources = resourcePatternResolver.getResources("http://foo.com/bar/**);
Note: As Fluent's Batch API functions asynchronously, it cannot guarantee the sequence of resource processing, whereby resource 2 may execute before resource 1 is persisted. To ensure data integrity, it is recommended to ensure resource data is unique or to consider processing only one resource at a time.
Define the Reader:
1final var reader = new CsvItemReader<>(fields, delimiter, encoding, skipLine, InventoryLocation.class);
Language: java
Name: Define the Reader:
Description:
[Warning: empty required content area]`CsvItemReader`
`FlatFileItemReader`
Define the Writer:
1final var writer = new FluentInventoryItemWriter(context, response.getId());
Language: java
Name: Define the Writer:
Description:
[Warning: empty required content area]`FluentInventoryItemWriter`
`ItemWriter`
Define the process step:
1final var loadStep = batchJobBuilder.buildProcessStep(LOAD_STEP, resources, reader, writer, chunkSize, skipLimit, concurrencyLimit
2);
Language: java
Name: Define the process step:
Description:
[Warning: empty required content area]Define a Tasklet to move/archive files to another location on Amazon S3:
1public class S3ArchiveTasklet implements Tasklet {
2
3 private final AmazonS3 amazonS3;
4 private final Resource[] resources;
5 private final String archiveFolder;
6
7 public S3ArchiveTasklet(final AmazonS3 amazonS3, @NotNull Resource[] resources, @NotNull String archiveFolder) {
8 this.amazonS3 = amazonS3;
9 this.resources = resources;
10 this.archiveFolder = archiveFolder;
11 }
12
13 public RepeatStatus execute(final @NotNull StepContribution stepContribution,
14 final @NotNull ChunkContext chunkContext) {
15 final var archiveFolderURI = new AmazonS3URI(archiveFolder);
16
17 for (Resource resource : resources) {
18 SimpleStorageResource s3Resource = (SimpleStorageResource) resource;
19 final var sourceURI = new AmazonS3URI(s3Resource.getS3Uri());
20 final var destinationKey = archiveFolderURI.getKey() + sourceURI.getKey();
21
22 amazonS3.copyObject(sourceURI.getBucket(), resource.getFilename(), archiveFolderURI.getBucket(), destinationKey);
23 amazonS3.deleteObject(sourceURI.getBucket(), resource.getFilename());
24 }
25
26 return RepeatStatus.FINISHED;
27 }
28}
Language: java
Name: Define a Tasklet to move/archive files to another location on Amazon S3:
Description:
[Warning: empty required content area]Define the archiveTasklet:
1final var archiveTasklet = new S3ArchiveTasklet(amazonS3, resources, archiveFolder);
Language: java
Name: Define the archiveTasklet:
Description:
[Warning: empty required content area]Define the archiveStep:
1final var archiveStep = batchJobBuilder.buildCustomStep(ARCHIVE_STEP, archiveTasklet);
Language: java
Name: Define the archiveStep:
Description:
[Warning: empty required content area]Launch the Batch Job:
1batchJobBuilder.launchJob(UUID.randomUUID().toString(), JOB_NAME, loadStep, archiveStep);
Language: java
Name: Launch the Batch Job:
Description:
[Warning: empty required content area]Parameters:
- id: id of batch job. By default, Spring Batch provide ability to indicate a job instance with the same parameters has already been executed or completed.
- name: name of batch job
- steps: accept a variable number of steps and execute steps follow the order. Examples:
- batchJobBuilder.launchJob(id, name, prepareStep, loadStep, archiveStep): execute 3 steps by the order: prepareStep > loadStep > archiveStep
- batchJobBuilder.launchJob(id, name, loadStep, archiveStep): execute 2 steps by the order: loadStep > archiveStep
- batchJobBuilder.launchJob(id, name, loadStep): execute only 1 step
- batchJobBuilder.launchJob(id, name, prepareStep, loadStep, archiveStep): execute 3 steps by the order: prepareStep > loadStep > archiveStep
Trigger Job
1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/<ACCOUNT>/<RETAILER>/inventory-job
Language: java
Name: Code
Description:
[Warning: empty required content area]1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/cnctdev/34/inventory-job
Language: java
Name: Example of the CURL with the replaced values:
Description:
[Warning: empty required content area]