Trigger a Spring Batch job with a JMS message
Creating a batch-driven solution is pretty easy with Spring Batch. There is an excellent getting started guide on spring.io website, which explains the basics and provides example code how to start a batch job on application startup. However, batch jobs are typically executed as a result of an event. In this blog post, we are going to look how to trigger a Spring Batch job with a JMS message.
Trigger a batch job on startup
Let's review first what additional support gives Spring Boot on top of Spring Batch. As usual, there is auto-configuration support by which JobLauncherCommandLineRunner will be created and all jobs in the context will be executed on startup. The executed batch jobs can be limited with the spring.batch.job.names
property.
Let's see this in practice. Head to this https://github.com/altfatterz/spring-batch-with-jms GitHub repository, and look into the trigger-batch-job-upon-startup
module. The example is using H2 database in server mode in order to examine the infrastructure tables created by Spring Batch. Installing H2 is pretty easy, on Mac you would do:
brew install h2
brew services start h2
Then you can connect to the H2 console at http://localhost:8082/.
Our example batch job is very simple containing only a single step which prints out a logging message.
@Slf4j
@Configuration
public class BatchConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
log.info("step executed");
return RepeatStatus.FINISHED;
}).build();
}
@Bean
public Job job(Step step1) throws Exception {
return jobBuilderFactory.get("job1")
.incrementer(new RunIdIncrementer())
.start(step1)
.build();
}
}
If you execute the TriggerBatchJobUponStartApp
in the logs you should see that the job is executed:
o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job1]] launched with the following parameters: [{run.id=1}]
o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
c.e.t.BatchConfiguration : step executed
o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job1]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
and if you look into H2 console again you will see that Spring Batch created the following tables:
BATCH_JOB_EXECUTION
BATCH_JOB_EXECUTION_CONTEXT
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_INSTANCE
BATCH_STEP_EXECUTION
BATCH_STEP_EXECUTION_CONTEXT
Now the RunIdIncremeter
in the above code snippet needs a bit of explanation. Spring Batch has the rule that a JobInstance
can only be run once to completion. This means that for each combination of identifying job parameters, you can only have one JobExecution
that results in COMPLETE. The RunIdIncrementer
will append an additional, unique parameter (run.id) to the list of parameters so that the resulting combination would be unique, giving you a new JobInstance
each time you ran the job with the same combination of identifying parameters.
Trigger a batch job with a JMS message
Let's look how to trigger the batch job with a JMS message. First, you need to set the spring.batch.job.enabled
property to false otherwise Spring Boot will start the batch jobs on application start. In this example, we leverage Spring Integration to consume a JMS message and we use ActiveMQ as a JMS provider. Installing ActiveMQ is pretty easy, if you are on Mac you would do:
brew install activemq
brew services start activemq
You can verify that is up and running by logging in into its administration interface at http://localhost:8161/admin/ with admin/admin credentials. In this example, we are going to use the http://hawt.io/ console instead which provides a better user experience when sending and analyzing JMS messages. Unfortunately, there is no homebrew support for it so you need to download the jar artifact and you can start the service with:
java -jar hawtio-app-1.5.4.jar --port 9999
The console will be available at http://localhost:9999/hawtio. You need to click on Connect then Local and the hit the Start agent for Apache ActiveMQ. The agent URL will be provided where you will find the ActiveMQ tab from where you can access the exposed resources like queues, topics, client connectors.
In this example, we are just interested in to trigger a Spring Batch job, no requirement to provide the result of batch job execution. So we can consume the JMS message with a Channel Adapter, no need for a Gateway.
There are two JMS-based inbound Channel Adapters. The first uses Spring's JmsTemplate
to receive messages based on a polling period. The second is message-driven and relies upon a MessageListenerContainer
. We will look into both.
Inbound channel adapter
Let's consume the following JMS messages with a JmsTemplate
and a poller.
<notification>
<email>example@gmail.com</email>
<status>ORDER_DISPATCHED</status>
</notification>
Processing the message will trigger a batch job. In order to keep things simple, the batch job will contain only one single step which will send an email to the address extracted from the message.
The integration flow is the following:
@Bean
public IntegrationFlow myJmsTriggeredFlow() {
return IntegrationFlows.from(jmsDestinationPollingSource,
c -> c.poller(Pollers.fixedRate(5000, 2000)))
.transform(toJobLaunchRequest())
.handle(jobLaunchingGateway)
.handle(logger())
.get();
}
With the help of JMSDestinationPollingSource
, we receive the JMS message then we transform it into a JobLaunchRequest
object and then with the help of JobLaunchingGateway
the batch job is launched. In the JMSDestinationPollingSource
bean definition, we set the message converter, configure the queue name from where we fetch the messages.
@Bean
public JmsDestinationPollingSource jmsDestinationPollingSource(JmsTemplate jmsTemplate) {
jmsTemplate.setMessageConverter(messageConverter());
JmsDestinationPollingSource jmsDestinationPollingSource = new JmsDestinationPollingSource(jmsTemplate);
jmsDestinationPollingSource.setDestinationName("notifications");
return jmsDestinationPollingSource;
}
The message transformation is pretty simple, we extract the field values and set them as job parameters:
public JobLaunchRequest toRequest(Message<Notification> message) {
Notification notification = message.getPayload();
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder()
.addString("run.id", UUID.randomUUID().toString())
.addString("email", notification.getEmail())
.addString("status", notification.getStatus());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
If you would like to try it yourself check out the inbound-channel-adapter-example
module, make sure H2 and ActiveMQ are started and execute the InboudChannelAdapterExample
providing also the –spring.mail.username=<username>
and –spring.mail.password=<password>
program arguments. With the hawtio console put a notification message on the 'notification' queue (which was created by the application automatically) in order to verify that the batch job is triggered.
Message-driven channel adapter
Another way of consuming JMS messages is with a MessageListenerContainer
. In this example, we use the SimpleMessageListenerContainer
which is enough since we do not need XA support only native JMS transaction support. Setting the 'sessionTransacted' property to true we make sure that if during message processing an error occurs it will lead to a rollback with the message getting redelivered or put to a dead-letter queue.
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
simpleMessageListenerContainer.setDestinationName("trades");
simpleMessageListenerContainer.setSessionTransacted(true);
return simpleMessageListenerContainer;
}
In this example, we consume a 'trade' message in the following format:
<trade>
<stock>AMZN</stock>
<quantity>100</quantity>
<action>BUY</action>
</trade>
The JMS message is converted into Spring Integration Message by the JmsMessageDrivenEndpoint
and put on a MessageChannel
.
@Bean
public JmsMessageDrivenEndpoint jmsMessageDrivenEndpoint() {
ChannelPublishingJmsMessageListener cpjml = new ChannelPublishingJmsMessageListener();
cpjml.setRequestChannel(inputChannel());
cpjml.setMessageConverter(messageConverter());
JmsMessageDrivenEndpoint jmde = new JmsMessageDrivenEndpoint(messageListenerContainer(), cpjml);
return jmsMessageDrivenEndpoint;
}
Then with a ServiceActivator
, the JMS message is converted into a JobLaunchRequest
and put on an output channel.
@ServiceActivator(inputChannel = "inputChannel", outputChannel = "outputChannel")
public JobLaunchRequest process(Trade trade) {
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder()
.addString("run.id", UUID.randomUUID().toString())
.addString("stock", trade.getStock())
.addString("action", trade.getAction().toString())
.addLong("quantity", Long.valueOf(trade.getQuantity()));
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
And finally, the JobLaunchingGateway
receives the JobLaunchRequest
messages from the output channel and launches the spring batch job.
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlows.from("outputChannel")
.handle(jobLaunchingGateway)
.handle(logger())
.get();
}
Conclusion
We have seen that is pretty simple to trigger a Spring Batch job with a JMS messages. It basically boils down to convert the received JMS message into a JobLaunchRequest
and leverage the JobLaunchingGateway
to launch the batch job. It is worth to note that consuming JMS messages with the JMSTemplate
in combination with a poller approach is recommended only for low message volume, for high message volume you should consider using the message-driven approach.
The example code can be found here. If you are interested send me a pull request with an example of triggering a batch job by adding a new entry into a trigger table.