rpcx 官方技术博客

使用 Rust 开发 rpcx 服务

2019.08.10

当前,你可以使用GoJava开发原生的rpcx服务,也可以其它语言(C、C++、Python、C#、NodeJS等等)通过http或者JSON-RPC 2.0的方式访问rpcx服务。

现在, RPCX支持使用 Rust 编程语言实现原生的rpcx服务,也可以使用 Rust 通过原生的rpcx协议访问其它语言编写的rpcx服务。

目前 rpcx-rs发布了 0.1.2 版本,可以方便的将 Rust 函数暴露成rpcx服务,也可以使用Rust访问rpcx服务,支持 JSON 和 MessagePack 两种序列化方式。

rpcx-rs秉承着Go语言版本的信念,采用最简单的方式实现rpc服务,小到一年级的小学生,大到花甲之年的老太太,都能轻松的实现rpc服务。

后续的功能会持续增加,包括protobuf序列化的支持、服务治理各种功能(路由、失败处理、重试、熔断器、限流)、监控(metrics、trace)、注册中心(etcd、consul)等众多的功能。

那么,让我们看看如何开发一个rust实现的rpcx服务,以及如何调用,相应的Go语言的代码也一并列出。

我们的例子是一个乘法服务,客户端传入两个整数, 服务器端计算出两个数的,然后返回给客户端。

客户端和服务器端公用的数据

客户端和服务器端交互,一般不会使用字节数组,而是封装的数据对象,像Rust和Go语言中的struct、Java中的Class等,所以我们先定义好交流用的数据结构,这样服务器和客户端都可以使用了。

你要在你的Cargo.toml文件中引入rpcx库:

1
rpcx =  "0.1.2"

然后定义数据结构。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
use std::Error as StdError;

use rmp_serde as rmps; 
use serde::{Deserialize, Serialize};

use rpcx::*;

#[derive(RpcxParam, Default, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ArithAddArgs {
    #[serde(rename = "A")]
    pub a: u64,
    #[serde(rename = "B")]
    pub b: u64,
}
#[derive(RpcxParam, Default, Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ArithAddReply {
    #[serde(rename = "C")]
    pub c: u64,
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type Args struct {
	A int
	B int
}

type Reply struct {
	C int
}

type Arith int

func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
	reply.C = args.A * args.B
	fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
	return nil
}

我们使用serde实现 JSON 和 MessagePack 序列化格式的支持。你不必手工实现RpcxParam trait,通过 derive属性就可以自动为你的数据架构加上JSON 和 MessagePack 序列化格式的支持。同时你也需要在derive属性上加上DefaultSerialize, Deserialize,以便rpcx实现自动的序列化和初始化。

当然这一切都是通过属性完成的,你只需要定义数据结构即可。为了和Go默认的JSON属性相一致(GO默认序列化好的字段名称首字母是大写的),我们这里也加上了serde的属性,让serde进行 JSON序列化的时候使用指定的名称。

这里我们定义了传入参数ArithAddArgs和传出参数(ArithAddReply)。

服务端实现

服务器端实现了一个函数mul,函数的名称不重要,因为我们注册的时候会手工传入服务的路径名称和方法名称,也就是Go语言中实现的service_pathservice_method

函数的参数类型是ArithAddArgs,输出结果的类型是ArithAddReply

在main函数中我们新建了一个服务器,线程池中线程的数量默认是CPU的两倍,你也可以根据需要设置更大的线程数量。

然后我们注册了这个服务(函数),使用register_func!宏来进行注册,第一个参数是服务器实例,第二个参数是service_path、第三个参数是service_method,第四个参数是要注册的函数,第五个和第六个参数是函数的传入参数和传出参数的类型。

然后调用服务的start就可以提供服务了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
use mul_model::{ArithAddArgs, ArithAddReply};
use rpcx::*;

fn mul(args: ArithAddArgs) -> ArithAddReply {
    ArithAddReply { c: args.a * args.b }
}

fn main() {
    let mut rpc_server = Server::new("0.0.0.0:8972".to_owned(), 0);

    register_func!(rpc_server, "Arith", "Mul", mul, ArithAddArgs, ArithAddReply);

    rpc_server.start().unwrap();
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
	"context"
	"flag"
	"fmt"

	example "github.com/rpcx-ecosystem/rpcx-examples3"
	"github.com/smallnest/rpcx/server"
)

var (
	addr = flag.String("addr", "localhost:8972", "server address")
)

type Arith struct{}

// the second parameter is not a pointer
func (t *Arith) Mul(ctx context.Context, args example.Args, reply *example.Reply) error {
	reply.C = args.A * args.B
	fmt.Println("C=", reply.C)
	return nil
}

func main() {
	flag.Parse()

	s := server.NewServer()
	//s.Register(new(Arith), "")
	s.RegisterName("Arith", new(Arith), "")
	err := s.Serve("tcp", *addr)
	if err != nil {
		panic(err)
	}
}

客户端实现

客户端使用Client::new先连接一个服务器,你可以指定序列化格式,服务器也会使用你的这个序列化格式返回结果,然后指定要调用的service_pathservice_method, 使用call返回结果(Option类型,因为有的调用不需要返回结果,返回结果是Result类型,可能是成功的结果,也可能是Error); 使用acall异步调用,会返回一个future。

你可以和Go语言的版本交叉着互相调用,可以看到很容易的我们就实现了rpc服务和调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use std::HashMap;

use mul_model::*;
use rpcx::Client;
use rpcx::{Result, SerializeType};

pub fn main() {
    let mut c: Client = Client::new("127.0.0.1:8972");
    c.start().map_err(|err| println!("{}", err)).unwrap();
    c.opt.serialize_type = SerializeType::JSON;

    let mut a = 1;
    loop {
        let service_path = String::from("Arith");
        let service_method = String::from("Mul");
        let metadata = HashMap::new();
        let args = ArithAddArgs { a: a, b: 10 };
        a = a + 1;

        let reply: Option<Result<ArithAddReply>> =
            c.call(service_path, service_method, false, metadata, &args);
        if reply.is_none() {
            continue;
        }

        let result_reply = reply.unwrap();
        match result_reply {
            Ok(r) => println!("received: {:?}", r),
            Err(err) => println!("received err:{}", err),
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
	"context"
	"flag"
	"log"

	"github.com/smallnest/rpcx/protocol"

	example "github.com/rpcx-ecosystem/rpcx-examples3"
	"github.com/smallnest/rpcx/client"
)

var (
	addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
	flag.Parse()

	d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
	opt := client.DefaultOption
	opt.SerializeType = protocol.JSON

	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, opt)
	defer xclient.Close()

	args := example.Args{
		A: 10,
		B: 20,
	}

	reply := &example.Reply{}
	err := xclient.Call(context.Background(), "Mul", args, reply)
	if err != nil {
		log.Fatalf("failed to call: %v", err)
	}

	log.Printf("%d * %d = %d", args.A, args.B, reply.C)

}