Import Relational Database Data to Elasticsearch with Spring Batch (Part 1)

January 5, 2017

Recently I had a customer project that required transferring large amounts of data from a relational database to the NoSQL database that is Elasticsearch in order to take advantage of its famous fast searching capabilities. Elasticsearch is part of the ELK stack that is released and maintained by Elastic.co. The abbreviation ELK stand for Elasticsearch, Logstash, and Kibana.

The easiest way to transfer data from a traditional relational database into Elasticsearch is by using the "L" in the ELK stack: Logstash. Unfortunately, Logstash has some limitations, and one of those limitations is directly related to reading records from a relational database because, although database entry and Elasticsearch entry may seem very similar, it’s not possible to match single database entry to a single Elasticsearch document. This difference originates from the fact that Elasticsearch doesn’t use the notion of "relations" between its "records", but instead it uses flat documents structure to store its data and flat documents have no relations between each other. In addition, the customer database have several millions entries which made the situation even more complicated. So, instead of using Logstash, a decision was made to write our own importer that was going to use batch processing and bulk writing into Elasticsearch.

I have already implemented similar applications for other projects by following this workflow:

  1. Importing data from a REST endpoint
  2. Convert the data to a search optimized record
  3. Write the records to Elasticsearch

Since this project is going to be similar, but without any limitations, I decided to start the development from scratch. That way I can take advantage of the new Spring Boot and Spring Data features. Since Spring Data supports MongoDB out of the box, it should be easy to adapt this concepts for other NoSQL databases, like Elasticsearch. Depending on the specific data store, some optimizations may be possible only for Elasticsearch or the other way around.

The following blog series will describe how to create an importing daemon to Elasticsearch from an existing data source, starting with a simple table of a relational database, and then improving the import for complex records from a relational database. The steps will be:

  1. Import Relational Database Data with Spring Batch
  2. Components and usage of Spring Data Elasticsearch

This, first blog post will describe a step-by-step introduction on how to develop an importer application that reads from any data source and writes the data to the default output in batches.

Prerequisites

Before continuing you should have:

If you don’t have a test database you can setup one by following these steps:

mysql -u employees -p < employees.sql

This command changes the username to "employees" and specifies the password for the execution. After a few seconds, you should be able to see employees with several related databases.

From now on, to the rest of this blog series, you should always have a running Elasticsearch instance and a database that will be imported to that instance.

Now that all preconditions are met we can begin importing the data. We start by creating a new Spring Boot project that will run our batch processing, using the Spring Batch. I suggest using the project generator at http://start.spring.io to create the initial project setup. This is a great website that allows you to choose the necessary dependencies for your project, and it also provides plugins for the most famous IDEs, like Intellij IDEA. For our Spring Batch project we would need the following dependencies:

Download the initial project and open it in the IDE of your choice. You can already try to run the project, although it will unfortunately fail because of two errors:

  1. The process is not able to connect to the database – we need to specify a data source
  2. We have a class not found exception, but the process is still continuing

Only small amount of configuration is needed to specify the data source. You can open the application.properties and add the following configuration (it will maybe look different depending on the database):

spring.datasource.url=jdbc:mysql://localhost:3306/employees
spring.datasource.username=employees
spring.datasource.password=employees

That should take care of the database exception and so the only thing left is the class not found exception that is basically saying that there is a missing Elasticsearch dependency. To fix this, simply add a new maven dependency:

<dependency>
         <groupId>net.java.dev.jna</groupId>
         <artifactId>jna</artifactId>
         <version>4.1.0</version>
</dependency>

After adding this dependency, we can start our new application with “mvn spring-boot:run” or our IDE integration and observe the following output:

(slide to the right to continue reading output)

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.4.1.RELEASE)
  
2016-11-15 19:55:13.781  INFO 22292 --- [           main] com.example.ImporterApplication          : Starting ImporterApplication on ida with PID 22292 (/home/valentin/development/elasticsearch/import-database-batch/importer/target/classes started by valentin in /home/valentin/development/elasticsearch/import-database-batch/importer)
2016-11-15 19:55:13.791  INFO 22292 --- [           main] com.example.ImporterApplication          : No active profile set, falling back to default profiles: default
2016-11-15 19:55:14.073  INFO 22292 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@619713e5: startup date [Tue Nov 15 19:55:14 CET 2016]; root of context hierarchy
2016-11-15 19:55:17.995  INFO 22292 --- [           main] org.elasticsearch.node                   : [Red Ronin] version[2.4.0], pid[22292], build[ce9f0c7/2016-08-29T09:14:17Z]
2016-11-15 19:55:17.996  INFO 22292 --- [           main] org.elasticsearch.node                   : [Red Ronin] initializing ...
2016-11-15 19:55:18.003  INFO 22292 --- [           main] org.elasticsearch.plugins                : [Red Ronin] modules [], plugins [], sites []
2016-11-15 19:55:18.043  INFO 22292 --- [           main] org.elasticsearch.env                    : [Red Ronin] using [1] data paths, mounts [[/ (rootfs)]], net usable_space [3.9gb], net total_space [450gb], spins? [unknown], types [rootfs]
2016-11-15 19:55:18.043  INFO 22292 --- [           main] org.elasticsearch.env                    : [Red Ronin] heap size [2.5gb], compressed ordinary object pointers [true]
2016-11-15 19:55:20.367  INFO 22292 --- [           main] org.elasticsearch.node                   : [Red Ronin] initialized
2016-11-15 19:55:20.367  INFO 22292 --- [           main] org.elasticsearch.node                   : [Red Ronin] starting ...
2016-11-15 19:55:20.369  INFO 22292 --- [           main] org.elasticsearch.transport              : [Red Ronin] publish_address {local[1]}, bound_addresses {local[1]}
2016-11-15 19:55:20.402  INFO 22292 --- [           main] org.elasticsearch.discovery              : [Red Ronin] elasticsearch/qjqyDxwzRvesFwXl691tTw
2016-11-15 19:55:20.423  INFO 22292 --- [pdateTask][T#1]] org.elasticsearch.cluster.service        : [Red Ronin] new_master {Red Ronin}{qjqyDxwzRvesFwXl691tTw}{local}{local[1]}{local=true}, reason: local-disco-initial_connect(master)
2016-11-15 19:55:20.435  INFO 22292 --- [           main] org.elasticsearch.node                   : [Red Ronin] started
2016-11-15 19:55:20.548  INFO 22292 --- [pdateTask][T#1]] org.elasticsearch.gateway                : [Red Ronin] recovered [0] indices into cluster_state
2016-11-15 19:55:20.756  INFO 22292 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2016-11-15 19:55:20.776  INFO 22292 --- [           main] com.example.ImporterApplication          : Started ImporterApplication in 8.48 seconds (JVM running for 9.759)
2016-11-15 19:55:20.789  INFO 22292 --- [       Thread-1] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@619713e5: startup date [Tue Nov 15 19:55:14 CET 2016]; root of context hierarchy
2016-11-15 19:55:20.800  INFO 22292 --- [       Thread-1] s.b.a.d.e.ElasticsearchAutoConfiguration : Closing Elasticsearch client
2016-11-15 19:55:20.800  INFO 22292 --- [       Thread-1] org.elasticsearch.node                   : [Red Ronin] stopping ...
2016-11-15 19:55:20.806  INFO 22292 --- [       Thread-1] org.elasticsearch.node                   : [Red Ronin] stopped
2016-11-15 19:55:20.807  INFO 22292 --- [       Thread-1] org.elasticsearch.node                   : [Red Ronin] closing ...
2016-11-15 19:55:20.823  INFO 22292 --- [       Thread-1] org.elasticsearch.node                   : [Red Ronin] closed
2016-11-15 19:55:20.824  INFO 22292 --- [       Thread-1] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown

Description of the output:
First, you can see the normal startup of Spring Boot with the integrated Spring Boot Logo. Then, as part of the Spring Boot startup, you can see the startup of an Elasticsearch Node, in my case with the name "Red Ronin". After Elasticsearch starts you can see that the ImporterApplication itself has also started and then terminates. That is fine for now, since we currently do not have any batch processing integrated. That is our the next step.

Spring Batch is structured in Jobs that have Steps (you can read more about it at http://docs.spring.io/spring-batch/reference/html/spring-batch-intro.html). Jobs can be instantiated with JobParameters and then executed. The execution of a Job is called JobExecution. During the Job execution, a specified list of Steps is executed. Only after all steps have executed successfully is the job considered completed. Each step can be either a simple unstructured task, called batchlet, in witch case a method is executed once, or a complex step with reading, processing, and writing. Our example uses the second option and so it is necessary to specify an ItemReader and at least one ItemProcessor and/or ItemWriter. Spring Batch will read items from the reader, then pass those items to the processor, and finally write them in chunks with the writer. It is also possible to specify a writing chunk size.

To enable modules in Spring Boot you can use the @Enable-Annotation for any configuration, e.g. the main @SpringBootApplication. In our case, we want to enable batch processing and this can be done with @EnableBatchProcessing annotation. Since we need to also specify beans that will represent our reader, processor, and writer we are going to create one more configuration class named BatchConfiguration. Spring Boot does ComponentScan automatically and it will find all @Component’s, including @Configuration, @Service, and @Controller in our package structure. The BatchConfiguration class with our two annotations will look like this:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
}

Now Spring Boot will do some magic and, if we have a correctly configured job, it will automatically execute it. Now let’s create a new job inside our BatchConfiguration:

@Bean
public Job importJob(JobBuilderFactory jobBuilderFactory, Step importEmployeeStep) {
    return jobBuilderFactory.get("importJob")
                .incrementer(new RunIdIncrementer())
                .start(importEmployeeStep)
                .build();
}

This is a simple example of a job. It creates a job named "importJob" and adds "importEmployeeStep" to it. Then the method build() is called on the factory to create the actual job. One additional option is the "incrementer". The incrementer is not mandatory, but we will use it to make our life a bit easier. The JobParametersIncrementer interface allows us to programmatically increment the JobParameters with each subsequent call. In this case, we use the RunIdIncrementer implementation, which adds one job parameter called "run.id" that will be automatically incremented.

Now we have a job bean, but its sole step is missing. Spring will search for a Step bean with the name "importEmployeeStep". A simple step can look like the following:

@Bean
publicStep importEmployeeStep(StepBuilderFactory stepBuilderFactory, ItemReader<Employee> employeeItemReader) {
    returnstepBuilderFactory.get("importEmployeeStep")
                .chunk(20)
                .reader(employeeItemReader)
                .writer(e -> e.forEach(System.out::println))
                .build();
}

This is a simple step that uses the ItemReader<Employee> to read multiple Employees which are going to be sent in chunks to the writer. The writer receives a collection of items and will execute System.out.println for each of them. The chunk size is set to 20, which means that the writer lambda expression is going to be executed for every 20 entries.

So far - so good, there are only two classes that needs to be created: the employee and the reader classes.

The Employee class that will hold all information between reading and writing to Elasticsearch. We can use the database values as provided in the database. Example class will look like this

public class Employee {
    private int empNo;
    private Date birthDate;
    private String firstName;
    private String lastName;
    private Gender gender;
    private Date hireDate;
 
    // generate getter + setter
    // generate toString method
}

The getters and setters are skipped, but you can use your IDE to generate them.

The Reader class will read employees from the database. To do that it needs an EmployeesItemReader, based on the JdbcCursorItemReader with the generic type Employee:

@Component
@JobScope
public class EmployeesItemReader extends JdbcCursorItemReader<Employee> {
 
    @Autowired
    public FlatEmployeesItemReader(DataSource dataSource) {
        this.setSql("select * from employees " +
                "left join dept_emp on employees.emp_no = dept_emp.emp_no " +
                "left join departments on dept_emp.dept_no = departments.dept_no");
        this.setRowMapper(new EmployeeRowMapper());
        this.setDataSource(dataSource);
    }
  
    private static class EmployeeRowMapper implements RowMapper<Employee> {
        @Override
        public Employee mapRow(ResultSet resultSet, int i) throws SQLException {
            Employee employee = new Employee();
            employee.setEmpNo(resultSet.getInt("emp_no"));
            employee.setBirthDate(resultSet.getDate("birth_date"));
            employee.setFirstName(resultSet.getString("first_name"));
            employee.setLastName(resultSet.getString("last_name"));
            employee.setGender(Gender.valueOf(resultSet.getString("gender")));
            employee.setHireDate(resultSet.getDate("hire_date"));
 
            List<Department> departments = new ArrayList<>();
            Department department = new Department();
            department.setDeptNo(resultSet.getString("dept_no"));
            department.setName(resultSet.getString("dept_name"));
            departments.add(department);
            employee.setDepartments(departments);
            return employee;
        }
    }
}

This reader class will return all employee from the database and finally a null. Employee::toString will get executed for each employee and then the application will stop.

To summarize, in this blog post we have created an importer which can import data from a relational database and write it to the standard output. In the next post, we will import the data into the Elasticsearch database. Spring Data provides multiple possibilities in that regard, you will see them in the next blog post.

About the author: Valentin Zickner

Is working since 2016 at mimacom as a Software Engineering. He has a lot of experience with cloud technologies, in addition he is specialized to Spring, Elasticsearch and Flowable.

Comments
Join us