Dubbo:扩展机制

Java的SPI机制

SPI(service provider interface)机制是通过定义服务接口标准,让不同的厂商去实现,java通过java.util.ServiceLoader类查找接口对应的服务实现。

SPI约定

服务提供者提供了服务接口的一种实现后,要在类路径下的META-INF/services/目录里创建一个以服务接口命名的文件,文件内容是实现该服务接口的具体实现类名(如果有多个实现类则换行保存,SPI会一次性实例化所有实现)。

使用:ServiceLoader<XX> loader = ServiceLoader.load(XX.class);

dubbo的扩展机制

dubbo的扩展机制和java的SPI机制相似,但是又增加了如下功能:

  • 根据关键字获取特定的扩展实现,依赖注解@SPI@Adaptive
  • 对扩展点增加了 IOC 和 AOP 的功能。

约定:在扩展类的 jar 包内 ,放置扩展点配置文件 META-INF/dubbo/接口全限定名 ,内容为: 配置名=扩展实现类全限定名 ,多个实现类用换行符分隔。

通过 ExtensionLoader 获取扩展点实例的方法:eg Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();


核心步骤:

  • 5 loadFile()META-INF/dubbo/META-INF/dubbo/internalMETA-INF/services等路径下加载所有的扩展点
  • 7 生成扩展点自适应类的源码
  • 9 注入依赖的扩展点

扩展点定义

@SPI注解:被该注解标记的接口表示是一个可扩展的接口,注解的值表示默认的扩展点实现类对应的key(具体是在根据URL配置动态获取实现类时,如果URL配置缺失则使用@SPI的值作为默认值)。

@Adaptive注解:

  • 标记在扩展接口的方法上,ExtensionLoader.getAdaptiveExtension()获取自适应类是通过生成java源码并编译成class加载完成的,没有被标注的方法生成的实现都会抛出UnsupportedOperationException异常。
  • 标记在扩展接口的实现类上,获取自适应类不再通过生成java源码实现,而是直接把标记了@Adaptive注解的类作为扩展接口实现类。(只能有一个实现类标记@Adaptive,目前只有AdaptiveCompilerAdaptiveExtensionFactory类上标注了该注解)

扩展点特性

扩展点自适应(Adaptive)

上述通过ExtensionLoadergetAdaptiveExtension()方法得到的是扩展点的自适应实现(命名方式为”类名$Adaptive”),这个自适应实现类在运行时根据url(Dubbo使用URL对象传递配置信息)来动态的创建具体实现类实例,然后再进行调用。

运行时创建的具体扩展实现类实例是通过ExtensionLoader.getExtensionLoader(XX.class).getExtension(extName);方法完成的。

生成自适应类并编译

自适应类是通过动态拼接而成的java代码,然后再通过编译器扩展(默认是javassist)编译该java代码。

ExtensionLoader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
private Class<?> createAdaptiveExtensionClass() {
// 生成自适应类源码
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
// 编译器扩展
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
private String createAdaptiveExtensionClassCode() {
StringBuilder codeBuidler = new StringBuilder();
Method[] methods = type.getMethods();
boolean hasAdaptiveAnnotation = false;
for (Method m : methods) {
if (m.isAnnotationPresent(Adaptive.class)) {
hasAdaptiveAnnotation = true;
break;
}
}
// 完全没有Adaptive方法,则不需要生成Adaptive类
if (!hasAdaptiveAnnotation)
throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!");
codeBuidler.append("package " + type.getPackage().getName() + ";");
codeBuidler.append("\nimport " + ExtensionLoader.class.getName() + ";");
codeBuidler.append("\npublic class " + type.getSimpleName() + "$Adaptive" + " implements " + type.getCanonicalName() + " {");
for (Method method : methods) {
Class<?> rt = method.getReturnType();
Class<?>[] pts = method.getParameterTypes();
Class<?>[] ets = method.getExceptionTypes();
Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
StringBuilder code = new StringBuilder(512);
if (adaptiveAnnotation == null) {
code.append("throw new UnsupportedOperationException(\"method ")
.append(method.toString()).append(" of interface ")
.append(type.getName()).append(" is not adaptive method!\");");
} else {
int urlTypeIndex = -1;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].equals(URL.class)) {
urlTypeIndex = i;
break;
}
}
// 有类型为URL的参数
if (urlTypeIndex != -1) {
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"url == null\");",
urlTypeIndex);
code.append(s);
s = String.format("\n%s url = arg%d;", URL.class.getName(), urlTypeIndex);
code.append(s);
}
// 参数没有URL类型
else {
String attribMethod = null;
// 找到参数的URL属性
LBL_PTS:
for (int i = 0; i < pts.length; ++i) {
Method[] ms = pts[i].getMethods();
for (Method m : ms) {
String name = m.getName();
if ((name.startsWith("get") || name.length() > 3)
&& Modifier.isPublic(m.getModifiers())
&& !Modifier.isStatic(m.getModifiers())
&& m.getParameterTypes().length == 0
&& m.getReturnType() == URL.class) {
urlTypeIndex = i;
attribMethod = name;
break LBL_PTS;
}
}
}
if (attribMethod == null) {
throw new IllegalStateException("fail to create adative class for interface " + type.getName()
+ ": not found url parameter or url attribute in parameters of method " + method.getName());
}
// Null point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"%s argument == null\");",
urlTypeIndex, pts[urlTypeIndex].getName());
code.append(s);
s = String.format("\nif (arg%d.%s() == null) throw new IllegalArgumentException(\"%s argument %s() == null\");",
urlTypeIndex, attribMethod, pts[urlTypeIndex].getName(), attribMethod);
code.append(s);
s = String.format("%s url = arg%d.%s();", URL.class.getName(), urlTypeIndex, attribMethod);
code.append(s);
}
// @OvO@ 获取方法上Adaptive注解的值
String[] value = adaptiveAnnotation.value();
// 没有设置Key,则使用“扩展点接口名的点分隔”作为Key
if (value.length == 0) {
char[] charArray = type.getSimpleName().toCharArray();
StringBuilder sb = new StringBuilder(128);
for (int i = 0; i < charArray.length; i++) {
if (Character.isUpperCase(charArray[i])) {
if (i != 0) {
sb.append(".");
}
sb.append(Character.toLowerCase(charArray[i]));
} else {
sb.append(charArray[i]);
}
}
value = new String[]{sb.toString()};
}
boolean hasInvocation = false;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].getName().equals("com.alibaba.dubbo.rpc.Invocation")) {
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"invocation == null\");", i);
code.append(s);
s = String.format("\nString methodName = arg%d.getMethodName();", i);
code.append(s);
hasInvocation = true;
break;
}
}
String defaultExtName = cachedDefaultName;
String getNameCode = null;
// @OvO@ 确定Adaptive实现: 通过@Adaptive注解的value(多个value则依次尝试)作为URL参数的key来获取相应值(默认值是@SPI注解的value)
for (int i = value.length - 1; i >= 0; --i) {
if (i == value.length - 1) {
if (null != defaultExtName) {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName);
} else {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\")", value[i]);
else
getNameCode = "url.getProtocol()";
}
} else {
if (!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode);
else
getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
}
}
code.append("\nString extName = ").append(getNameCode).append(";");
// check extName == null?
String s = String.format("\nif(extName == null) " +
"throw new IllegalStateException(\"Fail to get extension(%s) name from url(\" + url.toString() + \") use keys(%s)\");",
type.getName(), Arrays.toString(value));
code.append(s);
// @OvO@ 确定了Adaptive实现后,通过getExtension(extName)方法得到实例
s = String.format("\n%s extension = (%<s)%s.getExtensionLoader(%s.class).getExtension(extName);",
type.getName(), ExtensionLoader.class.getSimpleName(), type.getName());
code.append(s);
// return statement
if (!rt.equals(void.class)) {
code.append("\nreturn ");
}
s = String.format("extension.%s(", method.getName());
code.append(s);
for (int i = 0; i < pts.length; i++) {
if (i != 0)
code.append(", ");
code.append("arg").append(i);
}
code.append(");");
}
codeBuidler.append("\npublic " + rt.getCanonicalName() + " " + method.getName() + "(");
for (int i = 0; i < pts.length; i++) {
if (i > 0) {
codeBuidler.append(", ");
}
codeBuidler.append(pts[i].getCanonicalName());
codeBuidler.append(" ");
codeBuidler.append("arg" + i);
}
codeBuidler.append(")");
if (ets.length > 0) {
codeBuidler.append(" throws ");
for (int i = 0; i < ets.length; i++) {
if (i > 0) {
codeBuidler.append(", ");
}
codeBuidler.append(ets[i].getCanonicalName());
}
}
codeBuidler.append(" {");
codeBuidler.append(code.toString());
codeBuidler.append("\n}");
}
codeBuidler.append("\n}");
if (logger.isDebugEnabled()) {
logger.debug(codeBuidler.toString());
}
return codeBuidler.toString();
}

Protocol自适应类源码

Protocol$Adaptive
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}

扩展点装饰(AOP)

ExtensionLoader在加载扩展点时,如果加载到的扩展点有拷贝构造函数(也就是构造函数参数类型为扩展点类型),则判定为扩展点的 Wrapper 类(判定扩展点是否为 Wrapper 类是在loadFile()中完成的)。

在上述介绍中,dubbo在运行时通过URL来动态创建扩展点实现的实例,也就是ExtensionLoadergetExtension(String name)方法,真正创建实例是在createExtension()方法中并完成了Wrapper类的包装过程。

ExtensionLoader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension();
}
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// @OvO@ 创建扩展实例,同时对实例进行包装Wrapper
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
// @OvO@ 创建扩展实例,进行相应的装饰wrapper,并注入装饰类依赖的扩展点
@SuppressWarnings("unchecked")
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}

扩展点注入(IOC)

在获取扩展点自适应类的实例(getAdaptiveExtension())和获取扩展点具体实现类的实例时(getExtension())都会通过injectExtension()完成依赖注入。

dubbo中默认采用AdaptiveExtensionFactory来获取所有的扩展,其实现还是通过遍历SpiExtensionFactory(dubbo的扩展机制得到的扩展点实例)和SpringExtensionFactory(spring容器维护的bean)完成的。

扩展点自动激活(Activate)

对于集合类扩展点,比如: Filter , InvokerListener , ExportListener , TelnetHandler , StatusChecker 等,可以同时加载多个实现,可以用自动激活根据条件来自动加载。

ExtensionLoader在通过loadFile()加载扩展配置时会缓存带有注解@Activate的类的相关信息,使用getActivateExtension(URL url, String[] values, String group)方法获取条件相匹配的扩展实现。