Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ XdsResourceType<?> fromTypeUrl(String typeUrl) {
private class AdsStream implements EventHandler<DiscoveryResponse> {
private boolean responseReceived;
private boolean closed;
// Response nonce for the most recently received discovery responses of each resource type.
// Response nonce for the most recently received discovery responses of each resource type URL.
// Client initiated requests start response nonce with empty string.
// Nonce in each response is echoed back in the following ACK/NACK request. It is
// used for management server to identify which response the client is ACKing/NACking.
Expand All @@ -275,7 +275,7 @@ private class AdsStream implements EventHandler<DiscoveryResponse> {
// map; nonces are only discarded once the stream closes because xds_protocol says "the
// management server should not send a DiscoveryResponse for any DiscoveryRequest that has a
// stale nonce."
private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
private final Map<String, String> respNonces = new HashMap<>();
private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
Expand Down Expand Up @@ -323,7 +323,7 @@ void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
respNonces.getOrDefault(type, ""), null);
respNonces.getOrDefault(type.typeUrl(), ""), null);
}

@Override
Expand All @@ -336,6 +336,7 @@ public void onRecvMessage(DiscoveryResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
respNonces.put(response.getTypeUrl(), response.getNonce());
XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(
Expand Down Expand Up @@ -371,7 +372,6 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
return;
}
responseReceived = true;
respNonces.put(type, nonce);
ProcessingTracker processingTracker = new ProcessingTracker(
() -> call.startRecvMessage(), syncContext);
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
Expand Down
5 changes: 4 additions & 1 deletion xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -2781,10 +2781,13 @@ public void edsCleanupNonceAfterUnsubscription() {
xdsClient.cancelXdsResourceWatch(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
verifySubscribedResourcesMetadataSizes(0, 0, 0, 0);
call.verifyRequest(EDS, Arrays.asList(), VERSION_1, "0000", NODE);
// The control plane can send an updated response for the empty subscription list, with a new
// nonce.
call.sendResponse(EDS, Arrays.asList(), VERSION_1, "0001");

// When re-subscribing, the version was forgotten but not the nonce
xdsClient.watchXdsResource(XdsEndpointResource.getInstance(), "A.1", edsResourceWatcher);
call.verifyRequest(EDS, "A.1", "", "0000", NODE, Mockito.timeout(2000));
call.verifyRequest(EDS, "A.1", "", "0001", NODE, Mockito.timeout(2000));
}

@Test
Expand Down
Loading