Skip to content

Commit 228fc8e

Browse files
committed
core: PickFirstLB should not return a subchannel during CONNECTING
This is a follow-up from noticing a breakage in gRPC-LB because it wasn't checking if the connectivity state changed even if the picker was identical. lb_serverStatusCodeConversion() has been misleading since 42e1829 ("xds: Do RLS fallback policy eagar start"). At that point, the subchannel it marked as READY was for the default target's policy, not the policy for wilderness. However, since old PF policy provided a subchannel when CONNECTING, everything was "fine", but RLS would mistakenly count toward target_picks. This demonstrates that RLS target_picks has been broken since it was introduced for PF, as PF relied on the caller to avoid the picker when it was CONNECTING. This may have been hard to notice in production, as the metrics become correct as soon as the connection is established, so as long as you use the channel for a while, the duplicate counting would become a small percentage of the overall amount.
1 parent a1a7363 commit 228fc8e

File tree

3 files changed

+13
-17
lines changed

3 files changed

+13
-17
lines changed

core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) {
8686

8787
// The channel state does not get updated when doing name resolving today, so for the moment
8888
// let LB report CONNECTION and call subchannel.requestConnection() immediately.
89-
updateBalancingState(
90-
CONNECTING, new FixedResultPicker(PickResult.withSubchannel(subchannel)));
89+
updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
9190
subchannel.requestConnection();
9291
} else {
9392
subchannel.updateAddresses(servers);

core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ public void refreshNameResolutionAfterSubchannelConnectionBroken() {
219219
inOrder.verify(mockSubchannel).start(stateListenerCaptor.capture());
220220
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
221221
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
222-
assertSame(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
222+
assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse();
223223
inOrder.verify(mockSubchannel).requestConnection();
224224

225225
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
@@ -278,7 +278,7 @@ public void pickAfterResolvedAndChanged() throws Exception {
278278
assertThat(args.getAddresses()).isEqualTo(servers);
279279
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
280280
verify(mockSubchannel).requestConnection();
281-
assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
281+
assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse();
282282

283283
loadBalancer.acceptResolvedAddresses(
284284
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
@@ -300,7 +300,7 @@ public void pickAfterStateChangeAfterResolution() throws Exception {
300300
verify(mockSubchannel).start(stateListenerCaptor.capture());
301301
SubchannelStateListener stateListener = stateListenerCaptor.getValue();
302302
verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
303-
Subchannel subchannel = pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel();
303+
assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse();
304304
reset(mockHelper);
305305
when(mockHelper.getSynchronizationContext()).thenReturn(syncContext);
306306

@@ -317,7 +317,7 @@ public void pickAfterStateChangeAfterResolution() throws Exception {
317317

318318
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
319319
inOrder.verify(mockHelper).updateBalancingState(eq(READY), pickerCaptor.capture());
320-
assertEquals(subchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
320+
assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs).getSubchannel());
321321

322322
verify(mockHelper, atLeast(0)).getSynchronizationContext(); // Don't care
323323
verifyNoMoreInteractions(mockHelper);
@@ -405,8 +405,7 @@ public void nameResolutionSuccessAfterError() throws Exception {
405405
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), pickerCaptor.capture());
406406
verify(mockSubchannel).requestConnection();
407407

408-
assertEquals(mockSubchannel, pickerCaptor.getValue().pickSubchannel(mockArgs)
409-
.getSubchannel());
408+
assertThat(pickerCaptor.getValue().pickSubchannel(mockArgs).hasResult()).isFalse();
410409

411410
assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs),
412411
pickerCaptor.getValue().pickSubchannel(mockArgs));

rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
import io.grpc.inprocess.InProcessServerBuilder;
7373
import io.grpc.internal.FakeClock;
7474
import io.grpc.internal.JsonParser;
75-
import io.grpc.internal.PickFirstLoadBalancerProvider;
7675
import io.grpc.internal.PickSubchannelArgsImpl;
7776
import io.grpc.internal.testing.StreamRecorder;
7877
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
@@ -212,12 +211,14 @@ public void lb_serverStatusCodeConversion() throws Exception {
212211
throw new RuntimeException(e);
213212
}
214213
});
214+
assertThat(subchannels.poll()).isNotNull(); // default target
215+
assertThat(subchannels.poll()).isNull();
216+
// Warm-up pick; will be queued
215217
InOrder inOrder = inOrder(helper);
216218
inOrder.verify(helper)
217219
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
218220
SubchannelPicker picker = pickerCaptor.getValue();
219221
PickSubchannelArgs fakeSearchMethodArgs = newPickSubchannelArgs(fakeSearchMethod);
220-
// Warm-up pick; will be queued
221222
PickResult res = picker.pickSubchannel(fakeSearchMethodArgs);
222223
assertThat(res.getStatus().isOk()).isTrue();
223224
assertThat(res.getSubchannel()).isNull();
@@ -230,8 +231,7 @@ public void lb_serverStatusCodeConversion() throws Exception {
230231
subchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
231232
res = picker.pickSubchannel(fakeSearchMethodArgs);
232233
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK);
233-
int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2;
234-
verifyLongCounterAdd("grpc.lb.rls.target_picks", expectedTimes, 1, "wilderness", "complete");
234+
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete");
235235

236236
// Check on conversion
237237
Throwable cause = new Throwable("cause");
@@ -284,8 +284,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception {
284284
res = picker.pickSubchannel(searchSubchannelArgs);
285285
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
286286
assertThat(res.getSubchannel()).isSameInstanceAs(searchSubchannel);
287-
int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2;
288-
verifyLongCounterAdd("grpc.lb.rls.target_picks", expectedTimes, 1, "wilderness", "complete");
287+
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete");
289288

290289
// rescue should be pending status although the overall channel state is READY
291290
res = picker.pickSubchannel(rescueSubchannelArgs);
@@ -431,7 +430,7 @@ public void lb_working_withDefaultTarget_noRlsResponse() throws Exception {
431430
inOrder.verify(helper).getMetricRecorder();
432431
inOrder.verify(helper).getChannelTarget();
433432
inOrder.verifyNoMoreInteractions();
434-
int times = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2;
433+
int times = 1;
435434
verifyLongCounterAdd("grpc.lb.rls.default_target_picks", times, 1,
436435
"defaultTarget", "complete");
437436

@@ -536,8 +535,7 @@ public void lb_working_withoutDefaultTarget() throws Exception {
536535
res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
537536
assertThat(res.getStatus().isOk()).isFalse();
538537
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
539-
int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() ? 1 : 2;
540-
verifyLongCounterAdd("grpc.lb.rls.target_picks", expectedTimes, 1, "wilderness", "complete");
538+
verifyLongCounterAdd("grpc.lb.rls.target_picks", 1, 1, "wilderness", "complete");
541539

542540
res = picker.pickSubchannel(newPickSubchannelArgs(fakeRescueMethod));
543541
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();

0 commit comments

Comments
 (0)