Components and Usage of Spring Data Elasticsearch (Part 2)

January 25, 2017

In the previous post, we discussed how to read data in chunks from a relational database as a Spring Batch process using Spring Boot runner. This post will explain how to import that data into Elasticsearch.

Spring Boot provides out of the box support for Elasticsearch that saves a lot of configuration time so we are going to take full advantage of that. However, in real world application, you will need to use the Elasticsearch Java Client that requires additional instantiating and configuring. When no configuration is provided, the Spring Boot integration for Elasticsearch will automatically create an embedded Elasticsearch instance which is very useful for testing.

This approach is very good, but it also means that the embedded Elasticsearch will live as long as the Spring Boot process lives – basically it will shut down after the import finishes. That leads to one big disadvantage – it is not possible to persistently configure Elasticsearch plugins. Fortunately, you can configure the Elasticsearch instance with the Spring Boot app will connect to with the following parameters:

spring.data.elasticsearch.cluster-name=application
spring.data.elasticsearch.cluster-nodes=localhost:9300

You may have to change the cluster-name parameter to match the name of your cluster. The port is the TCP port of your Elasticsearch server (default 9300) and if this port is already taken, then Elasticsearch will pick the first free port by incrementing the port number.

Now that the Elasticsearch configuration is completed we can start discussing the different possibilities of accessing Elasticsearch data:

  1. Autowiring the Client from Elasticsearch, usable like in every other application
  2. Writing an ElasticsearchCrudRepository similar the CrudRepository from JPA
  3. Using the ElasticsearchTemplate for less magic and more possibilities

We will Start with the trivial way which is the Client-Interface of Elasticsearch. Luckily, Spring already created a bean of this type and provided it. It is enough to add a constructor parameter with Client and mark the constructor as @Autowired. For more information about the usage of the Java Client of Elasticsearch, you can look at the official API documentation.

The second way, similar to reading the data with JPA in Elasticsearch, is to use the CrudRepository for Elasticsearch. Therefore, it is necessary to annotate the Domain-Model-Object with @Document(indexName = "employees") and the ID-Attribute with @Id attribute. Notice, the imported @Document annotation should be from spring-data-elasticsearch, and not from another spring-data package.

With this precondition in place, the only thing left is to extend the ElasticsearchCrudRepository<DomainObject, IdType>. In our case the DomainObject is Employee and the IdType is an Integer, so the new ElasticsearchEmployeeRepository will look like the following:

public interface ElasticsearchEmployeeRepository extends ElasticsearchCrudRepository<Employee, Integer> {
}

Now we have a new repository which has several default methods: count(), findAll(), findOne(ID), delete(ID), deleteAll(), exists(ID), save(DomainObject), save(Iterable<DomainObject>). The last method can be used to execute bulk request. The benefit of a bulk request is that Elasticsearch can process multiple data at the same time so the overhead from indexing the data is significantly reduced.

This CRUD repository can be used to write the imported data directly to Elasticsearch, but to do that it needs implementation of the ItemWriter interface. This interface predefines one processing method write which receives a list of items that must be written to Elasticsearch. The implementation looks like this:

@Component
public class EmployeesItemWriter implements ItemWriter<Employee> {
    private ElasticsearchEmployeeRepository elasticsearchEmployeeRepository;
 
    @Autowired
    public EmployeesItemWriter(ElasticsearchEmployeeRepository elasticsearchEmployeeRepository) {
        this.elasticsearchEmployeeRepository = elasticsearchEmployeeRepository;
    }
 
    @Override
    public void write(List<? extends Employee> items) throwsException {
        this.elasticsearchEmployeeRepository.save(items);
    }
}

In order to use the newly created ItemWriter, you must include it in the BatchConfiguration class by modifying the step to look like the following:

@Bean
public Step importEmployeeStep(StepBuilderFactory stepBuilderFactory,
                               ItemReader<Employee> employeeItemReader,                                
                               ItemWriter<Employee> employeeItemWriter) {
    return stepBuilderFactory.get("importEmployeeStep")
                .<Employee, Employee>chunk(1000)
                .reader(employeeItemReader)
                .writer(employeeItemWriter)
                .build();
}

Now you can rerun the importer again and the data should be imported to Elasticsearch. Since we specified the index name as employees and the document is called employee we will find all employees when we search against the following endpoint: http://localhost:9200/employees/employee/_search

Most customers often require that the data in the database should be updated without any Elasticsearch downtime. One solution is to do a complete re-import of the data. Unfortunately, it is not always possible to drop and recreate the whole index, because that will leave the system with no available data for a couple of seconds, depending on amount of the data. One solution to solve this problem is to create the index with a different name and add an alias for the real name. After the new index is created the alias can be relinked to the new index so the downtime is reduced to the time it takes Elastiseach to change a simple link.

To change the name on every import, we can use the run.id that is created by the RunIdIncrementor by adding the @JobScope annotation to our writer. Without that annotation our component will not be bounded to the job-context. When we use the @JobScope annotation, our component will be instantiated for each job.

Then we can autowire the job parameter to our writer with the @Value annotation. This can look like the following:

@Component
@JobScope
public class EmployeesItemWriter implements ItemWriter<Employee> {
 
    private ElasticsearchTemplate elasticsearchTemplate;
    private Date date;
    private Long runId;
 
    @Autowired
    public EmployeesItemWriter(ElasticsearchTemplate elasticsearchTemplate,
                               @Value("#{jobParameters['run.id']}") Long runId) {
        this.elasticsearchTemplate = elasticsearchTemplate;
        this.runId = runId;
    }
 
    // ...
}

Unfortunately, now its not possible to add a dynamic index name to the repository or the domain object. This means that our CRUD repository is limited to importing only to predefined index name. But we need to be able to create indexes dynamically and then simply relink their aliases. To solve this issue, we can either create an importing alias which will be the same for each importing, or we could use the ElasticsearchTemplate, which is also a part of spring-data-elasticsearch integration. The ElasticsearchTemplate is similar to the JdbcTemplate for relational databases. Fortunately, it has also a method for doing a bulk request to import data, our processing can look like the following:

@Override
public void write(List<? extends Employee> items) throws Exception {
    List<IndexQuery> indexQueries = items.stream()
            .map(item -> new IndexQueryBuilder().withObject(item).withId(String.valueOf(item.getEmpNo())))
            .map(builder -> builder.withType("employee"))
            .map(builder -> builder.withIndexName("employees-" + runId))
            .map(IndexQueryBuilder::build)
            .collect(Collectors.toList());
 
    this.elasticsearchTemplate.bulkIndex(indexQueries);
}

Now we are importing the employees to a new employees-[run.id] index, where the [run.id] is the number of the job. So, if you want to figure out what was the result after importing a specific job, you just need to match the [run.id] of the index with the [run.id] stored in the batch job table.

But since this takes time and effort for the regular user, you might want to make things easier. You can do that by creating an alias, here is how: you first need to remove the old index which was created by our first import. You can do a DELETE request to http://localhost:9200/employees/ with a REST client of your choice. For example with CURL:

curl -XDELETE http://localhost:9200/employees/

Now we need to create a new step, which is only executed if the first step was successful. This step is going to be different than the other step, because it is not going to perform reading or writing on a list of item, instead it will execute a task, so we can use the AbstractBatchlet to create a single processor.

Now, we need to create the new alias. This can be done with AliasQuery which is provided by the ElasticsearchTemplate. The AliasQuery is used for adding and removing of index aliases, all you need to do is to instantiate it by providing the index name and the index alias, afterwards we can execute it with the ElasticsearchTemplate:

AliasQuery aliasQuery = new AliasQuery();
aliasQuery.setAliasName("employees");
aliasQuery.setIndexName("employees-" + date.getTime());
elasticsearchTemplate.addAlias(aliasQuery);

This code will generate new alias, but will not remove the existing ones. This is very bad, because Elasticsearch will make all aliases available during querying time. So it is necessary to remove all old aliases manually before creating the new alias. For this we can use a wildcard AliasQuery:

AliasQuery removeAliasQuery = newAliasQuery();
removeAliasQuery.setAliasName("employees");
removeAliasQuery.setIndexName("employees-*");
elasticsearchTemplate.removeAlias(removeAliasQuery);

Since this expression would remove also our new alias, it is necessary to execute it before the new alias is created. Unfortunately, this means that we will have to execute two queries instead of one. And if the second one fails for some reason, there will be no alias available to rollback to. With the original Elasticsearch client it is possible to do it in one transaction, but for now the solution above should be enough, the resulting batchlet can look like the following:

@Component
@JobScope
public class CreateIndexBatchletStep extends AbstractBatchlet {
 
    private ElasticsearchTemplate elasticsearchTemplate;
    private Long runId;
 
    @Autowired
    public CreateIndexBatchletStep(ElasticsearchTemplate elasticsearchTemplate,
                                   @Value("#{jobParameters['run.id']}") Long runId) {
        this.elasticsearchTemplate = elasticsearchTemplate;
        this.runId = runId;
    }
 
    @Override
    public String process() throws Exception {
        try {
            AliasQuery removeAliasQuery = new AliasQuery();
            removeAliasQuery.setAliasName("employees");
            removeAliasQuery.setIndexName("employees-*");
            elasticsearchTemplate.removeAlias(removeAliasQuery);
        } catch (AliasesNotFoundException exception) {
            // Ignore
        }
 
        AliasQuery aliasQuery = new AliasQuery();
        aliasQuery.setAliasName("employees");
        aliasQuery.setIndexName("employees-" + runId);
        elasticsearchTemplate.addAlias(aliasQuery);
        return COMPLETED.toString();
    }
}

Of course, we also need to add the new step in our BatchConfiguration. This can look like the following:

@Bean
public Step renameAliasStep(StepBuilderFactory stepBuilderFactory,
                            CreateIndexBatchletStep createIndexBatchletStep) {
    return stepBuilderFactory.get("renameAliasStep")
                .tasklet(new BatchletAdapter(createIndexBatchletStep))
                .build();
}

Lastly, we have to integrate this step into our job, for this we need to add an abort, if the first step fails and add our second step. The result is a little bit more complex job configuration:

@Bean
public Job importJob(JobBuilderFactory jobBuilderFactory,
                     Step importEmployeeStep, Step renameAliasStep) {
    return jobBuilderFactory.get("importJob")
                    .incrementer(new RunIdIncrementer())
                    .start(importEmployeeStep).on(ExitStatus.FAILED.getExitCode()).fail()
                    .next(renameAliasStep)
                    .build()
                    .build();
}

When we now execute our importer, we will create a new index named employees-[num], import all our employees into this index and afterwards replace the existing (or not) alias to employees. Our client application will be able to read data for the endpoint employees without any noticeable downtime. Finally, you can find this implementation in a Github Repository to compare it with your implementation, also some improvements are available in this repository.

About the author: Valentin Zickner

Is working since 2016 at mimacom as a Software Engineering. He has a lot of experience with cloud technologies, in addition he is specialized to Spring, Elasticsearch and Flowable.

Comments
Join us