Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,44 @@ For XDC locks requiring strong consistency, opt for a multi-site Aerospike clust
#### Notes

A lock exists only within the scope of a Client represented by `CLIENT_ID`.

---

## Configuring Lock Timing

By default, every `LockBase` uses the library's built-in timing constants:

| Parameter | Default | Description |
|-----------|---------|-------------|
| `lockTtl` | 90 s | How long the lock is held before the storage layer expires it automatically |
| `waitForLock` | 90 s | Maximum time a blocking `acquireLock` call waits for a contended lock |
| `sleepBetweenRetries` | 1000 ms | Sleep interval between successive acquisition attempts |

To customise these values supply a `LockConfiguration` to the `LockBase` builder using standard `Duration` values. Any value left unset falls back to the library default.

```java
// Tight SLO service: short TTL, short wait, fast retry
LockConfiguration config = LockConfiguration.builder()
.lockTtl(Duration.ofSeconds(30)) // hold locks for 30 s
.waitForLock(Duration.ofSeconds(10)) // wait at most 10 s for a contended lock
.sleepBetweenRetries(Duration.ofMillis(500)) // retry every 500 ms
.build();

DistributedLockManager lockManager = DistributedLockManager.builder()
.clientId("CLIENT_ID")
.farmId("FA1")
.lockBase(LockBase.builder()
.mode(LockMode.EXCLUSIVE)
.lockConfiguration(config) // <-- inject custom config
.lockStore(AerospikeStore.builder()
.aerospikeClient(aerospikeClient)
.namespace("NAMESPACE")
.setSuffix("distributed_lock")
.build())
.build())
.build();
lockManager.initialize();
```

> **Backward compatibility**: omitting `lockConfiguration(...)` from the builder is fully supported
> and produces identical behaviour to all previous library versions.
12 changes: 8 additions & 4 deletions src/main/java/com/phonepe/dlm/DistributedLockManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.phonepe.dlm.exception.ErrorCode;
import com.phonepe.dlm.lock.Lock;
import com.phonepe.dlm.lock.base.LockBase;
import com.phonepe.dlm.lock.base.LockConfiguration;
import com.phonepe.dlm.lock.level.LockLevel;

import java.time.Duration;
Expand All @@ -42,7 +43,8 @@ public void initialize() {
* This method attempts to acquire the lock immediately and throws exception if lock is unavailable
* It does not wait if the lock is currently held by another thread.
* <p>
* The lock will be acquired for default time period {@link LockBase#DEFAULT_LOCK_TTL_SECONDS}
* The lock will be held for the TTL configured in {@link LockBase}'s {@link LockConfiguration},
* defaulting to {@link LockConfiguration#DEFAULT_LOCK_TTL} seconds.
*
* @param lock The lock to be acquired.
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is already acquired
Expand All @@ -68,8 +70,9 @@ public void tryAcquireLock(final Lock lock, final Duration duration) {
* it will wait until the lock becomes available.
* It blocks the thread until the lock is acquired.
* <p>
* By default, timeout is {@link LockBase#DEFAULT_WAIT_FOR_LOCK_IN_SECONDS}
* The lock will be acquired for default time period {@link LockBase#DEFAULT_LOCK_TTL_SECONDS}
* By default, timeout is {@link LockConfiguration#DEFAULT_WAIT_FOR_LOCK} seconds and
* TTL is {@link LockConfiguration#DEFAULT_LOCK_TTL} seconds. Override both via
* {@link LockConfiguration} on the {@link LockBase}.
*
* @param lock The lock to be acquired.
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout
Expand All @@ -83,7 +86,8 @@ public void acquireLock(final Lock lock) {
* it will wait until the lock becomes available.
* It blocks the thread until the lock is acquired.
* <p>
* By default, timeout is {@link LockBase#DEFAULT_WAIT_FOR_LOCK_IN_SECONDS}
* By default, timeout is {@link LockConfiguration#DEFAULT_WAIT_FOR_LOCK} seconds. Override
* via {@link LockConfiguration} on the {@link LockBase}.
*
* @param lock The lock to be acquired.
* @param duration The lock duration in seconds for which lock will be held
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/com/phonepe/dlm/lock/ILockable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.phonepe.dlm.lock;

import com.phonepe.dlm.exception.ErrorCode;
import com.phonepe.dlm.lock.base.LockBase;

import com.phonepe.dlm.lock.base.LockConfiguration;

import java.time.Duration;

Expand All @@ -29,7 +30,9 @@ public interface ILockable {
* This method attempts to acquire the lock immediately and throws exception if lock is unavailable
* It does not wait if the lock is currently held by another thread.
* <p>
* The lock will be acquired for default time period {@link LockBase#DEFAULT_LOCK_TTL_SECONDS}
* The lock will be acquired for the TTL configured via
* {@link LockConfiguration} (field: {@code lockTtl}), defaulting to
* {@link LockConfiguration#DEFAULT_LOCK_TTL} seconds.
*
* @param lock The lock to be acquired.
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is already acquired
Expand All @@ -51,8 +54,9 @@ public interface ILockable {
* it will wait until the lock becomes available.
* It blocks the thread until the lock is acquired.
* <p>
* By default, timeout is {@link LockBase#DEFAULT_WAIT_FOR_LOCK_IN_SECONDS}
* The lock will be acquired for default time period {@link LockBase#DEFAULT_LOCK_TTL_SECONDS}
* By default, timeout is {@link com.phonepe.dlm.lock.base.LockConfiguration#DEFAULT_WAIT_FOR_LOCK} seconds,
* and TTL is {@link com.phonepe.dlm.lock.base.LockConfiguration#DEFAULT_LOCK_TTL} seconds.
* Override both via {@link com.phonepe.dlm.lock.base.LockConfiguration}.
*
* @param lock The lock to be acquired.
* @throws DLMException with {@link ErrorCode#LOCK_UNAVAILABLE} if lock is not available even after the timeout
Expand All @@ -64,7 +68,8 @@ public interface ILockable {
* it will wait until the lock becomes available.
* It blocks the thread until the lock is acquired.
* <p>
* By default, timeout is {@link LockBase#DEFAULT_WAIT_FOR_LOCK_IN_SECONDS}
* By default, timeout is {@link com.phonepe.dlm.lock.base.LockConfiguration#DEFAULT_WAIT_FOR_LOCK} seconds.
* Override via {@link com.phonepe.dlm.lock.base.LockConfiguration}.
*
* @param lock The lock to be acquired.
* @param duration The lock duration in seconds for which lock will be held
Expand Down
80 changes: 67 additions & 13 deletions src/main/java/com/phonepe/dlm/lock/base/LockBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,69 @@
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Core implementation of the distributed locking contract defined by {@link ILockable}.
*
* <p>{@code LockBase} orchestrates lock acquisition and release by delegating storage operations
* to the configured {@link ILockStore}. Timing behaviour — TTL, wait timeout, and sleep between retries —
* is governed by the {@link LockConfiguration} supplied at construction time. When no configuration
* is provided the library falls back to {@link LockConfiguration}'s built-in defaults
* ({@link LockConfiguration#DEFAULT_LOCK_TTL},
* {@link LockConfiguration#DEFAULT_WAIT_FOR_LOCK},
* {@link LockConfiguration#DEFAULT_SLEEP_BETWEEN_RETRIES}), preserving full backward
* compatibility.
*
* <h2>Typical construction</h2>
* <pre>{@code
* // Backward-compatible: omitting lockConfiguration uses library defaults.
* LockBase lockBase = LockBase.builder()
* .mode(LockMode.EXCLUSIVE)
* .lockStore(aerospikeStore)
* .build();
*
* // Custom timing for a service with tighter SLOs.
* LockBase lockBase = LockBase.builder()
* .mode(LockMode.EXCLUSIVE)
* .lockStore(aerospikeStore)
* .lockConfiguration(LockConfiguration.builder()
* .lockTtl(Duration.ofSeconds(30))
* .waitForLock(Duration.ofSeconds(10))
* .sleepBetweenRetries(Duration.ofMillis(500))
* .build())
* .build();
* }</pre>
*
* <p>This class is thread-safe provided the supplied {@link ILockStore} is also thread-safe.
*/
@Slf4j
@AllArgsConstructor
@Builder
@Getter
@Builder
@AllArgsConstructor
public class LockBase implements ILockable {
public static final Duration DEFAULT_LOCK_TTL_SECONDS = Duration.ofSeconds(90);
public static final Duration DEFAULT_WAIT_FOR_LOCK_IN_SECONDS = Duration.ofSeconds(90);
public static final int WAIT_TIME_FOR_NEXT_RETRY = 1000; // 1 second

/**
* The storage backend used to persist and remove lock records.
*/
private final ILockStore lockStore;
private final LockMode mode; // Not implemented now, but can be leveraged in the future.

/**
* The locking mode (e.g. {@link LockMode#EXCLUSIVE}).
* Not actively enforced today but reserved for future multi-mode support.
*/
private final LockMode mode;

/**
* Timing configuration for this lock base instance.
* <p>
* When not set via the builder, defaults to {@link LockConfiguration#builder() build()},
* which applies the library-standard defaults (90 s TTL, 90 s wait, 1 000 ms retry).
*/
@Builder.Default
private final LockConfiguration lockConfiguration = LockConfiguration.builder().build();

@Override
public void tryAcquireLock(final Lock lock) {
tryAcquireLock(lock, DEFAULT_LOCK_TTL_SECONDS);
tryAcquireLock(lock, lockConfiguration.getLockTtl());
}

@Override
Expand All @@ -55,12 +103,12 @@ public void tryAcquireLock(final Lock lock, final Duration duration) {

@Override
public void acquireLock(final Lock lock) {
acquireLock(lock, DEFAULT_LOCK_TTL_SECONDS, DEFAULT_WAIT_FOR_LOCK_IN_SECONDS);
acquireLock(lock, lockConfiguration.getLockTtl(), lockConfiguration.getWaitForLock());
}

@Override
public void acquireLock(final Lock lock, final Duration duration) {
acquireLock(lock, duration, DEFAULT_WAIT_FOR_LOCK_IN_SECONDS);
acquireLock(lock, duration, lockConfiguration.getWaitForLock());
}

@Override
Expand All @@ -77,15 +125,14 @@ public void acquireLock(final Lock lock, final Duration duration, final Duration
throw e;
}
if (e.getErrorCode() == ErrorCode.LOCK_UNAVAILABLE) {
sleep();
sleep(lockConfiguration.getSleepBetweenRetries());
continue;
}
throw e;
}
} while (!success.get());
}


@Override
public boolean releaseLock(final Lock lock) {
if (lock.getAcquiredStatus().get()) {
Expand All @@ -101,9 +148,16 @@ private void writeToStore(final Lock lock, final Duration ttlSeconds) {
lock.getAcquiredStatus().compareAndSet(false, true);
}

private static void sleep() {
/**
* Sleeps for the configured sleepBetweenRetries before the next acquisition attempt.
*
* @param sleepBetweenRetries the duration to sleep
* @throws DLMException wrapping {@link InterruptedException} if the thread is interrupted
* while sleeping, with the interrupt status restored on the current thread
*/
private static void sleep(final Duration sleepBetweenRetries) {
try {
Thread.sleep(WAIT_TIME_FOR_NEXT_RETRY);
Thread.sleep(sleepBetweenRetries.toMillis());
} catch (InterruptedException e) {
log.error("Error sleeping the thread", e);
Thread.currentThread().interrupt();
Expand Down
116 changes: 116 additions & 0 deletions src/main/java/com/phonepe/dlm/lock/base/LockConfiguration.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/**
* Copyright (c) 2024 Original Author(s), PhonePe India Pvt. Ltd.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.phonepe.dlm.lock.base;

import lombok.Builder;
import lombok.Getter;

import java.time.Duration;

/**
* Immutable configuration encapsulating all timing parameters that govern lock-acquisition
* behaviour in the Distributed Lock Manager.
*
* <p>All parameters are expressed as {@link Duration} values. Any parameter left unset in the
* builder falls back to its corresponding library default via {@code @Builder.Default}, preserving
* full backward compatibility for callers that do not supply a {@code LockConfiguration}.
*
* <h2>Default values</h2>
* <ul>
* <li><b>lockTtl</b> ({@link #DEFAULT_LOCK_TTL}) — how long the lock is held before the
* storage layer expires it automatically.</li>
* <li><b>waitForLock</b> ({@link #DEFAULT_WAIT_FOR_LOCK}) — maximum time a caller blocks
* waiting for a contended lock.</li>
* <li><b>sleepBetweenRetries</b> ({@link #DEFAULT_SLEEP_BETWEEN_RETRIES}) — polling interval between
* successive acquisition attempts when a lock is unavailable.</li>
* </ul>
*
* <h2>Usage</h2>
* <pre>{@code
* // Use library defaults — identical behaviour to omitting lockConfiguration from the builder.
* LockConfiguration defaults = LockConfiguration.builder().build();
*
* // Custom configuration for a service with tighter SLOs.
* LockConfiguration custom = LockConfiguration.builder()
* .lockTtl(Duration.ofSeconds(30))
* .waitForLock(Duration.ofSeconds(10))
* .sleepBetweenRetries(Duration.ofMillis(500))
* .build();
*
* DistributedLockManager lockManager = DistributedLockManager.builder()
* .clientId("MY_SERVICE")
* .farmId("MHX")
* .lockBase(LockBase.builder()
* .mode(LockMode.EXCLUSIVE)
* .lockConfiguration(custom)
* .lockStore(aerospikeStore)
* .build())
* .build();
* }</pre>
*
* <p>Instances of this class are <em>immutable</em> and therefore safe for concurrent use without
* external synchronisation.
*/
@Getter
@Builder
public final class LockConfiguration {

/**
* Default lock time-to-live: 90 seconds.
* <p>
* The lock is held for this duration before the storage layer expires it automatically,
* protecting against deadlocks caused by holders that crash or fail to release the lock.
*/
public static final Duration DEFAULT_LOCK_TTL = Duration.ofSeconds(90);

/**
* Default maximum wait time for lock acquisition: 90 seconds.
* <p>
* When a caller invokes a blocking {@code acquireLock} variant without specifying a timeout,
* the library retries for at most this duration before throwing a
* {@link com.phonepe.dlm.exception.DLMException} with
* {@link com.phonepe.dlm.exception.ErrorCode#LOCK_UNAVAILABLE}.
*/
public static final Duration DEFAULT_WAIT_FOR_LOCK = Duration.ofSeconds(90);

/**
* Default polling interval between successive lock-acquisition retries: 1 second.
* <p>
* When a lock is unavailable, the library sleeps for this duration before the next attempt.
* Tuning this value trades off CPU/network overhead against acquisition latency under
* contention.
*/
public static final Duration DEFAULT_SLEEP_BETWEEN_RETRIES = Duration.ofMillis(1_000);

/**
* The duration for which a successfully acquired lock is held before automatic expiry.
*/
@Builder.Default
private final Duration lockTtl = DEFAULT_LOCK_TTL;

/**
* The maximum duration a blocking {@code acquireLock} call will wait for a contended lock.
*/
@Builder.Default
private final Duration waitForLock = DEFAULT_WAIT_FOR_LOCK;

/**
* The sleep duration between successive acquisition attempts when a lock is unavailable.
*/
@Builder.Default
private final Duration sleepBetweenRetries = DEFAULT_SLEEP_BETWEEN_RETRIES;
}
Loading
Loading