Spring microservices monitoring: /metrics endpoint and ELK, Part II: Improvements

June 29, 2017

Introduction

As we saw in the first part of this series, it is essential to monitor the health of our microservices, and to improve the tool we developed which helps us get metrics and forward them to an Elasticsearch instance. In this chapter we will pursue the question of how can we improve the processing of the result of the metrics endpoint.

Improvements

Polling Configuration

Justification: Since the metrics endpoints in our previous use case (a Spring Boot application) can be configured, it quickly became pretty obvious that our forwarder application requires this tool to be useful. Also, there is more than one endpoint that exposes valuable data, so let’s come up with this more generic solution.

Goal:
– Users should be able to configure N endpoints for each microservice to be polled at.

Each endpoint needs a separate index because each one is supposed to expose different type of data, therefore its structure can be different. One minor thing to be taken care of was the naming of such an index, because of the rules of Elasticsearch. So we will simply replace the slash with a dash, add a general index prefix, and then cache the result (exhibit 1). For example, if the URL of our endpoint is /metadata/metrics, and we have an index prefix metricsforwarder the name of its index will be metricsforwarder–metadata-metrics.

As per the polling of the configured endpoints, we can see on the exhibit 1 how they were added to the Application.yml, and the extra cycle added to the poller class.

# Application.yml

# ...
metricpoller:
   endpoints: /admin/health, /admin/metrics, /admin/info
# ...
// MetricPollerService.java

@Component
 public class MetricPollerService {

...

public void pollInstances() {
     *//Get all the registered services* List<String> services = discoveryClient.getServices();
 
     for (String service : services) {
         *//Get all the instances of each service* List<ServiceInstance> instances = discoveryClient.getInstances(service);
 
         int count = 1;
         for (ServiceInstance instance : instances) {
             for(String endpoint : this.metricsEndpoints) {
                 *//Get the metrics and delegate the forwarding of the message* HashMap<String, Object> result = this.getMetrics(instance, endpoint);
                 try {
                     this.forwarder.cache(result, instance, endpoint);

...

     }

}
// ElasticsearchCachedForwarder.java

@Component
 public class ElasticsearchCachedForwarder {

@Autowired
 public ElasticsearchCachedForwarder(RestClient esRestClient,
                                     @Value("${metricpoller.endpoints:/admin/metrics}") String\[\] metricsEndpoints,
                                     BulkManager bulkManager,
                                     IndexManager indexManager) {
     this.esRestClient = esRestClient;
     this.indices = new HashMap <>();
     *//Build the index name based on the endpoint, and then relate them.* for (String endpoint : metricsEndpoints) {
         this.indices.put(endpoint, indexManager.getIndexName(endpoint));
     }
     this.bulkManager = bulkManager;
     this.headers = new Header\[\]{this.bulkManager.getHeader()};
 }

...

}
// IndexManager.java

@Component
 public class IndexManager {
     private static String *INDEX_NAME*;
     private static String *INDEX\_NAME\_DATE_FORMAT*;
 
     @Autowired
     public IndexManager(@Value("${metricpoller.index.name:microsvcmetrics}") String indexName) {
         *INDEX_NAME* = indexName;
         *INDEX\_NAME\_DATE_FORMAT* = indexNameDateFormat;
     }
 
     public String getIndexName(String endpoint) {
         return String.*format*("%s-%s", *INDEX_NAME*, cleanEndpointName(endpoint));
     }
 
     private String cleanEndpointName(String endpoint) {
         String result = endpoint.startsWith("/") ? endpoint.replaceFirst("/", "") : endpoint;
         return result.replace("/", "-");
     }
 
 }

Exhibit 1. Polling several endpoints.

Elasticsearch Index Rotation Configuration

Justification: We want to give more flexibility to the index where the data is going to be forwarded to. Furthermore, when dealing with logging, it is a good practice to rotate the logs.

Goals:
– Users should be able to configure the name of the index to store the data.
– This index has a configurable date suffix for daily rotation.
– The index name will be formed in the following format:
-

Since Kibana requires a date field in order to create its time series, we have to create mappings that will indicate which field it is. Furthermore, such mappings should be applied each time an index is created. So to keep this simple, a template will be used, for example the one shown in exhibit 2. Please keep in mind the following:
A) The name of the index must be matched by the template.
B) The Automatic Index Creation has to be enabled on the Elasticsearch instance.

PUT \_template/template\_microsvcmetrics
 {
   "template": "microsvcmetrics*",
   "settings": {
     "index": {
       "number\_of\_shards": 1,
       "number\_of\_replicas": 1
     }
   },
   "mappings": {
     "timestamped-metric": {
       "properties": {
         "timestamp.value": {
           "type": "date",
           "format": "yyyyMMdd'T'HHmmss.SSSZ||epoch_millis"
         }
       }
     }
   }
 }

Exhibit 2. Template for Elasticsearch.

The exhibit 3 shows the index rotation configuration. To create the date suffix for the index name and leave it on a standard configurable fashion, the Java class DateTimeFormatter was used in combination with configuration properties in the Application.yml, so the patterns for formatting and parsing can be used here.

# Application.yml - 
...
metricpoller:
     index:
     #This has to match with the index template
     name: microsvcmetrics 
     dateFormat: yyyy-MM-dd

...
// IndexManager.java -

@Component
 public class IndexManager {

    // ...

    public String getIndexName(String endpoint) {
         return String.format("%s-%s-%s", INDEX_NAME, cleanEndpointName(endpoint),    getCurrentLocalDateTimeStamp());
     }
 
     private String cleanEndpointName(String endpoint) {
         String result = endpoint.startsWith("/") ? endpoint.replaceFirst("/", "") :    endpoint;
         return result.replace("/", "-");
     }
 
     private String getCurrentLocalDateTimeStamp() {
         return     LocalDateTime.now().format(DateTimeFormatter.ofPattern(INDEX_NAME_DATE_FORMAT));
     }
}

Exhibit 3. Managing the index name.

Performance optimizing with bulk requests

Justification: This new feature aims to improve the performance of our application by reducing the overhead on our network by sending several requests instead of one request.

Goal
– The metrics are forwarded to Elasticsearch just once per microservices polling-round.

To achieve this enhancement, the following had to be taken care of:

A) To gracefully handle the cache of the results.- On the one hand, a configuration for auto-flush was added, including how many results can be cached before forwarding the data to Elasticsearch. On the other hand, the solution here implied a change on the architecture of the application, unfortunately adding some coupling between the poller and the forwarder workers, because the former has to inform the latter when a round was finalized to flush the data. In the flush method of the CachedForwarder, the request itself is being built using the cached data that we have, also adding the required Content-Type header (explained later). Furthermore, to minimize the loss of data when the application finalizes with cached data, a @PreDestroy annotation was added to the flush method on the Forwarder.

# Application.yml - 
# ...
metricpoller:
   bulk:
     cache:
       autoflush: false
       documents: 20

// MetricPollerService.java - 

@Component
 public class MetricPollerService {

    // ...

    public void pollInstances() {
     // ...
        try {
           this.forwarder.cache(result, instance, endpoint);
        } catch(IOException ioe){
                     LOG.error(MessageFormat.format("Error fetching endpoint {0} for service instance: {1} with url {2}", endpoint, instance.getServiceId(), buildInstanceUrl(instance, endpoint)), ioe);
                 }
        }   


     if(!this.autoflush){
         try {
             this.forwarder.flush();
         } catch(IOException ioe){
             LOG.error("Error on flushing the cache", ioe);
         }
     }
 }    
// ElasticsearchCachedForwarder.java -
public class ElasticsearchCachedForwarder {
    // ...
    public void cache(HashMap<String, Object> message, ServiceInstance instance, String endpoint) throws IOException {
         String jsonContent = MessageBuilder.buildMessageFromMetrics(message, endpoint, instance);
         this.bulkManager.addInstruction(this.indices.get(endpoint), jsonContent);
         if (this.bulkManager.isAutoflush() && this.bulkManager.isCacheFull()) {
             this.flush();
         }
     }
 
     @PreDestroy
     public void flush() throws IOException {
     String jsonContent = this.bulkManager.getBulkRequest();
     HttpEntity entity = new NStringEntity(jsonContent);
     esRestClient.performRequestAsync("POST", BulkManager.BULK_ENDPOINT, Collections.emptyMap(), entity, new ResponseListener() {
             @Override
             public void onSuccess(Response response) {
                 LOG.debug("Successfully submitted metrics");
             }
 
             @Override
             public void onFailure(Exception exception) {
                 LOG.error("Error submitting metrics",exception);
             }
         }, this.headers);
         this.bulkManager.clearCachedInstructions();
     }
}

Exhibit 4. Handling the cache.

B) To build the bulk request itself.- The _bulk endpoint accepts requests in the following format (taken from the official documentation), and the Content-Type header has to be set to application/x-ndjson:

action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
...
action_and_meta_data\n  
optional_source\n

How the bulk request is built, is shown in exhibit 6. A constant string with a configurable action - for the index - was added.

// BulkManager.java – 

@Component
public class BulkManager {
     private final int documentsToCache;
     private final boolean autoflush;
     private static final String BULK_ACTION_AND_METADATA = "{ \"index\" : { \"_index\" : \"%s\", \"_type\" : \"timestamped-metric\"} }";
     public static final String BULK_ENDPOINT = "_bulk";
     private final List<String> bulkInstructions;
     private BasicHeader header;

     @Autowired
     public BulkManager(@Value("${metricpoller.bulk.cache.documents:10}") int documentsToCache,
             @Value("${metricpoller.bulk.cache.autoflush:false}") Boolean autoflush) {
         this.documentsToCache = documentsToCache;
         this.autoflush = autoflush;
         this.bulkInstructions = new Vector<>();
         this.header = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/x-ndjson");
     }

     public int getDocumentsToCache() {
         return documentsToCache;
     }

     public boolean isAutoflush() {
         return autoflush;
     }

     public BasicHeader getHeader() {
         return header;
     }

     public void setHeader(BasicHeader header) {
         this.header = header;
     }

     public void addInstruction(String mapping, String jsonDocument){
         this.bulkInstructions.add(String.format(BULK_ACTION_AND_METADATA, mapping));
         this.bulkInstructions.add(jsonDocument);
     }

     public boolean isCacheFull(){
         //we are storing 2 instructions per document: one for the action & metadata, and the other one for the entity representing the document
         return this.bulkInstructions.size() >= this.getDocumentsToCache() * 2;
     }

     public String getBulkRequest() {
         StringBuilder bulkRequest = new StringBuilder();
         for (String instruction : this.bulkInstructions) {
             bulkRequest.append(instruction);
             bulkRequest.append('\n');
         }
         return bulkRequest.toString();
     }

     public void clearCachedInstructions() {
         this.bulkInstructions.clear();
     }
 }

Exhibit 5. Handling the bulk request.

Conclusion

In this part we have shown how to improve the metrics-forwarder application by adding configurable endpoints to poll at a configurable index name with a data postfix for rotation, and minimizing the requests sent to index the data.
There are more questions that can be addressed:

On a more personal note, the unit testing was funny to work with, because it was not easy to break down the captured packages, and then to transform them in order to assert them, but hey! That’s why we do this kind of stuff, keep learning and experimenting!

The full code for this project is available at https://github.com/mimacom/metric-forwarder

About the author: Enrique Llerena Domínguez

Passionate in code and in life. Likes football (both american and the real one ;) ). Goal oriented. Keep movin', keep movin'!

Comments
Join us