Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
systemsmanagement:Ardana:8:CentOS:7.5
openstack-monasca-persister-java
upgrade-to-influxdb-1.1.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File upgrade-to-influxdb-1.1.patch of Package openstack-monasca-persister-java
commit 9b36fbdd5dbc0dec6ede8e2ce0a1778555baf68f Author: Kamil Choroba <kamil.choroba@est.fujitsu.com> Date: Thu Feb 23 13:51:43 2017 +0100 Upgrade to influxdb 1.1 Added functionality to support influxdb 1.1. Change-Id: I993a8b905451c9fef61c011d65e0512014b0dbba diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java index 2d25152b4458..cbf0bc636f54 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java @@ -1,5 +1,6 @@ /* * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * Copyright (c) 2017 FUJITSU LIMITED * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,18 +20,20 @@ package monasca.persister.repository.influxdb; import java.util.Map; +import com.google.common.base.Joiner; + public class InfluxPoint { private final String measurement; private final Map<String, String> tags; - private final String time; + private final Long time; private final Map<String, Object> fields; private final String Precision = "ms"; public InfluxPoint( final String measurement, final Map<String, String> tags, - final String time, + final Long time, final Map<String, Object> fields) { this.measurement = measurement; @@ -47,7 +50,7 @@ public class InfluxPoint { return this.tags; } - public String getTime() { + public Long getTime() { return this.time; } @@ -59,4 +62,14 @@ public class InfluxPoint { return Precision; } + // Create influxdb line protocol string + public String toInflux() { + return new StringBuilder(this.measurement).append(",") + .append(Joiner.on(",").join(this.tags.entrySet().iterator())) + .append(" ") + .append(Joiner.on(",").join(this.fields.entrySet().iterator())) + .append(" ").append(this.time) + .toString(); + } + } diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java index 86f267e34ea9..bbd6de62b5dd 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java @@ -1,5 +1,6 @@ /* * (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP + * Copyright (c) 2017 FUJITSU LIMITED * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,8 +28,6 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +47,6 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo { private final ObjectMapper objectMapper = new ObjectMapper(); - private final DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime(); - @Inject public InfluxV9AlarmRepo( final Environment env, @@ -200,9 +197,7 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo { valueMap.put("reason_data", "{}"); - DateTime dateTime = new DateTime(event.timestamp, DateTimeZone.UTC); - - String dateString = this.dateFormatter.print(dateTime); + Long dateTime = new DateTime(event.timestamp, DateTimeZone.UTC).getMillis(); Map<String, String> tags = new HashMap<>(); @@ -212,7 +207,7 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo { InfluxPoint influxPoint = - new InfluxPoint(ALARM_STATE_HISTORY_NAME, tags, dateString, valueMap); + new InfluxPoint(ALARM_STATE_HISTORY_NAME, tags, dateTime, valueMap); influxPointList.add(influxPoint); diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java index 9aeb16b45524..3410e262fb7d 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java @@ -1,6 +1,7 @@ /* * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * Copyright (c) 2017 FUJITSU LIMITED * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import scala.collection.mutable.StringBuilder; import io.dropwizard.setup.Environment; import monasca.persister.repository.RepoException; @@ -74,7 +76,7 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo { influxPoint = new InfluxPoint(definition.getName(), tagMap, - measurement.getISOFormattedTimeString(), + measurement.getTime(), buildValueMap(measurement)); influxPointList.add(influxPoint); @@ -89,24 +91,24 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo { private Map<String, Object> buildValueMap(Measurement measurement) { - Map<String, Object> valueMap = new HashMap<>(); - - valueMap.put("value", measurement.getValue()); - - String valueMetaJSONString = measurement.getValueMetaJSONString(); - - if (valueMetaJSONString == null || valueMetaJSONString.isEmpty()) { - - valueMap.put("value_meta", "{}"); - - } else { - - valueMap.put("value_meta", valueMetaJSONString); - + Map<String, Object> valueMap = new HashMap<String, Object>(); + Map<String, String> valueMeta = measurement.getValueMeta(); + + if (valueMeta != null && valueMeta.size() != 0) { + /* + * Refactor value meta strings like: + * "error: error(111, 'Connection refused'). * Connection failed after 3 ms" + * to "error: error(111, \"Connection refused\"). Connection failed after 3 ms" + * Otherwise persisting to influx will fail. + */ + for (Map.Entry<String, String> entry : valueMeta.entrySet()) { + valueMap.put(entry.getKey(), + new StringBuilder("\"").append(entry.getValue().replaceAll("'", "\\\\\"")).append("\"")); + } } + valueMap.put("value", measurement.getValue()); return valueMap; - } private Map<String, String> buildTagMap(Definition definition, Dimensions dimensions) { diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java index 273176e4ea7d..6ff7e15bb537 100644 --- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java +++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java @@ -1,5 +1,6 @@ /* * Copyright (c) 2014 Hewlett-Packard Development Company, L.P. + * Copyright (c) 2017 FUJITSU LIMITED * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,10 +22,6 @@ import monasca.persister.configuration.PersisterConfig; import monasca.persister.repository.RepoException; import com.google.inject.Inject; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.commons.codec.binary.Base64; import org.apache.http.Header; import org.apache.http.HeaderElement; @@ -38,8 +35,8 @@ import org.apache.http.HttpStatus; import org.apache.http.client.entity.EntityBuilder; import org.apache.http.client.entity.GzipDecompressingEntity; import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.ContentType; -import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; @@ -49,7 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; public class InfluxV9RepoWriter { @@ -67,17 +63,17 @@ public class InfluxV9RepoWriter { private final String baseAuthHeader; - private final ObjectMapper objectMapper = new ObjectMapper(); - @Inject public InfluxV9RepoWriter(final PersisterConfig config) { this.influxName = config.getInfluxDBConfiguration().getName(); - this.influxUrl = config.getInfluxDBConfiguration().getUrl() + "/write"; this.influxUser = config.getInfluxDBConfiguration().getUser(); this.influxPass = config.getInfluxDBConfiguration().getPassword(); this.influxCreds = this.influxUser + ":" + this.influxPass; this.influxRetentionPolicy = config.getInfluxDBConfiguration().getRetentionPolicy(); + this.influxUrl = new StringBuilder(config.getInfluxDBConfiguration().getUrl()). + append("/write?db=").append(this.influxName).append("&precision=ms"). + append("&rp=").append(this.influxRetentionPolicy).toString(); this.gzip = config.getInfluxDBConfiguration().getGzip(); this.baseAuthHeader = "Basic " + new String(Base64.encodeBase64(this.influxCreds.getBytes())); @@ -132,12 +128,7 @@ public class InfluxV9RepoWriter { request.addHeader("Content-Type", "application/json"); request.addHeader("Authorization", this.baseAuthHeader); - InfluxWrite - influxWrite = - new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry, - new HashMap<String, String>()); - - String jsonBody = getJsonBody(influxWrite); + byte[] byte_array = getBinary(influxPointArry); if (this.gzip) { @@ -147,8 +138,8 @@ public class InfluxV9RepoWriter { requestEntity = EntityBuilder .create() - .setText(jsonBody) - .setContentType(ContentType.APPLICATION_JSON) + .setBinary(byte_array) + .setContentType(ContentType.DEFAULT_BINARY) .setContentEncoding("UTF-8") .gzipCompress() .build(); @@ -161,9 +152,9 @@ public class InfluxV9RepoWriter { logger.debug("[{}]: gzip set to false. sending non-gzip msg", id); - StringEntity stringEntity = new StringEntity(jsonBody, "UTF-8"); + ByteArrayEntity byteEntity = new ByteArrayEntity(byte_array); - request.setEntity(stringEntity); + request.setEntity(byteEntity); } @@ -224,19 +215,11 @@ public class InfluxV9RepoWriter { } } - private String getJsonBody(InfluxWrite influxWrite) throws RepoException { - - String json = null; - - try { - - json = this.objectMapper.writeValueAsString(influxWrite); - - } catch (JsonProcessingException e) { - - throw new RepoException("failed to serialize json", e); + private byte[] getBinary(InfluxPoint[] influxPointArry) { + StringBuilder binaryString = new StringBuilder(); + for (InfluxPoint influxPoint : influxPointArry) { + binaryString.append(influxPoint.toInflux()).append(System.lineSeparator()); } - - return json; + return binaryString.toString().getBytes(); } }
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor