Trigger a Spring Batch job with a JMS message

September 29, 2017

Creating a batch-driven solution is pretty easy with Spring Batch. There is an excellent getting started guide on spring.io website, which explains the basics and provides example code how to start a batch job on application startup. However, batch jobs are typically executed as a result of an event. In this blog post, we are going to look how to trigger a Spring Batch job with a JMS message.

Trigger a batch job on startup

Let's review first what additional support gives Spring Boot on top of Spring Batch. As usual, there is auto-configuration support by which  JobLauncherCommandLineRunner will be created and all jobs in the context will be executed on startup. The executed batch jobs can be limited with the spring.batch.job.names property.

Let's see this in practice. Head to this https://github.com/altfatterz/spring-batch-with-jms GitHub repository, and look into the trigger-batch-job-upon-startup module. The example is using H2 database in server mode in order to examine the infrastructure tables created by Spring Batch. Installing H2 is pretty easy, on Mac you would do:

brew install h2
brew services start h2

Then you can connect to the H2 console at http://localhost:8082/.

Our example batch job is very simple containing only a single step which prints out a logging message.

@Slf4j
@Configuration
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .tasklet((contribution, chunkContext) -> {
                    log.info("step executed");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Bean
    public Job job(Step step1) throws Exception {
        return jobBuilderFactory.get("job1")
                .incrementer(new RunIdIncrementer())
                .start(step1)
                .build();
    }
}

If you execute the TriggerBatchJobUponStartApp  in the logs you should see that the job is executed:

o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job1]] launched with the following parameters: [{run.id=1}]
o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
c.e.t.BatchConfiguration : step executed
o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job1]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]

and if you look into H2 console again you will see that Spring Batch created the following tables:

BATCH_JOB_EXECUTION
BATCH_JOB_EXECUTION_CONTEXT
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_INSTANCE
BATCH_STEP_EXECUTION
BATCH_STEP_EXECUTION_CONTEXT

Now the RunIdIncremeter in the above code snippet needs a bit of explanation. Spring Batch has the rule that a JobInstance can only be run once to completion. This means that for each combination of identifying job parameters, you can only have one JobExecution that results in COMPLETE. The RunIdIncrementer will append an additional, unique parameter (run.id) to the list of parameters so that the resulting combination would be unique, giving you a new JobInstance each time you ran the job with the same combination of identifying parameters.

Trigger a batch job with a JMS message

Let's look how to trigger the batch job with a JMS message. First, you need to set the spring.batch.job.enabled property to false otherwise Spring Boot will start the batch jobs on application start. In this example, we leverage Spring Integration to consume a JMS message and we use ActiveMQ as a JMS provider. Installing ActiveMQ is pretty easy, if you are on Mac you would do:

brew install activemq
brew services start activemq

You can verify that is up and running by logging in into its administration interface at http://localhost:8161/admin/ with admin/admin credentials. In this example, we are going to use the http://hawt.io/ console instead which provides a better user experience when sending and analyzing JMS messages. Unfortunately, there is no homebrew support for it so you need to download the jar artifact and you can start the service with:

java -jar hawtio-app-1.5.4.jar --port 9999

The console will be available at http://localhost:9999/hawtio. You need to click on Connect then Local and the hit the Start agent for Apache ActiveMQ. The agent URL will be provided where you will find the ActiveMQ tab from where you can access the exposed resources like queues, topics, client connectors.

In this example, we are just interested in to trigger a Spring Batch job, no requirement to provide the result of batch job execution. So we can consume the JMS message with a Channel Adapter, no need for a Gateway.

There are two JMS-based inbound Channel Adapters. The first uses Spring's JmsTemplate to receive messages based on a polling period. The second is message-driven and relies upon a MessageListenerContainer. We will look into both.

Inbound channel adapter

Let's consume the following JMS messages with a JmsTemplate and a poller.

<notification>
   <email>example@gmail.com</email>
   <status>ORDER_DISPATCHED</status>
</notification>

Processing the message will trigger a batch job. In order to keep things simple, the batch job will contain only one single step which will send an email to the address extracted from the message.

The integration flow is the following:

@Bean
public IntegrationFlow myJmsTriggeredFlow() {
    return IntegrationFlows.from(jmsDestinationPollingSource,
            c -> c.poller(Pollers.fixedRate(5000, 2000)))
            .transform(toJobLaunchRequest())
            .handle(jobLaunchingGateway)
            .handle(logger())
            .get();
}

With the help of JMSDestinationPollingSource, we receive the JMS message then we transform it into a JobLaunchRequest object and then with the help of JobLaunchingGateway the batch job is launched. In the JMSDestinationPollingSource bean definition, we set the message converter, configure the queue name from where we fetch the messages.

@Bean
public JmsDestinationPollingSource jmsDestinationPollingSource(JmsTemplate jmsTemplate) {
    jmsTemplate.setMessageConverter(messageConverter());
    JmsDestinationPollingSource jmsDestinationPollingSource = new JmsDestinationPollingSource(jmsTemplate);
    jmsDestinationPollingSource.setDestinationName("notifications");
    return jmsDestinationPollingSource;
}

The message transformation is pretty simple, we extract the field values and set them as job parameters:

public JobLaunchRequest toRequest(Message<Notification> message) {
    Notification notification = message.getPayload();
    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder()
            .addString("run.id", UUID.randomUUID().toString())
            .addString("email", notification.getEmail())
            .addString("status", notification.getStatus());
    return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}

If you would like to try it yourself check out the inbound-channel-adapter-example module, make sure H2 and ActiveMQ are started and execute the InboudChannelAdapterExample providing also the –spring.mail.username=<username> and –spring.mail.password=<password> program arguments. With the hawtio console put a notification message on the 'notification' queue (which was created by the application automatically) in order to verify that the batch job is triggered.

Message-driven channel adapter

Another way of consuming JMS messages is with a MessageListenerContainer. In this example, we use the SimpleMessageListenerContainer which is enough since we do not need XA support only native JMS transaction support. Setting the 'sessionTransacted' property to true we make sure that if during message processing an error occurs it will lead to a rollback with the message getting redelivered or put to a dead-letter queue.

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
    simpleMessageListenerContainer.setDestinationName("trades");
    simpleMessageListenerContainer.setSessionTransacted(true);
    return simpleMessageListenerContainer;
}

In this example, we consume a 'trade' message in the following format:

<trade>
  <stock>AMZN</stock>
  <quantity>100</quantity>
  <action>BUY</action>
</trade>

The JMS message is converted into Spring Integration Message by the JmsMessageDrivenEndpoint and put on a MessageChannel.

@Bean
public JmsMessageDrivenEndpoint jmsMessageDrivenEndpoint() {
    ChannelPublishingJmsMessageListener cpjml = new ChannelPublishingJmsMessageListener();
    cpjml.setRequestChannel(inputChannel());
    cpjml.setMessageConverter(messageConverter());
    JmsMessageDrivenEndpoint jmde = new JmsMessageDrivenEndpoint(messageListenerContainer(), cpjml);
    return jmsMessageDrivenEndpoint;
}

Then with a ServiceActivator, the JMS message is converted into a JobLaunchRequest and put on an output channel.

@ServiceActivator(inputChannel = "inputChannel", outputChannel = "outputChannel")
public JobLaunchRequest process(Trade trade) {
    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder()
            .addString("run.id", UUID.randomUUID().toString())
            .addString("stock", trade.getStock())
            .addString("action", trade.getAction().toString())
            .addLong("quantity", Long.valueOf(trade.getQuantity()));
    return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}

And finally, the JobLaunchingGateway receives the JobLaunchRequest messages from the output channel and launches the spring batch job.

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlows.from("outputChannel")
            .handle(jobLaunchingGateway)
            .handle(logger())
            .get();
}

Conclusion

We have seen that is pretty simple to trigger a Spring Batch job with a JMS messages. It basically boils down to convert the received JMS message into a JobLaunchRequest and leverage the JobLaunchingGateway to launch the batch job. It is worth to note that consuming JMS messages with the JMSTemplate in combination with a poller approach is recommended only for low message volume, for high message volume you should consider using the message-driven approach.

The example code can be found here. If you are interested send me a pull request with an example of triggering a batch job by adding a new entry into a trigger table.

About the author: Zoltan Altfatter

Zoltan Altfatter is a software craftsman at mimacom. He has been in software engineering for over 10 years. He is passionate about the JVM and the Spring ecosystem.

Comments
Join us