diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java index 7153db02110e..3ef0a3bc6c77 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/checksum/ReconcileContainerTask.java @@ -17,7 +17,10 @@ package org.apache.hadoop.ozone.container.checksum; +import java.util.Collection; +import java.util.Collections; import java.util.Objects; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask; import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; @@ -79,6 +82,19 @@ public String getMetricDescriptionSegment() { return METRIC_DESCRIPTION_SEGMENT; } + @Override + public Collection getVolumes() { + org.apache.hadoop.ozone.container.common.interfaces.Container + container = controller.getContainer(getContainerId()); + if (container != null) { + HddsVolume vol = (HddsVolume) container.getContainerData().getVolume(); + if (vol != null) { + return Collections.singletonList(vol); + } + } + return Collections.emptyList(); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 2f53178e9bf9..c068c928ca96 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -195,6 +195,9 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService, } nextHB = new AtomicLong(Time.monotonicNow()); + ReplicationConfig replicationConfig = + conf.getObject(ReplicationConfig.class); + ContainerImporter importer = new ContainerImporter(conf, container.getContainerSet(), container.getController(), @@ -205,15 +208,14 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService, importer, new SimpleContainerDownloader(conf, certClient)); ContainerReplicator pushReplicator = new PushReplicator(conf, - new OnDemandContainerReplicationSource(container.getController()), + new OnDemandContainerReplicationSource(container.getController(), + replicationConfig), new GrpcContainerUploader(conf, certClient, container.getController()) ); pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull"); pushReplicatorWithMetrics = new MeasuredReplicator(pushReplicator, "push"); - ReplicationConfig replicationConfig = - conf.getObject(ReplicationConfig.class); supervisor = ReplicationSupervisor.newBuilder() .stateContext(context) .datanodeConfig(dnConf) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java index 6a5de8bc349a..a7f011f42b34 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java @@ -54,7 +54,8 @@ public void handle(SCMCommand command, OzoneContainer container, ECReconstructionCommandInfo reconstructionCommandInfo = new ECReconstructionCommandInfo(ecContainersCommand); ECReconstructionCoordinatorTask task = new ECReconstructionCoordinatorTask( - coordinator, reconstructionCommandInfo); + coordinator, reconstructionCommandInfo, container.getController(), + context.getParent().getDatanodeDetails()); this.supervisor.addTask(task); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java index 135c6fdb0391..6b73fcbcd227 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java @@ -83,7 +83,8 @@ public void handle(SCMCommand command, OzoneContainer container, replicateCommand.getTargetDatanode() == null ? downloadReplicator : pushReplicator; - ReplicationTask task = new ReplicationTask(replicateCommand, replicator); + ReplicationTask task = new ReplicationTask(replicateCommand, replicator, + container.getController()); supervisor.addTask(task); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index 310c46de5294..e71cd4e2f1ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import org.apache.commons.io.FileUtils; @@ -107,6 +108,9 @@ public class HddsVolume extends StorageVolume { private final AtomicBoolean dbLoaded = new AtomicBoolean(false); private final AtomicBoolean dbLoadFailure = new AtomicBoolean(false); + private final AtomicInteger activeOutboundReplications = + new AtomicInteger(0); + /** * Builder for HddsVolume. */ @@ -629,4 +633,16 @@ public void compactDb() { LOG.warn("compact rocksdb error in {}", dbFilePath, e); } } + + public int incActiveOutboundReplications() { + return activeOutboundReplications.incrementAndGet(); + } + + public int decActiveOutboundReplications() { + return activeOutboundReplications.decrementAndGet(); + } + + public int getActiveOutboundReplications() { + return activeOutboundReplications.get(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java index e9535c6afe80..bc74d25ebacb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinatorTask.java @@ -17,7 +17,15 @@ package org.apache.hadoop.ozone.container.ec.reconstruction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Objects; +import java.util.SortedMap; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask; import org.apache.hadoop.util.Time; import org.slf4j.Logger; @@ -32,18 +40,24 @@ public class ECReconstructionCoordinatorTask LoggerFactory.getLogger(ECReconstructionCoordinatorTask.class); private final ECReconstructionCoordinator reconstructionCoordinator; private final ECReconstructionCommandInfo reconstructionCommandInfo; + private final ContainerController containerController; + private final DatanodeDetails self; private final String debugString; public static final String METRIC_NAME = "ECReconstructions"; public static final String METRIC_DESCRIPTION_SEGMENT = "EC reconstructions"; public ECReconstructionCoordinatorTask( ECReconstructionCoordinator coordinator, - ECReconstructionCommandInfo reconstructionCommandInfo) { + ECReconstructionCommandInfo reconstructionCommandInfo, + ContainerController containerController, + DatanodeDetails self) { super(reconstructionCommandInfo.getContainerID(), reconstructionCommandInfo.getDeadline(), reconstructionCommandInfo.getTerm()); this.reconstructionCoordinator = coordinator; this.reconstructionCommandInfo = reconstructionCommandInfo; + this.containerController = containerController; + this.self = self; debugString = reconstructionCommandInfo.toString(); } @@ -117,4 +131,32 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(getContainerId()); } + + @Override + public Collection getVolumes() { + if (containerController == null || self == null) { + return Collections.emptyList(); + } + List volumes = new ArrayList<>(); + SortedMap sources = + reconstructionCommandInfo.getSourceNodeMap(); + boolean hasLocalSource = false; + for (DatanodeDetails dn : sources.values()) { + if (dn.equals(self)) { + hasLocalSource = true; + break; + } + } + if (hasLocalSource) { + org.apache.hadoop.ozone.container.common.interfaces.Container + container = containerController.getContainer(getContainerId()); + if (container != null) { + HddsVolume vol = (HddsVolume) container.getContainerData().getVolume(); + if (vol != null) { + volumes.add(vol); + } + } + } + return volumes; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java index 05932e6edf79..bde75000f2ae 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java @@ -22,7 +22,9 @@ import java.time.Clock; import java.time.Instant; import java.time.ZoneId; +import java.util.Collection; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; /** * Abstract class to capture common variables and methods for different types @@ -90,6 +92,11 @@ public long getDeadline() { return deadlineMsSinceEpoch; } + /** + * Returns any volumes associated with this task. + */ + public abstract Collection getVolumes(); + /** * Abstract method which needs to be overridden by the sub classes to execute * the task. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java index 422dd370d1fd..2ff991907f6a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java @@ -17,12 +17,14 @@ package org.apache.hadoop.ozone.container.replication; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; @@ -34,10 +36,13 @@ public class OnDemandContainerReplicationSource implements ContainerReplicationSource { private final ContainerController controller; + private final ReplicationServer.ReplicationConfig config; public OnDemandContainerReplicationSource( - ContainerController controller) { + ContainerController controller, + ReplicationServer.ReplicationConfig config) { this.controller = controller; + this.config = config; } @Override @@ -57,8 +62,24 @@ public void copyData(long containerId, OutputStream destination, " is not found.", CONTAINER_NOT_FOUND); } - controller.exportContainer( - container.getContainerType(), containerId, destination, - new TarContainerPacker(compression)); + HddsVolume volume = (HddsVolume) container.getContainerData().getVolume(); + if (volume != null) { + if (volume.getActiveOutboundReplications() >= + config.getVolumeOutboundLimit()) { + throw new StorageContainerException("Volume " + volume.getStorageID() + + " has reached the maximum number of concurrent replication reads (" + + config.getVolumeOutboundLimit() + ")", CONTAINER_INTERNAL_ERROR); + } + volume.incActiveOutboundReplications(); + } + try { + controller.exportContainer( + container.getContainerType(), containerId, destination, + new TarContainerPacker(compression)); + } finally { + if (volume != null) { + volume.decActiveOutboundReplications(); + } + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index 34b5a799548d..d3bb14d8c505 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -63,6 +63,8 @@ public class ReplicationServer { private ContainerController controller; + private ReplicationConfig replicationConfig; + private int port; private final ContainerImporter importer; @@ -75,6 +77,7 @@ public ReplicationServer(ContainerController controller, this.secConf = secConf; this.caClient = caClient; this.controller = controller; + this.replicationConfig = replicationConfig; this.importer = importer; this.port = replicationConfig.getPort(); @@ -103,7 +106,8 @@ public ReplicationServer(ContainerController controller, public void init() { GrpcReplicationService grpcReplicationService = new GrpcReplicationService( - new OnDemandContainerReplicationSource(controller), importer); + new OnDemandContainerReplicationSource(controller, replicationConfig), + importer); NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .addService(ServerInterceptors.intercept( @@ -225,10 +229,27 @@ public static final class ReplicationConfig { ) private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT; + @Config(key = "hdds.datanode.replication.volume.outbound.limit", + type = ConfigType.INT, + defaultValue = "2", + tags = {DATANODE}, + description = "The maximum number of concurrent replication reads " + + "allowed per physical disk volume." + ) + private int volumeOutboundLimit = 2; + public double getOutOfServiceFactor() { return outOfServiceFactor; } + public int getVolumeOutboundLimit() { + return volumeOutboundLimit; + } + + public void setVolumeOutboundLimit(int limit) { + this.volumeOutboundLimit = limit; + } + public int scaleOutOfServiceLimit(int original) { return (int) Math.ceil(original * outOfServiceFactor); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 8dee840db226..cbd5fbfc00ce 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -25,22 +25,29 @@ import java.time.Clock; import java.time.Instant; import java.time.ZoneId; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Objects; import java.util.OptionalLong; +import java.util.PriorityQueue; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.IntConsumer; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -51,6 +58,7 @@ import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.util.Time; @@ -79,6 +87,7 @@ public final class ReplicationSupervisor { private final Map timeoutCounter = new ConcurrentHashMap<>(); private final Map skippedCounter = new ConcurrentHashMap<>(); private final Map queuedCounter = new ConcurrentHashMap<>(); + private final AtomicLong volumeOutboundConcurrencyWaitTotal; private final MetricsRegistry registry; private final Map opsLatencyMs = new ConcurrentHashMap<>(); @@ -116,6 +125,8 @@ public static class Builder { private DatanodeConfiguration datanodeConfig; private ExecutorService executor; private Clock clock; + private final AtomicLong volumeOutboundConcurrencyWaitTotal = + new AtomicLong(0); private IntConsumer executorThreadUpdater = threadCount -> { }; @@ -173,11 +184,14 @@ public ReplicationSupervisor build() { .setDaemon(true) .setNameFormat(threadNamePrefix + "ContainerReplicationThread-%d") .build(); + VolumeAwarePriorityQueue vaQueue = + new VolumeAwarePriorityQueue(replicationConfig, + volumeOutboundConcurrencyWaitTotal); ThreadPoolExecutor tpe = new ThreadPoolExecutor( replicationConfig.getReplicationMaxStreams(), replicationConfig.getReplicationMaxStreams(), 60, TimeUnit.SECONDS, - new PriorityBlockingQueue<>(), + vaQueue, threadFactory); executor = tpe; executorThreadUpdater = threadCount -> { @@ -192,7 +206,8 @@ public ReplicationSupervisor build() { } return new ReplicationSupervisor(context, executor, replicationConfig, - datanodeConfig, clock, executorThreadUpdater); + datanodeConfig, clock, executorThreadUpdater, + volumeOutboundConcurrencyWaitTotal); } } @@ -206,7 +221,8 @@ public static Map getMetricsMap() { private ReplicationSupervisor(StateContext context, ExecutorService executor, ReplicationConfig replicationConfig, DatanodeConfiguration datanodeConfig, - Clock clock, IntConsumer executorThreadUpdater) { + Clock clock, IntConsumer executorThreadUpdater, + AtomicLong volumeOutboundConcurrencyWaitTotal) { this.inFlight = ConcurrentHashMap.newKeySet(); this.context = context; this.executor = executor; @@ -215,6 +231,7 @@ private ReplicationSupervisor(StateContext context, ExecutorService executor, maxQueueSize = datanodeConfig.getCommandQueueLimit(); this.clock = clock; this.executorThreadUpdater = executorThreadUpdater; + this.volumeOutboundConcurrencyWaitTotal = volumeOutboundConcurrencyWaitTotal; // set initial state if (context != null) { @@ -424,6 +441,16 @@ public void run() { LOG.warn("Failed {}", this, e); failureCounter.get(task.getMetricName()).incrementAndGet(); } finally { + for (HddsVolume vol : task.getVolumes()) { + vol.decActiveOutboundReplications(); + } + if (executor instanceof ThreadPoolExecutor) { + BlockingQueue queue = + ((ThreadPoolExecutor) executor).getQueue(); + if (queue instanceof VolumeAwarePriorityQueue) { + ((VolumeAwarePriorityQueue) queue).signalVolumeAvailable(); + } + } queuedCounter.get(task.getMetricName()).decrementAndGet(); opsLatencyMs.get(task.getMetricName()).add(Time.monotonicNow() - startTime); inFlight.remove(task); @@ -536,6 +563,10 @@ public long getReplicationSkippedCount(String metricsName) { return counter != null ? counter.get() : 0; } + public long getVolumeOutboundConcurrencyWaitTotal() { + return volumeOutboundConcurrencyWaitTotal.get(); + } + public long getReplicationQueuedCount() { return getCount(queuedCounter); } @@ -554,4 +585,146 @@ public long getReplicationRequestTotalTime(String metricsName) { MutableRate rate = opsLatencyMs.get(metricsName); return rate != null ? (long) Math.ceil(rate.lastStat().total()) : 0; } + + /** + * A custom implementation of a PriorityBlockingQueue that is aware of the + * outbound replication limit per volume. + * It skips over tasks whose volumes are currently at their limit. + */ + private static final class VolumeAwarePriorityQueue + extends LinkedBlockingQueue { + + private static final long serialVersionUID = 1L; + + private final transient PriorityQueue queue; + private final transient Lock lock = new ReentrantLock(); + private final transient Condition notEmpty = lock.newCondition(); + private final transient ReplicationConfig replicationConfig; + private final transient AtomicLong volumeWaitCounter; + + private VolumeAwarePriorityQueue(ReplicationConfig config, + AtomicLong volumeWaitCounter) { + this.replicationConfig = config; + this.volumeWaitCounter = volumeWaitCounter; + queue = new PriorityQueue<>(TASK_RUNNER_COMPARATOR); + } + + @Override + public boolean offer(Runnable r) { + if (!(r instanceof TaskRunner)) { + return false; + } + TaskRunner taskRunner = (TaskRunner) r; + lock.lock(); + try { + boolean added = queue.offer(taskRunner); + if (added) { + notEmpty.signal(); + } + return added; + } finally { + lock.unlock(); + } + } + + @Override + public Runnable take() throws InterruptedException { + lock.lock(); + try { + while (true) { + TaskRunner task = findRunnableTask(); + if (task != null) { + return task; + } + notEmpty.await(); + } + } finally { + lock.unlock(); + } + } + + @Override + public Runnable poll(long timeout, TimeUnit unit) + throws InterruptedException { + long nanos = unit.toNanos(timeout); + lock.lock(); + try { + while (true) { + TaskRunner task = findRunnableTask(); + if (task != null) { + return task; + } + if (nanos <= 0) { + return null; + } + nanos = notEmpty.awaitNanos(nanos); + } + } finally { + lock.unlock(); + } + } + + private TaskRunner findRunnableTask() { + Iterator it = queue.iterator(); + while (it.hasNext()) { + TaskRunner taskRunner = it.next(); + Collection volumes = taskRunner.task.getVolumes(); + boolean canRun = true; + for (HddsVolume vol : volumes) { + if (vol.getActiveOutboundReplications() >= + replicationConfig.getVolumeOutboundLimit()) { + canRun = false; + volumeWaitCounter.incrementAndGet(); + break; + } + } + if (canRun) { + it.remove(); + // Pre-increment to reserve the slot + for (HddsVolume vol : volumes) { + vol.incActiveOutboundReplications(); + } + return taskRunner; + } + } + return null; + } + + /** + * Signal the queue that a volume slot has become available. + */ + public void signalVolumeAvailable() { + lock.lock(); + try { + notEmpty.signalAll(); + } finally { + lock.unlock(); + } + } + + @Override + public int size() { + lock.lock(); + try { + return queue.size(); + } finally { + lock.unlock(); + } + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public void clear() { + lock.lock(); + try { + queue.clear(); + } finally { + lock.unlock(); + } + } + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java index 64854e1ea2c4..1164c36a145d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisorMetrics.java @@ -85,7 +85,11 @@ public void getMetrics(MetricsCollector collector, boolean all) { supervisor.getReplicationSkippedCount()) .addGauge(Interns.info("maxReplicationStreams", "Maximum number of " + "concurrent replication tasks which can run simultaneously"), - supervisor.getMaxReplicationStreams()); + supervisor.getMaxReplicationStreams()) + .addGauge(Interns.info("volumeOutboundConcurrencyWaitTotal", + "Total number of times a replication task was delayed due to " + + "volume outbound limit"), + supervisor.getVolumeOutboundConcurrencyWaitTotal()); Map metricsMap = ReplicationSupervisor.getMetricsMap(); if (!metricsMap.isEmpty()) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java index a32e9b41ab1b..6a61f9988a35 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -17,9 +17,13 @@ package org.apache.hadoop.ozone.container.replication; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Objects; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; /** @@ -29,6 +33,7 @@ public class ReplicationTask extends AbstractReplicationTask { private final ReplicateContainerCommand cmd; private final ContainerReplicator replicator; + private final ContainerController containerController; private final String debugString; public static final String METRIC_NAME = "ContainerReplications"; public static final String METRIC_DESCRIPTION_SEGMENT = "container replications"; @@ -39,11 +44,13 @@ public class ReplicationTask extends AbstractReplicationTask { private long transferredBytes; public ReplicationTask(ReplicateContainerCommand cmd, - ContainerReplicator replicator) { + ContainerReplicator replicator, + ContainerController containerController) { super(cmd.getContainerID(), cmd.getDeadline(), cmd.getTerm()); setPriority(cmd.getPriority()); this.cmd = cmd; this.replicator = replicator; + this.containerController = containerController; if (cmd.getTargetDatanode() != null) { // Only push replication will have a target datanode set, and it must be // sent to the source datanode to be executed. It is possible the source @@ -54,6 +61,11 @@ public ReplicationTask(ReplicateContainerCommand cmd, debugString = cmd.toString(); } + public ReplicationTask(ReplicateContainerCommand cmd, + ContainerReplicator replicator) { + this(cmd, replicator, null); + } + /** * Intended to only be used in tests. */ @@ -63,7 +75,7 @@ protected ReplicationTask( ContainerReplicator replicator ) { this(ReplicateContainerCommand.fromSources(containerId, sources), - replicator); + replicator, null); } @Override @@ -133,4 +145,20 @@ DatanodeDetails getTarget() { public void runTask() { replicator.replicate(this); } + + @Override + public Collection getVolumes() { + if (cmd.getTargetDatanode() != null && containerController != null) { + // This is a push command, we are the source. + org.apache.hadoop.ozone.container.common.interfaces.Container + container = containerController.getContainer(getContainerId()); + if (container != null) { + HddsVolume vol = (HddsVolume) container.getContainerData().getVolume(); + if (vol != null) { + return Collections.singletonList(vol); + } + } + } + return Collections.emptyList(); + } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java index 8b831fa06466..a94666b70a2d 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java @@ -191,14 +191,15 @@ public void testDownload() throws IOException { @Test public void testUpload() { ContainerReplicationSource source = - new OnDemandContainerReplicationSource(containerController); + new OnDemandContainerReplicationSource(containerController, + new ReplicationServer.ReplicationConfig()); GrpcContainerUploader uploader = new GrpcContainerUploader(conf, null, containerController); PushReplicator pushReplicator = new PushReplicator(conf, source, uploader); ReplicationTask task = - new ReplicationTask(toTarget(CONTAINER_ID, datanode), pushReplicator); + new ReplicationTask(toTarget(CONTAINER_ID, datanode), pushReplicator, null); pushReplicator.replicate(task); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java index 25a12be03b98..f47863a9bc4e 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestMeasuredReplicator.java @@ -64,9 +64,9 @@ public void closeReplicator() throws Exception { @Test public void measureFailureSuccessAndBytes() { //WHEN - measuredReplicator.replicate(new ReplicationTask(forTest(1), replicator)); - measuredReplicator.replicate(new ReplicationTask(forTest(2), replicator)); - measuredReplicator.replicate(new ReplicationTask(forTest(3), replicator)); + measuredReplicator.replicate(new ReplicationTask(forTest(1), replicator, null)); + measuredReplicator.replicate(new ReplicationTask(forTest(2), replicator, null)); + measuredReplicator.replicate(new ReplicationTask(forTest(3), replicator, null)); //THEN //even containers should be failed @@ -84,9 +84,9 @@ public void measureFailureSuccessAndBytes() { public void testReplicationTime() throws Exception { //WHEN //will wait at least the 300ms - measuredReplicator.replicate(new ReplicationTask(forTest(101), replicator)); - measuredReplicator.replicate(new ReplicationTask(forTest(201), replicator)); - measuredReplicator.replicate(new ReplicationTask(forTest(300), replicator)); + measuredReplicator.replicate(new ReplicationTask(forTest(101), replicator, null)); + measuredReplicator.replicate(new ReplicationTask(forTest(201), replicator, null)); + measuredReplicator.replicate(new ReplicationTask(forTest(300), replicator, null)); //THEN //even containers should be failed @@ -104,7 +104,7 @@ public void testReplicationTime() throws Exception { public void testFailureTimeSuccessExcluded() { //WHEN //will wait at least the 15ms - measuredReplicator.replicate(new ReplicationTask(forTest(15), replicator)); + measuredReplicator.replicate(new ReplicationTask(forTest(15), replicator, null)); //THEN @@ -116,7 +116,7 @@ public void testFailureTimeSuccessExcluded() { public void testSuccessTimeFailureExcluded() { //WHEN //will wait at least the 10ms - measuredReplicator.replicate(new ReplicationTask(forTest(10), replicator)); + measuredReplicator.replicate(new ReplicationTask(forTest(10), replicator, null)); //THEN @@ -127,7 +127,7 @@ public void testSuccessTimeFailureExcluded() { @Test public void testReplicationQueueTimeMetrics() { final Instant queued = Instant.now().minus(1, ChronoUnit.SECONDS); - ReplicationTask task = new ReplicationTask(forTest(100), replicator) { + ReplicationTask task = new ReplicationTask(forTest(100), replicator, null) { @Override public Instant getQueued() { return queued; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java index a4463410cea5..065894d57c39 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestPushReplicator.java @@ -69,7 +69,7 @@ void uploadCompletesNormally(CopyContainerCompression compression) ContainerReplicator subject = createSubject(containerID, target, output, completion, compression); ReplicationTask task = new ReplicationTask(toTarget(containerID, target), - subject); + subject, null); // WHEN subject.replicate(task); @@ -90,7 +90,7 @@ void uploadFailsWithException() throws IOException { ContainerReplicator subject = createSubject(containerID, target, output, completion, NO_COMPRESSION); ReplicationTask task = new ReplicationTask(toTarget(containerID, target), - subject); + subject, null); // WHEN subject.replicate(task); @@ -112,7 +112,7 @@ void packFailsWithException() throws IOException { ContainerReplicator subject = createSubject(containerID, target, output, completion, NO_COMPRESSION); ReplicationTask task = new ReplicationTask(toTarget(containerID, target), - subject); + subject, null); // WHEN subject.replicate(task); diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index 9ceb0a99e9f0..903e9f9ab1ad 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -53,6 +53,7 @@ import java.time.Instant; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.SortedMap; @@ -494,13 +495,13 @@ public void testTaskBeyondDeadline(ContainerLayoutVersion layout) { ReplicateContainerCommand cmd = createCommand(1); cmd.setDeadline(clock.millis() + 10000); - ReplicationTask task1 = new ReplicationTask(cmd, replicatorRef.get()); + ReplicationTask task1 = new ReplicationTask(cmd, replicatorRef.get(), null); cmd = createCommand(2); cmd.setDeadline(clock.millis() + 20000); - ReplicationTask task2 = new ReplicationTask(cmd, replicatorRef.get()); + ReplicationTask task2 = new ReplicationTask(cmd, replicatorRef.get(), null); cmd = createCommand(3); // No deadline set - ReplicationTask task3 = new ReplicationTask(cmd, replicatorRef.get()); + ReplicationTask task3 = new ReplicationTask(cmd, replicatorRef.get(), null); // no deadline set clock.fastForward(15000); @@ -532,8 +533,8 @@ public void testDatanodeOutOfService(ContainerLayoutVersion layout) { pushCmd.setTerm(CURRENT_TERM); ReplicateContainerCommand pullCmd = createCommand(2); - supervisor.addTask(new ReplicationTask(pushCmd, replicatorRef.get())); - supervisor.addTask(new ReplicationTask(pullCmd, replicatorRef.get())); + supervisor.addTask(new ReplicationTask(pushCmd, replicatorRef.get(), null)); + supervisor.addTask(new ReplicationTask(pullCmd, replicatorRef.get(), null)); assertEquals(2, supervisor.getReplicationRequestCount()); assertEquals(1, supervisor.getReplicationSuccessCount()); @@ -597,14 +598,14 @@ public void testMultipleReplication(ContainerLayoutVersion layout, ReplicateContainerCommand cmd1 = createCommand(6L); cmd1.setDeadline(clock.millis() + 10000); - ReplicationTask task1 = new ReplicationTask(cmd1, replicatorRef.get()); + ReplicationTask task1 = new ReplicationTask(cmd1, replicatorRef.get(), null); clock.fastForward(15000); replicationSupervisor.addTask(task1); ReconstructECContainersCommand cmd2 = createReconstructionCmd(7L); cmd2.setDeadline(clock.millis() + 10000); ECReconstructionCoordinatorTask task2 = new ECReconstructionCoordinatorTask( - ecReplicatorRef.get(), new ECReconstructionCommandInfo(cmd2)); + ecReplicatorRef.get(), new ECReconstructionCommandInfo(cmd2), null, datanode); clock.fastForward(15000); ecReconstructionSupervisor.addTask(task2); ecReconstructionSupervisor.addTask(createECTask(8L)); @@ -872,6 +873,11 @@ public void runTask() { "Interrupted waiting for the completion latch to be released"); setStatus(DONE); } + + @Override + public Collection getVolumes() { + return emptyList(); + } } private static class OrderedTask extends AbstractReplicationTask { @@ -907,6 +913,11 @@ public void runTask() { setStatus(DONE); completeLatch.countDown(); } + + @Override + public Collection getVolumes() { + return emptyList(); + } } private ReplicationSupervisor supervisorWithReplicator( @@ -948,7 +959,7 @@ private ReplicationSupervisor supervisorWithECReconstruction() throws IOExceptio private ReplicationTask createTask(long containerId) { ReplicateContainerCommand cmd = createCommand(containerId); - return new ReplicationTask(cmd, replicatorRef.get()); + return new ReplicationTask(cmd, replicatorRef.get(), null); } private ReconcileContainerTask createReconciliationTask(long containerId) { @@ -962,13 +973,13 @@ private ReconcileContainerTask createReconciliationTask(long containerId) { private ECReconstructionCoordinatorTask createECTask(long containerId) { return new ECReconstructionCoordinatorTask(null, - createReconstructionCmdInfo(containerId)); + createReconstructionCmdInfo(containerId), null, null); } private ECReconstructionCoordinatorTask createECTaskWithCoordinator(long containerId) { ECReconstructionCommandInfo ecReconstructionCommandInfo = createReconstructionCmdInfo(containerId); return new ECReconstructionCoordinatorTask(ecReplicatorRef.get(), - ecReconstructionCommandInfo); + ecReconstructionCommandInfo, null, null); } private static ReplicateContainerCommand createCommand(long containerId) { @@ -1191,7 +1202,7 @@ private void scheduleTasks( for (int i = 0; i < 10; i++) { List sources = singletonList(datanodes.get(i % datanodes.size())); - rs.addTask(new ReplicationTask(fromSources(i, sources), noopReplicator)); + rs.addTask(new ReplicationTask(fromSources(i, sources), noopReplicator, null)); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java index 0bf886569c9d..a26c340a3d01 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java @@ -278,17 +278,40 @@ private int processMissingIndexes( List availableSourceNodes, List excludedNodes, List usedNodes) throws IOException { + List missingIndexes = replicaCount.unavailableIndexes(true); + if (missingIndexes.isEmpty()) { + return 0; + } + return processReconstruction(replicaCount, missingIndexes, sources, + availableSourceNodes, excludedNodes, usedNodes); + } + + /** + * Core logic to schedule an EC reconstruction command. + * + * @param replicaCount the current replica count of the container + * @param missingIndexes the indexes that need to be reconstructed + * @param sources available source replicas + * @param availableSourceNodes healthy source nodes + * @param excludedNodes nodes to be excluded from target selection + * @param usedNodes nodes already used for this container + * @return number of commands sent + * @throws IOException if an error occurs + */ + private int processReconstruction( + ECContainerReplicaCount replicaCount, + List missingIndexes, + Map> sources, + List availableSourceNodes, + List excludedNodes, + List usedNodes) throws IOException { ContainerInfo container = replicaCount.getContainer(); ECReplicationConfig repConfig = - (ECReplicationConfig)container.getReplicationConfig(); - List missingIndexes = replicaCount.unavailableIndexes(true); - LOG.debug("Processing missing indexes {} for container {}.", missingIndexes, - container.containerID()); + (ECReplicationConfig) container.getReplicationConfig(); + LOG.debug("Processing reconstruction of indexes {} for container {}.", + missingIndexes, container.containerID()); final int expectedTargetCount = missingIndexes.size(); boolean recoveryIsCritical = expectedTargetCount == repConfig.getParity(); - if (expectedTargetCount == 0) { - return 0; - } int commandsSent = 0; if (sources.size() >= repConfig.getData()) { @@ -297,9 +320,9 @@ private int processMissingIndexes( final boolean hasOverloaded = !excludedDueToLoad.isEmpty(); final List excludedOrOverloadedNodes = hasOverloaded ? new ArrayList<>(ImmutableSet.builder() - .addAll(excludedNodes) - .addAll(excludedDueToLoad) - .build()) + .addAll(excludedNodes) + .addAll(excludedDueToLoad) + .build()) : excludedNodes; // placement with overloaded nodes excluded @@ -350,12 +373,43 @@ private int processMissingIndexes( } if (0 < targetCount) { usedNodes.addAll(selectedDatanodes); - // TODO - what are we adding all the selected nodes to available - // sources? availableSourceNodes.addAll(selectedDatanodes); List sourceDatanodesWithIndex = new ArrayList<>(); - for (Pair src : sources.values()) { + + // If we have more than the required number of data blocks, we can + // prefer IN_SERVICE nodes to avoid overloading decommissioning nodes. + List> sortedSources = + sources.values().stream() + .sorted((p1, p2) -> { + boolean p1InService = + p1.getRight().getOperationalState() == IN_SERVICE; + boolean p2InService = + p2.getRight().getOperationalState() == IN_SERVICE; + if (p1InService && !p2InService) { + return -1; + } + if (!p1InService && p2InService) { + return 1; + } + return 0; + }) + .collect(Collectors.toList()); + + int inServiceCount = 0; + for (Pair src : sortedSources) { + if (src.getRight().getOperationalState() == IN_SERVICE) { + inServiceCount++; + } + } + + for (Pair src : sortedSources) { + // If we have enough in-service nodes to fulfill the reconstruction + // requirements, we skip any out-of-service nodes. + if (inServiceCount >= repConfig.getData() && + src.getRight().getOperationalState() != IN_SERVICE) { + continue; + } sourceDatanodesWithIndex.add( new ReconstructECContainersCommand .DatanodeDetailsAndReplicaIndex( @@ -368,11 +422,6 @@ private int processMissingIndexes( sourceDatanodesWithIndex, selectedDatanodes, integers2ByteString(missingIndexes), repConfig); - // This can throw a CommandTargetOverloadedException, but there is no - // point in retrying here. The sources we picked already have the - // overloaded nodes excluded, so we should not get an overloaded - // exception, but it could happen due to other threads adding work to - // the DNs. If it happens here, we just let the exception bubble up. replicationManager.sendThrottledReconstructionCommand( container, reconstructionCommand); for (int i = 0; i < missingIndexes.size(); i++) { @@ -383,7 +432,7 @@ private int processMissingIndexes( } if (targetCount != expectedTargetCount) { LOG.debug("Insufficient nodes were returned from the placement policy" + - " to fully reconstruct container {}. Requested {} received {}", + " to fully reconstruct container {}. Requested {} received {}", container.getContainerID(), expectedTargetCount, targetCount); if (hasOverloaded && recoveryIsCritical) { metrics.incrECPartialReconstructionCriticalTotal(); @@ -400,8 +449,6 @@ private int processMissingIndexes( + " {}. Available sources are: {}", container.containerID(), repConfig.getData(), sources.size(), sources); } - LOG.trace("Sent {} commands for container {}.", commandsSent, - container.containerID()); return commandsSent; } @@ -429,71 +476,89 @@ private int processDecommissioningIndexes( throws IOException { ContainerInfo container = replicaCount.getContainer(); Set decomIndexes = replicaCount.decommissioningOnlyIndexes(true); - int commandsSent = 0; - if (!decomIndexes.isEmpty()) { - LOG.debug("Processing decommissioning indexes {} for container {}.", - decomIndexes, container.containerID()); - final List selectedDatanodes = getTargetDatanodes( - container, decomIndexes.size(), usedNodes, excludedNodes); + if (decomIndexes.isEmpty()) { + return 0; + } - ContainerPlacementStatus placementStatusWithSelectedTargets = - validatePlacement(container, availableSourceNodes, selectedDatanodes); - if (!placementStatusWithSelectedTargets.isPolicySatisfied()) { - LOG.debug("Target nodes + existing nodes for EC container {}" + - " will not satisfy placement policy {}. Reason: {}. Selected" + - " nodes: {}. Available source nodes: {}. Resuming recovery " + - "regardless.", - container.containerID(), containerPlacement.getClass().getName(), - placementStatusWithSelectedTargets.misReplicatedReason(), - selectedDatanodes, availableSourceNodes); + if (replicationManager.getConfig().isEcDecommissionReconstructionEnabled()) { + for (Integer index : decomIndexes) { + Pair source = sources.get(index); + if (source != null && replicationManager.isNodeHighlyLoaded( + source.getLeft().getDatanodeDetails())) { + LOG.info("Source node {} is highly loaded, switching to " + + "reconstruction for decommissioning container {}", + source.getLeft().getDatanodeDetails(), container.containerID()); + metrics.incrEcReconstructionDecommissionTriggeredTotal(); + return processReconstruction(replicaCount, + new ArrayList<>(decomIndexes), sources, availableSourceNodes, + excludedNodes, usedNodes); + } } + } - usedNodes.addAll(selectedDatanodes); - Iterator iterator = selectedDatanodes.iterator(); - // In this case we need to do one to one copy. - CommandTargetOverloadedException overloadedException = null; - for (Integer decomIndex : decomIndexes) { - Pair source = sources.get(decomIndex); - if (source == null) { - LOG.warn("Cannot find source replica for decommissioning index " + - "{} in container {}", decomIndex, container.containerID()); - continue; - } - ContainerReplica sourceReplica = source.getLeft(); - if (!iterator.hasNext()) { - LOG.warn("Couldn't find enough targets. Available source" - + " nodes: {}, the target nodes: {}, excluded nodes: {}," - + " usedNodes: {}, and the decommission indexes: {}", - sources.values().stream() - .map(Pair::getLeft).collect(Collectors.toSet()), - selectedDatanodes, excludedNodes, usedNodes, decomIndexes); - break; - } - try { - createReplicateCommand( - container, iterator, sourceReplica, replicaCount); - commandsSent++; - } catch (CommandTargetOverloadedException e) { - LOG.debug("Unable to send Replicate command for container {}" + - " index {} because the source node {} is overloaded.", - container.getContainerID(), sourceReplica.getReplicaIndex(), - sourceReplica.getDatanodeDetails()); - overloadedException = e; - } + LOG.debug("Processing decommissioning indexes {} for container {}.", + decomIndexes, container.containerID()); + final List selectedDatanodes = getTargetDatanodes( + container, decomIndexes.size(), usedNodes, excludedNodes); + + ContainerPlacementStatus placementStatusWithSelectedTargets = + validatePlacement(container, availableSourceNodes, selectedDatanodes); + if (!placementStatusWithSelectedTargets.isPolicySatisfied()) { + LOG.debug("Target nodes + existing nodes for EC container {}" + + " will not satisfy placement policy {}. Reason: {}. Selected" + + " nodes: {}. Available source nodes: {}. Resuming recovery " + + "regardless.", + container.containerID(), containerPlacement.getClass().getName(), + placementStatusWithSelectedTargets.misReplicatedReason(), + selectedDatanodes, availableSourceNodes); + } + + usedNodes.addAll(selectedDatanodes); + Iterator iterator = selectedDatanodes.iterator(); + // In this case we need to do one to one copy. + int commandsSent = 0; + CommandTargetOverloadedException overloadedException = null; + for (Integer decomIndex : decomIndexes) { + Pair source = sources.get(decomIndex); + if (source == null) { + LOG.warn("Cannot find source replica for decommissioning index " + + "{} in container {}", decomIndex, container.containerID()); + continue; + } + ContainerReplica sourceReplica = source.getLeft(); + if (!iterator.hasNext()) { + LOG.warn("Couldn't find enough targets. Available source" + + " nodes: {}, the target nodes: {}, excluded nodes: {}," + + " usedNodes: {}, and the decommission indexes: {}", + sources.values().stream() + .map(Pair::getLeft).collect(Collectors.toSet()), + selectedDatanodes, excludedNodes, usedNodes, decomIndexes); + break; } - if (overloadedException != null) { - throw overloadedException; + try { + createReplicateCommand( + container, iterator, sourceReplica, replicaCount); + commandsSent++; + } catch (CommandTargetOverloadedException e) { + LOG.debug("Unable to send Replicate command for container {}" + + " index {} because the source node {} is overloaded.", + container.getContainerID(), sourceReplica.getReplicaIndex(), + sourceReplica.getDatanodeDetails()); + overloadedException = e; } + } + if (overloadedException != null) { + throw overloadedException; + } - if (selectedDatanodes.size() != decomIndexes.size()) { - LOG.debug("Insufficient nodes were returned from the placement policy" + - " to fully replicate the decommission indexes for container {}." + - " Requested {} received {}", container.getContainerID(), - decomIndexes.size(), selectedDatanodes.size()); - metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal(); - throw new InsufficientDatanodesException(decomIndexes.size(), - selectedDatanodes.size()); - } + if (selectedDatanodes.size() != decomIndexes.size()) { + LOG.debug("Insufficient nodes were returned from the placement policy" + + " to fully replicate the decommission indexes for container {}." + + " Requested {} received {}", container.getContainerID(), + decomIndexes.size(), selectedDatanodes.size()); + metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal(); + throw new InsufficientDatanodesException(decomIndexes.size(), + selectedDatanodes.size()); } LOG.trace("Sent {} commands for container {}.", commandsSent, container.containerID()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java index fff263fee341..6de6cc771883 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java @@ -54,6 +54,11 @@ protected boolean inflightOperationLimitReached(ReplicationManager rm, return false; } + @Override + protected boolean reconstructionLimitReached(ReplicationManager rm) { + return false; + } + @Override protected int sendDatanodeCommands( ReplicationManager replicationManager, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 8cd8444d1d2f..48a7981365f1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -159,6 +160,18 @@ public class ReplicationManager implements SCMService, ContainerReplicaPendingOp private final Map excludedNodes = new ConcurrentHashMap<>(); + /** + * Track the number of active EC reconstruction commands across the cluster. + */ + private final AtomicInteger inflightReconstructionCount = new AtomicInteger(0); + + /** + * Mapping from reconstruction command ID to the number of pending fragments + * for that command. Used to know when the whole command is finished. + */ + private final Map reconstructionCommandIdToPendingFragmentCount = + new ConcurrentHashMap<>(); + /** * SCMService related variables. * After leaving safe mode, replicationMonitor needs to wait for a while @@ -422,6 +435,54 @@ public long getInflightReplicationCount() { .getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD); } + /** + * Returns the number of active EC reconstruction commands currently in + * progress across the cluster. + */ + public int getInflightReconstructionCount() { + return inflightReconstructionCount.get(); + } + + /** + * Returns the maximum number of inflight reconstruction commands allowed + * across the cluster at any given time. + * @return the maximum number of inflight reconstruction commands allowed + */ + public int getReconstructionInFlightLimit() { + return rmConf.getReconstructionGlobalLimit(); + } + + /** + * Returns true if the number of inflight reconstruction commands has reached + * the global limit. + * @return true if the limit is reached, false otherwise + */ + public boolean isReconstructionLimitReached() { + int limit = getReconstructionInFlightLimit(); + return limit > 0 && getInflightReconstructionCount() >= limit; + } + + /** + * Returns true if the given datanode's replication load (queued replication + * and reconstruction commands) exceeds the configured load factor threshold. + * + * @param datanode the datanode to check + * @return true if the node is highly loaded, false otherwise + */ + public boolean isNodeHighlyLoaded(DatanodeDetails datanode) { + try { + int limit = getReplicationLimit(datanode); + if (limit <= 0) { + return true; + } + double loadFactor = (double) getQueuedReplicationCount(datanode) / limit; + return loadFactor >= rmConf.getEcDecommissionReconstructionLoadFactor(); + } catch (NodeNotFoundException e) { + LOG.warn("Node {} not found when checking load factor", datanode, e); + return true; + } + } + /** * Sends delete container command for the given container to the given * datanode. @@ -697,6 +758,8 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, containerReplicaPendingOps.scheduleAddReplica(containerInfo.containerID(), targets.get(i), targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs, requiredSize, clock.millis()); } + inflightReconstructionCount.incrementAndGet(); + reconstructionCommandIdToPendingFragmentCount.put(cmd.getId(), targetIndexes.size()); getMetrics().incrEcReconstructionCmdsSentTotal(); } else if (cmd.getType() == Type.replicateContainerCommand) { ReplicateContainerCommand rcc = (ReplicateContainerCommand) cmd; @@ -1074,6 +1137,19 @@ ReplicationQueue getQueue() { @Override public void opCompleted(ContainerReplicaOp op, ContainerID containerID, boolean timedOut) { + if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD + && op.getCommand() != null + && op.getCommand().getType() == Type.reconstructECContainersCommand) { + long cmdId = op.getCommand().getId(); + reconstructionCommandIdToPendingFragmentCount.compute(cmdId, (k, v) -> { + if (v == null || v <= 1) { + inflightReconstructionCount.decrementAndGet(); + return null; + } + return v - 1; + }); + } + if (!(timedOut && op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE)) { // We only care about expired delete ops. All others should be ignored. return; @@ -1291,6 +1367,38 @@ public static class ReplicationManagerConfiguration ) private int containerSampleLimit = 100; + @Config(key = "hdds.scm.replication.decommission.ec.reconstruction.enabled", + type = ConfigType.BOOLEAN, + defaultValue = "false", + reconfigurable = true, + tags = { SCM }, + description = "If true, SCM will switch from 1-1 replication to " + + "multi-source reconstruction for EC containers on decommissioning " + + "nodes when the node's load exceeds the threshold." + ) + private boolean ecDecommissionReconstructionEnabled = false; + + @Config(key = "hdds.scm.replication.decommission.ec.reconstruction.load.factor", + type = ConfigType.DOUBLE, + defaultValue = "0.9", + reconfigurable = true, + tags = { SCM }, + description = "The threshold factor (between 0 and 1) of a node's " + + "replication limit at which SCM switches to reconstruction for " + + "EC decommission. Default is 0.9." + ) + private double ecDecommissionReconstructionLoadFactor = 0.9; + + @Config(key = "hdds.scm.replication.reconstruction.global.limit", + type = ConfigType.INT, + defaultValue = "50", + reconfigurable = true, + tags = { SCM }, + description = "A cluster-wide limit to restrict the total number of " + + "active EC reconstruction commands across the cluster." + ) + private int reconstructionGlobalLimit = 50; + @Config(key = "hdds.scm.replication.quasi.closed.stuck.best.origin.copies", type = ConfigType.INT, defaultValue = "3", @@ -1347,6 +1455,30 @@ public void setDatanodeReplicationLimit(int limit) { this.datanodeReplicationLimit = limit; } + public boolean isEcDecommissionReconstructionEnabled() { + return ecDecommissionReconstructionEnabled; + } + + public void setEcDecommissionReconstructionEnabled(boolean enabled) { + this.ecDecommissionReconstructionEnabled = enabled; + } + + public double getEcDecommissionReconstructionLoadFactor() { + return ecDecommissionReconstructionLoadFactor; + } + + public void setEcDecommissionReconstructionLoadFactor(double factor) { + this.ecDecommissionReconstructionLoadFactor = factor; + } + + public int getReconstructionGlobalLimit() { + return reconstructionGlobalLimit; + } + + public void setReconstructionGlobalLimit(int limit) { + this.reconstructionGlobalLimit = limit; + } + public void setMaintenanceRemainingRedundancy(int redundancy) { this.maintenanceRemainingRedundancy = redundancy; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java index 84c13231b192..cfac93f7e4bf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerMetrics.java @@ -224,6 +224,14 @@ public final class ReplicationManagerMetrics implements MetricsSource { + "to the pending commands on all source datanodes") private MutableCounterLong replicateContainerCmdsDeferredTotal; + @Metric("Number of times EC reconstruction was triggered for decommission " + + "due to source node load.") + private MutableCounterLong ecReconstructionDecommissionTriggeredTotal; + + @Metric("Number of times EC reconstruction was throttled due to global " + + "reconstruction limit.") + private MutableCounterLong ecReconstructionGlobalLimitReachedTotal; + public ReplicationManagerMetrics(ReplicationManager manager) { this.registry = new MetricsRegistry(METRICS_SOURCE_NAME); this.replicationManager = manager; @@ -289,6 +297,8 @@ public void getMetrics(MetricsCollector collector, boolean all) { partialReplicationTotal.snapshot(builder, all); ecPartialReplicationForMisReplicationTotal.snapshot(builder, all); partialReplicationForMisReplicationTotal.snapshot(builder, all); + ecReconstructionDecommissionTriggeredTotal.snapshot(builder, all); + ecReconstructionGlobalLimitReachedTotal.snapshot(builder, all); } public void unRegister() { @@ -572,4 +582,20 @@ public long getPartialReplicationForMisReplicationTotal() { return this.partialReplicationForMisReplicationTotal.value(); } + public void incrEcReconstructionDecommissionTriggeredTotal() { + this.ecReconstructionDecommissionTriggeredTotal.incr(); + } + + public long getEcReconstructionDecommissionTriggeredTotal() { + return this.ecReconstructionDecommissionTriggeredTotal.value(); + } + + public void incrEcReconstructionGlobalLimitReachedTotal() { + this.ecReconstructionGlobalLimitReachedTotal.incr(); + } + + public long getEcReconstructionGlobalLimitReachedTotal() { + return this.ecReconstructionGlobalLimitReachedTotal.value(); + } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java index 8f291158902a..20b8d05946bb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java @@ -52,6 +52,11 @@ protected boolean inflightOperationLimitReached(ReplicationManager rm, return rm.getInflightReplicationCount() >= pendingOpLimit; } + @Override + protected boolean reconstructionLimitReached(ReplicationManager rm) { + return rm.isReconstructionLimitReached(); + } + @Override protected int sendDatanodeCommands( ReplicationManager replicationManager, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java index 6508b73c10e6..f172817293f6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java @@ -76,6 +76,13 @@ protected abstract void requeueHealthResult( protected abstract boolean inflightOperationLimitReached( ReplicationManager rm, long inflightLimit); + /** + * Check if the reconstruction operation limit is reached. + * @param rm The ReplicationManager instance + * @return True if the limit is reached, false otherwise. + */ + protected abstract boolean reconstructionLimitReached(ReplicationManager rm); + /** * Read messages from the ReplicationManager under replicated queue and, * form commands to correct replication. The commands are added @@ -105,6 +112,14 @@ public void processAll(ReplicationQueue queue) { .getMetrics().incrPendingReplicationLimitReachedTotal(); break; } + if (reconstructionLimitReached(replicationManager)) { + LOG.info("The maximum number of pending reconstruction commands ({}) " + + "are scheduled. Ending the iteration.", + replicationManager.getReconstructionInFlightLimit()); + replicationManager.getMetrics() + .incrEcReconstructionGlobalLimitReachedTotal(); + break; + } HealthResult healthResult = dequeueHealthResultFromQueue(queue); if (healthResult == null) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java index 5d2af561196b..8a00e486fc47 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java @@ -435,6 +435,56 @@ public void testUnderReplicationWithDecomNodesOverloaded() Lists.emptyList(), availableReplicas, 1, 0, policy)); } + @Test + public void testUnderReplicationWithDecomNodesSwitchToReconstruction() + throws IOException { + replicationManager.getConfig().setEcDecommissionReconstructionEnabled(true); + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2), + Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4), + Pair.of(IN_SERVICE, 5)); + + // Mock node 1 as highly loaded + DatanodeDetails decomNode = availableReplicas.stream() + .filter(r -> r.getReplicaIndex() == 1) + .findFirst().get().getDatanodeDetails(); + when(replicationManager.isNodeHighlyLoaded(decomNode)).thenReturn(true); + + ECUnderReplicationHandler ecURH = + new ECUnderReplicationHandler(policy, conf, replicationManager); + UnderReplicatedHealthResult result = + mock(UnderReplicatedHealthResult.class); + when(result.isUnrecoverable()).thenReturn(false); + when(result.getContainerInfo()).thenReturn(container); + + ecURH.processAndSendCommands(availableReplicas, ImmutableList.of(), + result, remainingMaintenanceRedundancy); + + // We expect 1 reconstruction command for index 1, and 0 replicate commands + int replicateCommand = 0; + int reconstructCommand = 0; + for (Pair> dnCommand : commandsSent) { + if (dnCommand.getValue() instanceof ReplicateContainerCommand) { + replicateCommand++; + } else if (dnCommand.getValue() instanceof ReconstructECContainersCommand) { + reconstructCommand++; + ReconstructECContainersCommand reconCmd = + (ReconstructECContainersCommand) dnCommand.getValue(); + assertEquals(ECUnderReplicationHandler.integers2ByteString( + ImmutableList.of(1)), reconCmd.getMissingContainerIndexes()); + + // verify source offloading: decomNode should NOT be in the source list + // because we have 4 other IN_SERVICE nodes (DATA=3) + for (ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex src : + reconCmd.getSources()) { + assertNotEquals(decomNode, src.getDnDetails()); + } + } + } + assertEquals(0, replicateCommand); + assertEquals(1, reconstructCommand); + } + @Test public void testUnderReplicationWithDecomIndex12() throws IOException { Set availableReplicas = ReplicationTestUtil diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 1d7d619efb0f..393d8b66f6e3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -48,7 +48,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; import java.io.IOException; import java.time.Instant; @@ -1801,4 +1803,70 @@ private void mockReplicationCommandCounts( }); } + @Test + public void testInflightReconstructionLimit() throws IOException, NodeNotFoundException { + rmConf.setReconstructionGlobalLimit(2); + ReplicationManager rm = createReplicationManager(); + assertEquals(2, rm.getReconstructionInFlightLimit()); + assertEquals(0, rm.getInflightReconstructionCount()); + assertFalse(rm.isReconstructionLimitReached()); + + mockReplicationCommandCounts(dn -> 0, dn -> 0); + + ContainerInfo container = ReplicationTestUtil.createContainerInfo( + repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20); + + // Send one reconstruction command with 2 fragments + ReconstructECContainersCommand cmd1 = new ReconstructECContainersCommand( + 1L, Collections.emptyList(), + ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails()), + integers2ByteString(ImmutableList.of(1, 2)), (ECReplicationConfig) repConfig); + + rm.sendThrottledReconstructionCommand(container, cmd1); + assertEquals(1, rm.getInflightReconstructionCount()); + assertFalse(rm.isReconstructionLimitReached()); + + // Send another reconstruction command with 1 fragment + ReconstructECContainersCommand cmd2 = new ReconstructECContainersCommand( + 2L, Collections.emptyList(), + ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails()), + integers2ByteString(ImmutableList.of(3)), (ECReplicationConfig) repConfig); + rm.sendThrottledReconstructionCommand(container, cmd2); + assertEquals(2, rm.getInflightReconstructionCount()); + assertTrue(rm.isReconstructionLimitReached()); + + // Complete one fragment of cmd1 + ContainerReplicaOp op1 = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, + cmd1.getTargetDatanodes().get(0), 1, cmd1, Long.MAX_VALUE, 0); + rm.opCompleted(op1, container.containerID(), false); + // Still 2 because cmd1 is not fully finished + assertEquals(2, rm.getInflightReconstructionCount()); + + // Complete second fragment of cmd1 + ContainerReplicaOp op2 = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, + cmd1.getTargetDatanodes().get(1), 2, cmd1, Long.MAX_VALUE, 0); + rm.opCompleted(op2, container.containerID(), false); + // Now 1 + assertEquals(1, rm.getInflightReconstructionCount()); + assertFalse(rm.isReconstructionLimitReached()); + + // Complete cmd2 + ContainerReplicaOp op3 = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, + cmd2.getTargetDatanodes().get(0), 3, cmd2, Long.MAX_VALUE, 0); + rm.opCompleted(op3, container.containerID(), false); + assertEquals(0, rm.getInflightReconstructionCount()); + } + + private static ByteString integers2ByteString(List src) { + byte[] dst = new byte[src.size()]; + for (int i = 0; i < src.size(); i++) { + dst[i] = src.get(i).byteValue(); + } + return dst.length > 0 ? UnsafeByteOperations.unsafeWrap(dst) + : ByteString.EMPTY; + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java index de93f7b9194d..958da96239a8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java @@ -131,4 +131,25 @@ public void testMessageNotProcessedIfGlobalLimitReached() throws IOException { assertEquals(1, rmMetrics.getPendingReplicationLimitReachedTotal()); } + @Test + public void testMessageNotProcessedIfReconstructionLimitReached() + throws IOException { + when(replicationManager.isReconstructionLimitReached()).thenReturn(true); + when(replicationManager.getReconstructionInFlightLimit()).thenReturn(10); + when(replicationManager.processUnderReplicatedContainer(any())).thenReturn(1); + + ContainerInfo container = ReplicationTestUtil + .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig); + UnderReplicatedHealthResult result = new UnderReplicatedHealthResult( + container, 3, false, false, false); + queue.enqueue(result); + + underReplicatedProcessor.processAll(queue); + + // The message should not be processed and still be on the queue (re-queued) + assertEquals(1, queue.underReplicatedQueueSize()); + // We should not have processed anything in RM + verify(replicationManager, times(0)).processUnderReplicatedContainer(any()); + } + } diff --git a/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java b/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java index d06633f14408..78f449c7a2a5 100644 --- a/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java +++ b/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.replication.ContainerReplicationSource; import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource; +import org.apache.hadoop.ozone.container.replication.ReplicationServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; @@ -66,7 +67,8 @@ public Void call() throws Exception { parent.loadContainersFromVolumes(); final ContainerReplicationSource replicationSource = - new OnDemandContainerReplicationSource(parent.getController()); + new OnDemandContainerReplicationSource(parent.getController(), + new ReplicationServer.ReplicationConfig()); for (int i = 0; i < containerCount; i++) { replicationSource.prepare(containerId); diff --git a/hadoop-ozone/vapor/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java b/hadoop-ozone/vapor/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java index 6e1be57d4201..b127ab1865a0 100644 --- a/hadoop-ozone/vapor/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java +++ b/hadoop-ozone/vapor/src/main/java/org/apache/hadoop/ozone/freon/ClosedContainerReplicator.java @@ -93,6 +93,8 @@ public class ClosedContainerReplicator extends BaseFreonGenerator implements private ContainerReplicator replicator; + private ContainerController controller; + private Timer timer; private WitnessedContainerMetadataStore witnessedContainerMetadataStore; @@ -148,7 +150,7 @@ public Void replicate() throws Exception { if (datanode.isEmpty() || datanodeUUIDs.contains(datanode)) { replicationTasks.add(new ReplicationTask( ReplicateContainerCommand.fromSources(container.getContainerID(), - datanodesWithContainer), replicator)); + datanodesWithContainer), replicator, controller)); } } @@ -239,7 +241,7 @@ private void initializeReplicationSupervisor( handlers.put(containerType, handler); } - ContainerController controller = + controller = new ContainerController(containerSet, handlers); ContainerImporter importer = new ContainerImporter(conf, containerSet,