Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public class ContainerSet implements Iterable<Container<?>> {

private static final Logger LOG = LoggerFactory.getLogger(ContainerSet.class);

/**
* Max attempts to acquire {@link Container#writeLock()} while verifying this set's id → container mapping
* is same (e.g. another thread may {@link #updateContainer} / DiskBalancer swap the instance).
*/
private static final int MAX_CONTAINER_MAP_SWAP_RETRIES = 10;

private final ConcurrentSkipListMap<Long, Container<?>> containerMap = new
ConcurrentSkipListMap<>();
private final ConcurrentSkipListSet<Long> missingContainerSet =
Expand Down Expand Up @@ -279,6 +285,49 @@ public Container<?> getContainer(long containerId) {
return containerMap.get(containerId);
}

/**
* Returns the max retry for a container map swap while acquiring container lock.
* @return max retry count
*/
public static int maxContainerMapSwapRetries() {
return MAX_CONTAINER_MAP_SWAP_RETRIES;
}

/**
* Locks the container mapped to {@code containerId} for write, and verifies that the instance locked is still
* the one stored in this set. If the mapping is swapped or the container no longer exists, unlocks and retries up to
* {@link #maxContainerMapSwapRetries()} times, then returns {@code null}.
*
* @return the locked container, or {@code null} if none mapped, or mapping could not be stabilized
*/
@Nullable
public Container<?> acquireContainerLock(long containerId) {
for (int retry = 0; retry < MAX_CONTAINER_MAP_SWAP_RETRIES; retry++) {
Container<?> candidate = getContainer(containerId);
if (candidate == null) {
LOG.info("Container {} no longer present in ContainerSet, skipping.", containerId);
return null;
}
candidate.writeLock();
Container<?> current = getContainer(containerId);
if (current == null) {
candidate.writeUnlock();
LOG.info("Container {} no longer exists in ContainerSet while acquiring lock.", containerId);
return null;
}
if (current != candidate) {
candidate.writeUnlock();
if (LOG.isDebugEnabled()) {
LOG.debug("Container {} mapping changed during lock acquisition (attempt {}); retrying.",
containerId, retry);
}
continue;
}
return candidate;
}
return null;
}

/**
* Removes container from both memory and database. This should be used when the containerData on disk has been
* removed completely from the node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,19 +512,19 @@ public BackgroundTaskResult call() {
return BackgroundTaskResult.EmptyTaskResult.newResult();
}

// Double check container state before acquiring lock to start move process.
// Container state may have changed after selection.
State containerState = container.getContainerData().getState();
if (!movableContainerStates.contains(containerState)) {
LOG.warn("Container {} is in {} state, skipping move process.", containerId, containerState);
postCall(false, startTime);
return BackgroundTaskResult.EmptyTaskResult.newResult();
}

// hold read lock on the container first, to avoid other threads to update the container state,
// such as block deletion.
container.readLock();
try {
// Double check container state after acquiring lock to start move process.
// Container state may have changed after selection.
State containerState = container.getContainerData().getState();
if (!movableContainerStates.contains(containerState)) {
LOG.warn("Container {} is in {} state, skipping move process.", containerId, containerState);
moveSucceeded = false;
return BackgroundTaskResult.EmptyTaskResult.newResult();
}

// Step 1: Copy container to new Volume's tmp Dir
diskBalancerTmpDir = getDiskBalancerTmpDir(destVolume)
.resolve(String.valueOf(containerId));
Expand Down Expand Up @@ -580,6 +580,10 @@ public BackgroundTaskResult call() {
// old caller can still hold the old Container object.
ozoneContainer.getContainerSet().updateContainer(newContainer);
destVolume.incrementUsedSpace(containerSize);

// Test injector: ContainerSet now references newContainer while this thread still holds
// readLock on the old replica.
pauseInjector();
// Mark old container as DELETED and persist state.
// markContainerForDelete require writeLock, so release readLock first
container.readUnlock();
Expand All @@ -597,13 +601,7 @@ public BackgroundTaskResult call() {
metrics.incrSuccessBytes(containerSize);
totalBalancedBytes.addAndGet(containerSize);
} catch (IOException e) {
if (injector != null) {
try {
injector.pause();
} catch (IOException ex) {
// do nothing
}
}
pauseInjector();
moveSucceeded = false;
LOG.warn("Failed to move container {}", containerId, e);
if (diskBalancerTmpDir != null) {
Expand Down Expand Up @@ -861,6 +859,17 @@ public static void setInjector(FaultInjector instance) {
injector = instance;
}

// call FaultInjector#pause when an injector is registered; ignore IOException.
private static void pauseInjector() {
if (injector != null) {
try {
injector.pause();
} catch (IOException ex) {
// do nothing
}
}
}

@VisibleForTesting
public void setReplicaDeletionDelay(long durationMills) {
this.replicaDeletionDelay = durationMills;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2293,24 +2293,31 @@ private boolean logBlocksFoundOnDisk(Container container) throws IOException {

private void deleteInternal(Container container, boolean force)
throws StorageContainerException {
final long containerId = container.getContainerData().getContainerID();
long startTime = clock.millis();
container.writeLock();
Container<?> containerLocked = containerSet.acquireContainerLock(containerId);
if (containerLocked == null) {
LOG.info("Exceeded {} retries to lock container {}; Now DN will resend for delete with " +
"the current container replica", ContainerSet.maxContainerMapSwapRetries(),
containerId);
return;
}
try {
final ContainerData data = container.getContainerData();
if (container.getContainerData().getVolume().isFailed()) {
final ContainerData data = containerLocked.getContainerData();
if (containerLocked.getContainerData().getVolume().isFailed()) {
// if the volume in which the container resides fails
// don't attempt to delete/move it. When a volume fails,
// failedVolumeListener will pick it up and clear the container
// from the container set.
LOG.info("Delete container issued on containerID {} which is in a " +
"failed volume. Skipping", container.getContainerData()
"failed volume. Skipping", containerLocked.getContainerData()
.getContainerID());
return;
}
// If force is false, we check container state.
if (!force) {
// Check if container is open
if (container.getContainerData().isOpen()) {
if (containerLocked.getContainerData().isOpen()) {
throw new StorageContainerException(
"Deletion of Open Container is not allowed.",
DELETE_ON_OPEN_CONTAINER);
Expand All @@ -2319,24 +2326,24 @@ private void deleteInternal(Container container, boolean force)
// If the container is not empty, it should not be deleted unless the
// container is being forcefully deleted (which happens when
// container is unhealthy or over-replicated).
if (container.hasBlocks()) {
if (containerLocked.hasBlocks()) {
metrics.incContainerDeleteFailedNonEmpty();
LOG.error("Received container deletion command for non-empty {}: {}", data, data.getStatistics());
// blocks table for future debugging.
// List blocks
logBlocksIfNonZero(container);
logBlocksIfNonZero(containerLocked);
// Log chunks
logBlocksFoundOnDisk(container);
logBlocksFoundOnDisk(containerLocked);
throw new StorageContainerException("Non-force deletion of " +
"non-empty container is not allowed.",
DELETE_ON_NON_EMPTY_CONTAINER);
}
} else {
metrics.incContainersForceDelete();
}
if (container.getContainerData() instanceof KeyValueContainerData) {
if (containerLocked.getContainerData() instanceof KeyValueContainerData) {
KeyValueContainerData keyValueContainerData =
(KeyValueContainerData) container.getContainerData();
(KeyValueContainerData) containerLocked.getContainerData();
HddsVolume hddsVolume = keyValueContainerData.getVolume();

// Steps to delete
Expand All @@ -2350,21 +2357,20 @@ private void deleteInternal(Container container, boolean force)
if (waitTime > maxDeleteLockWaitMs) {
LOG.warn("An attempt to delete container {} took {} ms acquiring locks and pre-checks. " +
"The delete has been skipped and should be retried automatically by SCM.",
container.getContainerData().getContainerID(), waitTime);
containerLocked.getContainerData().getContainerID(), waitTime);
return;
}
container.markContainerForDelete();
long containerId = container.getContainerData().getContainerID();
containerLocked.markContainerForDelete();
containerSet.removeContainer(containerId);
ContainerLogger.logDeleted(container.getContainerData(), force);
ContainerLogger.logDeleted(containerLocked.getContainerData(), force);
KeyValueContainerUtil.removeContainer(keyValueContainerData, conf);
} catch (IOException ioe) {
LOG.error("Failed to move container under " + hddsVolume
.getDeletedContainerDir());
String errorMsg =
"Failed to move container" + container.getContainerData()
"Failed to move container" + containerLocked.getContainerData()
.getContainerID();
triggerVolumeScanAndThrowException(container, errorMsg,
triggerVolumeScanAndThrowException(containerLocked, errorMsg,
CONTAINER_INTERNAL_ERROR);
}
}
Expand All @@ -2374,20 +2380,20 @@ private void deleteInternal(Container container, boolean force)
// All other IO Exceptions should be treated as if the container is not
// empty as a defensive check.
LOG.error("Could not determine if the container {} is empty",
container.getContainerData().getContainerID(), e);
containerLocked.getContainerData().getContainerID(), e);
String errorMsg =
"Failed to read container dir" + container.getContainerData()
"Failed to read container dir" + containerLocked.getContainerData()
.getContainerID();
triggerVolumeScanAndThrowException(container, errorMsg,
triggerVolumeScanAndThrowException(containerLocked, errorMsg,
CONTAINER_INTERNAL_ERROR);
} finally {
container.writeUnlock();
containerLocked.writeUnlock();
}
// Avoid holding write locks for disk operations
sendICR(container);
long bytesUsed = container.getContainerData().getBytesUsed();
HddsVolume volume = container.getContainerData().getVolume();
container.delete();
sendICR(containerLocked);
long bytesUsed = containerLocked.getContainerData().getBytesUsed();
HddsVolume volume = containerLocked.getContainerData().getVolume();
containerLocked.delete();
volume.decrementUsedSpace(bytesUsed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.BlockDeletingServiceMetrics;
import org.apache.hadoop.ozone.container.common.impl.BlockDeletingService;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.DBHandle;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
Expand All @@ -67,7 +68,7 @@ public class BlockDeletingTask implements BackgroundTask {

private final BlockDeletingServiceMetrics metrics;
private final int priority;
private final KeyValueContainerData containerData;
private KeyValueContainerData containerData;
private long blocksToDelete;
private final OzoneContainer ozoneContainer;
private final ConfigurationSource conf;
Expand Down Expand Up @@ -139,24 +140,36 @@ public BackgroundTaskResult call() throws Exception {

private ContainerBackgroundTaskResult handleDeleteTask() throws Exception {
ContainerBackgroundTaskResult crr;
final Container container = ozoneContainer.getContainerSet()
.getContainer(containerData.getContainerID());
container.writeLock();
File dataDir = new File(containerData.getChunksPath());
long startTime = Time.monotonicNow();
// Scan container's db and get list of under deletion blocks
try (DBHandle meta = BlockUtils.getDB(containerData, conf)) {
if (containerData.hasSchema(SCHEMA_V1)) {
crr = deleteViaSchema1(meta, container, dataDir, startTime);
} else if (containerData.hasSchema(SCHEMA_V2)) {
crr = deleteViaSchema2(meta, container, dataDir, startTime);
} else if (containerData.hasSchema(SCHEMA_V3)) {
crr = deleteViaSchema3(meta, container, dataDir, startTime);
} else {
throw new UnsupportedOperationException(
"Only schema version 1,2,3 are supported.");
ContainerSet cs = ozoneContainer.getContainerSet();
final long containerId = containerData.getContainerID();

Container<?> container = cs.acquireContainerLock(containerId);
if (container == null) {
LOG.warn("Exceeded {} attempts locking live container {}; giving up.", ContainerSet.maxContainerMapSwapRetries(),
containerId);
return new ContainerBackgroundTaskResult();
}
try {
// Always use ContainerData from the locked live Container so paths / RocksDB locations match deleteViaSchema*.
containerData = (KeyValueContainerData) container.getContainerData();

File dataDir = new File(containerData.getChunksPath());
long startTime = Time.monotonicNow();
// Scan container's db and get list of under deletion blocks
try (DBHandle meta = BlockUtils.getDB(containerData, conf)) {
if (containerData.hasSchema(SCHEMA_V1)) {
crr = deleteViaSchema1(meta, container, dataDir, startTime);
} else if (containerData.hasSchema(SCHEMA_V2)) {
crr = deleteViaSchema2(meta, container, dataDir, startTime);
} else if (containerData.hasSchema(SCHEMA_V3)) {
crr = deleteViaSchema3(meta, container, dataDir, startTime);
} else {
throw new UnsupportedOperationException(
"Only schema version 1,2,3 are supported.");
}
return crr;

}
return crr;
} finally {
container.writeUnlock();
}
Expand Down
Loading