From 35feb728d0a2ba1f8bd98f77a29373e51b22b4b3 Mon Sep 17 00:00:00 2001 From: aleigh Date: Wed, 1 Jun 2022 11:07:55 -0700 Subject: [PATCH] lc-esp-sdk: Introduced multi-destination producers. Deprecated monodestination producers. Introduced request/response native interface. Moved JMS message generation to ESPMessage class to centralize behavior. lc-esp-cli-sdk: Updated to support the new Requestor interface --- .../main/java/lc/esp/cli/sdk/CLIClient.java | 38 ++++---- .../src/main/java/lc/esp/sdk/ESPMessage.java | 18 ++++ .../src/main/java/lc/esp/sdk/ESPProducer.java | 32 +++---- .../main/java/lc/esp/sdk/ESPRequestor.java | 91 +++++++++++++++++++ .../src/main/java/lc/esp/sdk/ESPSession.java | 9 ++ .../main/java/lc/ircbot/svc/BotListener.java | 7 +- settings.gradle | 3 +- 7 files changed, 154 insertions(+), 44 deletions(-) create mode 100644 lc-esp-sdk/src/main/java/lc/esp/sdk/ESPRequestor.java diff --git a/lc-esp-cli-sdk/src/main/java/lc/esp/cli/sdk/CLIClient.java b/lc-esp-cli-sdk/src/main/java/lc/esp/cli/sdk/CLIClient.java index d7306b8cc..aa28cba81 100644 --- a/lc-esp-cli-sdk/src/main/java/lc/esp/cli/sdk/CLIClient.java +++ b/lc-esp-cli-sdk/src/main/java/lc/esp/cli/sdk/CLIClient.java @@ -1,12 +1,13 @@ package lc.esp.cli.sdk; import lc.esp.sdk.*; +import lc.mecha.lang.FutureResult; import lc.mecha.log.MechaLogger; import lc.mecha.log.MechaLoggerFactory; import lc.mecha.util.BasicallyDangerous; +import javax.jms.JMSException; import java.util.List; -import java.util.UUID; /** * This class implements a simple client for sending requests to a {@link CLIService}. @@ -16,35 +17,28 @@ import java.util.UUID; */ public class CLIClient extends BasicallyDangerous { private static final MechaLogger logger = MechaLoggerFactory.getLogger(CLIClient.class); - private final ESPClient client; - private final ESPAddress address; + private final ESPSession session; + private final ESPRequestor requestor; - public CLIClient(ESPClient client, ESPAddress address) { - this.client = client; - this.address = address; + public CLIClient(ESPClient client) throws JMSException { + this.session = client.createSession(); + this.requestor = session.createRequestor(); } - public CLIResponse exec(List cmd) throws Exception { - ESPAddress src = new ESPAddress("lc", "client", "cli-client", - UUID.randomUUID().toString(), ESPAddressClass.QUEUE, ESPMessageClass.COMMAND); - + public CLIResponse exec(ESPAddress destination, List cmd) throws Exception { logger.info("Received exec(). [cmd: {}]", cmd); - try (ESPSession session = client.createSession()) { - ESPConsumer consumer = session.createConsumer(src); - ESPProducer producer = session.createProducer(address); - CLIRequest req = new CLIRequest(cmd); - ESPMessage reqMsg = req.toMessage(); - reqMsg.setReplyTo(src); - producer.send(reqMsg); - logger.info("Waiting for reply..."); - return new CLIResponse(consumer.receive()); - // We don't close the producer/consumer on the premise that the session did that - } + CLIRequest req = new CLIRequest(cmd); + ESPMessage reqMsg = req.toMessage(); + FutureResult future = requestor.request(destination, reqMsg); + logger.info("Waiting for reply..."); + return new CLIResponse((ESPMessage) future.get()); + // We don't close the producer/consumer on the premise that the session did that + } @Override public void runDangerously() throws Exception { - client.start(); + requestor.runDangerously(); } } diff --git a/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPMessage.java b/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPMessage.java index 22a73b090..0553d1995 100644 --- a/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPMessage.java +++ b/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPMessage.java @@ -91,6 +91,24 @@ public class ESPMessage { } + public Message toMessage(Session session) throws JMSException { + TextMessage jms = session.createTextMessage(getPayload().toString()); + + for (Map.Entry param : getParameters().entrySet()) { + jms.setObjectProperty(param.getKey(), param.getValue()); + } + + String correlationId = getCorrelationId(); + if (correlationId != null) jms.setJMSCorrelationID(correlationId); + + ESPAddress replyTo = getReplyTo(); + if (replyTo != null) { + jms.setJMSReplyTo(replyTo.toDestination(session)); + } + + return jms; + } + public Serializable getParameter(String name) { return parameters.get(name); } diff --git a/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPProducer.java b/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPProducer.java index e3e22927f..f14141287 100644 --- a/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPProducer.java +++ b/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPProducer.java @@ -4,41 +4,39 @@ import lc.mecha.log.MechaLogger; import lc.mecha.log.MechaLoggerFactory; import javax.jms.JMSException; +import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; -import javax.jms.TextMessage; -import java.io.Serializable; -import java.util.Map; public class ESPProducer implements AutoCloseable { private static final MechaLogger logger = MechaLoggerFactory.getLogger(ESPProducer.class); private final MessageProducer producer; private final Session session; + @Deprecated public ESPProducer(Session session, ESPAddress address) throws JMSException { this.session = session; producer = session.createProducer(address.toDestination(session)); } + public ESPProducer(Session session) throws JMSException { + this.session = session; + producer = session.createProducer(null); + } + + @Deprecated public void send(ESPMessage msg) throws JMSException { logger.debug("Sending message... {}", msg); - TextMessage jms = session.createTextMessage(msg.getPayload().toString()); - - for (Map.Entry param : msg.getParameters().entrySet()) { - jms.setObjectProperty(param.getKey(), param.getValue()); - } - - String correlationId = msg.getCorrelationId(); - if (correlationId != null) jms.setJMSCorrelationID(correlationId); - - ESPAddress replyTo = msg.getReplyTo(); - if (replyTo != null) { - jms.setJMSReplyTo(replyTo.toDestination(session)); - } - + Message jms = msg.toMessage(session); producer.send(jms); } + public void send(ESPAddress destination, ESPMessage msg) throws JMSException { + logger.debug("Sending message... {}", msg); + Message jms = msg.toMessage(session); + producer.send(destination.toDestination(session), jms); + } + @Override public void close() throws Exception { // We leave it to ESPSession to close the session diff --git a/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPRequestor.java b/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPRequestor.java new file mode 100644 index 000000000..2d33eff28 --- /dev/null +++ b/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPRequestor.java @@ -0,0 +1,91 @@ +package lc.esp.sdk; + +import lc.mecha.lang.FutureResult; +import lc.mecha.log.MechaLogger; +import lc.mecha.log.MechaLoggerFactory; +import lc.mecha.util.BasicallyDangerous; + +import javax.jms.*; +import java.util.HashMap; +import java.util.UUID; + +/** + * This class implements a requestor which is capable of sending messages to JMS and waiting for a response. + * + * @author Alex Leigh + * @since 18.1 + */ +public class ESPRequestor extends BasicallyDangerous implements AutoCloseable { + private static final MechaLogger logger = MechaLoggerFactory.getLogger(ESPRequestor.class); + private final MessageProducer producer; + private final MessageConsumer consumer; + private final Session session; + private final ESPAddress srcAddress; + // FIXME: This will leak futures. We need a timeout map. + private final HashMap> futures = new HashMap<>(); + + protected ESPRequestor(Session session) throws JMSException { + this.session = session; + srcAddress = new ESPAddress("lc", "client", "session", + UUID.randomUUID().toString(), ESPAddressClass.QUEUE, ESPMessageClass.COMMAND); + producer = session.createProducer(null); + consumer = session.createConsumer(srcAddress.toDestination(session)); + } + + /** + * Asynchronously send a message to ESP. A {@link FutureResult} will be returned. If a reply to this message + * is received by the client the FutureResult will be fulfilled. + */ + public FutureResult request(ESPAddress destination, ESPMessage espMessage) + throws JMSException { + + FutureResult future = new FutureResult<>(); + String correlationId = UUID.randomUUID().toString(); + espMessage.setCorrelationId(correlationId); + espMessage.setReplyTo(srcAddress); + Message jms = espMessage.toMessage(session); + futures.put(correlationId, future); + + synchronized (this) { + producer.send(destination.toDestination(session), jms); + } + + return future; + } + + @Override + public void close() { + try { + producer.close(); + } catch (Exception ignored) { + // NOP + } + + try { + consumer.close(); + } catch (Exception ignored) { + // NOP + } + } + + @Override + public void runDangerously() throws Exception { + //noinspection InfiniteLoopStatement + while (true) { + Message msg = consumer.receive(); + String correlationId = msg.getJMSCorrelationID(); + if (correlationId == null) { + logger.warn("Received message with no correlation ID. {}", msg); + break; + } + + FutureResult future = futures.get(correlationId); + if (future == null) { + logger.warn("Received message, but no future. [correlationId: {}]", correlationId); + break; + } + + future.set(new ESPMessage(msg)); + } + } +} diff --git a/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPSession.java b/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPSession.java index b48f0a560..922a8d4bc 100644 --- a/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPSession.java +++ b/lc-esp-sdk/src/main/java/lc/esp/sdk/ESPSession.java @@ -15,10 +15,19 @@ public class ESPSession implements AutoCloseable { return new ESPConsumer(destination, session); } + @Deprecated public ESPProducer createProducer(ESPAddress destination) throws JMSException { return new ESPProducer(session, destination); } + public ESPProducer createProducer() throws JMSException { + return new ESPProducer(session); + } + + public ESPRequestor createRequestor() throws JMSException { + return new ESPRequestor(session); + } + @Override public void close() throws Exception { session.close(); diff --git a/lc-ircbot-svc/src/main/java/lc/ircbot/svc/BotListener.java b/lc-ircbot-svc/src/main/java/lc/ircbot/svc/BotListener.java index abe1c7672..391b15df4 100644 --- a/lc-ircbot-svc/src/main/java/lc/ircbot/svc/BotListener.java +++ b/lc-ircbot-svc/src/main/java/lc/ircbot/svc/BotListener.java @@ -20,20 +20,21 @@ public class BotListener extends ListenerAdapter { private final CLIClient cli; public BotListener() throws Exception { - cli = new CLIClient(new ESPClient(), new ESPAddress("queue://lc.global.loa.evelyn.cmd")); + ESPClient client = new ESPClient(); + client.start(); + cli = new CLIClient(client); new Thread(cli).start(); } @Override public void onGenericMessage(GenericMessageEvent event) { String baseMsg = event.getMessage().replaceAll("\\P{Print}", ""); - try { if (baseMsg.startsWith("evelyn:")) { String msg = baseMsg.substring("evelyn:".length()).trim(); String[] msgArr = msg.split(" "); // FIXME: We must have a timeout here or we will hang/leak this thread - CLIResponse res = cli.exec(Arrays.asList(msgArr)); + CLIResponse res = cli.exec(new ESPAddress("queue://lc.global.loa.evelyn.cmd"), Arrays.asList(msgArr)); event.respond(res.getOut()); } } catch (Exception e) { diff --git a/settings.gradle b/settings.gradle index aa56b3ba3..811eded35 100644 --- a/settings.gradle +++ b/settings.gradle @@ -64,5 +64,4 @@ include 'lc-gdn-ipbeacon-svc' include 'lc-ircbot-svc' include 'lc-mecha-calc' include 'lc-esp-cli-sdk' -include 'lc-evelyn-svc' - +include 'lc-evelyn-svc' \ No newline at end of file -- GitLab