diff --git a/lc-historian-service/src/main/java/lc/historian/service/HistorianService.java b/lc-historian-service/src/main/java/lc/historian/service/HistorianService.java index 67236bb33a1815478c0ec580db1c1a0b5aadcbfe..659d630e9cc46b1fe31f14a997146392ed684534 100644 --- a/lc-historian-service/src/main/java/lc/historian/service/HistorianService.java +++ b/lc-historian-service/src/main/java/lc/historian/service/HistorianService.java @@ -2,10 +2,25 @@ package lc.historian.service; import lc.esp.sdk.*; import lc.esp.sdk.telemetry.TelemetryFrame; +import lc.esp.sdk.telemetry.TelemetrySymbol; +import lc.mecha.json.JSONArray; +import lc.mecha.json.JSONObject; import lc.mecha.log.MechaLogger; import lc.mecha.log.MechaLoggerFactory; import lc.mecha.util.BasicallyDangerous; import lc.mecha.util.UniversalJob; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.BasicHttpEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Scanner; public class HistorianService extends BasicallyDangerous { private static final MechaLogger logger = MechaLoggerFactory.getLogger(HistorianService.class); @@ -29,20 +44,81 @@ public class HistorianService extends BasicallyDangerous { //noinspection InfiniteLoopStatement while (true) { ESPMessage msg = consumer.receive(); - publish(msg); - - - logger.info("Got message: {}", msg); } } } } } - private void publish(ESPMessage msg) { - TelemetryFrame tf = new TelemetryFrame(msg.getPayload()); + /* + [ + { + "metric": "sys.cpu.nice", + "timestamp": 1346846400, + "value": 18, + "tags": { + "host": "web01", + "dc": "lga" + } + }, + { + "metric": "sys.cpu.nice", + "timestamp": 1346846400, + "value": 9, + "tags": { + "host": "web02", + "dc": "lga" + } + } +] + */ + + private void publish(ESPMessage msg) throws IOException { + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpPost httpPost = new HttpPost("http://db4.leigh-co.com:4242/api/put"); + + + TelemetryFrame tf = new TelemetryFrame(msg.getPayload()); + // opentsdb only supports seconds precision + long seconds = tf.getTime().getEpochSecond(); - logger.info("Built telemetry frame: {}", tf); + JSONArray arr = new JSONArray(); + + for (TelemetrySymbol symbol : tf.getSymbols()) { + JSONObject symJson = new JSONObject(); + symJson.put("metric", symbol.getName()); + symJson.put("timestamp", seconds); + symJson.put("value", symbol.getValue()); + JSONObject tagsJson = new JSONObject(); + symJson.put("tags", tagsJson); + for (Map.Entry tag : msg.getParameters().entrySet()) { + // Do we want to filter out the _tags here? + tagsJson.put(tag.getKey(), tag.getValue()); + } + arr.put(symJson); + } + + // OpenTSDB does not support chunked requests! + BasicHttpEntity requestEntity = new BasicHttpEntity(); + byte[] r = arr.toString().getBytes(StandardCharsets.UTF_8); + logger.info("Built TSDB message: {}", arr.toString()); + + requestEntity.setContent(new ByteArrayInputStream(r)); + requestEntity.setContentLength(r.length); + requestEntity.setChunked(false); + + httpPost.setEntity(requestEntity); + HttpResponse httpResponse = httpClient.execute(httpPost); + + // TSDB is very terse, it responds 204 on success + if (httpResponse.getStatusLine().getStatusCode() != 204) { + System.out.println(httpResponse.getStatusLine()); + Scanner sc = new Scanner(httpResponse.getEntity().getContent()); + while (sc.hasNext()) { + System.out.println(sc.nextLine()); + } + } + } } }