diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java b/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java index 852ce92..234ffee 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/AcquireLockOptions.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -52,6 +53,7 @@ public class AcquireLockOptions { * optionally back-off and retry and to acquire the lock. */ private final boolean shouldSkipBlockingWait; + private final ThreadFactory threadFactory; /** * A builder for setting up an AcquireLockOptions object. This allows clients to configure @@ -77,8 +79,18 @@ public static class AcquireLockOptionsBuilder { private Optional sessionMonitorCallback; private boolean isSessionMonitorSet = false; private boolean shouldSkipBlockingWait; + private ThreadFactory threadFactory; - AcquireLockOptionsBuilder(final String partitionKey) { + /** + * Produces a new ThreadFactory that creates threads with the default settings. + * + * @return a new ThreadFactory that creates threads with the default settings + */ + private static ThreadFactory threadFactory() { + return Thread::new; + } + + AcquireLockOptionsBuilder(final String partitionKey, final ThreadFactory threadFactory) { this.partitionKey = partitionKey; this.additionalAttributes = new HashMap<>(); this.sortKey = Optional.empty(); @@ -90,6 +102,7 @@ public static class AcquireLockOptionsBuilder { this.shouldSkipBlockingWait = false; this.acquireReleasedLocksConsistently = false; this.reentrant = false; + this.threadFactory = threadFactory == null ? threadFactory() : threadFactory; } /** @@ -327,13 +340,13 @@ public AcquireLockOptions build() { final Optional sessionMonitor; if (this.isSessionMonitorSet) { Objects.requireNonNull(this.timeUnit, "timeUnit must not be null if sessionMonitor is non-null"); - sessionMonitor = Optional.of(new SessionMonitor(this.timeUnit.toMillis(this.safeTimeWithoutHeartbeat), this.sessionMonitorCallback)); + sessionMonitor = Optional.of(new SessionMonitor(this.timeUnit.toMillis(this.safeTimeWithoutHeartbeat), this.sessionMonitorCallback, this.threadFactory)); } else { sessionMonitor = Optional.empty(); } return new AcquireLockOptions(this.partitionKey, this.sortKey, this.data, this.replaceData, this.deleteLockOnRelease, this.acquireOnlyIfLockAlreadyExists, this.refreshPeriod, this.additionalTimeToWaitForLock, this.timeUnit, this.additionalAttributes, sessionMonitor, - this.updateExistingLockRecord, this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant); + this.updateExistingLockRecord, this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant, this.threadFactory); } @Override @@ -342,7 +355,7 @@ public String toString() { + this.replaceData + ", deleteLockOnRelease=" + this.deleteLockOnRelease + ", refreshPeriod=" + this.refreshPeriod + ", additionalTimeToWaitForLock=" + this.additionalTimeToWaitForLock + ", timeUnit=" + this.timeUnit + ", additionalAttributes=" + this.additionalAttributes + ", safeTimeWithoutHeartbeat=" + this.safeTimeWithoutHeartbeat + ", sessionMonitorCallback=" + this.sessionMonitorCallback + ", acquireReleasedLocksConsistently=" - + this.acquireReleasedLocksConsistently + ", reentrant=" + this.reentrant+ ")"; + + this.acquireReleasedLocksConsistently + ", reentrant=" + this.reentrant + ", threadFactory= " + this.threadFactory + ")"; } } @@ -354,13 +367,26 @@ public String toString() { * @return a builder for an AquireLockOptions object */ public static AcquireLockOptionsBuilder builder(final String partitionKey) { - return new AcquireLockOptionsBuilder(partitionKey); + return new AcquireLockOptionsBuilder(partitionKey, null); + } + + /** + * Creates a new version of AcquireLockOptionsBuilder using the partition key as well as a ThreadFactory, which + * will be used for spawning related threads. + * + * @param partitionKey The partition key under which the lock will be acquired. + * @param threadFactory The factory to create related threads. + * @return + */ + public static AcquireLockOptionsBuilder builder(final String partitionKey, final ThreadFactory threadFactory) { + return new AcquireLockOptionsBuilder(partitionKey, threadFactory); } private AcquireLockOptions(final String partitionKey, final Optional sortKey, final Optional data, final Boolean replaceData, final Boolean deleteLockOnRelease, final Boolean acquireOnlyIfLockAlreadyExists, final Long refreshPeriod, final Long additionalTimeToWaitForLock, final TimeUnit timeUnit, final Map additionalAttributes, final Optional sessionMonitor, - final Boolean updateExistingLockRecord, final Boolean shouldSkipBlockingWait, final Boolean acquireReleasedLocksConsistently, Boolean reentrant) { + final Boolean updateExistingLockRecord, final Boolean shouldSkipBlockingWait, final Boolean acquireReleasedLocksConsistently, Boolean reentrant, + final ThreadFactory threadFactory) { this.partitionKey = partitionKey; this.sortKey = sortKey; this.data = data; @@ -376,6 +402,7 @@ private AcquireLockOptions(final String partitionKey, final Optional sor this.shouldSkipBlockingWait = shouldSkipBlockingWait; this.acquireReleasedLocksConsistently = acquireReleasedLocksConsistently; this.reentrant = reentrant; + this.threadFactory = threadFactory; } String getPartitionKey() { @@ -428,6 +455,10 @@ Map getAdditionalAttributes() { return this.additionalAttributes; } + ThreadFactory getThreadFactory() { + return this.threadFactory; + } + /** * Constructs a SessionMonitor object for LockItem instantiation * @@ -460,7 +491,8 @@ public boolean equals(final Object other) { && Objects.equals(this.updateExistingLockRecord, otherOptions.updateExistingLockRecord) && Objects.equals(this.shouldSkipBlockingWait, otherOptions.shouldSkipBlockingWait) && Objects.equals(this.acquireReleasedLocksConsistently, otherOptions.acquireReleasedLocksConsistently) - && Objects.equals(this.reentrant, otherOptions.reentrant); + && Objects.equals(this.reentrant, otherOptions.reentrant) + && Objects.equals(this.threadFactory, otherOptions.threadFactory); } @Override @@ -468,7 +500,7 @@ public int hashCode() { return Objects.hash(this.partitionKey, this.sortKey, this.data, this.replaceData, this.deleteLockOnRelease, this.acquireOnlyIfLockAlreadyExists, this.refreshPeriod, this.additionalTimeToWaitForLock, this.timeUnit, this.additionalAttributes, this.sessionMonitor, this.updateExistingLockRecord, - this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant); + this.shouldSkipBlockingWait, this.acquireReleasedLocksConsistently, this.reentrant, this.threadFactory); } diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java b/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java index 4d7d0f8..9fc6f60 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClient.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -237,6 +238,7 @@ public class AmazonDynamoDBLockClient implements Runnable, Closeable { private final ConcurrentHashMap sessionMonitors; private final Optional backgroundThread; private final Function namedThreadCreator; + private final ReentrantLock releaseLocksReentrantLock; private volatile boolean shuttingDown = false; /* These are the keys that are stored in the DynamoDB table to keep track of the locks */ @@ -281,6 +283,7 @@ public AmazonDynamoDBLockClient(final AmazonDynamoDBLockClientOptions amazonDyna this.sortKeyName = amazonDynamoDBLockClientOptions.getSortKeyName(); this.namedThreadCreator = amazonDynamoDBLockClientOptions.getNamedThreadCreator(); this.holdLockOnServiceUnavailable = amazonDynamoDBLockClientOptions.getHoldLockOnServiceUnavailable(); + this.releaseLocksReentrantLock = new ReentrantLock(); if (amazonDynamoDBLockClientOptions.getCreateHeartbeatBackgroundThread()) { if (this.leaseDurationInMilliseconds < 2 * this.heartbeatPeriodInMilliseconds) { @@ -853,7 +856,9 @@ public boolean releaseLock(final ReleaseLockOptions options) { return false; } - synchronized (lockItem) { + lockItem.getReentrantLock().lock(); + try + { try { // Always remove the heartbeat for the lock. The // caller's intention is to release the lock. Stopping the @@ -931,6 +936,12 @@ public boolean releaseLock(final ReleaseLockOptions options) { // get exceptions from this method. this.removeKillSessionMonitor(lockItem.getUniqueIdentifier()); } + finally + { + lockItem.getReentrantLock() + .unlock(); + } + return true; } @@ -952,11 +963,15 @@ private Map getKeys(String partitionKey, Optional locks = new HashMap<>(this.locks); - synchronized (locks) { + this.releaseLocksReentrantLock.lock(); + try { for (final Entry lockEntry : locks.entrySet()) { this.releaseLock(lockEntry.getValue()); // TODO catch exceptions and report failure separately } } + finally { + this.releaseLocksReentrantLock.unlock(); + } } /** @@ -1163,7 +1178,8 @@ public void sendHeartbeat(final SendHeartbeatOptions options) { throw new LockNotGrantedException("Cannot send heartbeat because lock is not granted"); } - synchronized (lockItem) { + lockItem.getReentrantLock().lock(); + try { //Set up condition for UpdateItem. Basically any changes require: //1. I own the lock //2. I know the current version number @@ -1235,6 +1251,10 @@ public void sendHeartbeat(final SendHeartbeatOptions options) { } } } + finally { + lockItem.getReentrantLock() + .unlock(); + } } /** diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientOptions.java b/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientOptions.java index 38c69f5..3340d4d 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientOptions.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/AmazonDynamoDBLockClientOptions.java @@ -51,7 +51,6 @@ public class AmazonDynamoDBLockClientOptions { private final Function namedThreadCreator; private final Boolean holdLockOnServiceUnavailable; - /** * A builder for setting up an AmazonDynamoDBLockClientOptions object. By default, it is setup to have a partition key name of * "key," a lease duration of 20 seconds, and a default heartbeat period of 5 seconds. These defaults can be overriden. @@ -76,6 +75,14 @@ public static class AmazonDynamoDBLockClientOptionsBuilder { namedThreadCreator()); } + AmazonDynamoDBLockClientOptionsBuilder(final DynamoDbClient dynamoDBClient, final String tableName, final Function namedThreadCreator) { + this(dynamoDBClient, tableName, + /* By default, tries to set ownerName to the localhost */ + generateOwnerNameFromLocalhost(), + namedThreadCreator); + } + + private static final String generateOwnerNameFromLocalhost() { try { return Inet4Address.getLocalHost().getHostName() + UUID.randomUUID().toString(); @@ -233,6 +240,20 @@ public static AmazonDynamoDBLockClientOptionsBuilder builder(final DynamoDbClien return new AmazonDynamoDBLockClientOptionsBuilder(dynamoDBClient, tableName); } + /** + * Creates an AmazonDynamoDBLockClientOptions builder object, which can be + * used to create an AmazonDynamoDBLockClient. The only required parameters + * are the client and the table name. + * + * @param dynamoDBClient The client for talking to DynamoDB. + * @param tableName The table containing the lock client. + * @param namedThreadCreator A function that takes in a thread name and outputs a ThreadFactory that creates threads with the given name. + * @return A builder which can be used for creating a lock client. + */ + public static AmazonDynamoDBLockClientOptionsBuilder builder(final DynamoDbClient dynamoDBClient, final String tableName, final Function namedThreadCreator) { + return new AmazonDynamoDBLockClientOptionsBuilder(dynamoDBClient, tableName, namedThreadCreator); + } + private AmazonDynamoDBLockClientOptions(final DynamoDbClient dynamoDBClient, final String tableName, final String partitionKeyName, final Optional sortKeyName, final String ownerName, final Long leaseDuration, final Long heartbeatPeriod, final TimeUnit timeUnit, final Boolean createHeartbeatBackgroundThread, final Function namedThreadCreator, final Boolean holdLockOnServiceUnavailable) { diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/LockItem.java b/src/main/java/com/amazonaws/services/dynamodbv2/LockItem.java index 91108ba..77ca255 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/LockItem.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/LockItem.java @@ -26,6 +26,8 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + import software.amazon.awssdk.services.dynamodb.model.AttributeValue; /** @@ -37,6 +39,7 @@ public class LockItem implements Closeable { private final AmazonDynamoDBLockClient client; private final String partitionKey; private final Optional sortKey; + private final ReentrantLock reentrantLock; private Optional data; private final String ownerName; @@ -87,6 +90,7 @@ public class LockItem implements Closeable { this.ownerName = ownerName; this.deleteLockItemOnClose = deleteLockItemOnClose; + this.reentrantLock = new ReentrantLock(); this.leaseDuration = new AtomicLong(leaseDuration); this.lookupTime = new AtomicLong(lastUpdatedTimeInMilliseconds); this.recordVersionNumber = new StringBuffer(recordVersionNumber); @@ -105,6 +109,15 @@ public String getPartitionKey() { return this.partitionKey; } + /** + * Returns the {@link ReentrantLock} for this {@link LockItem}. + * + * @return The reentrant lock for this lock item. + */ + public ReentrantLock getReentrantLock() { + return this.reentrantLock; + } + /** * Returns the sort key associated with the lock, if there is one. * diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/SessionMonitor.java b/src/main/java/com/amazonaws/services/dynamodbv2/SessionMonitor.java index 13df7ff..fa4960d 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/SessionMonitor.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/SessionMonitor.java @@ -15,6 +15,7 @@ package com.amazonaws.services.dynamodbv2; import java.util.Optional; +import java.util.concurrent.ThreadFactory; import com.amazonaws.services.dynamodbv2.util.LockClientUtils; @@ -29,6 +30,7 @@ final class SessionMonitor { private final long safeTimeWithoutHeartbeatMillis; private final Optional callback; + private final ThreadFactory threadFactory; /** * Constructs a SessionMonitor object. @@ -38,10 +40,15 @@ final class SessionMonitor { * "danger zone" * @param callback the callback to run when the lock's lease enters the danger * zone + * @param threadFactory the factory to create the thread that will run the callback */ - public SessionMonitor(final long safeTimeWithoutHeartbeatMillis, final Optional callback) { + public SessionMonitor( + final long safeTimeWithoutHeartbeatMillis, + final Optional callback, + final ThreadFactory threadFactory) { this.safeTimeWithoutHeartbeatMillis = safeTimeWithoutHeartbeatMillis; this.callback = callback; + this.threadFactory = threadFactory; } /** @@ -78,7 +85,7 @@ long millisecondsUntilLeaseEntersDangerZone(final long lastAbsoluteTimeUpdatedMi */ public void runCallback() { if (this.callback.isPresent()) { - final Thread t = new Thread(this.callback.get()); + final Thread t = this.threadFactory.newThread(this.callback.get()); t.setDaemon(true); t.start(); } diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java index ca1dc7a..b48e8dc 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/LockItemTest.java @@ -69,7 +69,7 @@ public void equals_differentPartitionKey_returnFalse() { 1000, //last updated time in milliseconds "recordVersionNumber", false, //released - Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor + Optional.of(new SessionMonitor(1000, Optional.empty(), Thread::new)), //session monitor new HashMap<>()))); } @@ -84,7 +84,7 @@ public void equals_differentOwner_returnFalse() { 1000, //last updated time in milliseconds "recordVersionNumber", false, //released - Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor + Optional.of(new SessionMonitor(1000, Optional.empty(), Thread::new)), //session monitor new HashMap<>()))); } @@ -104,7 +104,7 @@ public void isExpired_whenIsReleasedTrue_returnTrue() { 1000, //last updated time in milliseconds "recordVersionNumber", true, //released - Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor + Optional.of(new SessionMonitor(1000, Optional.empty(), Thread::new)), //session monitor new HashMap<>()).isExpired()); } @@ -139,7 +139,7 @@ public void millisecondsUntilDangerZoneEntered_whenIsReleasedTrue_throwsIllegalS 1000, //last updated time in milliseconds "recordVersionNumber", true, //released - Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor + Optional.of(new SessionMonitor(1000, Optional.empty(), Thread::new)), //session monitor new HashMap<>()).millisecondsUntilDangerZoneEntered(); } @@ -191,7 +191,7 @@ static LockItem createLockItem(AmazonDynamoDBLockClient lockClient) { 1000, //last updated time in milliseconds "recordVersionNumber", false, //released - Optional.of(new SessionMonitor(1000, Optional.empty())), //session monitor + Optional.of(new SessionMonitor(1000, Optional.empty(), Thread::new)), //session monitor new HashMap<>()); //additional attributes } } diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/SessionMonitorTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/SessionMonitorTest.java index 53dab75..134d46b 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/SessionMonitorTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/SessionMonitorTest.java @@ -36,33 +36,33 @@ public class SessionMonitorTest { @Test public void isLeaseEnteringDangerZone_whenThereAreZeroOrLessMillisUntilEnterDangerZone_returnTrue() { - SessionMonitor sut = PowerMockito.spy(new SessionMonitor(1000, Optional.empty())); + SessionMonitor sut = PowerMockito.spy(new SessionMonitor(1000, Optional.empty(), Thread::new)); when(sut.millisecondsUntilLeaseEntersDangerZone(25L)).thenReturn(0L); assertTrue(sut.isLeaseEnteringDangerZone(25L)); } @Test public void isLeaseEnteringDangerZone_whenThereAreMoreThanZeroMillisUntilEnterDangerZone_returnFalse() { - SessionMonitor sut = PowerMockito.spy(new SessionMonitor(1000, Optional.empty())); + SessionMonitor sut = PowerMockito.spy(new SessionMonitor(1000, Optional.empty(), Thread::new)); when(sut.millisecondsUntilLeaseEntersDangerZone(25L)).thenReturn(1L); assertFalse(sut.isLeaseEnteringDangerZone(25L)); } @Test public void runCallback_whenNotPresent_doesNothing() { - SessionMonitor sut = new SessionMonitor(1000, Optional.empty()); + SessionMonitor sut = new SessionMonitor(1000, Optional.empty(), Thread::new); sut.runCallback(); } @Test public void hasCallback_whenCallbackNull_returnFalse() { - SessionMonitor sut = new SessionMonitor(1000, null); + SessionMonitor sut = new SessionMonitor(1000, null, Thread::new); assertFalse(sut.hasCallback()); } @Test public void hasCallback_whenCallbackNotNull_returnTrue() { - SessionMonitor sut = new SessionMonitor(1000, Optional.empty()); + SessionMonitor sut = new SessionMonitor(1000, Optional.empty(), Thread::new); assertTrue(sut.hasCallback()); } }