Skip to content

Commit 4847411

Browse files
committed
[#5065] fixed cannot immediately pull service instances when watched registry-center service instance changed problem (#5066)
1 parent d4a3fb1 commit 4847411

File tree

5 files changed

+109
-7
lines changed

5 files changed

+109
-7
lines changed

clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/DiscoveryEvents.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,15 @@
2020
import java.util.List;
2121

2222
import org.apache.servicecomb.service.center.client.model.MicroserviceInstance;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import com.fasterxml.jackson.databind.JsonNode;
27+
import com.fasterxml.jackson.databind.ObjectMapper;
2328

2429
public abstract class DiscoveryEvents {
30+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
31+
2532
public static class InstanceChangedEvent extends DiscoveryEvents {
2633
private final String appName;
2734

@@ -52,6 +59,45 @@ public List<MicroserviceInstance> getInstances() {
5259
* internal events to ask for a immediate instance pull
5360
*/
5461
public static class PullInstanceEvent extends DiscoveryEvents {
62+
private static final Logger LOGGER = LoggerFactory.getLogger(PullInstanceEvent.class);
63+
64+
private final String appId;
65+
66+
private final String serviceName;
67+
68+
public PullInstanceEvent(String message) {
69+
JsonNode messageNode = parseJsonString(message);
70+
this.appId = getContextFromNode(messageNode, "appId");
71+
this.serviceName = getContextFromNode(messageNode, "serviceName");
72+
}
73+
74+
public String getAppId() {
75+
return appId;
76+
}
77+
78+
public String getServiceName() {
79+
return serviceName;
80+
}
81+
82+
private JsonNode parseJsonString(String message) {
83+
try {
84+
return OBJECT_MAPPER.readTree(message);
85+
} catch (Exception e) {
86+
LOGGER.error("parse message [{}] failed!", message, e);
87+
return null;
88+
}
89+
}
5590

91+
private String getContextFromNode(JsonNode messageNode, String itemKey) {
92+
if (messageNode == null) {
93+
return "";
94+
}
95+
try {
96+
return messageNode.get("key").get(itemKey).asText();
97+
} catch (Exception e) {
98+
LOGGER.error("get [{}] context from node [{}] failed!", itemKey, e);
99+
return "";
100+
}
101+
}
56102
}
57103
}

clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,7 @@ public RegisteredMicroserviceInstanceResponse registerMicroserviceInstance(Micro
226226

227227
@Override
228228
public FindMicroserviceInstancesResponse findMicroserviceInstance(String consumerId, String appId, String serviceName,
229-
String versionRule,
230-
String revision) {
229+
String versionRule, String revision) {
231230
try {
232231
Map<String, String> headers = new HashMap<>();
233232
headers.put("X-ConsumerId", consumerId);

clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterDiscovery.java

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@
2424
import java.util.Map;
2525
import java.util.Objects;
2626
import java.util.Random;
27+
import java.util.Timer;
28+
import java.util.TimerTask;
2729
import java.util.concurrent.ConcurrentHashMap;
2830

31+
import org.apache.commons.lang3.StringUtils;
2932
import org.apache.servicecomb.http.client.task.AbstractTask;
3033
import org.apache.servicecomb.http.client.task.Task;
3134
import org.apache.servicecomb.service.center.client.DiscoveryEvents.InstanceChangedEvent;
@@ -104,6 +107,8 @@ public static class SubscriptionValue {
104107

105108
private final Random random = new Random();
106109

110+
private Timer timer;
111+
107112
public ServiceCenterDiscovery(ServiceCenterClient serviceCenterClient, EventBus eventBus) {
108113
super("service-center-discovery-task");
109114
this.serviceCenterClient = serviceCenterClient;
@@ -153,7 +158,60 @@ public void onPullInstanceEvent(PullInstanceEvent event) {
153158
return;
154159
}
155160
pullInstanceTaskOnceInProgress = true;
156-
startTask(new PullInstanceOnceTask());
161+
if (StringUtils.isEmpty(event.getAppId()) || StringUtils.isEmpty(event.getServiceName())) {
162+
// If the application or service name cannot be resolved, pulled all services.
163+
startTask(new PullInstanceOnceTask());
164+
return;
165+
}
166+
try {
167+
String appId = event.getAppId();
168+
String serviceName = event.getServiceName();
169+
if (!refreshTargetServiceSuccess(appId, serviceName)) {
170+
int positive = random.nextInt(300);
171+
int sign = random.nextBoolean() ? 1 : -1;
172+
long delayTime = 2000L + sign * positive;
173+
if (timer == null) {
174+
timer = new Timer("event-retry-pull-task");
175+
}
176+
timer.schedule(new PullTargetServiceTask(appId, serviceName), delayTime);
177+
}
178+
} finally {
179+
pullInstanceTaskOnceInProgress = false;
180+
}
181+
}
182+
183+
class PullTargetServiceTask extends TimerTask {
184+
private final String appId;
185+
186+
private final String serviceName;
187+
188+
public PullTargetServiceTask(String appId, String serviceName) {
189+
this.appId = appId;
190+
this.serviceName = serviceName;
191+
}
192+
193+
@Override
194+
public void run() {
195+
refreshTargetServiceSuccess(appId, serviceName);
196+
}
197+
}
198+
199+
private boolean refreshTargetServiceSuccess(String appId, String serviceName) {
200+
SubscriptionKey currentKey = new SubscriptionKey(appId, serviceName);
201+
if (instancesCache.get(currentKey) == null) {
202+
// No pull during the service startup phase.
203+
return true;
204+
}
205+
if (LOGGER.isDebugEnabled()) {
206+
LOGGER.debug("pull [{}#{}] instances from service center", appId, serviceName);
207+
}
208+
String originRev = instancesCache.get(currentKey).revision;
209+
pullInstance(currentKey, instancesCache.get(currentKey), true);
210+
String currentRev = instancesCache.get(currentKey).revision;
211+
if (LOGGER.isDebugEnabled()) {
212+
LOGGER.debug("current revision: [{}], origin revision: [{}]", currentRev, originRev);
213+
}
214+
return !originRev.equals(currentRev);
157215
}
158216

159217
private List<SubscriptionKey> pullInstance(SubscriptionKey k, SubscriptionValue v, boolean sendChangedEvent) {
@@ -265,7 +323,7 @@ private static String instanceToString(List<MicroserviceInstance> instances) {
265323
sb.append(endpoint.length() > 64 ? endpoint.substring(0, 64) : endpoint);
266324
sb.append("|");
267325
}
268-
sb.append(instance.getServiceName());
326+
sb.append(instance.getStatus());
269327
sb.append("|");
270328
}
271329
sb.append("#");

clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterOperation.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,7 @@ public interface ServiceCenterOperation {
114114
* @throws OperationException If some problems happened to contact service center or non http 200 returned.n
115115
*/
116116
FindMicroserviceInstancesResponse findMicroserviceInstance(String consumerId, String appId, String serviceName,
117-
String versionRule,
118-
String revision);
117+
String versionRule, String revision);
119118

120119
/**
121120
* Delete a microservice instance

clients/service-center-client/src/main/java/org/apache/servicecomb/service/center/client/ServiceCenterWatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ private void backOff() {
164164
@Override
165165
public void onMessage(String s) {
166166
LOGGER.info("web socket receive message [{}], start query instance", s);
167-
this.eventBus.post(new PullInstanceEvent());
167+
this.eventBus.post(new PullInstanceEvent(s));
168168
}
169169

170170
@Override

0 commit comments

Comments
 (0)