Dubbo(一):从dubbo-demo初探dubbo源码
Dubbo(二):架构与SPI
Dubbo(三):高并发下降级与限流
Dubbo(四):其他常用特性
Dubbo(五):自定义扩展
核心流程
首先看一下dubbo官网给出的dubbo层级图,dubbo的分层非常清晰:
大致注册流程:
在Provider端启动后,由spring加载xml或注解配置的dubbo bean,在ServiceBean(ServiceConfig)中执行export方法。根据Protocol(dubbo)
做对应的导出,创建URL
对象,ProxyFactory使用Javassist动态代理,包装成Invoker
对象。使用RegistryProtocol去导出,生成注册URL,ProtocolFilterWrapper添加Filter调用链,最后通过DubboProtocol去导出DubboExporter
并放入本地exporterMap中。然后分别经过Exchange层、Transport层
创建启动Netty
,使用ExchangeHandler监听Netty事件。再通过ZooKeeper
注册服务URL,并监听接收推送。最后将Exporter和URL存入本地list中
Interface -> Implement -> Proxy(ProxyFactory) -> Invoker -> Protocol -> Exporter
大致订阅流程:
在Consumer端启动后,由spring加载xml或注解配置的dubbo bean,在ReferenceBean(ReferenceConfig)中执行getObject方法触发init初始化。执行createProxy方法创建代理对象,因为非本地所以通过loadRegistries方法从ZooKeeper
拉取注册中心URLs。如果URL大于1条,说明有多个注册中心,执行每个Protocol.refer得到Invoker集合
,然后通过Cluster
使用join方法通过Directory
包装为一个Invoker(xxxClusterInvoker)
返回;否则只有1条直接通过RegistryProtocol执行refer方法获取一个Invoker
:{
具体为先通过group分组获取Cluster,创建RegistryDirectory
,使用Registry(ZookeeperRegistry)
将订阅URL注册到ZooKeeper并添加监听器,RegistryDirectory也进行订阅(providers、configurators、routers),在notify方法中处理则三个zk节点下的URLs,创建InvokerDelegate
包装Protocol.refer生成的Invoker,ProtocolFilterWrapper添加Filter调用链,最后通过DubboProtocol创建serviceKey(port+serviceName+version+group),创建DubboInvoker
并放入invokers集合,最后也是调用cluster.join包装返回}
。最后用proxyFactory生成代理,依旧使用Javassist动态代理Invoker
Interface -> Proxy(ProxyFactory) <- xxxClusterInvoker(Directory(Invokers、URLs、Routers)) <- DubboInvoker <- Protocol
大致调用流程:
1、生产者和消费者都是通过JavassistProxyFactory动态代理生成代理:return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))
,因此在消费者实际调用的时候,就是执行InvokerInvocationHandler的invoke方法,根据是否异步来创建RpcInvocation
。之前会在MockClusterInvoker的invoke方法里判断mock处理,正常情况下继续调用Invoker.invoker。比如根据缺省容错方案(FailoverClusterInvoker),在循环重试次数retries下,通过Directory
获取invoker集合,再通过Router
路由选择相应invoker集合。通过LoadBalance
获得一个Invoker
,调用这个Invoker的invoke方法,处理Filter链,最后就是DubboInvoker
。执行doInvoke方法选择一个ExchangeClient
去远程调用(单向、同步、异步)得到Result(Future)
对象。具体是通过Exchange层通过Codec编码序列化后通过NettyClient发送request请求到服务提供者
Interface -> Proxy -> InvokerInvocationHandler -> xxxClusterInvoker(Directory、Router、LoadBalance) -> Filter链 -> DubboInvoker -> ExchangeClient -> Codec -> Serialization -> Transporter层(Netty)
2、服务提供者NettyClient接收请求后,Codec解码反序列化。从线程池取出一个线程去处理,通过Exchange层的HeaderExchangeHandler的received方法,将message封装为Request,通过CompletableFuture<Object> future = handler.reply(channel, msg)
调用到DubboProtocol中的ExchangeHandler,前面说到用来监听处理事件。在其reply方法中查找提供端请求对应的Invoker
,先拿到DubboExporter
,然后处理Filter链,最后使用AbstractProxyInvoker调用具体方法。得到Result
结果后,根据是否是AsyncRpcResult包装返回,在HeaderExchangeHandler封装为Response对象,通过ExchangeChannel
的send方法发回
Transporter层(Netty) -> Codec -> Serialization -> ExchangeHandler -> DubboExporter -> Protocol -> Filter链 -> Invoker -> Proxy -> Implement -> Result -> Codec -> Serialization -> Transporter层(NettyClient)
3、同样服务消费者在NettyClient接收Request后,Codec解码反序列化。由于之前的Result(Future)
对象一直卡在get(timeout)方法上不断循环获取,这时Response就能取得结果并返回
Transporter层(Netty) -> Codec -> Serialization -> Future.get -> … -> 调用者
自定义扩展
由于Dubbo的SPI机制,只要接口有@SPI注解的接口类就会去查找扩展点实现,会依次从三个路径文件中读取扩展点,比如放在优先级最高的META-INF\dubbo\internal下。通过ExtensionLoader加载扩展点,如果加载的扩展点有拷贝构造函数,则会判定为Wrapper自动包装在真正的扩展点外面,并返回Wrapper类的实例
Filter链
服务提供方和服务消费方调用过程拦截,Dubbo本身的大多功能均基于此扩展点实现,每次远程方法执行,该拦截都会被执行,可以在请求处理前或者处理后做一些通用的逻辑,不过需要注意对性能的影响
约定:
1、用户自定义filter默认在内置filter之后
2、特殊值default
,表示缺省扩展点插入的位置。比如:filter="xxx,default,yyy"
,表示xxx在缺省filter之前,yyy在缺省filter之后
3、特殊符号-
,表示剔除。比如:filter="-foo1"
,剔除添加缺省扩展点foo1。比如:filter="-default"
,剔除添加所有缺省扩展点
4、provider和service同时配置的filter时,累加所有filter,而不是覆盖。比如:<dubbo:provider filter="xxx,yyy"/>
和<dubbo:service filter="aaa,bbb" />
,则xxx,yyy,aaa,bbb均会生效。如果要覆盖,需配置:<dubbo:service filter="-xxx,-yyy,aaa,bbb" />
扩展方法:
1、实现接口Filter
2、扩展配置有两种方法,第一种依赖配置
<!-- 消费方调用过程拦截 -->
<dubbo:reference filter="xxx,yyy" />
<!-- 消费方调用过程缺省拦截器,将拦截所有reference -->
<dubbo:consumer filter="xxx,yyy"/>
<!-- 提供方调用过程拦截 -->
<dubbo:service filter="xxx,yyy" />
<!-- 提供方调用过程缺省拦截器,将拦截所有service -->
<dubbo:provider filter="xxx,yyy"/>
在META-INF/dubbo/org.apache.dubbo.rpc.Filter文本文件里配置:
xxx=com.xxx.XxxFilter
2、第二种方法依赖@Activate注解,dubbo将自动加载
dubbo提供了许多原生Filter,其入口是在ProtocolFilterWrapper,因为它是Protocol的包装类,所以会在加载的Extension的时候被自动包装进来(Dubbo的SPI机制),看一下源码:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 向注册中心registry的时候,不会进行filter调用链
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 向注册中心registry的时候,不会进行filter调用链
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 获得所有激活的Filter(已经排好序的)
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
// 复制引用,构建filter调用链
final Invoker<T> next = last;
// 这里只是构造一个最简化的Invoker作为调用链的载体Invoker
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
创建调用链过程非常简单,重点是如何获取filters集合的,查看ExtensionLoader代码:
public List<T> getActivateExtension(URL url, String key, String group) {
String value = url.getParameter(key);
return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
// 所有用户自己配置的filter信息(有些Filter是默认激活的,有些是配置激活的,这里这里的names指的配置激活的filter信息)
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
// 如果这些名称里不包括去除default的标志(-default),换言之就是加载Dubbo提供的默认Filter
if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
// 加载extension信息
getExtensionClasses();
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
// name指的是SPI读取的配置文件的key
String name = entry.getKey();
Object activate = entry.getValue();
// 获取Activate注解
String[] activateGroup, activateValue;
if (activate instanceof Activate) {
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
} else {
continue;
}
// 区分在Provider端生效还是在consumer端生效
if (isMatchGroup(group, activateGroup)) {
T ext = getExtension(name);
// 分别判断:
// 1.用户配置的filter列表中不包含当前ext
// 2.用户配置的filter列表中不包含当前ext的加-的key
// 3.如果用户的配置信息(url中)中有可以激活的配置key并且数据不为0,false,null,N/A,也就是说有正常的使用
if (!names.contains(name)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activateValue, url)) {
exts.add(ext);
}
}
}
// 根据Activate注解上的order排序
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
// 到此,Dubbo提供的原生的Filter已经被添加完毕了,下面处理用户自己扩展的Filter
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
// 如果单个name不是以-开头并且所有的key里面并不包含-'name'(也就是说如果配置成了"dubbo,-dubbo"这种场景进不去条件)
if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
// 可以通过default关键字替换Dubbo原生的Filter链,主要用来控制调用链顺序
if (Constants.DEFAULT_KEY.equals(name)) {
if (!usrs.isEmpty()) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
// 加入用户自己定义的扩展Filter
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (!usrs.isEmpty()) {
exts.addAll(usrs);
}
return exts;
}
举几个原生Filter例子:
Cunsumer端:
1、ConsumerContextFilter:将客户端设置的隐式参数传递给服务端。在客户端调用Invoker.invoke方法时候,会去取当前状态记录器RpcContext中的attachments属性,然后设置到RpcInvocation对象中,在RpcInvocation传递到provider的时候会通过另外一个过滤器ContextFilter将RpcInvocation对象重新设置回RpcContext中供服务端逻辑重新获取隐式参数。因此RpcContext只能记录一次请求的状态信息,因为在第二次调用的时候参数已经被新的RpcInvocation覆盖掉 2、ActiveLimitFilter:当配置了actives并且值不为0的时候触发。限制同一个客户端对于一个服务端方法的并发调用量
3、FutureFilter:处理3种事件信息,上一篇文章有讲到事件通知,就是这里实现的
Provider端:
1、ContextFilter:与ConsumerContextFilter是结合使用,剔除核心信息后重新将invocation和attachments信息设置到RpcContext,这里设置以后provider的代码就可以获取到consumer端传递的一些隐式参数了
2、EchoFilter:回响测试主要用来检测服务是否正常(网络状态)
3、ExecuteLimitFilter:服务端接口限制限流的具体执行逻辑就是在ExecuteLimitFilter中,因为服务端不需要考虑重试等待逻辑,一旦当前执行的线程数量大于指定数量,就直接返回失败了
4、ExceptionFilter:对于异常的处理规则,除了一些异常外,都包装成RuntimeException然后抛出(避免异常在Client出不能反序列化问题)
其他
偶尔会用到的扩展,其实由于SPI,套路都一样,实现接口后在META-INF/dubbo/下对应文件里配置即可:
协议扩展:实现接口Protocol(export、refer)、Exporter(构造方法、unexport)、Invoker(构造方法、doInvoke),比如DubboProtocol、DubboExporter、DubboInvoker
引用监听扩展:当有服务引用时,触发该事件。实现接口InvokerListener(referred、destroyed)
暴露监听扩展:当有服务暴露时,触发该事件。实现接口ExporterListener(exported、unexported)
集群扩展:当有多个服务提供方时,将多个服务提供方组织成一个集群,并伪装成一个提供方。已有6种集群容错方案,实现接口Cluster(join)
路由扩展:从多个服务提者方中选择一个进行调用。实现接口RouterFactory(getRouter)、Router(route)
负载均衡扩展:从多个服务提者方中选择一个进行调用。实现LoadBalance接口(select)
序列化扩展:实现接口Serialization(serialize、deserialize)、ObjectInput、ObjectOutput
容器扩展:服务容器扩展,用于自定义加载内容。实现接口Container(start、stop),比如SpringContainer、JettyContainer、Log4jContainer
Telnet命令
有的时候所开发的项目属于底层服务,遇到问题的时候需要上层项目复杂的操作,这个时候可以到服务器上直接调用接口,查看返回结果来初步判断问题
首先连接:telnet localhost 20880
dubbo有一些内建的telnet命令(也可以扩展),常用的有:
ls命令:
ls: 显示服务列表
ls -l: 显示服务详细信息列表
ls XxxService: 显示服务的方法列表
ls -l XxxService: 显示服务的方法详细信息列表
ps命令:
ps: 显示服务端口列表
ps -l: 显示服务地址列表
ps 20880: 显示端口上的连接信息
ps -l 20880: 显示端口上的连接详细信息
invoke命令:
invoke xxxService.xxxMethod({"prop": "value"}): 调用服务的方法
invoke xxxService.xxxMethod({""name":"xx","version":"xx","class":"xx.xx.Xxx"}) 调用服务方法
invoke xxxMethod({"prop": "value"}): 调用服务的方法(自动查找包含此方法的服务)
trace命令:
trace XxxService: 跟踪 1 次服务任意方法的调用情况
trace XxxService 10: 跟踪 10 次服务任意方法的调用情况
trace XxxService xxxMethod: 跟踪 1 次服务方法的调用情况
trace XxxService xxxMethod 10: 跟踪 10 次服务方法的调用情况
其他还有:cd、pwd、count、select 、status、log 、shutdown、help、clear、exit