diff --git a/java/lc-esp-eo-engine/src/main/java/lc/esp/eo/EOTransaction.java b/java/lc-esp-eo-engine/src/main/java/lc/esp/eo/EOTransaction.java index fc0b8c7df9be54cf43cf77fe8b70915a6f94889d..9e7d1cd870c9db400b46f1ff9d144100afdcd9a9 100644 --- a/java/lc-esp-eo-engine/src/main/java/lc/esp/eo/EOTransaction.java +++ b/java/lc-esp-eo-engine/src/main/java/lc/esp/eo/EOTransaction.java @@ -1,6 +1,7 @@ package lc.esp.eo; import com.mongodb.BasicDBObject; +import com.mongodb.MongoGridFSException; import com.mongodb.TransactionOptions; import com.mongodb.WriteConcern; import com.mongodb.client.ClientSession; @@ -44,6 +45,8 @@ public class EOTransaction implements AutoCloseable { private final static String KEY_CONTENT_TYPE = "contentType"; private final static String COLLECTION_OBJ = "eo_obj"; private final static String COLLECTION_ASSOC = "eo_assoc"; + private final static String COLLECTION_CHUNKS = "chunks"; + private final static String COLLECTION_FILES = "files"; private HashSet pendingFiles; public EOTransaction(MongoClient client, String databaseName) { @@ -62,21 +65,18 @@ public class EOTransaction implements AutoCloseable { * errors. */ public void initialize() { - String objFullPath = databaseName + "." + COLLECTION_OBJ; - String assocFullpath = databaseName + "." + COLLECTION_ASSOC; - - logger.info("Initializing database. [obj: {}] [assoc: {}]", objFullPath, assocFullpath); + shardByKey(COLLECTION_OBJ, KEY_ID); + shardByKey(COLLECTION_ASSOC, KEY_SRCOBJ); + shardByKey(COLLECTION_FILES, KEY_ID); + shardByKey(COLLECTION_CHUNKS, KEY_ID); + } + private void shardByKey(String collectionName, String keyName) { + String path = databaseName + "." + collectionName; MongoDatabase adminDb = client.getDatabase("admin"); - - Document cmd = new Document("shardCollection", objFullPath).append("key", new Document("_id", "hashed")); + Document cmd = new Document("shardCollection", path).append("key", new Document(keyName, "hashed")); Document response = adminDb.runCommand(cmd); - logger.info("Response: {}", response); - - cmd = new Document("shardCollection", assocFullpath).append("key", new Document(KEY_SRCOBJ, "hashed")); - response = adminDb.runCommand(cmd); - logger.info("Response: {}", response); - + logger.info("shardBykey: {}", response); // TODO: Wait for `db.collection.getShardDistribution()` } @@ -141,7 +141,7 @@ public class EOTransaction implements AutoCloseable { * whatever reason, the EOFS directory objects (created on the transaction) will be correct, but the file will * actually exist in GridFs, in an unknown condition; either fully or partially intact. */ - public void fileCreate(String filename, InputStream file, String contentType) { + public String fileCreate(String filename, InputStream file, String contentType) { // TODO: Implement directory structure if (filename.contains("/")) throw new IllegalArgumentException("Filename provided may not contain file separator."); @@ -150,11 +150,12 @@ public class EOTransaction implements AutoCloseable { .chunkSizeBytes(1048576) .metadata(new Document(KEY_CONTENT_TYPE, contentType)); GridFSBucket gridFSBucket = GridFSBuckets.create(db); - ObjectId fileId = gridFSBucket.uploadFromStream(session, filename, file, options); + ObjectId fileId = gridFSBucket.uploadFromStream(filename, file, options); synchronized (this) { if (pendingFiles == null) pendingFiles = new HashSet<>(); pendingFiles.add(fileId); } + return fileId.toString(); } public void fileRead(OutputStream os, String filename) throws IOException { @@ -182,7 +183,11 @@ public class EOTransaction implements AutoCloseable { if (pendingFiles != null) { GridFSBucket gridFSBucket = GridFSBuckets.create(db); for (ObjectId pending : pendingFiles) { - gridFSBucket.delete(pending); + try { + gridFSBucket.delete(pending); + } catch (MongoGridFSException e) { + logger.warn("Failed to delete file created adjacent to pending transaction: {}", e); + } } } session.startTransaction(); @@ -190,6 +195,7 @@ public class EOTransaction implements AutoCloseable { @Override public void close() { + rollback(); session.close(); client.close(); } diff --git a/java/lc-esp-eo-engine/src/test/java/lc/esp/eo/EOTransactionTest.java b/java/lc-esp-eo-engine/src/test/java/lc/esp/eo/EOTransactionTest.java index 6a832d455fefd151e9329769be80982c5c472dfc..5d933a23c760029743146413c8612c97d5b395ed 100644 --- a/java/lc-esp-eo-engine/src/test/java/lc/esp/eo/EOTransactionTest.java +++ b/java/lc-esp-eo-engine/src/test/java/lc/esp/eo/EOTransactionTest.java @@ -43,10 +43,10 @@ public class EOTransactionTest { try { // We basically just assume this will always be around in the build environment try (FileInputStream fis = new FileInputStream("build.gradle")) { - txn.fileCreate("test", fis, "text/plain"); - + String fileId = txn.fileCreate("test", fis, "text/plain"); + logger.info("Created file: {}", fileId); ByteArrayOutputStream bos = new ByteArrayOutputStream(); - txn.fileRead(bos, "/test"); + txn.fileRead(bos, "test"); bos.close(); logger.info("Read file: {}", bos.size()); assert (bos.size() > 0);