前言
Unix domain socket(也称为 Unix 域套接字)是一种用于同一台主机上进程间通信(IPC)的机制。与常规网络套接字不同,Unix domain socket 不依赖于网络协议,并且只能用于在同一台机器上的进程之间通信, 这使得 Unix socket 比网络套接字更快和更有效。
protobuf(Google Protocol Buffers)是Google提供一个高效的协议数据交换格式工具库, 具有比JSON提高3到5倍的时间和空间效率。
在最近的一个项目中,使用了两种不同的语言来实现业务应用和运维工具:业务应用使用C语言实现,运维工具为了开发效率使用了golang来实现。 由于C语言没有原生的gRPC库实现,所以业务应用和运维工具直接使用unix domain socket进行交互,数据传输使用protobuf 进行序列化和反序列化。本文将以一个简单的通信为例介绍如何使用unix domain socket和protobuf 实现跨语言的进程通信。
工作模式
Unix domain socket 工作模式为C/S ( Client/Server, 客户端/服务端) 模式,客户端和服务端使用固定的socket文件进行通信。在本项目中,业务应用作为服务端,接收来自客户端的请求,客户端连接服务端发送请求,展示服务端返回的请求结果。
消息头格式
为了更好的做消息控制,约定了请求和响应使用固定的报文头。
struct sk_request_msg { char traceid[TRACEID_LEN]; uint32_t version; uint16_t op_id; uint16_t op_type; size_t len; char data[0]; }; struct sk_reply_msg { char errstr[ERRSTR_LEN]; uint32_t version; uint16_t op_id; uint16_t op_type; int errcode; uint32_t padding; size_t len; char data[0]; }; |
注: 在消息中,data 部分即是使用protobuf序列化过后的的数据内容,这里需要注意的是消息头使用64bit对齐,因此在回复消息中可以看到一个32bit的padding字段。
相应的golang 定义如下
type Request struct { TraceId [64]byte Version uint32 ID uint16 Type uint16 Len uint64 Data []byte } type Response struct { ErrMsg []byte Version uint32 ID uint16 Type uint16 ErrCode int32 Rsvd uint32 Len uint64 Data []byte } |
针对发送消息,需要对消息头也进行序列化,因此针对请求需要实现完整的序列化方法
// Marshal serialization for socket data func (r *Request) Marshal() ([]byte, error) { data := make([]byte, 0, RequestHDRLen+r.Len) wt := bytes.Buffer{} err := binary.Write(&wt, binary.LittleEndian, r.TraceId) if err != nil { return []byte{}, err } data = append(data, wt.Bytes()...)
buf := make([]byte, 4) binary.LittleEndian.PutUint32(buf, uint32(r.Version)) data = append(data, buf...)
buf1 := make([]byte, 2) binary.LittleEndian.PutUint16(buf1, uint16(r.ID)) data = append(data, buf1...)
buf2 := make([]byte, 2) binary.LittleEndian.PutUint16(buf2, uint16(r.Type)) data = append(data, buf2...)
buf3 := make([]byte, 8) binary.LittleEndian.PutUint64(buf3, uint64(r.Len)) data = append(data, buf3...)
wd := bytes.Buffer{} err = binary.Write(&wd, binary.LittleEndian, r.Data) if err != nil { return []byte{}, err }
data = append(data, wd.Bytes()...) return data, nil } |
针对回复消息要进行反序列化
// Unmarshal deserialization from binary func (r *Response) Unmarshal(d []byte) error { if len(d) < ResponseHDRLen { return ErrInvalPkt } r.ErrMsg = d[:64] r.Version = binary.LittleEndian.Uint32(d[64:]) r.ID = binary.LittleEndian.Uint16(d[68:]) r.Type = binary.LittleEndian.Uint16(d[70:]) r.ErrCode = int32(binary.LittleEndian.Uint32(d[72:])) r.Len = binary.LittleEndian.Uint64(d[80:]) r.Data = d[88 : r.Len+88]
return nil } |
注: 当前实现未对大端机型进行适配, 如要在大端机型上运行,需要按需进行适配。
Protobuf定义请求
syntax = "proto3"; // [START go_declaration] option go_package = "./proto"; // [END go_declaration] enum Magic { UNKNOWN = 0; CTYUN = 0x27102711; }; message Version { Magic magic = 1; string version = 2; } service TestService { rpc ShowVersion(Version) returns (Version); } |
使用proto工具生成对应的c和golang定义:
OBJS := $(shell find . -name '*.proto' | sort)
all: $(OBJS) # go install google.golang.org/protobuf/cmd/protoc-gen-go@latest for i in $(OBJS); \ do \ protoc-c --c_out=. $$i; \ protoc --go_out=../pkg $$i; \ done |
服务实现
在消息头里面定义了op_type 和op_id, 这两个字段主要用于路由不同的操作到对应的回调函数,这里仅列出回调接口内的关键代码实现
Version version = VERSION__INIT, *verp = NULL; char ver_str[VERSION_LEN] = {0}; uint8_t *request = (uint8_t *)conf; uint8_t *reply = NULL; size_t sz_version = 0;
verp = version__unpack(NULL, size, request); if (verp == NULL) { return ERROR_INVPKT; }
if (verp->magic != MAGIC__CTYUN) { return ERROR _INVAL; } version__free_unpacked(verp, NULL);
memset(ver_str, 0, VERSION_LEN); sprintf(ver_str, "version: %s (%s), build on %s", VERSION, COMMIT, BUILD_DATE); version.magic = MAGIC__CTYUN; version.version = ver_str;
reply = rte_zmalloc(NULL, version__get_packed_size(&version), RTE_CACHE_LINE_SIZE); if (reply == NULL) { return ERROR_NOMEM; } sz_version = version__pack(&version, reply); |
其中request 是客户端发送请求序列化过后的数据,因此在服务端首先使用自动生成的version__unpack接口进行反序列化,得到相应的消息结构,对消息的幻数进行校验,校验通过后,构造相应消息,然后使用version__pack序列化回复消息。
在路由函数调完这个操作回调后,将消息封装上回复消息头,然后通过socket发还给客户端。 其大体实现代码如下:
memset(&ctl_addr, 0, sizeof(struct sockaddr_un)); ctl_len = sizeof(ctl_addr);
// socket 侦听 ctl_fd = accept(srv_fd, (struct sockaddr*)&ctl_addr, &ctl_len); if (ctl_fd < 0) { return ERROR_IO; }
ret = sk_msg_recv(ctl_fd, &msg);
if (ret == ERROR_OK) { // 消息路由,调用对应回调 …
// 构造回复消息头 memset(&reply_hdr, 0, sizeof(reply_hdr)); reply_hdr.version = SK_VERSION; reply_hdr.id = msg->id; reply_hdr.type = msg->type; reply_hdr.errcode = ret; strncpy(reply_hdr.errstr, strerror(ret), ERRSTR_LEN - 1); reply_hdr.len = reply_data_len;
// 发送回复消息 ret = sk_msg_send(ctl_fd, &reply_hdr, reply_data, reply_data_len); } sk_msg_free(msg); |
客户端实现
在客户端使用golang 开发,调用过程为先将请求消息通过protobuf 进行序列化,然后封装好消息头,连接Unix域套接字接口发送消息,然后读取回复,解析消息头并反序列化具体的消息内容:
func (c *Client) Call(ctx context.Context, rid, rt uint16, req, resp proto.Message) error { d, err := pb.Marshal(req) if err != nil { return err }
request := Request{ Version: SKVersion, ID: rid, Type: rt, Len: uint64(len(d)), Data: d, TraceId:uuid.NewV4().Bytes(), }
data, err := request.Marshal() if err != nil { return err }
conn, err := net.DialTimeout("unix", "/var/run/ctl.sock", c.option.Timeout) if err != nil { return err } defer conn.Close() err = conn.SetDeadline(c.option.Deadline) if err != nil { return err }
_, err = conn.Write(data) if err != nil { return err }
buff, err := io.ReadAll(conn) if err != nil { return err }
var response Response err = response.Unmarshal(buff) if err != nil { return err }
if response.ErrCode != 0 { return errors.New(string(response.ErrMsg)) }
if rt == SKGet { err = pb.Unmarshal(response.Data, resp) if err != nil { return err } } return nil } |
命令行调用上面的call 接口
req := &pb.Version{ Magic: pb.Magic_CTYUN, } resp := &pb.Version{}
err := c.uc.Call(ctx, SKGetVersion, SKGet, req, resp) if err != nil { fmt.Println(err) os.Exit(1) }
fmt.Println(string(resp.Version)) |
运行代码进行测试,可以得到如下结果
结论
Unix 域套接字是一种用于同一台主机上高效的进程间通信(IPC)的机制,同时借助protobuf 消息序列化及反序列化,可以实现应用间的高效通信,而且实现了跨语言的变成开发,实现业务应用和运维工具在不同的团队进行独立维护。