Skip to content

Commit 508c141

Browse files
committed
otel: add tcp metrics
1 parent 024fdd0 commit 508c141

18 files changed

+319
-16
lines changed

core/src/main/java/io/grpc/internal/ClientTransportFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.grpc.ChannelCredentials;
2525
import io.grpc.ChannelLogger;
2626
import io.grpc.HttpConnectProxiedSocketAddress;
27+
import io.grpc.MetricRecorder;
2728
import java.io.Closeable;
2829
import java.net.SocketAddress;
2930
import java.util.Collection;
@@ -91,6 +92,7 @@ final class ClientTransportOptions {
9192
private Attributes eagAttributes = Attributes.EMPTY;
9293
@Nullable private String userAgent;
9394
@Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr;
95+
@Nullable private MetricRecorder metricRecorder;
9496

9597
public ChannelLogger getChannelLogger() {
9698
return channelLogger;
@@ -101,6 +103,16 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) {
101103
return this;
102104
}
103105

106+
@Nullable
107+
public MetricRecorder getMetricRecorder() {
108+
return metricRecorder;
109+
}
110+
111+
public ClientTransportOptions setMetricRecorder(@Nullable MetricRecorder metricRecorder) {
112+
this.metricRecorder = metricRecorder;
113+
return this;
114+
}
115+
104116
public String getAuthority() {
105117
return authority;
106118
}

core/src/main/java/io/grpc/internal/InternalServer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.grpc.InternalChannelz.SocketStats;
2020
import io.grpc.InternalInstrumented;
21+
import io.grpc.MetricRecorder;
2122
import java.io.IOException;
2223
import java.net.SocketAddress;
2324
import java.util.List;
@@ -71,4 +72,9 @@ public interface InternalServer {
7172
*/
7273
@Nullable List<InternalInstrumented<SocketStats>> getListenSocketStatsList();
7374

75+
/**
76+
* Sets the MetricRecorder for the server. This optional method allows setting
77+
* the MetricRecorder after construction but before start().
78+
*/
79+
default void setMetricRecorder(MetricRecorder metricRecorder) {}
7480
}

core/src/main/java/io/grpc/internal/InternalSubchannel.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
8080
private final InternalChannelz channelz;
8181
private final CallTracer callsTracer;
8282
private final ChannelTracer channelTracer;
83+
private final MetricRecorder metricRecorder;
8384
private final ChannelLogger channelLogger;
8485
private final boolean reconnectDisabled;
8586

@@ -191,6 +192,7 @@ protected void handleNotInUse() {
191192
this.scheduledExecutor = scheduledExecutor;
192193
this.connectingTimer = stopwatchSupplier.get();
193194
this.syncContext = syncContext;
195+
this.metricRecorder = metricRecorder;
194196
this.callback = callback;
195197
this.channelz = channelz;
196198
this.callsTracer = callsTracer;
@@ -265,6 +267,7 @@ private void startNewTransport() {
265267
.setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
266268
.setEagAttributes(currentEagAttributes)
267269
.setUserAgent(userAgent)
270+
.setMetricRecorder(metricRecorder)
268271
.setHttpConnectProxiedSocketAddress(proxiedAddr);
269272
TransportLogger transportLogger = new TransportLogger();
270273
// In case the transport logs in the constructor, use the subchannel logId

core/src/main/java/io/grpc/internal/ServerImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import io.grpc.InternalServerInterceptors;
4949
import io.grpc.InternalStatus;
5050
import io.grpc.Metadata;
51+
import io.grpc.MetricInstrumentRegistry;
52+
import io.grpc.MetricRecorder;
5153
import io.grpc.ServerCall;
5254
import io.grpc.ServerCallExecutorSupplier;
5355
import io.grpc.ServerCallHandler;
@@ -97,6 +99,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
9799

98100
private final InternalLogId logId;
99101
private final ObjectPool<? extends Executor> executorPool;
102+
private final MetricRecorder metricRecorder;
100103
/** Executor for application processing. Safe to read after {@link #start()}. */
101104
private Executor executor;
102105
private final HandlerRegistry registry;
@@ -143,6 +146,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
143146
InternalServer transportServer,
144147
Context rootContext) {
145148
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
149+
this.metricRecorder =
150+
new MetricRecorderImpl(builder.metricSinks, MetricInstrumentRegistry.getDefaultRegistry());
151+
146152
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
147153
this.fallbackRegistry =
148154
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
@@ -182,6 +188,7 @@ public ServerImpl start() throws IOException {
182188
// Start and wait for any ports to actually be bound.
183189

184190
ServerListenerImpl listener = new ServerListenerImpl();
191+
transportServer.setMetricRecorder(metricRecorder);
185192
transportServer.start(listener);
186193
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
187194
started = true;

core/src/main/java/io/grpc/internal/ServerImplBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.grpc.HandlerRegistry;
3232
import io.grpc.InternalChannelz;
3333
import io.grpc.InternalConfiguratorRegistry;
34+
import io.grpc.MetricSink;
3435
import io.grpc.Server;
3536
import io.grpc.ServerBuilder;
3637
import io.grpc.ServerCallExecutorSupplier;
@@ -80,6 +81,7 @@ public static ServerBuilder<?> forPort(int port) {
8081
final List<ServerTransportFilter> transportFilters = new ArrayList<>();
8182
final List<ServerInterceptor> interceptors = new ArrayList<>();
8283
private final List<ServerStreamTracer.Factory> streamTracerFactories = new ArrayList<>();
84+
final List<MetricSink> metricSinks = new ArrayList<>();
8385
private final ClientTransportServersBuilder clientTransportServersBuilder;
8486
HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY;
8587
ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;
@@ -157,6 +159,14 @@ public ServerImplBuilder intercept(ServerInterceptor interceptor) {
157159
return this;
158160
}
159161

162+
/**
163+
* Adds a MetricSink to the server.
164+
*/
165+
public ServerImplBuilder addMetricSink(MetricSink metricSink) {
166+
metricSinks.add(checkNotNull(metricSink, "metricSink"));
167+
return this;
168+
}
169+
160170
@Override
161171
public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) {
162172
streamTracerFactories.add(checkNotNull(factory, "factory"));

core/src/main/java/io/grpc/internal/ServerTransport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,5 @@ public interface ServerTransport extends InternalInstrumented<SocketStats> {
4444
* outstanding tasks are cancelled when the transport terminates.
4545
*/
4646
ScheduledExecutorService getScheduledExecutorService();
47+
4748
}

netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,7 @@ public void run() {
856856
localSocketPicker,
857857
channelLogger,
858858
useGetForSafeMethods,
859+
options.getMetricRecorder(),
859860
Ticker.systemTicker());
860861
return transport;
861862
}

netty/src/main/java/io/grpc/netty/NettyClientHandler.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.grpc.InternalChannelz;
3131
import io.grpc.InternalStatus;
3232
import io.grpc.Metadata;
33+
import io.grpc.MetricRecorder;
3334
import io.grpc.Status;
3435
import io.grpc.StatusException;
3536
import io.grpc.internal.ClientStreamListener.RpcProgress;
@@ -123,6 +124,7 @@ class NettyClientHandler extends AbstractNettyHandler {
123124
private final Supplier<Stopwatch> stopwatchFactory;
124125
private final TransportTracer transportTracer;
125126
private final Attributes eagAttributes;
127+
private final TcpMetrics.Tracker tcpMetrics;
126128
private final String authority;
127129
private final InUseStateAggregator<Http2Stream> inUseState =
128130
new InUseStateAggregator<Http2Stream>() {
@@ -164,7 +166,8 @@ static NettyClientHandler newHandler(
164166
Attributes eagAttributes,
165167
String authority,
166168
ChannelLogger negotiationLogger,
167-
Ticker ticker) {
169+
Ticker ticker,
170+
MetricRecorder metricRecorder) {
168171
Preconditions.checkArgument(maxHeaderListSize > 0, "maxHeaderListSize must be positive");
169172
Http2HeadersDecoder headersDecoder = new GrpcHttp2ClientHeadersDecoder(maxHeaderListSize);
170173
Http2FrameReader frameReader = new DefaultHttp2FrameReader(headersDecoder);
@@ -194,7 +197,8 @@ static NettyClientHandler newHandler(
194197
eagAttributes,
195198
authority,
196199
negotiationLogger,
197-
ticker);
200+
ticker,
201+
metricRecorder);
198202
}
199203

200204
@VisibleForTesting
@@ -214,7 +218,8 @@ static NettyClientHandler newHandler(
214218
Attributes eagAttributes,
215219
String authority,
216220
ChannelLogger negotiationLogger,
217-
Ticker ticker) {
221+
Ticker ticker,
222+
MetricRecorder metricRecorder) {
218223
Preconditions.checkNotNull(connection, "connection");
219224
Preconditions.checkNotNull(frameReader, "frameReader");
220225
Preconditions.checkNotNull(lifecycleManager, "lifecycleManager");
@@ -269,7 +274,8 @@ static NettyClientHandler newHandler(
269274
pingCounter,
270275
ticker,
271276
maxHeaderListSize,
272-
softLimitHeaderListSize);
277+
softLimitHeaderListSize,
278+
metricRecorder);
273279
}
274280

275281
private NettyClientHandler(
@@ -288,7 +294,8 @@ private NettyClientHandler(
288294
PingLimiter pingLimiter,
289295
Ticker ticker,
290296
int maxHeaderListSize,
291-
int softLimitHeaderListSize) {
297+
int softLimitHeaderListSize,
298+
MetricRecorder metricRecorder) {
292299
super(
293300
/* channelUnused= */ null,
294301
decoder,
@@ -350,6 +357,7 @@ public void onStreamClosed(Http2Stream stream) {
350357
}
351358
}
352359
});
360+
this.tcpMetrics = new TcpMetrics.Tracker(metricRecorder, "client");
353361
}
354362

355363
/**
@@ -490,6 +498,12 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
490498
/**
491499
* Handler for the Channel shutting down.
492500
*/
501+
@Override
502+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
503+
tcpMetrics.channelActive();
504+
super.channelActive(ctx);
505+
}
506+
493507
@Override
494508
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
495509
try {

netty/src/main/java/io/grpc/netty/NettyClientTransport.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.grpc.InternalLogId;
3535
import io.grpc.Metadata;
3636
import io.grpc.MethodDescriptor;
37+
import io.grpc.MetricRecorder;
3738
import io.grpc.Status;
3839
import io.grpc.internal.ClientStream;
3940
import io.grpc.internal.ConnectionClientTransport;
@@ -108,6 +109,7 @@ class NettyClientTransport implements ConnectionClientTransport,
108109
private final ChannelLogger channelLogger;
109110
private final boolean useGetForSafeMethods;
110111
private final Ticker ticker;
112+
private final MetricRecorder metricRecorder;
111113

112114

113115
NettyClientTransport(
@@ -132,6 +134,7 @@ class NettyClientTransport implements ConnectionClientTransport,
132134
LocalSocketPicker localSocketPicker,
133135
ChannelLogger channelLogger,
134136
boolean useGetForSafeMethods,
137+
MetricRecorder metricRecorder,
135138
Ticker ticker) {
136139

137140
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
@@ -159,6 +162,7 @@ class NettyClientTransport implements ConnectionClientTransport,
159162
this.logId = InternalLogId.allocate(getClass(), remoteAddress.toString());
160163
this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
161164
this.useGetForSafeMethods = useGetForSafeMethods;
165+
this.metricRecorder = metricRecorder;
162166
this.ticker = Preconditions.checkNotNull(ticker, "ticker");
163167
}
164168

@@ -251,7 +255,8 @@ public Runnable start(Listener transportListener) {
251255
eagAttributes,
252256
authorityString,
253257
channelLogger,
254-
ticker);
258+
ticker,
259+
metricRecorder);
255260

256261
ChannelHandler negotiationHandler = negotiator.newHandler(handler);
257262

netty/src/main/java/io/grpc/netty/NettyServer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.grpc.InternalInstrumented;
3232
import io.grpc.InternalLogId;
3333
import io.grpc.InternalWithLogId;
34+
import io.grpc.MetricRecorder;
3435
import io.grpc.ServerStreamTracer;
3536
import io.grpc.internal.InternalServer;
3637
import io.grpc.internal.ObjectPool;
@@ -93,6 +94,7 @@ class NettyServer implements InternalServer, InternalWithLogId {
9394
private final int maxMessageSize;
9495
private final int maxHeaderListSize;
9596
private final int softLimitHeaderListSize;
97+
private MetricRecorder metricRecorder;
9698
private final long keepAliveTimeInNanos;
9799
private final long keepAliveTimeoutInNanos;
98100
private final long maxConnectionIdleInNanos;
@@ -272,7 +274,8 @@ public void initChannel(Channel ch) {
272274
permitKeepAliveTimeInNanos,
273275
maxRstCount,
274276
maxRstPeriodNanos,
275-
eagAttributes);
277+
eagAttributes,
278+
metricRecorder);
276279
ServerTransportListener transportListener;
277280
// This is to order callbacks on the listener, not to guard access to channel.
278281
synchronized (NettyServer.this) {

0 commit comments

Comments
 (0)