Weather Updates
Author:
Fluent Commerce
Changed on:
20 Nov 2023
Key Points
- In this guide, we will go through some basic features of the SDK by creating a connector from scratch that can receive or retrieve weather updates and push them to a Fluent setting.
- If you wish to bypass this guide and look at the result, feel free to download the sources.
Steps
Overview
These are the topics covered by this guide:
- Creating a new connector project
- Adding a new SQS Queue listener
- How to map an external system to a Fluent retailer
- How to process a message received
- How to determine the correct message route
- Receiving and processing messages from the SQS listener
- How to read a property from Fluent
- Code generation from GraphQL files
- How to upsert (create or update) a setting in Fluent
- Adding a job and executing it
- Opening an HTTP endpoint that can receive inputs and queue them to be processed
Security is not covered here, especially when opening a new endpoint. The determination of the Fluent account has also been simplified for this demo.
If you wish to bypass this guide and look at the end result, feel free to download the sources.
Prerequisites
- localstack container is running
- You have access to a Fluent account
There is a script
`localstack-setup.sh`
It is best to do the following:
First, run this command to open a session with the localstack container
1docker exec -it localstack /bin/bash
Language: java
Name: Command
Description:
[Warning: empty required content area]Then use the commands below to create the secrets, but ensure to update the variables: $ACCOUNT, $RETAILER, $USERNAME, $PASSWORD and $REGION. Regions values are: sydney, dublin, singapore or north_america.
1awslocal secretsmanager create-secret --name fc/connect/weather-update-demo/api/fluent/activeAccounts --secret-string "{\"accounts\":[{\"name\":\"$ACCOUNT\", \"\region": \"$REGION\", \"retailers\":[$RETAILER]}]}" ;
2awslocal secretsmanager create-secret --name fc/connect/weather-update-demo/$ACCOUNT/api/fluent-account/$RETAILER --secret-string "{\"retailer\":\"1\", \"userName\":\"$USERNAME\", \"password\":\"$PASSWORD\"}";
Language: java
Name: Command
Description:
[Warning: empty required content area]Use ctrl + D to exit the localstack session.
Creating the project
Follow the SDK guide to create a new project - Creating a new connector with the SDK
Setting up the IDE
Open the maven project in your preferred IDE and create
`docker-compose-localstack.yml`
1version: "3.9"
2services:
3 localstack:
4 container_name: localstack
5 image: localstack/localstack
6 restart: unless-stopped
7 ports:
8 - "4510-4559:4510-4559" # external service port range
9 - "4566:4566"
10 environment:
11 - AWS_DEFAULT_REGION=us-east-1
12 - SERVICES=sqs,secretsmanager
13 - HOSTNAME=localstack
14 - HOSTNAME_EXTERNAL=localstack
15 volumes:
16 - ./docker/localstack:/docker-entrypoint-initaws.d
Language: java
Name: Example
Description:
[Warning: empty required content area]With the IDE open the localstack running, follow the SDK setup steps.
The essential steps are now complete and so lets start working on the weather demo project. Add the variable below to the "Run Configuration" environment variables, as shown in the screenshot.
`SQS_DEMO`
1SQS_DEMO=demo
Language: java
Name: Command
Description:
[Warning: empty required content area]All initial required steps have been configured, and it is possible to start the application.
Adding a new queue listener
It's time to start coding….
Queue configuration
The first step in creating a listener is creating a configuration, as shown below. See SDK configuration steps for further details. There are two files in the screenshot below, and these are two different versions of the
`application.yaml`
`-dev`
`-connector`
These are the key properties to create:
- : the listener ID, and it must be unique. It is important to keep the id
`demo-weather`
as it will be referenced elsewhere in this sample.`demo-weather`
- : Contains the environment name that can return the actual queue name. The purpose of having it as an environment variable is so that during the deployment of a container, it can receive as a parameter the queue name allowing different queue names between different deployment environments.
`name`
- : it has to be sqs as this project uses the
`type`
as an example.`connect-sdk-core-aws`
- : the SDK requires that a queue is bound to a Fluent account.
`fluent-account`
Queue listener
Let's create some java classes with the listener configuration out of the way. Create a new package
`custom`
`com.fluentcommerce.connect`
Queue Message
WeatherData will be the only data type (DTO) we will use in the demo.
1package com.fluentcommerce.connect.custom;
2
3/**
4 *
5 * @param id name of the event/message
6 * @param code weather code identification
7 * @param temperature the weather temperature
8 * @param description weather description
9 */
10public record WeatherData(String id, String code, String temperature, String description) {}
Language: java
Name: queue Message sample code
Description:
[Warning: empty required content area]Queue listener
`DemoQueueListener`
The class below uses a fixed value for
`SAMPLE_FIXED_ID`
`EXTERNAL_ID`
In the code example below, there is a method
`getMessageRoute`
`id`
`name`
1package com.fluentcommerce.connect.custom;
2
3import com.fasterxml.jackson.core.JsonProcessingException;
4import com.fluentcommerce.connect.core.config.data.RouterMappingSettings;
5import com.fluentcommerce.connect.core.configuration.data.AccountReference;
6import com.fluentcommerce.connect.core.exception.UnprocessableMessageException;
7import com.fluentcommerce.connect.core.listener.data.ConnectSDKMessage;
8import com.fluentcommerce.connect.core.listener.data.ExternalMessage;
9import com.fluentcommerce.connect.core.listener.external.BaseExternalListener;
10import com.fluentcommerce.connect.core.listener.external.ListenerInfo;
11import lombok.extern.slf4j.Slf4j;
12import org.apache.commons.lang3.StringUtils;
13import org.jetbrains.annotations.NotNull;
14import org.springframework.stereotype.Component;
15
16import java.util.Collections;
17import java.util.Optional;
18import java.util.UUID;
19
20@Slf4j
21@Component
22@ListenerInfo(id = "demo-weather", messageClass = WeatherData.class)
23public class DemoQueueListener extends BaseExternalListener<WeatherData> {
24
25 //These values identify your external system and are used to find a Fluent account-retailer during the account map lookup
26 //The queue is fixed to a Fluent account (as defined at application-dev.yml), but the retailer is flexible
27 private final static String SAMPLE_FIXED_ID = "fluent-ct-dev-2";
28 private final static String EXTERNAL_ID = "projectKey";
29
30 @Override
31 public @NotNull Optional<ConnectSDKMessage> receiveMessage(@NotNull final ExternalMessage message) throws UnprocessableMessageException {
32 final WeatherData weatherData = getMessageContent(message);
33 final Optional<AccountReference> accountReference = getFluentAccountReference(Collections.singletonMap(EXTERNAL_ID, SAMPLE_FIXED_ID));
34 final Optional<String> messageRoute = getMessageRoute(weatherData);
35
36 if (accountReference.isPresent() && messageRoute.isPresent()) {
37 try {
38 return Optional.of(ConnectSDKMessage.builder()
39 .id(UUID.randomUUID())
40 .name(messageRoute.get())
41 .accountId(accountReference.get().accountId())
42 .retailerId(accountReference.get().retailerId())
43 .payload(ConnectSDKMessage.toJson(weatherData))
44 .build());
45 } catch (final JsonProcessingException e) {
46 throw new UnprocessableMessageException(e);
47 }
48 }else {
49 log.warn("Unable to determine account [{}] or route [{}].", accountReference, messageRoute);
50 }
51 return Optional.empty();
52 }
53
54 /**
55 * Finds a message route based on the name property as defined by the route mappings at application-connector.yml
56 */
57 private Optional<String> getMessageRoute(final WeatherData weatherData) {
58 return getRouteMappings().stream()
59 .filter(route -> weatherData.id().equalsIgnoreCase(route.getProp("id", StringUtils.EMPTY)))
60 .findFirst().map(RouterMappingSettings::getRoute);
61 }
62}
Language: java
Name: Example
Description:
[Warning: empty required content area]For the Fluent account mapping to work, we need to configure a new setting at the retailer used during the localstack credential setup process. The setting key has to follow the pattern:
`fc.connect.<connector-name>.account-mapping`
`connector-name`
`application-connector.yml`
Configuration Template:
Setting name at Retailer context on Fluent setting :
`fc.connect.<connectname>.account-mapping`
1{
2 "fluent": {
3 "accountId": "<FLUENT_ACCOUNT>",
4 "retailerId": "<RETAILER_ID>"
5 },
6 "externalAccount": {
7 "<EXTERNAL_KEY>": "<EXTERNAL_VALUE>"
8 }
9}
Language: json
Name: Configuration Template:
Description:
[Warning: empty required content area]For our demo, please set the external account as shown below:
The last step is to configure a route mapping. This feature allows your listener to choose which route to use for the incoming message based on the mapping defined. This is useful when your listener receives different types of messages and each type should be processed differently as they are different in nature. For example, your listener receives both weather update and weather warning update. By setting a routing table (aka route mapping) to forward these 2 messages to 2 different handlers, you can have give each handler a very specific task to do and this helps the maintenance, evolution as well as re-usability of your handlers
Queue Message Handler
When working with message handlers, the route to a handler is specified by an annotation HandlerInfo:
`route =`
`fc.connect.sample-weather-update`
1package com.fluentcommerce.connect.custom;
2
3import com.fluentcommerce.connect.core.exception.HandlerRetryException;
4import com.fluentcommerce.connect.core.exception.UnprocessableMessageException;
5import com.fluentcommerce.connect.core.handler.HandlerInfo;
6import com.fluentcommerce.connect.core.handler.context.MessageHandlerContext;
7import com.fluentcommerce.connect.core.listener.router.MessageHandler;
8import lombok.extern.slf4j.Slf4j;
9import org.jetbrains.annotations.NotNull;
10import org.springframework.stereotype.Component;
11
12import java.util.Optional;
13
14@Slf4j
15@Component
16@HandlerInfo(name = "location-weather-update", route = "fc.connect.sample-weather-update-location")
17public class WeatherDataMessageHandler implements MessageHandler {
18 @Override
19 public void processMessage(@NotNull final MessageHandlerContext context) throws UnprocessableMessageException, HandlerRetryException {
20 UpsertSettingUtils.upsertSetting(getPayload(context, WeatherData.class), context);
21 }
22
23 protected <T> T getPayload(final MessageHandlerContext context, final Class<T> messageClass) {
24 final Optional<T> payload = context.getMessagePayload(messageClass);
25 if (payload.isPresent()) {
26 return payload.get();
27 }
28 throw new UnprocessableMessageException("Unable to get message payload");
29 }
30}
Language: java
Name: Example
Description:
[Warning: empty required content area]Utility class
UpsertSettingUtils class is used by both the message handler and job in our example to connect to Fluent and upsert a setting. The setting name is hardcoded below a
`FLUENT_SETTING_KEY = "fc.connect.weather-update-demo.current-weather"`
1package com.fluentcommerce.connect.custom;
2
3import com.fasterxml.jackson.core.JsonProcessingException;
4import com.fluentcommerce.connect.core.exception.UnprocessableMessageException;
5import com.fluentcommerce.connect.core.handler.context.HandlerContext;
6import com.fluentcommerce.connect.core.utils.ConversionUtils;
7import com.fluentcommerce.graphql.queries.settings.CreateSettingMutation;
8import com.fluentcommerce.graphql.queries.settings.UpdateSettingMutation;
9import com.fluentcommerce.graphql.type.CreateSettingInput;
10import com.fluentcommerce.graphql.type.UpdateSettingInput;
11import com.fluentcommerce.util.core.SettingUtils;
12import com.fluentretail.api.exception.FluentApiException;
13import org.apache.commons.lang3.StringUtils;
14import org.jetbrains.annotations.NotNull;
15
16import java.util.Collections;
17import java.util.Map;
18
19public class UpsertSettingUtils {
20 private static final String FLUENT_SETTING_KEY = "fc.connect.weather-update-demo.current-weather";
21
22 private UpsertSettingUtils() {
23 }
24
25 public static void upsertSetting(final WeatherData weatherData, final HandlerContext context) {
26 final var settings = SettingUtils.getSettings(context.ofFluentContext(), Collections.singletonMap(FLUENT_SETTING_KEY, FLUENT_SETTING_KEY));
27 if (settings.containsKey(FLUENT_SETTING_KEY) && StringUtils.isNotBlank(settings.get(FLUENT_SETTING_KEY).getId())) {
28 update(settings.get(FLUENT_SETTING_KEY).getId(), weatherData, context);
29 } else {
30 create(weatherData, context);
31 }
32 }
33
34 private static void create(final WeatherData weatherData, final HandlerContext context) {
35 final CreateSettingMutation mutation;
36 try {
37 mutation = CreateSettingMutation.builder()
38 .input(CreateSettingInput.builder()
39 .name(FLUENT_SETTING_KEY)
40 .valueType("JSON")
41 .lobValue(convertObject(ConversionUtils.getObjectMapper().writeValueAsString(weatherData)))
42 .context("RETAILER")
43 .contextId(Integer.parseInt(context.getAccountReference().retailerId()))
44 .build())
45 .build();
46 } catch (JsonProcessingException e) {
47 throw new UnprocessableMessageException(e);
48 }
49 final CreateSettingMutation.Data result = context.ofFluentContext().executeMutation(mutation, CreateSettingMutation.Data.class);
50 if (result == null || result.createSetting() == null || StringUtils.isBlank(result.createSetting().id())) {
51 throw new FluentApiException(0, "Fluent mutation failed.");
52 }
53 }
54
55 private static void update(final String id, final WeatherData weatherData, final HandlerContext context) {
56 final UpdateSettingMutation mutation;
57 try {
58 mutation = UpdateSettingMutation.builder()
59 .input(UpdateSettingInput.builder()
60 .id(id)
61 .name(FLUENT_SETTING_KEY)
62 .valueType("JSON")
63 .lobValue(convertObject(ConversionUtils.getObjectMapper().writeValueAsString(weatherData)))
64 .context("RETAILER")
65 .contextId(Integer.parseInt(context.getAccountReference().retailerId()))
66 .build())
67 .build();
68 } catch (JsonProcessingException e) {
69 throw new UnprocessableMessageException(e);
70 }
71 final UpdateSettingMutation.Data result = context.ofFluentContext().executeMutation(mutation, UpdateSettingMutation.Data.class);
72 if (result == null || result.updateSetting() == null) {
73 throw new FluentApiException(0, "Fluent mutation failed.");
74 }
75 }
76
77 private static Map convertObject(@NotNull final String value) throws JsonProcessingException {
78 return ConversionUtils.getObjectMapper().readValue(value, Map.class);
79 }
80}
Language: java
Name: Example
Description:
[Warning: empty required content area]This class makes GraphQL mutation calls to Fluent, so let's create some GraphQL files and have the SDK generate their stubs. You should be able to find a
`graphql`
GraphQL
- CreateSetting.graphql
1mutation CreateSetting($input:CreateSettingInput!){
2 createSetting(input: $input){
3 id
4 name
5 }
6}
Language: graphqlschema
Name: CreateSetting.graphql
Description:
[Warning: empty required content area]- UpdateSetting.graphql
1mutation updateSetting($input:UpdateSettingInput!){
2 updateSetting(input: $input){
3 id
4 }
5}
Language: graphqlschema
Name: UpdateSetting.graphql
Description:
[Warning: empty required content area]Before running maven compile again, it is necessary first to uncomment, a plugin called
`apollo-client-maven-plugin`
`pom.xml`
Testing the queue
It is now possible to start the application and test the queue. When the application starts, all necessary queues will be auto-created by the SDK and immediately start listening for any new messages. To publish a message to the demo connector, run the command below. The message below simulates sending a weather update to the connector, which will update Fluent with a said weather update.
1docker exec -d localstack awslocal --endpoint-url=http://localhost:4566 sqs send-message --queue-url http://localstack:4566/000000000000/demo --message-body "{ \"id\": \"weather-update-message\", \"code\": \"300\", \"temperature\": \"22\", \"description\": \"Sunny Day\"}" ;
Language: java
Name: Command
Description:
[Warning: empty required content area]To validate that it works, check the logs of the demo connector and Fluent settings by searching for
`fc.connect.weather-update-demo.current-weather`
Adding a job handler
The next step is to add a job handler pretending to retrieve weather updates from external systems. In this example, return a random update from a pre-defined list.
1package com.fluentcommerce.connect.custom;
2
3import com.fluentcommerce.connect.core.exception.JobExecutionException;
4import com.fluentcommerce.connect.core.handler.HandlerInfo;
5import com.fluentcommerce.connect.core.handler.context.JobHandlerContext;
6import com.fluentcommerce.connect.core.job.extend.JobHandler;
7import lombok.extern.slf4j.Slf4j;
8import org.jetbrains.annotations.NotNull;
9import org.springframework.stereotype.Component;
10
11import java.util.List;
12import java.util.Random;
13
14@Slf4j
15@Component
16@HandlerInfo(name = "weather-update-job", route = "weather-update-job")
17public class WeatherUpdateJobHandler implements JobHandler {
18 @Override
19 public void run(@NotNull final JobHandlerContext context) throws JobExecutionException {
20 UpsertSettingUtils.upsertSetting(getWeatherReport(), context);
21 }
22
23 private WeatherData getWeatherReport() {
24 final var random = new Random();
25 return weatherCodeMap.get(random.nextInt(weatherCodeMap.size()));
26 }
27
28 private static final List<WeatherData> weatherCodeMap = List.of(
29 new WeatherData("weather-update", "201", "12", "Thunderstorm with rain"),
30 new WeatherData("weather-update", "301", "12", "Drizzle"),
31 new WeatherData("weather-update", "500", "19", "Light Rain"),
32 new WeatherData("weather-update", "502", "24", "Heavy Rain"),
33 new WeatherData("weather-update", "521", "23", "Shower rain"),
34 new WeatherData("weather-update", "600", "14", "Light snow"),
35 new WeatherData("weather-update", "601", "15", "Snow"),
36 new WeatherData("weather-update", "602", "16", "Heavy Snow"),
37 new WeatherData("weather-update", "711", "15", "Smoke"),
38 new WeatherData("weather-update", "741", "21", "Fog"),
39 new WeatherData("weather-update", "800", "28", "Clear sky"),
40 new WeatherData("weather-update", "801", "25", "Few clouds"),
41 new WeatherData("weather-update", "802", "23", "Scattered clouds"),
42 new WeatherData("weather-update", "803", "21", "Broken clouds"),
43 new WeatherData("weather-update", "804", "20", "Overcast clouds"));
44}
Language: java
Name: Example
Description:
[Warning: empty required content area]The SDK doesn't have an internal scheduler, so it relies on external systems to trigger the job/batch execution. In AWS deployments, we recommend using EventBridge, for example. EventBridge will call a given URL when the clock ticks. It’s time to run the job. To simulate such behavior, just run the command below but ensure to update the URL with the account and retailer first. Please use the account and retailer as configured in
`localstack-setup.sh.`
1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/<ACCOUNT>/<RETAILER>/weather-update-job
Language: java
Name: Command
Description:
[Warning: empty required content area]Example of the CURL with the replaced values:
1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/cnctdev/34/weather-update-job
Language: java
Name: Example
Description:
[Warning: empty required content area]Adding a new HTTP endpoint
Another way to receive updates from external systems is to open an HTTP endpoint. The example below receives the same payload we used in the queue example.
This is a straightforward example without authentication or authorization, with hardcoded values as the Fluent account and message route. We recommend securing any endpoint exposed to the WEB with authentication and authorization.
1package com.fluentcommerce.connect.custom;
2
3import com.fasterxml.jackson.core.JsonProcessingException;
4import com.fluentcommerce.connect.core.configuration.data.AccountReference;
5import com.fluentcommerce.connect.core.listener.data.ConnectSDKMessage;
6import com.fluentcommerce.connect.core.listener.publisher.ListenerPublisherService;
7import com.fluentcommerce.connect.custom.data.WeatherData;
8import org.springframework.beans.factory.annotation.Autowired;
9import org.springframework.beans.factory.annotation.Value;
10import org.springframework.http.ResponseEntity;
11import org.springframework.web.bind.annotation.RequestBody;
12import org.springframework.web.bind.annotation.RequestMapping;
13import org.springframework.web.bind.annotation.RequestMethod;
14import org.springframework.web.bind.annotation.RestController;
15
16import java.util.UUID;
17
18@RestController
19@RequestMapping("/api/v1/fluent-connect/weather")
20public class WeatherUpdateController {
21
22
23 private final ListenerPublisherService publishService;
24
25 @Value("${account}")
26 private String account;
27 @Value("${retailer}")
28 private String retailer;
29
30 @Autowired
31 public WeatherUpdateController(final ListenerPublisherService publishService) {
32 this.publishService = publishService;
33 }
34
35 @RequestMapping(value = "/update", method = RequestMethod.POST, consumes = "application/json", produces = "application/json")
36 public ResponseEntity<?> updateWeather(@RequestBody WeatherData weatherData) {
37 final var accountReference = AccountReference.of(account, retailer);
38
39 try {
40 publishService.publishMessage(ConnectSDKMessage.builder()
41 .id(UUID.randomUUID().toString())
42 .name("fc.connect.sample-weather-update")
43 .accountId(accountReference.accountId())
44 .retailerId(accountReference.retailerId())
45 .payload(ConnectSDKMessage.toJson(weatherData))
46 .build());
47 } catch (final JsonProcessingException e) {
48 return ResponseEntity.internalServerError().build();
49 }
50 return ResponseEntity.ok().build();
51 }
52}
Language: java
Name: Example
Description:
[Warning: empty required content area]The controller above expects a configuration to exist at
`application-connector.yml`
`localstack-setup.sh`
1#Fluent Account setup for the demo - only here for the demo
2account: "CNCTDEV"
3retailer: 34
Language: java
Name: Sample
Description:
[Warning: empty required content area]To test the HTTP endpoint, run the command below:
1curl -X POST http://localhost:8080/api/v1/fluent-connect/weather/update -H "Content-Type:application/json" -d "{ \"id\": \"weather-update-message\", \"code\": \"300\", \"temperature\": \"22\", \"description\": \"Sunny Day\"}"
Language: java
Name: Command
Description:
[Warning: empty required content area]