- url --> Invoker,使用Protocol#refer()
- Invoker --> proxy(代理了远程通信等逻辑),使用ProxyFactory#getProxy()
暴露服务的逻辑在ReferenceConfig#createProxy(),ReferenceConfig表示<dubbo:reference />
if (urls.size() == 1) { invoker = refprotocol.refer(interfaceClass, urls.get(0));} else { // 多个urls,循环产生Invoker,并最终用Cluster合并成一个Invoker,变成一个集群Invoker List> invokers = new ArrayList >(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // use last registry url // 如果有多个注册中心,用最后一个 } } if (registryURL != null) { // registry url is available // use AvailableCluster only when register's cluster is available // 对有注册中心的 Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // not a registry url invoker = cluster.join(new StaticDirectory(invokers)); }}
- 直接是提供者,解析出的url是比如:dubbo://service-host/com.foo.FooService?version=1.0.0,根据扩展自适配机制,使用DubboProtocol
- 注册中心,解析出的url是:registry://registry-host/com.alibaba.dubbo.registry.RegistryService?refer=URL.encode("consumer://consumer-host/com.foo.FooService?version=1.0.0"),根据扩展自适配机制,首先使用的RegistryProtocol,在refer()方法里向注册中心订阅提供者信息,订阅到信息之后,根据提供者的url是比如:dubbo://service-host/com.foo.FooService?version=1.0.0,使用DubboProtocol
/** * * @param cluster 自适应扩展实现 * @param registry 注册中心对象 * @param type 服务接口类型 * @param url 注册中心url * @param* @return */private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) { RegistryDirectory directory = new RegistryDirectory (type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map parameters = new HashMap (directory.getUrl().getParameters()); // 订阅url URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // 向注册中心注册自己(服务消费者) if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } // 向注册中心订阅信息,包含提供者、配置、路由 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); Invoker invoker = cluster.join(directory); // 向本地注册表,注册消费者 ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory); return invoker;}
publicInvoker refer(Class serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker invoker = new DubboInvoker (serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker;}/** * 连接远程服务提供者的客户端 * * @param url * @return */private ExchangeClient[] getClients(URL url) { // whether to share connection boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // 配置连接数为0,就使用共享连接, // if not configured, connection is shared, otherwise, one connection for one service if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect) { clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients;}/** * Get shared connection */private ExchangeClient getSharedClient(URL url) { String key = url.getAddress(); ReferenceCountExchangeClient client = referenceClientMap.get(key); if (client != null) { if (!client.isClosed()) { // 计数+1 client.incrementAndGetCount(); return client; } else { referenceClientMap.remove(key); } } synchronized (key.intern()) { ExchangeClient exchangeClient = initClient(url); client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); referenceClientMap.put(key, client); ghostClientMap.remove(key); return client; }}/** * Create new connection */private ExchangeClient initClient(URL url) { // client type setting. // 配置的Transporter的扩展实现,模式为netty String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); // 设置编码器默认为DubboCountCodec,Dubbo SPI 自适应机制会根据这个key找到真正的实现 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // enable heartbeat by default url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO is not allowed since it has severe performance issue. // 是否是支持的Transporter的扩展实现 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client; try { // connection should be lazy if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { client = new LazyConnectExchangeClient(url, requestHandler); } else { client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client;}