Data transformation with Spring Cloud Stream
Spring Cloud Stream 2.0 comes with a more consistent and simplified message transformation support. In this blog post we look into how we can convert a message in XML format received on a channel to JSON and send it to another channel to be consumed by further message handlers.
As an example we model a money transfer use case where we want to transfer an amount in one currency to another currency.
<transfer>
<amount>451</amount>
<fromCurrency>EUR</fromCurrency>
<toCurrency>CHF</toCurrency>
<dateTime>2018-03-21T10:15:30</dateTime>
<paymentMethod>CREDIT_CARD</paymentMethod>
</transfer>
The consumer of these messages calculates the fee related to the transfer
messages and prepares the response message in JSON format which could be the format which the other consumer down the line expects.
{
"amount":451,
"fromCurrency":"EUR",
"toCurrency":"CHF",
"dateTime":"2018-03-21T10:15:30",
"paymentMethod":"CREDIT_CARD",
"transferFee": {
"amount":5.5924,
"currency":"EUR"
}
}
Built-in converters in Spring Cloud Stream
Spring Cloud Stream 2.0 has the following built-in message converters:
- ApplicationJsonMessageMarshallingConverter
- TupleJsonMessageConverter (deprecated as of 2.0)
- ByteArrayMessageConverter
- ObjectStringMessageConverter
- JavaSerializationMessageConverter (deprecated as of 2.0)
- KryoMessageConverter (deprecated as of 2.0)
- JsonUnmarshallingConverter (deprecated as of 2.0)
In order to consume a message in XML format we need to define a custom message converter which should implement the org.springframework.messaging.converter.MessageConverter
interface.
Here we leverage the AbstractMessageConverter
which provides partial implementation of the conversion methods.
public class TransferMessageConverter extends AbstractMessageConverter {
private final ObjectMapper objectMapper;
public TransferMessageConverter() {
super(MimeType.valueOf("application/xml"));
objectMapper = BankDataObjectMapperFactory.objectMapper();
}
@Override
protected boolean supports(Class<?> clazz) {
return clazz.equals(Transfer.class);
}
@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
try {
return objectMapper.readValue((byte[]) message.getPayload(), Transfer.class);
} catch (IOException e) {
return null;
}
}
}
Next we need to configure it as a @Bean
and annotate it with @StreamMessageConverter
which makes sure that the custom message converter will be added on the top of the message converters defined in the CompositeMessageConverterFactory
@EnableBinding(Bank.class)
public class BankDataInfrastructureConfiguration {
@Bean
@StreamMessageConverter
public MessageConverter transferMessageConverter() {
return new TransferMessageConverter();
}
}
Creating Spring Integration channels
Next we create two channels:
transfers
from where we accept thetransfer
messages in XML formattransfers-with-fee
were we put te convertedtransfer
messages in JSON format
Using Spring Cloud Stream we only need to declare these channels in an interface, the @EnableBinding
annotation will trigger the creation of these channels.
public interface Bank {
String TRANSFERS = "transfers";
String TRANSFERS_WITH_FEE = "transfers-with-fee";
@Input(Bank.TRANSFERS)
SubscribableChannel transfers();
@Output(Bank.TRANSFERS_WITH_FEE)
MessageChannel transfersWithFee();
}
The transfers
channel is using a subtype of MessageChannel
which supports subscribing to it. It is good practice to customize the channel names with a String constant in the interface, by default the channel name is the method name.
Configuring a Spring Cloud Stream application
Next, we need to tell to Spring Cloud Stream how to fetch messages sent to the transfers
channel. Spring Cloud Stream introduced the abstraction called Binder
which makes it super easy to connect destinations
to message brokers like RabbitMQ or Apache Kafka.
By default here the destination
is the name after the spring.cloud.stream.bindings
however we can customize it with destination
setting as we did in the following configuration. Using RabbitMQ as a binder this will result in creating money-transfers
exchange.
We need to override the default content-type
(which is application/json
) to application/xml
.
Not required for our example to work, but we set transfers.qroup
to queue
which makes sure that we get a durable queue named money-transfers.queue
instead of an auto-delete, non-durable, exclusive queue named like money-transfers.anonymous.3KHTW8pCRGu7qv9s6Pznxg
spring:
application:
name: transfer-fee-calculator
cloud:
stream:
bindings:
transfers:
destiantion: money-transfers
content-type: application/xml
group: queue
transfers-with-fee:
destiantion: money-transfers-with-fee
content-type: application/json
producer:
required-groups: queue
By setting required-groups
we make sure that the queue named money-transfers-with-fee.queue
is created and bound to the money-transfers-with-fee
exchange.
It is important to note that another consumer processing the messages from money-transfers-with-fee
destination does not need to use the same channel names, only the destination name should match.
Adding a Transformer to a Spring Cloud Stream application
Next we need to define the transformer. Here we are using @ServiceActivator
from Spring Integration. Note, there is currently an issue with @Transformer
annotation when using a custom converter.
@Component
@Slf4j
public class TransferFeeCalculator {
@ServiceActivator(inputChannel = Bank.TRANSFERS, outputChannel = Bank.TRANSFERS_WITH_FEE)
public TransferWithFee transform(Transfer transfer) {
log.info("received transfer message: " + transfer);
TransferWithFee transferWithFee = TransferWithFee.builder()
.amount(transfer.getAmount())
.fromCurrency(transfer.getFromCurrency())
.toCurrency(transfer.getToCurrency())
.dateTime(transfer.getDateTime())
.paymentMethod(transfer.getPaymentMethod())
.transferFee(calculateTransferFee(
transfer.getAmount(), transfer.getFromCurrency(), transfer.getPaymentMethod()))
.build();
return transferWithFee;
}
}
Testing a Spring Cloud Stream application
Including the following dependency we can leverage the TestSupportBinder
which is a test Binder
implementation allowing to inspect what messages are sent and received by the application.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
The TestSupportBinderAutoConfiguration
registers it with high priority so adding it as a test dependency will replace all binders with the TestBindingSupport
binder.
The TestSupportBinder
provides a MessageCollector
which maintains a map between output channels and messages received. Injecting this into the test we can run assertions on the enqueued messages.
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT)
public class TransferFeeCalculatorTests {
@Autowired
private Bank bank;
@Autowired
private MessageCollector collector;
@SpyBean
private TransferFeeCalculator transferFeeCalculator;
@Test
public void testTransformer() {
SubscribableChannel transfers = bank.transfers();
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/xml");
String msg = "" +
"<transfer>" +
" <amount>451</amount>" +
" <fromCurrency>EUR</fromCurrency>" +
" <toCurrency>CHF</toCurrency>" +
" <dateTime>2018-03-21T10:15:30</dateTime>" +
" <paymentMethod>CREDIT_CARD</paymentMethod>" +
"</transfer>";
transfers.send(new GenericMessage<>(msg.getBytes(StandardCharsets.UTF_8), headers));
BlockingQueue<Message<?>> messages = this.collector.forChannel(bank.transfersWithFee());
assertThat(messages, receivesPayloadThat(is(
"{" +
"\"amount\":451," +
"\"fromCurrency\":\"EUR\"," +
"\"toCurrency\":\"CHF\"," +
"\"dateTime\":\"2018-03-21T10:15:30\"," +
"\"paymentMethod\":\"CREDIT_CARD\"," +
"\"transferFee\":{" +
"\"amount\":5.5924,\"" +
"currency\":\"EUR\"}" +
"}"
)));
verify(this.transferFeeCalculator, times(1)).transform(any(Transfer.class));
}
}
Conclusion
Registering a custom message converter in Spring Cloud Stream 2.0 is much more simplified and consistent using only the MessageConverter
abstraction. You can find the source code for this blog post on my github profile https://github.com/altfatterz/data-transformation-spring-cloud-stream
If you want to know more about the message conversion, head to Oleg Zhurakousky's (@z_oleg) excellent blog post https://spring.io/blog/2018/02/26/spring-cloud-stream-2-0-content-type-negotiation-and-transformation