微服务A,如何调用微服务B的所有节点?

springcloud微服务,其中A服务部署了一套,B服务部署了两套,A服务如何同时请求两个B服务。使用的是Feign进行服务间的调用,请写出详细的代码实现

阅读 2.6k
2 个回答
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.shared.Application;

import com.netflix.discovery.shared.InstanceInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.client.RestTemplate;

import java.util.List;

@Service
public class ServiceAClient {

    @Autowired
    private EurekaClient eurekaClient;

    @Autowired
    private RestTemplate restTemplate;

    public void callServiceB() {
        Application application = eurekaClient.getApplication("service-b");
        List<InstanceInfo> instances = application.getInstances();

        for (InstanceInfo instance : instances) {
            String url = "http://" + instance.getIPAddr() + ":" + instance.getPort() + "/api/endpoint";
            String response = restTemplate.getForObject(url, String.class);
            // Process the response
        }
    }
}

@Slf4j
@Component
public class FeignConfig implements ApplicationRunner {

    @Target({ElementType.METHOD, ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    private @interface AllInstanceRun {
    }

    private static final ThreadLocal<ServiceInstance> SERVICE_INSTANCE = new ThreadLocal<>();
    @Autowired
    private DiscoveryClient discoveryClient;
    @Autowired
    private ApplicationContext applicationContext;
    /**
     * 获取所有@FeignClient修饰的客户端,根据标记注解@AllInstanceRun判断是否需要进行包装
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        Arrays.stream(applicationContext.getBeanNamesForAnnotation(FeignClient.class))
                .map(name -> applicationContext.getBean(name))
                .forEach(bean -> {
                    if (!Proxy.isProxyClass(bean.getClass())) return;
                    InvocationHandler invocationHandler = Proxy.getInvocationHandler(bean);
                    ConfigurablePropertyAccessor accessor = PropertyAccessorFactory.forDirectFieldAccess(invocationHandler);
                    Map<Method, InvocationHandlerFactory.MethodHandler> dispatch = (Map<Method, InvocationHandlerFactory.MethodHandler>) accessor.getPropertyValue("dispatch");
                    if (dispatch == null) return;
                    dispatch.forEach((method, methodHandler) -> {
                        //先检查方法上是否有标记注解,没有再检查类上是否有标记注解
                        if (method.isAnnotationPresent(AllInstanceRun.class) || method.getDeclaringClass().isAnnotationPresent(AllInstanceRun.class)) {
                            ConfigurablePropertyAccessor methodHandlerAccessor = PropertyAccessorFactory.forDirectFieldAccess(methodHandler);
                            Client loadBalancerClient = (Client) accessor.getPropertyValue("client");
                            //替换负载均衡客户端为原始客户端
                            Client client = unwrapLoadBalancerClientAndProxyClient(loadBalancerClient);
                            methodHandlerAccessor.setPropertyValue("client", client);
                            //对MethodHandler进行代理对同一服务的多个实例进行依次调用
                            InvocationHandlerFactory.MethodHandler doubleExecuteWrapMethodHandler = doubleExecuteWrap(methodHandler);
                            dispatch.put(method, doubleExecuteWrapMethodHandler);
                        }
                    });
                });
    }

    /**
     * 对Client进行代理替换服务名称为实际的ip和端口
     */
    public Client unwrapLoadBalancerClientAndProxyClient(Client client) {
        if (client instanceof FeignBlockingLoadBalancerClient) {
            Client realClient = ((FeignBlockingLoadBalancerClient) client).getDelegate();
            return (Client) Proxy.newProxyInstance(
                    this.getClass().getClassLoader(),
                    new Class[]{Client.class},
                    (proxy, method, args) -> {
                        if ("execute".equals(method.getName())) {
                            ServiceInstance serviceInstance = SERVICE_INSTANCE.get();
                            Request request = (Request) args[0];
                            String realPath = request.url().replace(serviceInstance.getServiceId(), serviceInstance.getHost() + ":" + serviceInstance.getPort());
                            PropertyAccessorFactory.forDirectFieldAccess(request).setPropertyValue("url", realPath);
                        }
                        return method.invoke(realClient, args);
                    }
            );
        }
        return client;
    }

    /**
     * 对MethodHandler进行代理,对同一服务的多个实例进行依次调用
     */
    public InvocationHandlerFactory.MethodHandler doubleExecuteWrap(InvocationHandlerFactory.MethodHandler methodHandler) {
        return (InvocationHandlerFactory.MethodHandler) Proxy.newProxyInstance(
                this.getClass().getClassLoader(),
                new Class[]{InvocationHandlerFactory.MethodHandler.class},
                (proxy, method, args) -> {
                    if (!"invoke".equals(method.getName())) {
                        return method.invoke(methodHandler, args);
                    }
                    Request request = (Request) args[0];
                    final URI originalUri = URI.create(request.url());
                    String serviceId = originalUri.getHost();
                    List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceId);
                    List<Object> responses = new ArrayList<>();
                    for (ServiceInstance serviceInstance : serviceInstances) {//循环调用同一服务的多个实例
                        SERVICE_INSTANCE.set(serviceInstance);
                        try {
                            responses.add(method.invoke(methodHandler, args));
                        } finally {
                            SERVICE_INSTANCE.remove();
                        }
                    }
                    //返回多个结果的第一个结果
                    return responses.stream().findFirst().orElse(null);
                }
        );
    }
}

1.在需要的Feign方法或接口上加上注解@AllInstanceRun便会调用接口对应服务的所有实例。
2.注解加在方法上只对此方法生效,加在类上对所有方法生效。
ps:此代码基于springCloud2021.0.7,负载均衡使用springCloud loadbalancer,如果是比较老的项目使用ribbon只需要调整此处

if (client instanceof FeignBlockingLoadBalancerClient) 

的处理逻辑,从负载均衡包装client拿到实际的client即可。
代码写完没有测试,遇到问题可以留言。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
宣传栏