Dubbo:服务暴露过程

服务暴露触发

关于服务暴露是怎么被触发的在之前的博客 dubbo与spring 中有简单介绍过。

大体步骤如下:

  1. 在xml中进行服务配置。
  2. 根据spring的schema扩展注册的DubboBeanDefinitionParser,将dubbo相关配置解析成对应的 BeanDefinition。
  3. BeanDefinition 转换为具体的 Bean 是在spring容器完成refresh()的过程中
    3.1 设置delay且不为-1。ServiceBean实现了InitializingBean,在钩子方法afterPropertiesSet()中触发 export,也就是 bean 在实例化后且在调用<init-method>之前调用。
    3.2 未设置delay或值为-1 。ServiceBean实现了ApplicationListener可以监听spring的相关的事件,在spring容器refresh()完后,在onApplicationEvent()完成事件的处理,进而触发 export。

服务暴露过程

服务暴露的过程中比较重要的是在ServiceConfig.doExportUrlFor1Protocol()方法内,会生成具体服务对应的Provider URL后,同时如果存在注册中心还会在URL上加上registry,通过dubbo扩展机制获取ProxyFactory的实现来得到Invoker,然后调用Protocol进行服务暴露。

dubbo中ProxyFactory扩展实现类有 Javassist 和 JDK 两种方式,默认使用 Javassist 是因为通过字节码操作生成相应的 wrapper类(第一次使用后会将实例化对象缓存起来),避免了每次调用使用Java反射导致的性能低下问题。

doExportUrlFor1Protocol()
1
2
3
4
5
6
7
8
9
10
// @OvO@ 默认proxyFactory是JavassistProxyFactory
// @OvO@ 在服务暴露的url外又增加了一层registry,因而目标实现类protocol是RegistryProtocol
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// @OvO@ Protocol$Adaptive自适应扩展类执行export()之前,会通过getExtension()获取扩展类实现,
// 这里的protocol就是根据URL外面加了一层registry来确定扩展实现类是RegistryProtocol,
// createExtension()中还会在外面再包一层wrapper装饰类(ProtocolFilterWrapper/ProtocolListenerWrapper)
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);

Invoker导出为Exporter分为两种情况:一种是registry协议类型,另一种是其他类型。

这里值的注意的是protocol实际上是自适应类Protocol$Adaptive,因而在运行时是根据URL来动态指定扩展点的实现类,在服务暴露过程中由于URL加上了registry,所以 Protocol 实现类是RegistryProtocol

同时,Protocol$Adaptive在通过ExtensionLoader.getExtension()获取具体实现类时,会加上一些装饰类的功能(eg: ProtocolFilterWrapperProtocolListenerWrapper),这两种装饰类只会对非RegistryProtocol起作用。

RegistryProtocol进行服务暴露的过程中,会首先调用doLocalExport(),通过默认的DubboProtocol.export()对 Provider URL 进行服务暴露。然后再根据具体的 Registry 进行服务注册和监听器注册,并订阅override数据。

RegistryProtocol
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// @OvO@ 在本地暴露
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
// @OvO@ 获取需要暴露的providerUrl
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
// @OvO@ 注册服务到注册中心
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// @OvO@ 从原始包括registry的URL中提取providerUrl(根据export键)
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// @OvO@ 根据providerUrl继续export,默认是DubboProtocol暴露服务
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}

等到真正的DubboProtocol进行服务暴露时,会启动一个服务监听server,通过调用ExchangerTransporter层的扩展实现完成的。

DubboProtocol
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// @OvO@ Exchanger和Transporter
openServer(url);
return exporter;
}