Data transformation with Spring Cloud Stream

April 4, 2018

Spring Cloud Stream 2.0 comes with a more consistent and simplified message transformation support. In this blog post we look into how we can convert a message in XML format received on a channel to JSON and send it to another channel to be consumed by further message handlers.

As an example we model a money transfer use case where we want to transfer an amount in one currency to another currency.

<transfer>
    <amount>451</amount>
    <fromCurrency>EUR</fromCurrency>
    <toCurrency>CHF</toCurrency>
    <dateTime>2018-03-21T10:15:30</dateTime>
    <paymentMethod>CREDIT_CARD</paymentMethod>
</transfer>

The consumer of these messages calculates the fee related to the transfer messages and prepares the response message in JSON format which could be the format which the other consumer down the line expects.

{ 
  "amount":451,
  "fromCurrency":"EUR",
  "toCurrency":"CHF",
  "dateTime":"2018-03-21T10:15:30",
  "paymentMethod":"CREDIT_CARD",
  "transferFee": {
    "amount":5.5924,
    "currency":"EUR"
  }
}

Built-in converters in Spring Cloud Stream

Spring Cloud Stream 2.0 has the following built-in message converters:

In order to consume a message in XML format we need to define a custom message converter which should implement the org.springframework.messaging.converter.MessageConverter interface. Here we leverage the AbstractMessageConverter which provides partial implementation of the conversion methods.

public class TransferMessageConverter extends AbstractMessageConverter {

    private final ObjectMapper objectMapper;

    public TransferMessageConverter() {
        super(MimeType.valueOf("application/xml"));
        objectMapper = BankDataObjectMapperFactory.objectMapper();
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return clazz.equals(Transfer.class);
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
        try {
            return objectMapper.readValue((byte[]) message.getPayload(), Transfer.class);
        } catch (IOException e) {
            return null;
        }
    }
}

Next we need to configure it as a @Bean and annotate it with @StreamMessageConverter which makes sure that the custom message converter will be added on the top of the message converters defined in the CompositeMessageConverterFactory

@EnableBinding(Bank.class)
public class BankDataInfrastructureConfiguration {

    @Bean
    @StreamMessageConverter
    public MessageConverter transferMessageConverter() {
        return new TransferMessageConverter();
    }
}

Creating Spring Integration channels

Next we create two channels:

  1. transfers from where we accept the transfer messages in XML format
  2. transfers-with-fee were we put te converted transfer messages in JSON format

Using Spring Cloud Stream we only need to declare these channels in an interface, the @EnableBinding annotation will trigger the creation of these channels.

public interface Bank {

    String TRANSFERS = "transfers";

    String TRANSFERS_WITH_FEE = "transfers-with-fee";

    @Input(Bank.TRANSFERS)
    SubscribableChannel transfers();

    @Output(Bank.TRANSFERS_WITH_FEE)
    MessageChannel transfersWithFee();
}

The transfers channel is using a subtype of MessageChannel which supports subscribing to it. It is good practice to customize the channel names with a String constant in the interface, by default the channel name is the method name.

Configuring a Spring Cloud Stream application

Next, we need to tell to Spring Cloud Stream how to fetch messages sent to the transfers channel. Spring Cloud Stream introduced the abstraction called Binder which makes it super easy to connect destinations to message brokers like RabbitMQ or Apache Kafka. By default here the destination is the name after the spring.cloud.stream.bindings however we can customize it with destination setting as we did in the following configuration. Using RabbitMQ as a binder this will result in creating money-transfers exchange. We need to override the default content-type (which is application/json) to application/xml.

Not required for our example to work, but we set transfers.qroup to queue which makes sure that we get a durable queue named money-transfers.queue instead of an auto-delete, non-durable, exclusive queue named like money-transfers.anonymous.3KHTW8pCRGu7qv9s6Pznxg

spring:
  application:
    name: transfer-fee-calculator

  cloud:
    stream:
      bindings:
        transfers:
          destiantion: money-transfers
          content-type: application/xml
          group: queue
        transfers-with-fee:
          destiantion: money-transfers-with-fee
          content-type: application/json
          producer:
            required-groups: queue

By setting required-groups we make sure that the queue named money-transfers-with-fee.queue is created and bound to the money-transfers-with-fee exchange.

It is important to note that another consumer processing the messages from money-transfers-with-fee destination does not need to use the same channel names, only the destination name should match.

Adding a Transformer to a Spring Cloud Stream application

Next we need to define the transformer. Here we are using @ServiceActivator from Spring Integration. Note, there is currently an issue with @Transformer annotation when using a custom converter.

@Component
@Slf4j
public class TransferFeeCalculator {

    @ServiceActivator(inputChannel = Bank.TRANSFERS, outputChannel = Bank.TRANSFERS_WITH_FEE)
    public TransferWithFee transform(Transfer transfer) {
        log.info("received transfer message: " + transfer);

        TransferWithFee transferWithFee = TransferWithFee.builder()
                .amount(transfer.getAmount())
                .fromCurrency(transfer.getFromCurrency())
                .toCurrency(transfer.getToCurrency())
                .dateTime(transfer.getDateTime())
                .paymentMethod(transfer.getPaymentMethod())
                .transferFee(calculateTransferFee(
                        transfer.getAmount(), transfer.getFromCurrency(), transfer.getPaymentMethod()))
                .build();

        return transferWithFee;
    }
}

Testing a Spring Cloud Stream application

Including the following dependency we can leverage the TestSupportBinder which is a test Binder implementation allowing to inspect what messages are sent and received by the application.

<dependency>
    <groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
</dependency>

The TestSupportBinderAutoConfiguration registers it with high priority so adding it as a test dependency will replace all binders with the TestBindingSupport binder. The TestSupportBinder provides a MessageCollector which maintains a map between output channels and messages received. Injecting this into the test we can run assertions on the enqueued messages.

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = RANDOM_PORT)
public class TransferFeeCalculatorTests {

    @Autowired
    private Bank bank;

    @Autowired
    private MessageCollector collector;

    @SpyBean
    private TransferFeeCalculator transferFeeCalculator;

    @Test
    public void testTransformer() {
        SubscribableChannel transfers = bank.transfers();

        Map<String, Object> headers = new HashMap<>();
        headers.put("contentType", "application/xml");

        String msg = "" +
                "<transfer>" +
                "   <amount>451</amount>" +
                "   <fromCurrency>EUR</fromCurrency>" +
                "   <toCurrency>CHF</toCurrency>" +
                "   <dateTime>2018-03-21T10:15:30</dateTime>" +
                "   <paymentMethod>CREDIT_CARD</paymentMethod>" +
                "</transfer>";

        transfers.send(new GenericMessage<>(msg.getBytes(StandardCharsets.UTF_8), headers));

        BlockingQueue<Message<?>> messages = this.collector.forChannel(bank.transfersWithFee());

        assertThat(messages, receivesPayloadThat(is(
                "{" +
                        "\"amount\":451," +
                        "\"fromCurrency\":\"EUR\"," +
                        "\"toCurrency\":\"CHF\"," +
                        "\"dateTime\":\"2018-03-21T10:15:30\"," +
                        "\"paymentMethod\":\"CREDIT_CARD\"," +
                        "\"transferFee\":{" +
                        "\"amount\":5.5924,\"" +
                        "currency\":\"EUR\"}" +
                        "}"
        )));

        verify(this.transferFeeCalculator, times(1)).transform(any(Transfer.class));
    }
}

Conclusion

Registering a custom message converter in Spring Cloud Stream 2.0 is much more simplified and consistent using only the MessageConverter abstraction. You can find the source code for this blog post on my github profile https://github.com/altfatterz/data-transformation-spring-cloud-stream

If you want to know more about the message conversion, head to Oleg Zhurakousky's (@z_oleg) excellent blog post https://spring.io/blog/2018/02/26/spring-cloud-stream-2-0-content-type-negotiation-and-transformation

About the author: Zoltan Altfatter

Zoltan Altfatter is a software craftsman at mimacom. He has been in software engineering for over 10 years. He is passionate about the JVM and the Spring ecosystem.