Skip to content

Commits on Source 2

plugins {
id 'java'
}
group = 'com.leigh-co'
version = '22.0-SNAPSHOT'
repositories {
mavenCentral()
}
dependencies {
implementation 'org.apache.commons:commons-collections4:4.4'
implementation project(':lc-mecha')
testImplementation platform('org.junit:junit-bom:5.10.0')
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
test {
useJUnitPlatform()
}
\ No newline at end of file
package lc.lesson.cqrs;
import lc.mecha.json.JSONObject;
import org.apache.commons.collections4.MultiValuedMap;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
public interface CQRS {
/**
* Dispatches an {@link EventType} for a specific member by creating and submitting
* {@link ProcessorTask} instances for all registered subscribers.
* <p>
* This method looks up all {@link Processor}s that have been subscribed to the
* specified {@code eventType}, creates a new {@link ProcessorTask} for each one
* using {@link Processor#createTask(String)}, and submits these tasks to an internal
* {@link ExecutorService} for asynchronous execution.
* </p>
*
* <h3>Behavior:</h3>
* <ul>
* <li>If no processors are registered for the event type, the method returns immediately.</li>
* <li>Each matching {@link Processor} is invoked asynchronously via its
* {@link Processor#createTask(String)} method, and the resulting task's
* {@link ProcessorTask#run()} method is executed on a background thread.</li>
* <li>The order of execution is determined by the executor; this implementation uses
* a single-threaded executor to preserve submission order.</li>
* <li>Assertions will fail at runtime if {@code memberId} or {@code eventType} is {@code null}
* (when assertions are enabled).</li>
* </ul>
*
* <h3>Usage Example:</h3>
* <pre>{@code
* // Dispatch a USER_CREATED event for member "123"
* orchestrator.write("123", EventType.USER_CREATED);
* }</pre>
*
* @param memberId The unique identifier of the member the event relates to. Must not be {@code null}.
* @param eventType The type of event being dispatched. Must not be {@code null}.
* @see Processor
* @see ProcessorTask
*/
void write(String memberId, EventType eventType);
/**
* Registers a {@link ProcessorTask} as a subscriber for the specified
* {@link EventType}s and {@link InsightType}s.
* <p>
* This method binds a single {@code task} to multiple event and insight types so
* that it will be invoked whenever matching events or insights are processed.
* The provided collections must be non-null, even if empty.
* </p>
*
* <h3>Usage Example:</h3>
* <pre>{@code
* // Subscribe a task to two event types and one insight type
* orchestrator.subscribe(
* myTask,
* List.of(EventType.USER_CREATED, EventType.USER_UPDATED),
* List.of(InsightType.USER_PROFILE)
* );
* }</pre>
*
* <h3>Contract:</h3>
* <ul>
* <li>{@code task} must not be {@code null}.</li>
* <li>{@code eventTypes} and {@code insightTypes} must not be {@code null}, but may be empty.</li>
* <li>The task is added to the internal {@link MultiValuedMap} for each type provided.</li>
* <li>Assertions will fail at runtime if any argument is {@code null} (when assertions are enabled).</li>
* </ul>
*
* @param task The {@link ProcessorTask} to register. Must not be {@code null}.
* @param eventTypes The event types to subscribe to. Must not be {@code null}.
* @param insightTypes The insight types to subscribe to. Must not be {@code null}.
* @see #write(String, EventType)
* @see #read(String, InsightType, boolean, boolean)
*/
JSONObject read(String memberId, InsightType insightType, boolean sync, boolean createIfNotExist) throws SQLException;
}
package lc.lesson.cqrs;
import lc.mecha.json.JSONObject;
import org.apache.commons.collections4.MultiValuedMap;
import org.apache.commons.collections4.multimap.ArrayListValuedHashMap;
import java.sql.SQLException;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CQRSOrchestrator implements CQRS {
private final MultiValuedMap<EventType, Processor> processors = new ArrayListValuedHashMap<>();
private final MultiValuedMap<InsightType, Processor> insightProcessors = new ArrayListValuedHashMap<>();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final InsightStore store;
public CQRSOrchestrator(InsightStore store) {
this.store = store;
}
/**
* {@inheritDoc}
*/
@Override
public void write(String memberId, EventType eventType) {
assert memberId != null;
assert eventType != null;
// Fetch subscribers for this event type
Collection<Processor> tasks = processors.get(eventType);
if (tasks == null || tasks.isEmpty()) {
return; // No subscribers
}
for (Processor taskTemplate : tasks) {
ProcessorTask taskInstance = taskTemplate.createTask(memberId);
executor.submit(taskInstance::run);
}
}
/**
* {@inheritDoc}
*/
@Override
public JSONObject read(String memberId, InsightType insightType, boolean sync, boolean createIfNotExist) throws SQLException {
assert memberId != null;
assert insightType != null;
// Try reading from store first
JSONObject result = store.read(memberId, insightType);
if (result != null) {
return result;
}
// If not found and not allowed to create, return null
if (!createIfNotExist) {
return null;
}
// Find processors that can generate this insight
Collection<Processor> tasks = insightProcessors.get(insightType);
if (tasks == null || tasks.isEmpty()) {
return null; // No processor available to create this insight
}
if (sync) {
// Submit all tasks and wait for them in parallel
var futures = tasks.stream()
.map(taskTemplate -> {
ProcessorTask taskInstance = taskTemplate.createTask(memberId);
return executor.submit(taskInstance::run);
})
.toList();
// Wait for all to complete
for (var future : futures) {
try {
future.get();
} catch (Exception e) {
e.printStackTrace(); // Replace with logging
}
}
// After all tasks have completed, re-read the generated insight
return store.read(memberId, insightType);
} else {
// Run asynchronously and return null (caller must check later)
for (Processor taskTemplate : tasks) {
ProcessorTask taskInstance = taskTemplate.createTask(memberId);
executor.submit(taskInstance::run);
}
return null;
}
}
public void subscribe(Processor task, Collection<EventType> eventTypes, Collection<InsightType> insightTypes) {
assert (task != null);
assert (eventTypes != null);
assert (insightTypes != null);
for (EventType eventType : eventTypes) {
processors.put(eventType, task);
}
for (InsightType insightType : insightTypes) {
insightProcessors.put(insightType, task);
}
}
}
package lc.lesson.cqrs;
public enum EventType {
ASSESSMENT_CREATED, EVENT_A
}
package lc.lesson.cqrs;
import lc.mecha.json.JSONObject;
import java.sql.SQLException;
import java.util.Date;
public interface InsightStore {
JSONObject read(String memberId, InsightType insightType) throws SQLException;
void write(String memberId, InsightType insightType, JSONObject payload, Date expires) throws SQLException;
}
package lc.lesson.cqrs;
public enum InsightType {
PARENTING, INSIGHT_A
}
package lc.lesson.cqrs;
/**
* Represents a processing unit that can handle events and generate insights for a given member.
* <p>
* A {@code Processor} acts as a template for creating {@link ProcessorTask} instances. Each task
* performs the actual work of handling a specific event or generating a specific insight for a
* particular member. The orchestrator submits these tasks to an {@link java.util.concurrent.ExecutorService}
* for execution.
* </p>
* <p>
* Processors are typically registered with a {@link CQRSOrchestrator} via the
* {@link CQRSOrchestrator#subscribe(Processor, java.util.Collection, java.util.Collection)} method,
* specifying which {@link EventType}(s) they handle and which {@link InsightType}(s) they can generate.
* When an event occurs or an insight is requested, the orchestrator will create task instances from
* the registered processors and execute them either synchronously or asynchronously.
* </p>
* <p>
* Typical usage:
* <pre>{@code
* Processor myProcessor = ...;
* orchestrator.subscribe(myProcessor, List.of(eventType1, eventType2), List.of(insightType1));
* orchestrator.write(memberId, eventType1); // triggers ProcessorTask instances
* }</pre>
* </p>
*/
public interface Processor {
/**
* Creates a new task instance for the specified member ID.
* <p>
* Each call should return a fresh {@link ProcessorTask} instance that encapsulates the logic
* for handling the event or generating the insight for the given member.
* </p>
*
* @param memberId the ID of the member for whom the task should operate
* @return a new {@link ProcessorTask} instance ready to be executed
*/
ProcessorTask createTask(String memberId);
}
package lc.lesson.cqrs;
import java.util.Objects;
/**
* Represents a unit of work that processes an event or generates an insight for a specific member.
* <p>
* A {@code ProcessorTask} is created by a {@link Processor} via the
* {@link Processor#createTask(String)} method. Each task encapsulates the logic for handling a
* specific {@link EventType} or generating a specific {@link InsightType} for a particular
* member identified by {@code memberId}.
* </p>
* <p>
* Tasks are executed by the {@link CQRSOrchestrator}, either asynchronously or synchronously,
* depending on how the orchestrator is invoked. Implementations of this class should provide the
* {@link #run()} method to perform the actual processing logic.
* </p>
* <p>
* Typical usage:
* <pre>{@code
* ProcessorTask task = myProcessor.createTask(memberId);
* new Thread(task).start(); // or orchestrator will execute it
* }</pre>
* </p>
*/
public abstract class ProcessorTask implements Runnable {
private final String memberId;
private final EventType eventType;
/**
* Constructs a new ProcessorTask for the specified member and event type.
*
* @param memberId the ID of the member for whom this task is created; must not be null
* @param eventType the type of event this task handles; must not be null
*/
protected ProcessorTask(String memberId, EventType eventType) {
assert (memberId != null);
assert (eventType != null);
this.memberId = memberId;
this.eventType = eventType;
}
/**
* Returns the ID of the member for whom this task operates.
*
* @return the member ID
*/
public String getMemberId() {
return memberId;
}
/**
* Returns the type of event this task handles.
*
* @return the event type
*/
public EventType getEventType() {
return eventType;
}
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
ProcessorTask that = (ProcessorTask) o;
return Objects.equals(memberId, that.memberId) && eventType == that.eventType;
}
@Override
public int hashCode() {
return Objects.hash(memberId, eventType);
}
@Override
public String toString() {
return "ProcessorTask{" +
"memberId='" + memberId + '\'' +
", eventType=" + eventType +
'}';
}
}
package lc.lesson.cqrs;
import lc.mecha.json.JSONObject;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* {@code RAMInsightStore} is an in-memory implementation of {@link InsightStore}.
*
* <p>Insights are stored in RAM using a {@link ConcurrentHashMap}, keyed by a composite
* of {@code memberId} and {@code insightType}. Expired entries are ignored during reads.</p>
*
* <p>This implementation does not persist data and is intended for testing or
* ephemeral storage scenarios.</p>
*
* @author Alex Leigh
*/
public class RAMInsightStore implements InsightStore {
/**
* Composite key for storing insights per member and type
*/
private static record InsightKey(String memberId, InsightType insightType) {
}
/**
* Wrapper to hold payload and expiration timestamp
*/
private static class InsightValue {
final JSONObject payload;
final Date expires;
InsightValue(JSONObject payload, Date expires) {
this.payload = payload;
this.expires = expires;
}
}
private final Map<InsightKey, InsightValue> store = new ConcurrentHashMap<>();
@Override
public JSONObject read(String memberId, InsightType insightType) {
InsightKey key = new InsightKey(memberId, insightType);
InsightValue value = store.get(key);
if (value == null) return null;
// Treat null expiration as "never expires"
if (value.expires != null && value.expires.before(new Date())) {
// Expired, remove entry
store.remove(key);
return null;
}
return value.payload;
}
@Override
public void write(String memberId, InsightType insightType, JSONObject payload, Date expires) {
if (payload == null) throw new IllegalArgumentException("payload cannot be null");
// Use far-future default if expires is null
Date effectiveExpires = (expires != null) ? expires : new Date(Long.MAX_VALUE);
InsightKey key = new InsightKey(memberId, insightType);
InsightValue value = new InsightValue(payload, effectiveExpires);
store.put(key, value);
}
}
package lc.lesson.cqrs;
import lc.mecha.json.JSONObject;
import javax.sql.DataSource;
import java.sql.*;
import java.util.Date;
/**
* {@code SQLInsightStore} is an implementation of the {@link InsightStore} interface
* that persists and retrieves "insight" records from a relational database
* using JDBC. This class enforces that {@code payload} and {@code expires_at}
* are never null in the database.
*
* <p>The {@code insights} table is expected to enforce a unique constraint on
* {@code (member_id, insight_type)}. {@code expires_at} is always populated
* with a timestamp; if no explicit expiration is provided, it defaults to
* a far-future timestamp (effectively "never expires").</p>
*
* <code>CREATE TABLE insights (
* member_id TEXT NOT NULL,
* insight_type TEXT NOT NULL,
* payload TEXT NOT NULL,
* expires_at TIMESTAMP NOT NULL,
* PRIMARY KEY (member_id, insight_type)
* );</code>
*
* @author Alex Leigh
*/
public class SQLInsightStore implements InsightStore {
private final DataSource dataSource;
/**
* Constructs a new {@code SQLInsightStore} backed by the given {@link DataSource}.
*
* @param dataSource a {@link DataSource} for obtaining database connections
*/
public SQLInsightStore(DataSource dataSource) {
this.dataSource = dataSource;
}
/**
* Reads an insight record for a specific member and insight type from the database.
* <p>
* This method only returns a record if the {@code member_id} and {@code insight_type}
* match and {@code expires_at} is in the future.
*
* @param memberId the unique identifier of the member
* @param insightType the type of insight (as an {@link Enum})
* @return a {@link JSONObject} representing the stored payload,
* or {@code null} if no matching or valid record exists
* @throws SQLException if a database access error occurs
*/
@Override
public JSONObject read(String memberId, InsightType insightType) throws SQLException {
final String sql = """
SELECT payload
FROM insights
WHERE member_id = ? AND insight_type = ?
AND (expires_at IS NULL OR expires_at > CURRENT_TIMESTAMP)
""";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, memberId);
ps.setString(2, insightType.name()); // Assuming enum InsightType
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
String payloadJson = rs.getString("payload");
return new JSONObject(payloadJson);
}
}
}
return null; // No match found
}
/**
* Writes or updates an insight record for a member and insight type.
* <p>
* Both {@code payload} and {@code expires_at} are required and never null.
* If {@code expires} is not specified, it defaults to a far-future timestamp
* (effectively meaning "never expires").
*
* @param memberId the unique identifier of the member
* @param insightType the type of insight
* @param payload the insight data as a {@link JSONObject}, not null
* @param expires the expiration timestamp, or {@code null} to set a far-future expiry
* @throws SQLException if a database access error occurs
*/
@Override
public void write(String memberId, InsightType insightType, JSONObject payload, Date expires) throws SQLException {
final String sql = """
INSERT INTO insights (member_id, insight_type, payload, expires_at)
VALUES (?, ?, ?, ?)
ON CONFLICT (member_id, insight_type)
DO UPDATE SET payload = EXCLUDED.payload,
expires_at = EXCLUDED.expires_at
""";
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, memberId);
ps.setString(2, insightType.name());
ps.setString(3, payload.toString());
if (expires != null) {
ps.setTimestamp(4, new Timestamp(expires.getTime()));
} else {
ps.setNull(4, Types.TIMESTAMP);
}
ps.executeUpdate();
}
}
}
package lc.lesson.cqrs;
import lc.mecha.json.JSONObject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.jupiter.api.Assertions.*;
class CQRSOrchestratorTest {
private RAMInsightStore store;
private CQRSOrchestrator orchestrator;
@BeforeEach
void setUp() {
store = new RAMInsightStore();
orchestrator = new CQRSOrchestrator(store);
}
@Test
void testWriteWithoutSubscribers() {
// No processors subscribed yet, should not fail
orchestrator.write("member1", EventType.EVENT_A);
}
@Test
void testWriteWithSubscribers() throws InterruptedException {
AtomicBoolean taskRan = new AtomicBoolean(false);
Processor processor = memberId -> new ProcessorTask(memberId, EventType.EVENT_A) {
@Override
public void run() {
taskRan.set(true);
}
};
orchestrator.subscribe(processor, List.of(EventType.EVENT_A), List.of());
orchestrator.write("member1", EventType.EVENT_A);
// Give some time for async executor to run
Thread.sleep(100);
assertTrue(taskRan.get(), "Processor task should have run");
}
@Test
void testReadInsightNotExistCreateFalse() throws SQLException {
JSONObject result = orchestrator.read("member1", InsightType.INSIGHT_A, true, false);
assertNull(result, "Should return null if insight does not exist and createIfNotExist is false");
}
@Test
void testReadInsightSyncCreateTrue() throws SQLException {
Processor processor = new Processor() {
@Override
public ProcessorTask createTask(String memberId) {
return new ProcessorTask(memberId, EventType.EVENT_A) {
@Override
public void run() {
store.write(memberId, InsightType.INSIGHT_A, new JSONObject("{\"data\":42}"), null);
}
};
}
};
orchestrator.subscribe(processor, List.of(), List.of(InsightType.INSIGHT_A));
JSONObject result = orchestrator.read("member1", InsightType.INSIGHT_A, true, true);
assertNotNull(result, "Insight should have been created by processor");
assertEquals(42, result.getInt("data"));
}
@Test
void testReadInsightAsyncCreateTrue() throws InterruptedException, SQLException {
AtomicBoolean taskRan = new AtomicBoolean(false);
Processor processor = memberId -> new ProcessorTask(memberId, EventType.EVENT_A) {
@Override
public void run() {
store.write(memberId, InsightType.INSIGHT_A, new JSONObject("{\"data\":99}"), null);
taskRan.set(true);
}
};
orchestrator.subscribe(processor, List.of(), List.of(InsightType.INSIGHT_A));
JSONObject result = orchestrator.read("member1", InsightType.INSIGHT_A, false, true);
// Async call should return null immediately
assertNull(result, "Async read returns null immediately");
// Wait for async task to complete
Thread.sleep(100);
JSONObject stored = store.read("member1", InsightType.INSIGHT_A);
assertNotNull(stored);
assertEquals(99, stored.getInt("data"));
assertTrue(taskRan.get(), "Async processor should have run");
}
@Test
void testSubscribeNullArguments() {
Processor processor = new Processor() {
@Override
public ProcessorTask createTask(String memberId) {
return new ProcessorTask(memberId, EventType.EVENT_A) {
@Override
public void run() {
}
};
}
};
assertThrows(AssertionError.class, () -> orchestrator.subscribe(null, List.of(EventType.EVENT_A), List.of(InsightType.INSIGHT_A)));
assertThrows(AssertionError.class, () -> orchestrator.subscribe(processor, null, List.of(InsightType.INSIGHT_A)));
assertThrows(AssertionError.class, () -> orchestrator.subscribe(processor, List.of(EventType.EVENT_A), null));
}
}
raksasa @ 61d7ec61
Subproject commit ee2ab23600c4cf7cc855d2b63bb5f467d5394df0
Subproject commit 61d7ec6198a9e88aa1e467d0b7ce60259ee77eef
......@@ -51,3 +51,4 @@ include 'raksasa:java:cw-strata-db-migration'
include 'raksasa:java:cw-strata-db'
include 'raksasa:java:cw-strata-openapi-spec'
include 'raksasa:java:cw-strata-orcs-svc'
include 'lc-java-example'
\ No newline at end of file