From 2b6ff860fa1db67dbf38fb52d0659c8fb80a84a1 Mon Sep 17 00:00:00 2001 From: aleigh Date: Mon, 23 May 2022 16:28:20 -0700 Subject: [PATCH] lc-telemetry-faketsdb: WIP --- .../lc/esp/sdk/telemetry/TelemetryFrame.java | 21 +++++-- .../lc/esp/sdk/telemetry/TelemetrySymbol.java | 7 +-- lc-mecha-http-server/build.gradle | 17 +++--- lc-mecha/build.gradle | 3 +- .../main/java/lc/mecha/test/LogTestApp.java | 14 +++++ lc-plc-esp32/src/emon.c | 2 +- lc-telemetry-faketsdb-service/build.gradle | 10 +++- .../faketsdb/service/FakeTSDBService.java | 8 +-- .../telemetry/faketsdb/service/TSDBRelay.java | 59 +++++++++++++++++-- .../historian/service/HistorianService.java | 7 --- 10 files changed, 109 insertions(+), 39 deletions(-) create mode 100644 lc-mecha/src/main/java/lc/mecha/test/LogTestApp.java diff --git a/lc-esp-sdk/src/main/java/lc/esp/sdk/telemetry/TelemetryFrame.java b/lc-esp-sdk/src/main/java/lc/esp/sdk/telemetry/TelemetryFrame.java index ff51cbce1..b944b76ba 100644 --- a/lc-esp-sdk/src/main/java/lc/esp/sdk/telemetry/TelemetryFrame.java +++ b/lc-esp-sdk/src/main/java/lc/esp/sdk/telemetry/TelemetryFrame.java @@ -27,7 +27,7 @@ public class TelemetryFrame { public static final String KEY_TIME = "time"; public static final String KEY_TIME_NS = "time_ns"; public static final String KEY_SYMBOLS = "symbols"; - public static final String KEY_TLM = "esp.tlm."; + public static final String PREFIX_TLM = "esp.tlm."; private final HashSet symbols = new HashSet<>(); private final HashMap tags = new HashMap<>(); private final Instant time; @@ -35,7 +35,7 @@ public class TelemetryFrame { /** * Create a TelemetryFrame object from a {@link ESPMessage}. *

- * All ESP message parameters that begin with "TLM." will be copied into the TelemetryFrame with the TLM. + * All ESP message parameters that begin with "esp.tlm." will be copied into the TelemetryFrame with the TLM. * removed. * * @param msg The message. @@ -60,8 +60,8 @@ public class TelemetryFrame { } for (Map.Entry param : msg.getParameters().entrySet()) { - if (param.getKey().startsWith(KEY_TLM)) { - String tagName = param.getKey().substring(KEY_TLM.length()); + if (param.getKey().startsWith(PREFIX_TLM)) { + String tagName = param.getKey().substring(PREFIX_TLM.length()); tags.put(tagName, param.getValue()); } } @@ -89,6 +89,19 @@ public class TelemetryFrame { return frameJson; } + /** + * Convert this TelemetryFrame to an {@link ESPMessage} in the standard format. This will copy all of the + * telemetry frame tags to the message parameters while prepending the keys with "esp.tlm.". + */ + public ESPMessage toMessage() { + ESPMessage msg = new ESPMessage(); + for (Map.Entry parameter : tags.entrySet()) { + msg.getParameters().put(PREFIX_TLM + parameter.getKey(), parameter.getValue()); + } + msg.setPayload(toJson()); + return msg; + } + public HashMap getTags() { return tags; } diff --git a/lc-esp-sdk/src/main/java/lc/esp/sdk/telemetry/TelemetrySymbol.java b/lc-esp-sdk/src/main/java/lc/esp/sdk/telemetry/TelemetrySymbol.java index 79f0dea5a..85ccc4542 100644 --- a/lc-esp-sdk/src/main/java/lc/esp/sdk/telemetry/TelemetrySymbol.java +++ b/lc-esp-sdk/src/main/java/lc/esp/sdk/telemetry/TelemetrySymbol.java @@ -23,7 +23,6 @@ public class TelemetrySymbol { public static final String KEY_NAME = "name"; public static final String KEY_VALUE = "value"; public static final String KEY_UOM = "uom"; - public static final String KEY_UOT = "uot"; public static final String KEY_LOS = "los"; public static final String KEY_PRIMARY = "primary"; private final String name; @@ -34,8 +33,8 @@ public class TelemetrySymbol { public TelemetrySymbol(JSONObject obj) { name = obj.getString(KEY_NAME); - value = obj.getString(KEY_VALUE); - uom = obj.optString(KEY_UOM); + value = (Serializable) obj.get(KEY_VALUE); + uom = obj.optString(KEY_UOM, null); isLos = obj.getBoolean(KEY_LOS); isPrimary = obj.getBoolean(KEY_PRIMARY); } @@ -48,7 +47,7 @@ public class TelemetrySymbol { this.isPrimary = isPrimary; } - public TelemetrySymbol(String name, Serializable value, String uom, String uot, boolean isLos, boolean isPrimary) { + public TelemetrySymbol(String name, Serializable value, String uom, boolean isLos, boolean isPrimary) { this.name = name; this.value = value; this.uom = uom; diff --git a/lc-mecha-http-server/build.gradle b/lc-mecha-http-server/build.gradle index 8ca3e499f..fd7aa2e92 100644 --- a/lc-mecha-http-server/build.gradle +++ b/lc-mecha-http-server/build.gradle @@ -10,17 +10,16 @@ repositories { sourceCompatibility = JavaVersion.VERSION_17 targetCompatibility = JavaVersion.VERSION_17 dependencies { + api project(':lc-mecha-http-client') + api project(':lc-mecha-fabric') api group: 'javax.activation', name: 'javax.activation-api', version: '1.2.0' - api 'org.eclipse.jetty:jetty-server:11.0.6' - api 'org.eclipse.jetty:jetty-servlet:11.0.6' - api 'org.eclipse.jetty:jetty-annotations:11.0.6' - api 'org.eclipse.jetty.websocket:websocket-jetty-server:11.0.6' - api 'org.eclipse.jetty.websocket:websocket-jetty-client:11.0.6' - api 'commons-fileupload:commons-fileupload:1.3.3' + api 'org.eclipse.jetty:jetty-server:11.0.9' + api 'org.eclipse.jetty:jetty-servlet:11.0.9' + api 'org.eclipse.jetty:jetty-annotations:11.0.9' + api 'org.eclipse.jetty.websocket:websocket-jetty-server:11.0.9' + api 'org.eclipse.jetty.websocket:websocket-jetty-client:11.0.9' + api 'commons-fileupload:commons-fileupload:1.4' api 'javax.websocket:javax.websocket-api:1.1' api 'javax.servlet:javax.servlet-api:4.0.1' - api project(':lc-mecha') - api project(':lc-mecha-http-client') - api project(':lc-mecha-fabric') testImplementation group: 'junit', name: 'junit', version: '4.12' } \ No newline at end of file diff --git a/lc-mecha/build.gradle b/lc-mecha/build.gradle index 4f86feca8..73d5fd66d 100644 --- a/lc-mecha/build.gradle +++ b/lc-mecha/build.gradle @@ -13,7 +13,8 @@ sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 dependencies { - api 'org.slf4j:slf4j-reload4j:1.7.36' + api 'org.slf4j:slf4j-api:1.8.0-beta4' + api 'org.slf4j:slf4j-simple:1.8.0-beta4' // https://mvnrepository.com/artifact/commons-lang/commons-lang api group: 'commons-lang', name: 'commons-lang', version: '2.6' // https://mvnrepository.com/artifact/commons-io/commons-io diff --git a/lc-mecha/src/main/java/lc/mecha/test/LogTestApp.java b/lc-mecha/src/main/java/lc/mecha/test/LogTestApp.java new file mode 100644 index 000000000..8e18c2b71 --- /dev/null +++ b/lc-mecha/src/main/java/lc/mecha/test/LogTestApp.java @@ -0,0 +1,14 @@ +package lc.mecha.test; + +import lc.mecha.log.MechaLogger; +import lc.mecha.log.MechaLoggerFactory; + +public class LogTestApp { + private static final MechaLogger logger = MechaLoggerFactory.getLogger(LogTestApp.class); + + public static void main(String[] args) { + logger.info("Info"); + logger.warn("Warning"); + logger.error("Error"); + } +} diff --git a/lc-plc-esp32/src/emon.c b/lc-plc-esp32/src/emon.c index dac83f774..f62cf2264 100644 --- a/lc-plc-esp32/src/emon.c +++ b/lc-plc-esp32/src/emon.c @@ -164,7 +164,7 @@ void emonFtask() cJSON_AddStringToObject(tags, "esp.tlm.site", "us-iqh0"); cJSON_AddStringToObject(tags, "esp.tlm.container", "16veb"); - + // espFsend() will delete these JSON objects espFsend("LC/DEV/ENV/MON/TLM", frame, tags); vTaskDelay((DELAY_TIME_BETWEEN_READINGS_MS) / portTICK_RATE_MS); diff --git a/lc-telemetry-faketsdb-service/build.gradle b/lc-telemetry-faketsdb-service/build.gradle index 8ca48d917..633a0febe 100644 --- a/lc-telemetry-faketsdb-service/build.gradle +++ b/lc-telemetry-faketsdb-service/build.gradle @@ -6,11 +6,15 @@ plugins { group 'leighco' version '1.0' +sourceCompatibility = JavaVersion.VERSION_17 +targetCompatibility = JavaVersion.VERSION_17 + repositories { mavenCentral() } dependencies { + implementation project (':lc-esp-sdk') implementation project (':lc-mecha-http-server') testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0' @@ -18,4 +22,8 @@ dependencies { test { useJUnitPlatform() -} \ No newline at end of file +} + +application { + mainClass = 'lc.telemetry.faketsdb.service.FakeTSDBService' +} diff --git a/lc-telemetry-faketsdb-service/src/main/java/lc/telemetry/faketsdb/service/FakeTSDBService.java b/lc-telemetry-faketsdb-service/src/main/java/lc/telemetry/faketsdb/service/FakeTSDBService.java index 0407ebd76..7a87579c6 100644 --- a/lc-telemetry-faketsdb-service/src/main/java/lc/telemetry/faketsdb/service/FakeTSDBService.java +++ b/lc-telemetry-faketsdb-service/src/main/java/lc/telemetry/faketsdb/service/FakeTSDBService.java @@ -1,17 +1,12 @@ package lc.telemetry.faketsdb.service; -import lc.mecha.fabric.HandlerStatus; import lc.mecha.fabric.LiteralMessageSubscription; -import lc.mecha.http.server.PrefixedHandler; import lc.mecha.http.server.WebPipeline; import lc.mecha.http.server.WebServer; -import lc.mecha.http.server.WebTransaction; -import lc.mecha.json.JSONObject; import lc.mecha.log.MechaLogger; import lc.mecha.log.MechaLoggerFactory; import lc.mecha.util.BasicallyDangerous; -import java.nio.charset.StandardCharsets; import java.util.HashSet; /** @@ -19,6 +14,7 @@ import java.util.HashSet; * TCollectors. Instead of storing the data it is reformatted and sent into ESP. * * @author Alex Leigh + * @since mk1 (HORIZON BRAVE) */ public class FakeTSDBService extends BasicallyDangerous { private static final MechaLogger logger = MechaLoggerFactory.getLogger(FakeTSDBService.class); @@ -33,7 +29,7 @@ public class FakeTSDBService extends BasicallyDangerous { // 4242 is the default OpenTSDB port WebServer srv = new WebServer(4242, 128); HashSet prefixes = new HashSet<>(); - prefixes.add("/api/put"); + prefixes.add("/api"); srv.getWebPipeline().getServicePipeline().getSubscriptionBase().subscribe( new LiteralMessageSubscription<>(new TSDBRelay(prefixes), WebPipeline.KEY_SERVICE)); diff --git a/lc-telemetry-faketsdb-service/src/main/java/lc/telemetry/faketsdb/service/TSDBRelay.java b/lc-telemetry-faketsdb-service/src/main/java/lc/telemetry/faketsdb/service/TSDBRelay.java index 16c25ebc3..784dd7a3c 100644 --- a/lc-telemetry-faketsdb-service/src/main/java/lc/telemetry/faketsdb/service/TSDBRelay.java +++ b/lc-telemetry-faketsdb-service/src/main/java/lc/telemetry/faketsdb/service/TSDBRelay.java @@ -1,35 +1,82 @@ package lc.telemetry.faketsdb.service; +import lc.esp.sdk.*; +import lc.esp.sdk.telemetry.TelemetryFrame; +import lc.esp.sdk.telemetry.TelemetrySymbol; import lc.mecha.fabric.HandlerStatus; import lc.mecha.http.server.PrefixedHandler; import lc.mecha.http.server.WebTransaction; +import lc.mecha.json.JSONArray; import lc.mecha.json.JSONObject; import lc.mecha.log.MechaLogger; import lc.mecha.log.MechaLoggerFactory; +import org.apache.commons.io.IOUtils; +import javax.jms.JMSException; +import java.io.Serializable; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Set; /** * /api/put handler to receive OpenTSDB messages and relay them into ESP. * * @author Alex Leigh - * @since mk1 + * @since mk1 (HORIZON BRAVE) */ public class TSDBRelay extends PrefixedHandler { private static final MechaLogger logger = MechaLoggerFactory.getLogger(TSDBRelay.class); - public TSDBRelay(Set pathPrefixes) { + public static final String KEY_TIMESTAMP = "timestamp"; + public static final String KEY_METRIC = "metric"; + public static final String KEY_TAGS = "tags"; + public static final String KEY_VALUE = "value"; + private final ESPProducer producer; + + public TSDBRelay(Set pathPrefixes) throws JMSException { super(pathPrefixes); + ESPClient esp = new ESPClient(); + esp.start(); + ESPAddress telemDest = new ESPAddress("lc", "dev", + "env", "mon", ESPAddressClass.TOPIC, + ESPMessageClass.TELEMETRY); + ESPSession session = esp.createSession(); + producer = session.createProducer(telemDest); } @Override public HandlerStatus handlePrefixedWebRequest(WebTransaction request) throws Throwable { - request.httpServletResponse.getOutputStream().write("Hello World".getBytes(StandardCharsets.UTF_8)); + try { + String msgStr = IOUtils.toString(request.httpServletRequest.getInputStream()); + JSONArray tsbMetrics = new JSONArray(msgStr); + // System.out.println("Received opentsdb request: " + tsbMetrics); + + // For reach metric we are going to make a frame with a single symbol because our code is too + // ignorant to sort by like tags + for (int i = 0; i < tsbMetrics.length(); i++) { + JSONObject tsbMetric = tsbMetrics.getJSONObject(i); + TelemetryFrame frame = new TelemetryFrame(Instant.ofEpochSecond(tsbMetric.getLong(KEY_TIMESTAMP))); + JSONObject tsdbTags = tsbMetric.getJSONObject(KEY_TAGS); + for (String tagName : tsdbTags.keySet()) { + String tagValue = tsdbTags.getString(tagName); + frame.getTags().put(tagName, tagValue); + } + frame.getSymbols().add(new TelemetrySymbol(tsbMetric.getString(KEY_METRIC), + (Serializable) tsbMetric.get(KEY_VALUE), + null, false, true)); + + logger.info("Built frame: {}", frame); - JSONObject msg = new JSONObject(request.httpServletRequest.getInputStream()); + // FIXME: Not threadsafe? + ESPMessage telemMsg = frame.toMessage(); + producer.send(telemMsg); + } - logger.info("Received opentsdb request: {}", msg); + request.httpServletResponse.getOutputStream().write("OK".getBytes(StandardCharsets.UTF_8)); - return HandlerStatus.BREAK; + return HandlerStatus.BREAK; + } catch (Exception e) { + e.printStackTrace(); + throw e; + } } } diff --git a/lc-telemetry-historian-service/src/main/java/lc/telemetry/historian/service/HistorianService.java b/lc-telemetry-historian-service/src/main/java/lc/telemetry/historian/service/HistorianService.java index 00c44cf89..f25e47f5a 100644 --- a/lc-telemetry-historian-service/src/main/java/lc/telemetry/historian/service/HistorianService.java +++ b/lc-telemetry-historian-service/src/main/java/lc/telemetry/historian/service/HistorianService.java @@ -21,7 +21,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.util.Locale; import java.util.Map; import java.util.Scanner; @@ -85,13 +84,7 @@ public class HistorianService extends BasicallyDangerous { for (TelemetrySymbol symbol : tf.getSymbols()) { JSONObject symJson = new JSONObject(); - // We build the metric name for openTSDB as a combination of the ESP destination and the - // symbol name provided by the caller. This results in something that we hope is globally - // unique. We drop the service class here because we know it is telemetry. - - String espAddr = msg.getDestination().toOpenWireBase().toLowerCase(Locale.ROOT); StringAccumulatorV2 sa = new StringAccumulatorV2("."); - // sa.push(espAddr); sa.push(symbol.getName()); // TODO: We need to enforce formatting here String uom = symbol.getUom(); -- GitLab