rpcx 官方技术博客

实现 JSONRPC 2.0 协议

2019.06.30

rpc实现方式有多种,一种是基于TCP/UDP的自定义的协议,还有一个基于http或者websocket的rpc实现方式。 基于http比较常用的是RESTful API的实现方式,还有一种是基于http的常用编码格式的通讯方式, 比如XML-RPC、JSON-RPC, grpc和twitchtv/twirp等。

虽然一般直接基于TCP的自定义协议的RPC框架可以取得更好的性能,但是基于http的rpc框架,比如RESTful API、JSON-RPC实现起来更简单,而且更容易的实现跨平台的服务提供和服务调用,所以也有很广泛的应用。

JSON-RPC 1.0于2005年发布了1.0的版本,经过数次演化,于2010年发布了最后一个2.0订正版本。

JSON-RPC所有的传输对象都是单一的类型,序列化成JSON格式。一个request就是对远程方法的一次调用,它必需包含三个属性:

  • method: 要调用的方法名
  • params: 对象或者数组,要传入的参数
  • id: 一个字符串、数字或者Null(不推荐)的ID,用来匹配request和response

一个response必需包括:

  • result: 返回结果,如果error不等于空,这个值必需为null
  • error: 一个错误编码。
  • id: 对应的request的id

Request和Response的例子:

1
2
--> {"jsonrpc": "2.0", "method": "subtract", "params": {"minuend": 42, "subtrahend": 23}, "id": 3}
<-- {"jsonrpc": "2.0", "result": 19, "id": 3}

Notification (没有Response):

1
--> {"jsonrpc": "2.0", "method": "update", "params": [1,2,3,4,5]}

标准rpc库中支持jsonrpc 1.0,而jsonrpc 2.0有一些第三方库支持:x/tools/internal/jsonrpc2sourcegraph/jsonrpc2powerman/rpc-codec/jsonrpc2

得益于JSONRPC 2.0的简单性和rpcx良好的可扩展性,我们可以很容易的在rpcx中支持jsonrpc 2.0, 同时依然支持原始的rpcx访问,在同一个端口

JSONRPC 2.0数据结构

依照前面的对JSONRPC 2.0介绍,你已经大致了解了jsonrpc 2.0的一些数据结构,重要的是Request、Response、ID以及Error:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type jsonrpcRequest struct {
	VersionTag VersionTag `json:"jsonrpc"`
	Method string `json:"method"`
	Params *json.RawMessage `json:"params,omitempty"`
	ID *ID `json:"id,omitempty"`
}

type jsonrpcRespone struct {
	VersionTag VersionTag `json:"jsonrpc"`
	Result *json.RawMessage `json:"result,omitempty"`
	Error *JSONRPCError `json:"error,omitempty"`
	ID *ID `json:"id,omitempty"`
}

type VersionTag struct{}

type ID struct {
	Name   string
	Number int64
}

type JSONRPCError struct {
	Code int64 `json:"code"`
	Message string `json:"message"`
	Data *json.RawMessage `json:"data"`
}

注意ID同时支持字符串类型和数值类型,所以序列化的时候需要先尝试字符串,然后再尝试数值类型:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (id *ID) String() string {
	if id == nil {
		return ""
	}
	if id.Name != "" {
		return strconv.Quote(id.Name)
	}
	return "#" + strconv.FormatInt(id.Number, 10)
}

func (id *ID) MarshalJSON() ([]byte, error) {
	if id.Name != "" {
		return json.Marshal(id.Name)
	}
	return json.Marshal(id.Number)
}

func (id *ID) UnmarshalJSON(data []byte) error {
	*id = ID{}
	if err := json.Unmarshal(data, &id.Number); err == nil {
		return nil
	}
	return json.Unmarshal(data, &id.Name)
}

因为我们要实现的是jsonrpc 2.0, 所以版本号固定的就是2.0:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (err *JSONRPCError) JSONRPCError() string {
	if err == nil {
		return ""
	}
	return err.Message
}

func (VersionTag) MarshalJSON() ([]byte, error) {
	return json.Marshal("2.0")
}

func (VersionTag) UnmarshalJSON(data []byte) error {
	version := ""
	if err := json.Unmarshal(data, &version); err != nil {
		return err
	}
	if version != "2.0" {
		return fmt.Errorf("Invalid RPC version %v", version)
	}
	return nil
}

如果Request支持Notification,那么ID不需要设置,为零值:

1
2
3
func (r *jsonrpcRequest) IsNotify() bool {
	return r.ID == nil
}

另外你可能需要定义一些特定的错误编码,比如-32001-32700-32600-32601等等,这些都可以在规范中找到定义。

在rpcx中集成JSONRPC 2.0

rpcx支持同一个端口支持多种协议,目前支持rpcx标准协议、http invoke调用协议,所以我们可以参考http invoke实现json rpc 2.0的协议。

这是通过cmux库实现的,它可以根据预读取的一些数据分析接收的数据是什么协议,然后标记这个连接是这种协议,你就可以使用这个封装的net.Listener进行处理,和正常的单协议的net.Listener处理一样。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    m := cmux.New(ln)

	rpcxLn := m.Match(rpcxPrefixByteMatcher())

	if !s.DisableJSONRPC {
		jsonrpc2Ln := m.Match(cmux.HTTP1HeaderField("X-JSONRPC-2.0", "true"))
		go s.startJSONRPC2(jsonrpc2Ln)
	}

	if !s.DisableHTTPGateway {
		httpLn := m.Match(cmux.HTTP1Fast())
		go s.startHTTP1APIGateway(httpLn)
	}

	go m.Serve()

为了和http invoke进行区分,我们检查如果header头中包含X-JSONRPC-2.0: true的http请求,我们就认为这个连接的请求是for jsonrpc 2.0的请求。

1
2
3
4
5
6
func (s *Server) startJSONRPC2(ln net.Listener) {
	newServer := http.NewServeMux()
	newServer.HandleFunc("/", s.jsonrpcHandler)

	go http.Serve(ln, newServer)
}

我们使用jsonrpcHandler处理jsonrpc 2.0的请求, 它读取http request的body, 把它序列化成jsonrpc 2.0的Request, 然后再调用handleJSONRPCRequest将这个Request转换成rpcx的请求。

如果服务需要身份验证,它还会将Authorization header中传给Auth方法进行身份验证,这和标准的rpcx验证方法是一样的。

同时,我们还要将rpcx的Response转换成jsonrpc 2.0的Response返回。

CORS

因为jsonrpc 2.0是http调用,很多情况下浏览器会直接调用内部的rpcx服务。 浏览器调用后台服务很多情况下需要解决的就是跨域调用的问题,也就是CROS。

跨来源资源共享(CORS),亦译为跨域资源共享,是一份浏览器技术的规范,提供了 Web 服务从不同网域传来沙盒脚本的方法,以避开浏览器的同源策略,是 JSONP 模式的现代版。与 JSONP 不同,CORS 除了 GET 请求方法以外也支持其他的 HTTP 请求。用 CORS 可以让网页设计师用一般的 XMLHttpRequest,这种方式的错误处理比 JSONP 要来的好。另一方面,JSONP 可以在不支持 CORS 的老旧浏览器上运作。现代的浏览器都支持 CORS。

rpcx使用rs/cors实现跨域资源共享,原因在于这个库可以很好的和标准http库、其他go web框架进行集成,

startJSONRPC2修改如下,以支持CORS。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (s *Server) startJSONRPC2(ln net.Listener) {
	newServer := http.NewServeMux()
	newServer.HandleFunc("/", s.jsonrpcHandler)

	if s.corsOptions != nil {
		opt := cors.Options(*s.corsOptions)
		c := cors.New(opt)
		mux := c.Handler(newServer)
		go http.Serve(ln, mux)
	} else {
		go http.Serve(ln, newServer)
	}

}

对于开发者来说,如果想要支持跨域访问,只需在服务端设置CORSOptions, rpcx提供了一个通用的AllowAllCORSOptions:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// AllowAllCORSOptions returns a option that allows access.
func AllowAllCORSOptions() *CORSOptions {
	return &CORSOptions{
		AllowedOrigins: []string{"*"},
		AllowedMethods: []string{
			http.MethodHead,
			http.MethodGet,
			http.MethodPost,
			http.MethodPut,
			http.MethodPatch,
			http.MethodDelete,
		},
		AllowedHeaders:   []string{"*"},
		AllowCredentials: false,
	}
}