Dubbo(五):自定义扩展

Posted by ZhouJ000 on February 20, 2019

Dubbo(一):从dubbo-demo初探dubbo源码
Dubbo(二):架构与SPI
Dubbo(三):高并发下降级与限流
Dubbo(四):其他常用特性
Dubbo(五):自定义扩展

核心流程

首先看一下dubbo官网给出的dubbo层级图,dubbo的分层非常清晰: dubbo-framework

大致注册流程
在Provider端启动后,由spring加载xml或注解配置的dubbo bean,在ServiceBean(ServiceConfig)中执行export方法。根据Protocol(dubbo)做对应的导出,创建URL对象,ProxyFactory使用Javassist动态代理,包装成Invoker对象。使用RegistryProtocol去导出,生成注册URL,ProtocolFilterWrapper添加Filter调用链,最后通过DubboProtocol去导出DubboExporter并放入本地exporterMap中。然后分别经过Exchange层、Transport层创建启动Netty,使用ExchangeHandler监听Netty事件。再通过ZooKeeper注册服务URL,并监听接收推送。最后将Exporter和URL存入本地list中

Interface -> Implement -> Proxy(ProxyFactory) -> Invoker -> Protocol -> Exporter

大致订阅流程
在Consumer端启动后,由spring加载xml或注解配置的dubbo bean,在ReferenceBean(ReferenceConfig)中执行getObject方法触发init初始化。执行createProxy方法创建代理对象,因为非本地所以通过loadRegistries方法从ZooKeeper拉取注册中心URLs。如果URL大于1条,说明有多个注册中心,执行每个Protocol.refer得到Invoker集合,然后通过Cluster使用join方法通过Directory包装为一个Invoker(xxxClusterInvoker)返回;否则只有1条直接通过RegistryProtocol执行refer方法获取一个Invoker{具体为先通过group分组获取Cluster,创建RegistryDirectory,使用Registry(ZookeeperRegistry)将订阅URL注册到ZooKeeper并添加监听器,RegistryDirectory也进行订阅(providers、configurators、routers),在notify方法中处理则三个zk节点下的URLs,创建InvokerDelegate包装Protocol.refer生成的Invoker,ProtocolFilterWrapper添加Filter调用链,最后通过DubboProtocol创建serviceKey(port+serviceName+version+group),创建DubboInvoker并放入invokers集合,最后也是调用cluster.join包装返回}。最后用proxyFactory生成代理,依旧使用Javassist动态代理Invoker

Interface -> Proxy(ProxyFactory) <- xxxClusterInvoker(Directory(Invokers、URLs、Routers)) <- DubboInvoker <- Protocol

大致调用流程
1、生产者和消费者都是通过JavassistProxyFactory动态代理生成代理:return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)),因此在消费者实际调用的时候,就是执行InvokerInvocationHandler的invoke方法,根据是否异步来创建RpcInvocation。之前会在MockClusterInvoker的invoke方法里判断mock处理,正常情况下继续调用Invoker.invoker。比如根据缺省容错方案(FailoverClusterInvoker),在循环重试次数retries下,通过Directory获取invoker集合,再通过Router路由选择相应invoker集合。通过LoadBalance获得一个Invoker,调用这个Invoker的invoke方法,处理Filter链,最后就是DubboInvoker。执行doInvoke方法选择一个ExchangeClient去远程调用(单向、同步、异步)得到Result(Future)对象。具体是通过Exchange层通过Codec编码序列化后通过NettyClient发送request请求到服务提供者

Interface -> Proxy -> InvokerInvocationHandler -> xxxClusterInvoker(Directory、Router、LoadBalance) -> Filter链 -> DubboInvoker -> ExchangeClient -> Codec -> Serialization -> Transporter层(Netty)

2、服务提供者NettyClient接收请求后,Codec解码反序列化。从线程池取出一个线程去处理,通过Exchange层的HeaderExchangeHandler的received方法,将message封装为Request,通过CompletableFuture<Object> future = handler.reply(channel, msg)调用到DubboProtocol中的ExchangeHandler,前面说到用来监听处理事件。在其reply方法中查找提供端请求对应的Invoker,先拿到DubboExporter,然后处理Filter链,最后使用AbstractProxyInvoker调用具体方法。得到Result结果后,根据是否是AsyncRpcResult包装返回,在HeaderExchangeHandler封装为Response对象,通过ExchangeChannel的send方法发回

Transporter层(Netty) -> Codec -> Serialization -> ExchangeHandler -> DubboExporter -> Protocol -> Filter链 -> Invoker -> Proxy -> Implement -> Result -> Codec -> Serialization -> Transporter层(NettyClient)

3、同样服务消费者在NettyClient接收Request后,Codec解码反序列化。由于之前的Result(Future)对象一直卡在get(timeout)方法上不断循环获取,这时Response就能取得结果并返回

Transporter层(Netty) -> Codec -> Serialization -> Future.get -> … -> 调用者

自定义扩展

由于Dubbo的SPI机制,只要接口有@SPI注解的接口类就会去查找扩展点实现,会依次从三个路径文件中读取扩展点,比如放在优先级最高的META-INF\dubbo\internal下。通过ExtensionLoader加载扩展点,如果加载的扩展点有拷贝构造函数,则会判定为Wrapper自动包装在真正的扩展点外面,并返回Wrapper类的实例

Filter链

服务提供方和服务消费方调用过程拦截,Dubbo本身的大多功能均基于此扩展点实现,每次远程方法执行,该拦截都会被执行,可以在请求处理前或者处理后做一些通用的逻辑,不过需要注意对性能的影响

约定:
1、用户自定义filter默认在内置filter之后
2、特殊值default,表示缺省扩展点插入的位置。比如:filter="xxx,default,yyy",表示xxx在缺省filter之前,yyy在缺省filter之后
3、特殊符号-,表示剔除。比如:filter="-foo1",剔除添加缺省扩展点foo1。比如:filter="-default",剔除添加所有缺省扩展点
4、provider和service同时配置的filter时,累加所有filter,而不是覆盖。比如:<dubbo:provider filter="xxx,yyy"/><dubbo:service filter="aaa,bbb" />,则xxx,yyy,aaa,bbb均会生效。如果要覆盖,需配置:<dubbo:service filter="-xxx,-yyy,aaa,bbb" />

扩展方法:
1、实现接口Filter
2、扩展配置有两种方法,第一种依赖配置

<!-- 消费方调用过程拦截 -->
<dubbo:reference filter="xxx,yyy" />
<!-- 消费方调用过程缺省拦截器,将拦截所有reference -->
<dubbo:consumer filter="xxx,yyy"/>

<!-- 提供方调用过程拦截 -->
<dubbo:service filter="xxx,yyy" />
<!-- 提供方调用过程缺省拦截器,将拦截所有service -->
<dubbo:provider filter="xxx,yyy"/>

在META-INF/dubbo/org.apache.dubbo.rpc.Filter文本文件里配置:
xxx=com.xxx.XxxFilter

2、第二种方法依赖@Activate注解,dubbo将自动加载

dubbo提供了许多原生Filter,其入口是在ProtocolFilterWrapper,因为它是Protocol的包装类,所以会在加载的Extension的时候被自动包装进来(Dubbo的SPI机制),看一下源码:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
	// 向注册中心registry的时候,不会进行filter调用链
	if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
		return protocol.export(invoker);
	}
	return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
	// 向注册中心registry的时候,不会进行filter调用链
	if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
		return protocol.refer(type, url);
	}
	return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
	Invoker<T> last = invoker;
	// 获得所有激活的Filter(已经排好序的)
	List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
	if (!filters.isEmpty()) {
		for (int i = filters.size() - 1; i >= 0; i--) {
			final Filter filter = filters.get(i);
			// 复制引用,构建filter调用链
			final Invoker<T> next = last;
			// 这里只是构造一个最简化的Invoker作为调用链的载体Invoker
			last = new Invoker<T>() {

				@Override
				public Class<T> getInterface() {
					return invoker.getInterface();
				}

				@Override
				public URL getUrl() {
					return invoker.getUrl();
				}

				@Override
				public boolean isAvailable() {
					return invoker.isAvailable();
				}

				@Override
				public Result invoke(Invocation invocation) throws RpcException {
					return filter.invoke(next, invocation);
				}

				@Override
				public void destroy() {
					invoker.destroy();
				}

				@Override
				public String toString() {
					return invoker.toString();
				}
			};
		}
	}
	return last;
}

创建调用链过程非常简单,重点是如何获取filters集合的,查看ExtensionLoader代码:

public List<T> getActivateExtension(URL url, String key, String group) {
	String value = url.getParameter(key);
	return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}

public List<T> getActivateExtension(URL url, String[] values, String group) {
	List<T> exts = new ArrayList<T>();
	// 所有用户自己配置的filter信息(有些Filter是默认激活的,有些是配置激活的,这里这里的names指的配置激活的filter信息)
	List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
	// 如果这些名称里不包括去除default的标志(-default),换言之就是加载Dubbo提供的默认Filter
	if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
		// 加载extension信息
		getExtensionClasses();
		for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
			// name指的是SPI读取的配置文件的key
			String name = entry.getKey();
			Object activate = entry.getValue();
			// 获取Activate注解
			String[] activateGroup, activateValue;
			if (activate instanceof Activate) {
				activateGroup = ((Activate) activate).group();
				activateValue = ((Activate) activate).value();
			} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
				activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
				activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
			} else {
				continue;
			}
			// 区分在Provider端生效还是在consumer端生效
			if (isMatchGroup(group, activateGroup)) {
				T ext = getExtension(name);
				// 分别判断:
				// 1.用户配置的filter列表中不包含当前ext
				// 2.用户配置的filter列表中不包含当前ext的加-的key
				// 3.如果用户的配置信息(url中)中有可以激活的配置key并且数据不为0,false,null,N/A,也就是说有正常的使用
				if (!names.contains(name)
						&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
						&& isActive(activateValue, url)) {
					exts.add(ext);
				}
			}
		}
		// 根据Activate注解上的order排序
		Collections.sort(exts, ActivateComparator.COMPARATOR);
	}
	// 到此,Dubbo提供的原生的Filter已经被添加完毕了,下面处理用户自己扩展的Filter
	List<T> usrs = new ArrayList<T>();
	for (int i = 0; i < names.size(); i++) {
		String name = names.get(i);
		// 如果单个name不是以-开头并且所有的key里面并不包含-'name'(也就是说如果配置成了"dubbo,-dubbo"这种场景进不去条件)
		if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
				&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
			// 可以通过default关键字替换Dubbo原生的Filter链,主要用来控制调用链顺序
			if (Constants.DEFAULT_KEY.equals(name)) {
				if (!usrs.isEmpty()) {
					exts.addAll(0, usrs);
					usrs.clear();
				}
			} else {
				// 加入用户自己定义的扩展Filter
				T ext = getExtension(name);
				usrs.add(ext);
			}
		}
	}
	if (!usrs.isEmpty()) {
		exts.addAll(usrs);
	}
	return exts;
}

举几个原生Filter例子:
Cunsumer端
1、ConsumerContextFilter:将客户端设置的隐式参数传递给服务端。在客户端调用Invoker.invoke方法时候,会去取当前状态记录器RpcContext中的attachments属性,然后设置到RpcInvocation对象中,在RpcInvocation传递到provider的时候会通过另外一个过滤器ContextFilter将RpcInvocation对象重新设置回RpcContext中供服务端逻辑重新获取隐式参数。因此RpcContext只能记录一次请求的状态信息,因为在第二次调用的时候参数已经被新的RpcInvocation覆盖掉 2、ActiveLimitFilter:当配置了actives并且值不为0的时候触发。限制同一个客户端对于一个服务端方法的并发调用量
3、FutureFilter:处理3种事件信息,上一篇文章有讲到事件通知,就是这里实现的
Provider端
1、ContextFilter:与ConsumerContextFilter是结合使用,剔除核心信息后重新将invocation和attachments信息设置到RpcContext,这里设置以后provider的代码就可以获取到consumer端传递的一些隐式参数了
2、EchoFilter:回响测试主要用来检测服务是否正常(网络状态)
3、ExecuteLimitFilter:服务端接口限制限流的具体执行逻辑就是在ExecuteLimitFilter中,因为服务端不需要考虑重试等待逻辑,一旦当前执行的线程数量大于指定数量,就直接返回失败了
4、ExceptionFilter:对于异常的处理规则,除了一些异常外,都包装成RuntimeException然后抛出(避免异常在Client出不能反序列化问题)

其他

偶尔会用到的扩展,其实由于SPI,套路都一样,实现接口后在META-INF/dubbo/下对应文件里配置即可:

协议扩展:实现接口Protocol(export、refer)、Exporter(构造方法、unexport)、Invoker(构造方法、doInvoke),比如DubboProtocol、DubboExporter、DubboInvoker

引用监听扩展:当有服务引用时,触发该事件。实现接口InvokerListener(referred、destroyed)
暴露监听扩展:当有服务暴露时,触发该事件。实现接口ExporterListener(exported、unexported)

集群扩展:当有多个服务提供方时,将多个服务提供方组织成一个集群,并伪装成一个提供方。已有6种集群容错方案,实现接口Cluster(join)
路由扩展:从多个服务提者方中选择一个进行调用。实现接口RouterFactory(getRouter)、Router(route)
负载均衡扩展:从多个服务提者方中选择一个进行调用。实现LoadBalance接口(select)

序列化扩展:实现接口Serialization(serialize、deserialize)、ObjectInput、ObjectOutput

容器扩展:服务容器扩展,用于自定义加载内容。实现接口Container(start、stop),比如SpringContainer、JettyContainer、Log4jContainer

Telnet命令

有的时候所开发的项目属于底层服务,遇到问题的时候需要上层项目复杂的操作,这个时候可以到服务器上直接调用接口,查看返回结果来初步判断问题

首先连接:telnet localhost 20880

dubbo有一些内建的telnet命令(也可以扩展),常用的有:

ls命令:
ls: 				显示服务列表
ls -l: 				显示服务详细信息列表
ls XxxService: 		显示服务的方法列表
ls -l XxxService: 	显示服务的方法详细信息列表

ps命令:
ps: 			显示服务端口列表
ps -l: 			显示服务地址列表
ps 20880: 		显示端口上的连接信息
ps -l 20880: 	显示端口上的连接详细信息

invoke命令:
invoke xxxService.xxxMethod({"prop": "value"}): 	调用服务的方法
invoke xxxService.xxxMethod({""name":"xx","version":"xx","class":"xx.xx.Xxx"})  调用服务方法
invoke xxxMethod({"prop": "value"}): 				调用服务的方法(自动查找包含此方法的服务)

trace命令:
trace XxxService: 				跟踪 1 次服务任意方法的调用情况
trace XxxService 10: 			跟踪 10 次服务任意方法的调用情况
trace XxxService xxxMethod: 	跟踪 1 次服务方法的调用情况
trace XxxService xxxMethod 10: 	跟踪 10 次服务方法的调用情况

其他还有:cd、pwd、count、select 、status、log 、shutdown、help、clear、exit