手写一个网关服务,理解更透彻!

411次阅读  |  发布于2年以前

背景介绍

我们在工作中经常会需要处理http请求,通常都是基于SpringBoot应用直接接受外界的http请求,就如同下方的流程图所示:

但是随着后台应用的增加,可以调用的节点数目也慢慢变多,因此这个时候就需要有一个路由的角色可以帮助用户将请求转发到不同的机器节点上边。

其中扮演这个转发功能的角色我们通常可以称之为网关

在如今许多互联网公司都在推崇的微服务架构中,网关更是扮演着一个非常重要的角色。网关旨在为微服务架构提供一种简单而有效的统一的API路由管理方式。

在微服务架构中, 不同的微服务可以有不同的网络地址,各个微服务之间通过相互调用完成用户请求,客户端可能通过调用N个微服务的接口完成一个用户请求。

思考:微服务网关的上游会是什么?

下边我画了一张架构图,这是大部分互联网公司如今在建设微服务系统时候会采用的结构设计:

在流量抵达的最外层通常会选择使用LVS作为负载服务器,LVS是一种基于四层负载的高性能服务器,它的内部只会对外界的数据包进行分发处理,通常一台高性能的LVS机器就能支持百万的并发连接。为了保证LVS的高可用,通常LVS会部署多个节点,形成主从关系,且主从节点之间通过keepalived保持探活机制。

在LVS的下游会部署多套nginx环境,不同的nginx会处理不同业务部门的流量转发,nginx和LVS的不同点在于,nginx属于七层负载均衡,虽然说它的效率没有四层那么高,但是它可以支持根据不同请求来源的域名,api进行更详细的转发,实现下游的负载均衡,从而提升整体的吞吐量。

既然有了nginx,为什么还需要有gateway呢?

gateway同样也是具备有nginx的各种特性,虽然说其性能可能没有nginx那么高效,但是他所支持的功能非常强大。gateway常见功能有以下几点:

nginx是一款高性能的web服务器,但是如果希望实现上述的这些个性化功能需要开发相关功能的人员熟悉C语言和lua脚本,同时懂得这些模块的性能调优。而如今大部分互联网的微服务应用都会选择采用Java或者Go语言去编写,Gateway这块大部分定制开发也需要和这些采用Java或者Go语言编写的服务进行“交流”,所以两者之间的语言保持相同会比较合适。

另外从企业的角度来说,如今的市场中,招聘一个懂Java或者Go的程序员大概率要比招聘一个懂C和lua的程序员的成本更低。

当然,我也并不是说gateway一定需要部署在nginx之后,个性化功能一定不能写在nginx当中,例如下游系统是一些企业内部的管理系统,此时采用nginx就基本足够。但是当面对一个高流量访问的c端应用时,此时加入一个gateway会更加合适一些。这些具体的实现还是得结合实际业务场景来说。

手写实现一款简易版本的网关服务

其实网关的本质就是一个Web Servlet容器,然后在servlet的内部做了许多过滤规则和转发请求。在开始开发之前,我们需要先了解下servlet的一些知识点:

在Servlet的生命周期中,仅执行一次init()方法,它是在服务器装入Servlet 时执行的,可以配置服务器,以在启动服务器或客户机首次访问Servlet时装入 Servlet。无论有多少客户机访问Servlet,都不会重复执行init()

它是Servlet的核心,每当一个客户请求一个HttpServlet对象,该对象的 Service()方法就要调用,而且传递给这个方法一个“请求”(ServletRequest) 对象和一个“响应”(ServletResponse)对象作为参数。在HttpServlet中已存 在Service()方法。默认的服务功能是调用与HTTP请求的方法相应的do功能。

仅执行一次,在服务器端停止且卸载Servlet时执行该方法,有点类似于C++的 delete方法。一个Servlet在运行service()方法时可能会产生其他的线程,因 此需要确认在调用destroy()方法时,这些线程已经终止或完成。

小结

同个tomcat中,servlet默认以单例形式存在,但是在执行service函数的时候可能会有多个线程并发访问。

前边介绍了相关的背景知识,接下来我们直接开始上干货:

整套网关的结构分为了两个模块,一个是core(核心层),一个是starter(接入层)。

首先是GatewayCoreServlet部分:

package org.idea.qiyu.framework.gateway.core.core;
import org.idea.qiyu.framework.gateway.starter.registry.ApplicationChangeEvent;
import org.idea.qiyu.framework.gateway.starter.registry.ApplicationRegistry;
import org.idea.qiyu.framework.gateway.starter.registry.URL;
import org.idea.qiyu.framework.gateway.core.utils.GatewayLocalCache;
import org.jboss.netty.util.internal.ThreadLocalRandom;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationListener;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import javax.servlet.ServletInputStream;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.stream.Collectors;
/**
 * 单例对象
 *
 * @Author linhao
 * @Date created in 8:55 上午 2022/4/13
 */
@WebServlet(name = "GatewayServlet", urlPatterns = "/*")
public class GatewayCoreServlet extends HttpServlet implements InitializingBean, ApplicationListener<ApplicationChangeEvent> {
    private RestTemplate restTemplate = new RestTemplate();


    @Resource
    private ApplicationRegistry applicationRegistry;
    private static GatewayLocalCache gatewayLocalCache = new GatewayLocalCache();
    @Override
    public void init() {
        List<URL> urls = applicationRegistry.getRegistryInfo();
        if (urls.size() == 0) {
            return;
        }
        Map<String, List<URL>> map = urls.stream().collect(Collectors.groupingBy(URL::getApplicationName));
        GatewayLocalCache.gatewayLocalCache = map;
    }
    /**
     * 网关的核型请求都会被路由到这里的service函数中,然后在这里进行http的转发
     * 请求进入到servlet内部之后会有线程安全问题,另外注意下:每个servlet在tomcat内部是单例的存在
     *
     * @param req
     * @param resp
     */
    @Override
    protected void service(HttpServletRequest req, HttpServletResponse resp) throws IOException {
        String targetURL = req.getRequestURI();
        if (StringUtils.isEmpty(targetURL)) {
            return;
        }
        if ("/favicon.ico".equals(targetURL)) {
            return;
        }
        Map<String, String> tempMap = MatchApplicationInfo.buildTempMapping();
        String applicationName = null;
        //匹配前缀
        for (String mappingStr : tempMap.keySet()) {
            if (targetURL.contains(mappingStr)) {
                applicationName = tempMap.get(mappingStr);
                break;
            }
        }
        if (applicationName == null) {
            return;
        }
        List<URL> urls = GatewayLocalCache.get(applicationName);
        if (urls == null || urls.size() == 0) {
            return;
        }
        int index = new ThreadLocalRandom().nextInt(urls.size());
        URL url = urls.get(index);
        String prefix = "http://" + url.getAddress() + ":" + url.getPort();
        targetURL = prefix + targetURL;
        String method = req.getMethod();
        HttpMethod httpMethod = HttpMethod.resolve(method);
        //1、封装请求头
        MultiValueMap<String, String> headers = createRequestHeaders(req);
        //2、封装请求体
        byte[] body = createRequestBody(req);
        //3、构造出RestTemplate能识别的RequestEntity
        RequestEntity requestEntity = null;
        try {
            requestEntity = new RequestEntity<byte[]>(body, headers, httpMethod, new URI(targetURL));
            //转发到下游服务
            ResponseEntity responseEntity = restTemplate.exchange(requestEntity, byte[].class);
            Object respB = responseEntity.getBody();
            if (respB != null) {
                byte[] respByte = (byte[]) respB;
                resp.setCharacterEncoding("UTF-8");
                resp.setHeader("content-type", "application/json;charset=UTF-8");
                resp.getWriter().print(new String(respByte,"UTF-8"));
                resp.getWriter().flush();
            }
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }
    private byte[] createRequestBody(HttpServletRequest request) throws IOException {
        ServletInputStream servletInputStream = request.getInputStream();
        return StreamUtils.copyToByteArray(servletInputStream);
    }
    private MultiValueMap<String, String> createRequestHeaders(HttpServletRequest request) {
        HttpHeaders headers = new HttpHeaders();
        List<String> headerNames = Collections.list(request.getHeaderNames());
        for (String headerName : headerNames) {
            String headerVal = request.getHeader(headerName);
            headers.put(headerName, Collections.singletonList(headerVal));
        }
        return headers;
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        applicationRegistry.subscribeURL();
    }

    //当有新节点部署好了之后,会通知到这里
    @Override
    public void onApplicationEvent(ApplicationChangeEvent applicationChangeEvent) {
        System.out.println(">>>>>>>>> [applicationChangeEvent] >>>>>>>>> ");
        this.init();
    }
}

上边的GatewayCoreServlet类是负责接收所有外界的http请求,然后将其转发到下游的具体服务中。

这个GatewayCoreServlet类里涉及到了一个叫做MatchApplicationInfo的对象,这个对象内部存储着不同的url映射不同微服务的规则,这里我简单用了一个map集合进行管理:

public class MatchApplicationInfo {


    public static Map<String,String> buildTempMapping(){
        Map<String,String> temp = new HashMap<>();
        temp.put("/api/user","user-web-application");
        return temp;
    }
}

GatewayLocalCache对象是一个本地缓存,其内部具体代码如下:

public class GatewayLocalCache {
    public static Map<String, List<URL>> gatewayLocalCache = new ConcurrentHashMap<>();

    public static void put(String applicationName, List<URL> url) {
        gatewayLocalCache.put(applicationName,url);
    }

    public static List<URL> get(String applicationName){
        return gatewayLocalCache.get(applicationName);
    }
}

这个Cache内部存储了一张Map,Map的key就是MatchApplicationInfo对象中存储的路由映射key,也就是“/api/user”。Map的value是一个URL集合,这里我解释下URL集合的设计概念。

每个微服务应用都会有它专门的ip+端口+应用名称+注册时间+权重+是否暴露服务给gateway的这几项属性,于是我将它们统一放在了一个URL对象当中方便管理。

package org.idea.qiyu.framework.gateway.starter.registry;
import java.util.Objects;
/**
 * @Author linhao
 * @Date created in 8:55 上午 2022/4/16
 */
public class URL {
    private String applicationName;
    private String address;
    private Integer port;
    private Long registryTime;
    private Integer status;
    private Integer weight;
    public Integer getWeight() {
        return weight;
    }
    public void setWeight(Integer weight) {
        this.weight = weight;
    }
    public Integer getStatus() {
        return status;
    }
    public void setStatus(Integer status) {
        this.status = status;
    }
    public String getApplicationName() {
        return applicationName;
    }
    public void setApplicationName(String applicationName) {
        this.applicationName = applicationName;
    }
    public String getAddress() {
        return address;
    }
    public void setAddress(String address) {
        this.address = address;
    }
    public Integer getPort() {
        return port;
    }
    public void setPort(Integer port) {
        this.port = port;
    }
    public Long getRegistryTime() {
        return registryTime;
    }
    public void setRegistryTime(Long registryTime) {
        this.registryTime = registryTime;
    }
    public String buildURLStr() {
        String urlStr = this.getAddress() + ":" + this.getApplicationName() + ":" + this.getPort() + ":" + this.getStatus() + ":" + this.getRegistryTime() + ":" + this.getWeight();
        return urlStr;
    }
    public static URL buildURL(String url) {
        if (url == null || url == "") {
            return null;
        }
        String[] urlArr = url.split(":");
        URL result = new URL();
        result.setAddress(urlArr[0]);
        result.setApplicationName(urlArr[1]);
        result.setPort(Integer.valueOf(urlArr[2]));
        result.setStatus(Integer.valueOf(urlArr[3]));
        result.setRegistryTime(Long.valueOf(urlArr[4]));
        result.setWeight(Integer.valueOf(urlArr[5]));
        return result;
    }
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        URL url = (URL) o;
        return url.getApplicationName().equals(this.applicationName)
                && url.getRegistryTime().equals(this.registryTime)
                && url.getStatus().equals(this.status)
                && url.getPort().equals(this.port)
                && url.getAddress().equals(this.address);
    }
    @Override
    public int hashCode() {
        return Objects.hash(applicationName, address, port, registryTime, status);
    }
}

GatewayCoreServlet如何知道下游会有哪些服务注册了呢?所以我们还需要有一个接入层给到各个下游服务使用,当具体的应用服务启动之后往注册中心zookeeper做上报,然后通知到GateWay那边,下边来看代码:

ZookeeperRegistry这个类负责将需要注册的应用给上报到gateway中:

package org.idea.qiyu.framework.gateway.starter.registry;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.idea.qiyu.framework.gateway.starter.registry.zookeeper.AbstractZookeeperClient;
import org.idea.qiyu.framework.gateway.starter.registry.zookeeper.CuratorZookeeperClient;
import org.idea.qiyu.framework.gateway.starter.GatewayProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
 * @Author linhao
 * @Date created in 8:35 上午 2022/4/16
 */
@Component
public class ZookeeperRegistry implements ApplicationRegistry{
    private static AbstractZookeeperClient abstractZookeeperClient;
    private static String ROOT = "/gateway";
    @Resource
    private GatewayProperties gatewayProperties;
    @Resource
    private ApplicationContext applicationContext;
    private AbstractZookeeperClient getClient(){
        if(abstractZookeeperClient==null){
            abstractZookeeperClient = new CuratorZookeeperClient(gatewayProperties.getRegistryAddress());
        }
        return abstractZookeeperClient;
    }
    @Override
    public void registry(URL url) {
        String nodeAddress = ROOT+"/"+url.getAddress()+"_"+url.getPort();
        if(getClient().existNode(nodeAddress)){
            getClient().deleteNode(nodeAddress);
        }
        getClient().createTemporaryData(nodeAddress,url.buildURLStr());
    }
    @Override
    public List<URL> getRegistryInfo() {
        List<String> childUrlList = getClient().getChildrenData(ROOT);
        List<URL> urls = new ArrayList<>(childUrlList.size());
        childUrlList.forEach(item ->{
            String data = getClient().getNodeData(ROOT+"/"+item);
            URL url = URL.buildURL(data);
            urls.add(url);
        });
        return urls;
    }


    @Override
    public void unRegistry(URL url) {
        getClient().deleteNode(ROOT+"/"+url.getApplicationName());
    }
    @Override
    public void subscribeURL() {
        System.out.println("订阅zk节点数据");
        getClient().watchChildNodeData(ROOT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("节点发生了变化");
                applicationContext.publishEvent(new ApplicationChangeEvent(this,watchedEvent.getType().getIntValue()));
                subscribeURL();
            }
        });
    }
}

这里加入了一个节点监听的功能,当有新服务注册的时候会发布一个Spring事件,然后通知到GatewayServlet去做更新。

GatewayApplicationRegistryHandler是应用注册处理器,其内部会将启动好的springboot应用注册到zk上,内部代码如下所示:

package org.idea.qiyu.framework.gateway.starter;


import org.idea.qiyu.framework.gateway.starter.registry.ApplicationRegistry;
import org.idea.qiyu.framework.gateway.starter.registry.URL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.web.context.WebServerInitializedEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;


import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;


/**
 * 将各个SpringBoot应用注册到zk上
 *
 * @Author linhao
 * @Date created in 10:24 上午 2022/4/16
 */
public class GatewayApplicationRegistryHandler implements ApplicationListener<WebServerInitializedEvent>, ApplicationContextAware {


    private static Logger logger = LoggerFactory.getLogger(GatewayApplicationRegistryHandler.class);


    @Value("${spring.application.name}")
    private String applicationName;


    private volatile ApplicationContext applicationContext;


    private volatile ApplicationRegistry applicationRegistry;


    @Override
    public void onApplicationEvent(WebServerInitializedEvent webServerInitializedEvent) {
        System.out.println("注册服务到zk上,并且通知gateway暴露服务 【start】");
        try {
            InetAddress localHost = Inet4Address.getLocalHost();
            String ip = localHost.getHostAddress();
            Integer port = webServerInitializedEvent.getWebServer().getPort();
            URL url = new URL();
            url.setAddress(ip);
            url.setApplicationName(applicationName);
            url.setPort(port);
            url.setRegistryTime(System.currentTimeMillis());
            url.setStatus(0);
            url.setWeight(100);
            applicationRegistry.registry(url);
        } catch (UnknownHostException e) {
            logger.error(e.getMessage(), e);
        }
        System.out.println("注册服务到zk上,并且通知gateway暴露服务 【end】");
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
        this.applicationRegistry = applicationContext.getBean(ApplicationRegistry.class);
    }
}

GatewayApplicationRegistryHandler则是会在自动装配的时候生效:

package org.idea.qiyu.framework.gateway.starter;
import org.idea.qiyu.framework.gateway.starter.registry.ApplicationRegistry;
import org.idea.qiyu.framework.gateway.starter.registry.ZookeeperRegistry;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @Author linhao
 * @Date created in 9:40 上午 2022/4/16
 */
@Configuration
@EnableConfigurationProperties(GatewayProperties.class)
public class GatewayAutoConfiguration {
    @Bean
    public ApplicationRegistry applicationRegistry(){
        return new ZookeeperRegistry();
    }
    @Bean
    @ConditionalOnBean(ApplicationRegistry.class)
    public GatewayApplicationRegistryHandler gatewayApplicationRegistryHandler(){
        return new GatewayApplicationRegistryHandler();
    }
}

自动装配是采用了SpringBoot的spi去激活触发的:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.idea.qiyu.framework.gateway.starter.GatewayAutoConfiguration

代码截图:

那么上报的zk地址该怎么去配置呢?这里我设计了一个GatewayProperties对象,目前只会用于存储上报的zk地址:

@ConfigurationProperties(prefix = "gateway.core")
public class GatewayProperties {


    private String registryAddress;


    public String getRegistryAddress() {
        return registryAddress;
    }


    public void setRegistryAddress(String registryAddress) {
        this.registryAddress = registryAddress;
    }
}

服务接入方如何使用?

只需要在maven的依赖中引入相关的starter组件,然后配置好zk的地址。这样SpringBoot便可以在应用启动之后将自己的服务信息上报到zk即可:

<dependencies>
    <dependency>
        <groupId>org.idea.qiyu</groupId>
        <artifactId>qiyu-framework-gateway-starter</artifactId>
        <version>1.0.2-SNAPSHOT</version>
    </dependency>
</dependencies>

测试程序

首先是启动网关类:

然后启动两个不同端口的user-web-application应用,每个用户应用的内部都预先写好一些测试使用的controller:

最后通过网络发送http请求,查看请求是否被正确路由到不同的节点即可。

小结

现阶段只是手写实现了一个简单版本的网关服务,其实网关的核心设计并不复杂,还有更多的细节可以留给大家做更加深入的拓展,这里我也只是写了一个demo,希望可以给各位读者们带来一定的启发。

相关源代码:

https://gitee.com/IdeaHome_admin/qiyu-framework/tree/master/qiyu-framework-gateway

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8