1. 项目背景

  • 核心目的:为 AI 客户端的 MCP 服务协议使用诉求,提供便利性。后端服务接口可以在 MCP 网关进行注册,提供 MCP 协议类型接口。
  • 实现过程:以 MCP 协议 JSON-RPC2 通信结构,设计实现流式传输 Streamable HTTP 框架,完成 MCP 网关接口协议解析和消息处理。在消息处理过程中,则对注册进来的 http/rpc/… 接口进行调用。这部分会包括接口的描述性信息获取以及会话执行调用。更多细节会在后续的设计和编码中陆续体现

  1. 是网关服务的注册,这部分提供管理端由用户进行接口配置注册操作,注册时把接口 http/rpc 进行详细拆解,录入到数据库中。在服务启动时,可以把接口进行服务加载,放到请求池中,等待调用。
  2. mcp 协议的2个调用过程,一个 {gatewayId}/mcp/sse 建立连接,创建出 sessionId 进行服务关联。首次建立连接后,会返回一个 /gd10090/mcp/message?sessionId=c2db-f64 之后在请求返回的地址,进行调用。
  3. 请求 {gatewayId}/mcp/message 进行消息处理,完成 initialzetools/listtools/callresources/list,步骤调用。这些步骤需要从网关服务注册进来的接口中获取,而 tools/call 则是发起 http/rpc 接口调用。

  • trigger 是触发器层,我们把所有外部(http、rpc)调用我们,job调用,接收mq调用,都称之为触发动作。
  • case 是功能编排层,它的出现是为了减轻 trigger 层的压力。原本 trigger 要调用 domain 的领域层的各个方法完成逻辑串联,这部分拆分到 case 层来做处理,让 trigger 的 http 实现,只关心结果的封装和异常的打印。而这里的 case 层会有 mcp 的创建会话、消息处理的操作。后续还会随着功能开发增加其他 case 逻辑。
  • domain 是领域层,实现各个模块功能,包括;协议、鉴权、会话的处理。会话会有创建会话、查询会话和对会话的应答处理。
  • infrastructure 是基础设施层,所有资源和外部服务的链接,都会放到这里处理。这里还会实现 domain 领域层在 adapter 适配器内定义的接口,通过依赖倒置的方式实现。

2.MCP 协议

MCP通信协议(json-rpc2)

1. JSON-RPC 2.0

AI 大脑(客户端)要指挥外部工具(服务端)干活,两者必须说同一种语言,否则就是鸡同鸭讲。JSON-RPC 2.0 就是它们之间约定的“标准信件格式”。

JSON 结构:

  • 请求(我要你干活):必定包含 method(比如叫你去查天气)和 params(参数是北京)。
  • 应答(活干完了):必定包含 result(成功的结果)或者 error(失败的报错)。
  • 通知(顺便告诉你一声):只有 method 和参数,不需要对方回复。

3.如何定义session

1.定义session的结构与方法

用会话值对象SessionConfigVO.java

	//构造函数
    public SessionConfigVO(String sessionId, Sinks.Many<ServerSentEvent<String>> sink) {
        this.sessionId = sessionId;
        this.sink = sink;
        this.createTime = Instant.now();
        this.lastAccessedTime = Instant.now();
        this.active = true;
    }

流式响应Sinks.Many用来存消息 sink,并内置一些方法比如过期时间判断,时间更新,功能内聚

  • 新增了会话服务接口与实现
ISessionManagementService.java
SessionManagementService.java

包括怎样创建会话,删除,get会话,清理过期会话与 shutdown 会话

  • 其中,保存会话用private final Map<String, SessionConfigVO> activeSessions = new ConcurrentHashMap<>();

2.理解流式响应

Sinks

sink是这个 session 的“消息出口” - 会话一建好,先给客户端推一条“你接下来要往哪个地址发消息”的信息

    public SessionConfigVO createSession(String gatewayId) {
        log.info("创建会话 gatewayId:{}", gatewayId);
 
        String sessionId = UUID.randomUUID().toString();
 
        Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().multicast().onBackpressureBuffer();
 
        // 发送端点消息 - 告知客户端消息请求地址(客户端第二次会使用 messageEndpoint 进行请求会话)
        //塞进一个endpoint事件 :/api-gateway/{gatewayId}/mcp/sse?sessionId=xxx
        //封装成 SessionConfigVO 返回,这样就能根据 id 的到 endpoint
        String messageEndpoint = "/api-gateway/" + gatewayId + "/mcp/sse?sessionId=" + sessionId;
        sink.tryEmitNext(ServerSentEvent.<String>builder()
                .event("endpoint")
                .data(messageEndpoint)
                .build());
 
        SessionConfigVO sessionConfigVO = new SessionConfigVO(sessionId, sink);
 
        activeSessions.put(sessionId, sessionConfigVO);
 
        log.info("创建会话 gatewayId:{} sessionId:{},当前活跃会话数:{}", gatewayId, sessionId, activeSessions.size());
 
        return sessionConfigVO;
    }

到 endNode 节点后通过

return sink.asFlux()
                .mergeWith(
                ....

把 sink 转成一个 Flux

  • 这个 Flux 就是 SSE 响应流
  • 再拼一个每 60 秒发一次的 ping 心跳
  • 如果连接取消或终止,就 removeSession(sessionId) 清理会话

所以最终 Controller 返回的 Flux<ServerSentEvent<String>>,不是一次性结果,而是一条持续打开的 SSE 连接。

客户端刚连上时,最先收到的不是业务结果,而是一条 endpoint 事件。 这条事件是在 SessionManagementService.java:55 发出去的。

4.建立会话链

  - McpGatewayController.java
  - McpMessageService.java
  - RootNode.java
  - VerifyNode.java
  - SessionNode.java
  - EndNode.java
  1. 在 controller 层暴露接口 API,开始创建回回 MCPsession,做一个简单的校验后,交个 case 层McpMessageService。
  2. 之后从工厂类取出第一个节点也就是 rootNode(目前仅打印开始创建会话)
  3. 公共父类是AbstractMcpSessionSupport.java 。 它继承了一个策略路由器,所以每个节点本质上都在做两件事:当前做的是 doApplay 和get 下个节点
  4. 鉴权层目前是空的,session 层创建会话,塞入上下文
  5. 最后 end 层返回一个 sse 链接
  • AbstractMcpSessionSupport 是项目对外部策略路由框架的二次包装,当前偏薄,但不是完全没用。
  • doApply 和 get 已经由上层框架定义好了,所以这里不需要重复声明。
  • dynamicContext 就是这条节点链内部传递中间结果的容器,当前最主要的作用就是把 SessionNode 创建出来的 SessionConfigVO 传给 EndNode。

5.消息分发流程

建立持续会话连接后,在 controller 层创建一个“处理会话发来的 MCP 消息”

  • McpSchemaVO:协议消息数据 两个作用: 1.用来定义“消息格式” 2.把已经符合 MCP/JSON-RPC 结构的 JSON 字符串,解析成 Java 对象JSONRPCRequest或JSONRPCResponse。供后续代码使用

  • SessionMessageService.java

    • 在 conroller 层转为 java 对象后,开始根据枚举类找到真正的handler Bean,根据协议里的 method 方法去找不同的 Handler,如一下的 InitializeHandler
    • SessionMessageHandlerMethodEnum.负责对应关系
  • InitializeHandler.java

核心就是: “POST 过来一条 JSON-RPC 消息,系统怎么根据 method 找到对应 handler。”

6.创建MCP客户端

public class ApiTest {
 
    // 在这里直接填写你的智谱 API Key,仅用于本地测试。
    private static final String ZHIPU_API_KEY = "3b07934ad55f49ac948c2c66abcfec7c.lLYncQbI1iO49txx";
 
    public static void main(String[] args) throws Exception {
        OpenAiApi openAiApi = OpenAiApi.builder()
                .baseUrl("https://open.bigmodel.cn/api/paas/v4")
                .apiKey(ZHIPU_API_KEY)
                .completionsPath("/chat/completions")
                .embeddingsPath("/embeddings")
                .build();
 
        ChatModel chatModel = OpenAiChatModel.builder()
                .openAiApi(openAiApi)
                .defaultOptions(OpenAiChatOptions.builder()
                        .model("glm-4.5-air")
                        .toolCallbacks(new SyncMcpToolCallbackProvider(sseMcpClient02()).getToolCallbacks())
                        .build())
                .build();
 
        log.info("测试结果:{}", chatModel.call("你有哪些工具能力"));
    }

  • Deme项目的 ApiTest 作为 MCP 的 Client,指定 LLM 后, .toolCallbacks(new SyncMcpToolCallbackProvider(sseMcpClient02()).getToolCallbacks())将McpClient包装为 给 LLM用的工具回调集合 chatModel.call是通过 Client去问主项目tools/list,如果需要还要 tools/call

  • 关于 McpClient02 中的代码,

    • sseClientTransport代表创建一个 传输层对象,告诉 MCP客户端使用那种协议去传递信息,目前使用 http+sse,builder 建立服务端的地址。sseEndpoint 代表建立 SSE 链接的入口,这里对应主项目中的Controller 中的 Get 接口
      • 进入handleSseConnection方法中,server 端进入createSession(gatewayId),创建一个 SessionId,通过 SSE 推送一个时间 Endpoint,告诉Client 端,以后发送到 JSONRPC 请求发送至该目的地
    • 指定用 SSE 的方式去链接一个 MCPServer 端(sync 表示给我一个同步调用风格的 MCP 客户端)
    • initialized 表示“先按 MCP 协议和服务端握个手,拿到这个 MCP 服务端的初始化信息”
      • 发送method 为 initialize 的 JSONRPC 消息给刚刚的 sse 地址,实际上到了主项目的 post 接口中的handleMessage
      • 根据 sessionID 找会话,然后后序列化为McpSchemaVO定义的对象
      • 进入processHandlerMessage中根据private Map<String, IRequestHandler> requestHandlerMap; 取出 request 中为哪个请求来决定进入哪个 handler

7.Initialize

@Override
    public McpSchemaVO.JSONRPCResponse handle(String gatewayId, McpSchemaVO.JSONRPCRequest message) {
        log.info("消息处理服务-initialize gatewayId:{} request.params:{}", gatewayId, JSON.toJSONString(message.params()));
 
        // 1. 转换参数
        McpSchemaVO.InitializeRequest initializeRequest = McpSchemaVO.unmarshalFrom(message.params(), new TypeReference<>() {
        });
 
        // 2. 查询配置
        McpGatewayConfigVO mcpGatewayConfigVO = repository.queryMcpGatewayConfigByGatewayId(gatewayId);
 
        // 3. 组装信息
        McpSchemaVO.InitializeResult initializeResult = new McpSchemaVO.InitializeResult(initializeRequest.protocolVersion(),
                new McpSchemaVO.ServerCapabilities(new McpSchemaVO.ServerCapabilities.CompletionCapabilities(),
                        new HashMap<>(),
                        new McpSchemaVO.ServerCapabilities.LoggingCapabilities(),
                        new McpSchemaVO.ServerCapabilities.PromptCapabilities(true),
                        new McpSchemaVO.ServerCapabilities.ResourceCapabilities(false, true),
                        new McpSchemaVO.ServerCapabilities.ToolCapabilities(true)),
                new McpSchemaVO.Implementation(mcpGatewayConfigVO.getToolName(), mcpGatewayConfigVO.getToolVersion()),
                mcpGatewayConfigVO.getToolDesc()
        );
 
        // 4. 返回结果
        return new McpSchemaVO.JSONRPCResponse(McpSchemaVO.JSONRPC_VERSION, message.id(), initializeResult, null);
    }
  • 根据Client 端传来的网关 ID 和 Requst,转换为 Initiallize 的参数,根据网关 Id 查询仓储层数据库中配置的信息
  • 再把数据库里的:
  • toolName
  • toolVersion
  • toolDesc 组装进 Result 中返回给 Client 端

8.Tools/List

    public static void main(String[] args) throws Exception {
        OpenAiApi openAiApi = OpenAiApi.builder()
                .baseUrl("https://apis.itedus.cn")
                .apiKey("sk-j9brA5gtId49CpRG1d97E8F75dAb4066B7Cd73B0D9B481xx")
                .completionsPath("v1/chat/completions")
                .embeddingsPath("v1/embeddings")
                .build();
 
        ChatModel chatModel = OpenAiChatModel.builder()
                .openAiApi(openAiApi)
                .defaultOptions(OpenAiChatOptions.builder()
                        .model("gpt-4o")
                        .toolCallbacks(new SyncMcpToolCallbackProvider(sseMcpClient01()).getToolCallbacks())
                        .build())
                .build();
 
        log.info("测试结果:{}", chatModel.call("你有哪些工具能力"));
    }
  • toolCallbacks(new SyncMcpToolCallbackProvider(sseMcpClient01()).getToolCallbacks() 内部先调了 listTools(),ChatModel.call() 用的是前面已经拿到的工具定义,如果模型真用工具,才会在 ToolCallback.call() 里发 tools/call

  • 接下来完成 list 的 Handler,核心:把数据库里的工具定义和参数映射,拼成 MCP 协议要求的 tools 列表。

  • 查询数据库中的网关的基础配置与工具参数映射配置

  • 字段名 - 父路径 - MCP 路径 - 类型 - 描述 - 是否必填 - 排序

buildTools() j将“扁平的数据库记录”变成“嵌套的 JSON Schema”。

数据库中path 为:

  • xxxRequest01
  • xxxRequest01.city
  • xxxRequest01.company
  • xxxRequest01.company.name
  • xxxRequest01.company.type
  • xxxRequest02
  • xxxRequest02.employeeCount

但 MCP tools/list 需要的是这种树形结构:

  {
    "name": "JavaSDKMCPClient_getCompanyEmployee",
    "description": "获取公司雇员信息",
    "inputSchema": {
      "type": "object",
      "properties": {
        "xxxRequest01": {
          "type": "object",
          "properties": {
            "city": { "type": "string" },
            "company": {
              "type": "object",
              "properties": {
                "name": { "type": "string" },
                "type": { "type": "string" }
              }
            }
          }
        },
        "xxxRequest02": {
          "type": "object",
          "properties": {
            "employeeCount": { "type": "string" }
          }
        }
      }
    }
  }

 private List<McpSchemaVO.Tool> buildTools(McpGatewayConfigVO gatewayConfig, List<McpGatewayToolConfigVO> toolConfigs) {
        // 1. 通过 toolId 一组组转换为 Map 结构
        Map<Long, List<McpGatewayToolConfigVO>> toolsMap = toolConfigs.stream()
                .collect(Collectors.groupingBy(McpGatewayToolConfigVO::getToolId));
 
        List<McpSchemaVO.Tool> tools = new ArrayList<>();
 
        for (Map.Entry<Long, List<McpGatewayToolConfigVO>> entry : toolsMap.entrySet()) {
            Long toolId = entry.getKey();
            List<McpGatewayToolConfigVO> configs = entry.getValue();
 
            // 排序
            configs.sort((o1, o2) -> {
                int s1 = o1.getSortOrder() != null ? o1.getSortOrder() : 0;
                int s2 = o2.getSortOrder() != null ? o2.getSortOrder() : 0;
                return Integer.compare(s1, s2);
            });
 
            // 父子元素 Map parentPath -> List<Children>
            Map<String, List<McpGatewayToolConfigVO>> childrenMap = new HashMap<>();
 
            List<McpGatewayToolConfigVO> roots = new ArrayList<>();
 
            for (McpGatewayToolConfigVO config : configs) {
                if (config.getParentPath() == null) {
                    roots.add(config);
                } else {
                    childrenMap.computeIfAbsent(config.getParentPath(), k -> new ArrayList<>()).add(config);
                }
            }
 
            // 排序
            roots.sort((o1, o2) -> {
                int s1 = o1.getSortOrder() != null ? o1.getSortOrder() : 0;
                int s2 = o2.getSortOrder() != null ? o2.getSortOrder() : 0;
                return Integer.compare(s1, s2);
            });
 
            // 构建输入结构
            Map<String, Object> properties = new HashMap<>();
            List<String> required = new ArrayList<>();
			
		//把每个根节点递归构造成一个属性定义,放进 properties;如果这个根节点是必填的,就把它的名字放进 required 列表。
            for (McpGatewayToolConfigVO root : roots) {
                properties.put(root.getFieldName(), buildProperty(root, childrenMap));
                if (Integer.valueOf(1).equals(root.getIsRequired())) {
                    required.add(root.getFieldName());
                }
            }
 
            // 获取类型
            String type = roots.size() == 1 ? roots.get(0).getMcpType() : "object";
 
            // 构造函数
            McpSchemaVO.JsonSchema inputSchema = new McpSchemaVO.JsonSchema(
                    type,
                    properties,
                    required.isEmpty() ? null : required,
                    false,
                    null,
                    null
            );
 
            // 工具描述
            String name = "unknown-tool-" + toolId;
            String desc = "";
            if (gatewayConfig != null && Objects.equals(gatewayConfig.getToolId(), toolId)) {
                name = gatewayConfig.getToolName();
                desc = gatewayConfig.getToolDesc();
            }
 
            tools.add(new McpSchemaVO.Tool(name, desc, inputSchema));
        }
        return tools;
    }

数据库结构为

private Map<String, Object> buildProperty(McpGatewayToolConfigVO current, Map<String, List<McpGatewayToolConfigVO>> childrenMap) {
        Map<String, Object> property = new HashMap<>();
        property.put("type", current.getMcpType());
        if (current.getMcpDesc() != null) {
            property.put("description", current.getMcpDesc());
        }
 
        // 校验孩子元素
        List<McpGatewayToolConfigVO> children = childrenMap.get(current.getMcpPath());
        if (children != null && !children.isEmpty()) {
            Map<String, Object> props = new HashMap<>();
            List<String> reqs = new ArrayList<>();
 
            // 排序
            children.sort((o1, o2) -> {
                int s1 = o1.getSortOrder() != null ? o1.getSortOrder() : 0;
                int s2 = o2.getSortOrder() != null ? o2.getSortOrder() : 0;
                return Integer.compare(s1, s2);
            });
 
            for (McpGatewayToolConfigVO child : children) {
                // 注意,buildProperty 嵌套递归,一层层的寻找,是否还有孩子元素(children)
                props.put(child.getFieldName(), buildProperty(child, childrenMap));
                if (Integer.valueOf(1).equals(child.getIsRequired())) {
                    reqs.add(child.getFieldName());
                }
            }
 
            property.put("properties", props);
 
            if (!reqs.isEmpty()) {
                property.put("required", reqs);
            }
 
        }
 
        return property;
    }

9.Tools/Call

从“能列出工具”进入“真的去调用后端 HTTP 服务”

  1. @Tool 方式是“本地方法直接暴露成工具”
  • 工具来源:本地 Java 方法
    • 工具注册:@Tool + MethodToolCallbackProvider
    • tools/list:Spring AI 自动生成
    • tools/call:Spring AI 自动调用对应方法
    • 参数 schema:从方法参数/对象结构推导
    • 适合:自己写 MCP Server、工具少、开发快
  1. Gateway 方式是“把外部 HTTP 能力包装成工具”。
  • 工具来源:数据库配置 + 外部 HTTP 服务

  • 工具注册:自己查库组装

  • tools/list:自己查 mcp_protocol_mapping 拼 schema

  • tools/call:自己查 mcp_protocol_registry 再发 HTTP

  • 参数 schema:数据库配置驱动

  • 适合:平台、网关、多工具、多租户、动态接入

  • 新建mcp_protocol_registry表,添加具体Http 接口的映射关系,并新建 ProtocolConfigVO和对应的仓储查询接口

    • httpUrl
    • httpHeaders
    • httpMethod
    • timeout

新建一个领域层的ISessionPort接口,在基建层实现此接口真正的发送 HTTP 请求,将以上配置和参数传入 Port 层,拼接请求头,判断是post 还是 get,来进行不同 url 拼接和参数拼接,之后进入GenericHttpGateway.postOrget(…)

public class SessionPort implements ISessionPort {
    @Resource
    private GenericHttpGateway gateway;
 
    private final ObjectMapper objectMapper = new ObjectMapper();
 
    @Override
    public Object toolCall(McpGatewayProtocolConfigVO.HTTPConfig httpConfig, Object params) throws IOException {
        // 1. 构建请求头
        String httpHeadersJson = httpConfig.getHttpHeaders();
 
        Map<String, Object> headers = objectMapper.readValue(httpHeadersJson, Map.class);
 
        // 2. 判断请求方法
        String httpMethod = httpConfig.getHttpMethod().toLowerCase();
 
        // 3. 参数校验
        if (!(params instanceof Map<?, ?> arguments)) {
            throw new AppException(ResponseCode.ILLEGAL_PARAMETER.getCode(), ResponseCode.ILLEGAL_PARAMETER.getInfo());
        }
 
        switch (httpMethod) {
            // 1. 构建请求体
            case "post": {
                RequestBody requestBody = RequestBody.create(JSON.toJSONString(arguments.values().toArray()[0]),
                        MediaType.parse("application/json"));
 
                Call<ResponseBody> call = gateway.post(httpConfig.getHttpUrl(), headers, requestBody);
                ResponseBody responseBody = call.execute().body();
 
                assert responseBody != null;
 
                return responseBody.string();
            }
            // 2. 执行get请求
            case "get": {
                Map<String, Object> objMapRequest = new java.util.HashMap<>((Map<String, Object>) arguments.values().toArray()[0]);
 
                String url = httpConfig.getHttpUrl();
                // 替换路径参数
                Matcher matcher = Pattern.compile("\\{([^}]+)\\}").matcher(url);
                while (matcher.find()) {
                    String name = matcher.group(1);
                    if (objMapRequest.containsKey(name)) {
                        url = url.replace("{" + name + "}", String.valueOf(objMapRequest.get(name)));
                        objMapRequest.remove(name);
                    }
                }
 
                Call<ResponseBody> call = gateway.get(url, headers, objMapRequest);
 
                ResponseBody responseBody = call.execute().body();
 
                assert responseBody != null;
 
                return responseBody.string();
            }
        }
 
        throw new AppException(ResponseCode.METHOD_NOT_FOUND.getCode(), ResponseCode.METHOD_NOT_FOUND.getInfo());
    }
}

声明GenericHttpGateway的接口,让 Config 根据接口声明一个 HTTP 客户端的实例

public interface GenericHttpGateway {
    @POST
    Call<ResponseBody> post(
            @Url String url,
            @HeaderMap Map<String, Object> headers,
            @Body RequestBody body
    );
    @GET
    Call<ResponseBody> get(
            @Url String url,
            @HeaderMap Map<String, Object> headers,
            @QueryMap Map<String, Object> queryParams
    );
}

Spring 自己也有 HTTP 客户端方案,比如:

  • RestTemplate

  • WebClient

  • 这里使用第三方 HTTP 客户端

    • OkHttp:底层 HTTP 客户端
    • Retrofit:基于 OkHttp 的声明式接口封装
@Configuration
public class HTTPClientConfig {
    @Bean
    public OkHttpClient okHttpClient() {
        return new OkHttpClient.Builder()
                .connectionPool(new ConnectionPool(10, 5, TimeUnit.MINUTES))
                .retryOnConnectionFailure(true)
                .connectTimeout(100, TimeUnit.SECONDS)
                .readTimeout(300, TimeUnit.SECONDS)
                .writeTimeout(300, TimeUnit.SECONDS)
                .build();
    }
    @Bean
    public GenericHttpGateway genericHttpGateway(OkHttpClient okHttpClient) {
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://127.0.0.1/")
                .addConverterFactory(GsonConverterFactory.create())
                .client(okHttpClient)
                .build();
        return retrofit.create(GenericHttpGateway.class);
        //实际对象是 Retrofit 动态代理生成的实例
    }
 
}

10.网关协议表

  • 新增设计 mcp_gateway_tool、mcp_protocol_http 表,去掉原有的 mcp_protocol_registry 表。使用 protocol_id 进行衔接,protocol_type 区分协议类型,如果是 http 的,则可以指定到 mcp_protocol_type 表查询,如果将来还有其他的类型,则可以查询其他表数据。

mcp_protocol_registry 拆成 mcp_gateway_tool + mcp_protocol_http ToolsCallHandler.java 现在按 gatewayId+ toolName 去查协议配置,而不是只按 gatewayId