欢迎大家来到IT世界,在知识的湖畔探索吧!
概述
因为gRPC 的异步调用代码写的比较绕,所以这篇文章主要用来记录一下 gRPC 的异步调用。
需要注意的是,gRPC 为了实现异步调用,使用的是 CompletionQueue 绑定进行 RPC 调用,实际写代码的时候会感觉到比较奇怪。相应的因为是异步的,所以会调用CompletionQueue::Next来等待回包操作。这里先留个印象,下面讲流程的时候会比较清晰。
编译安装
为了完成我们的demo,没有安装gRPC的同学可以直接使用下面的命令进行安装:
$ export MY_INSTALL_DIR=$HOME/.local $ mkdir -p $MY_INSTALL_DIR $ export PATH="$MY_INSTALL_DIR/bin:$PATH" # 工具安装 $ sudo apt install -y cmake #安装好cmke的同学省略 $ sudo apt install -y build-essential autoconf libtool pkg-config # 下载项目 $ git clone --recurse-submodules -b v1.45.0 --depth 1 --shallow-submodules https://github.com/grpc/grpc # 编译安装 $ cd grpc $ mkdir -p cmake/build $ pushd cmake/build $ cmake -DgRPC_INSTALL=ON -DgRPC_BUILD_TESTS=OFF -DCMAKE_INSTALL_PREFIX=$INSTALL_DIR ../.. $ make -j $ make install $ popd
欢迎大家来到IT世界,在知识的湖畔探索吧!
下面介绍的例子都使用官方的:https://github.com/grpc/grpc/blob/master/examples/cpp/helloworld/。
异步 Client
对于同步的 client 来说,由于调用远程方法时会阻塞当前线程,但是异步允许同时发送多个请求,并且不会阻塞。
那么我们在使用 gRPC的异步API的时候 需要做到以下几点才行:
- 由于要进行异步非阻塞的请求,那么在发送请求的时候肯定不能等待回包;
- 所有的回包都通过另一个线程异步处理,避免主流程阻塞;
- 回包的数据通过某种介质进行传递,gRPC 使用的是CompletionQueue 来进行传递。
欢迎大家来到IT世界,在知识的湖畔探索吧!
整体流程从上面图可以看得出分为以下几个:
- 启动client,启动一个旁路线程循环获取 CompletionQueue 数据;
- 发送异步调用给 server;
- 如果 server 回包了,会将数据放入到 CompletionQueue 里面;
- 异步线程获取 CompletionQueue 数据并返回;
我们下面来看一下例子。
创建
首先是创建客户端,然后异步线程处理 server 的回包,因为处理回包的时候会阻塞。
欢迎大家来到IT世界,在知识的湖畔探索吧! // 创建客户端 GreeterClient greeter(grpc::CreateChannel( "localhost:50051", grpc::InsecureChannelCredentials())); // 起新的线程,从队列中取出结果并处理 std::thread thread_ = std::thread(&GreeterClient::AsyncCompleteRpc, &greeter); for (int i = 0; i < 100; i++) { std::string user("world " + std::to_string(i)); // 发送rpc请求 greeter.SayHello(user); // The actual RPC call! } std::cout << "Press control-c to quit" << std::endl << std::endl; // 阻塞 thread_.join(); // blocks forever
异步请求
这里会通过调用 SayHello 来发送请求:
class GreeterClient { public: explicit GreeterClient(std::shared_ptr<Channel> channel) : stub_(Greeter::NewStub(channel)) {} ... void SayHello(const std::string& user) { HelloRequest request; request.set_name(user); // 用来存储 rpc 数据 AsyncClientCall* call = new AsyncClientCall; // 这里是调用 Async 方法创建 RPC 对象,但是不会理吗开始进行 RPC请求 call->response_reader = stub_->PrepareAsyncSayHello(&call->context, request, &cq_); // 初始化RPC调用 call->response_reader->StartCall(); // 进行RPC请求,然后 call 对象作为一个 tag 放进去,会包数据会放到 reply中 // 这里不会阻塞等待请求 call->response_reader->Finish(&call->reply, &call->status, (void*) call); } private: ... CompletionQueue cq_; };
因为回包逻辑不在这里,所以在调用完 Finish 之后不需要等待可以直接返回。在调用完这个方法之后就可以等待 server 回包了,server 会将回包数据塞入到 cq_ 里面。
异步处理回包
异步处理回包的逻辑就在我们一开始创建的 thread 里面会循环调用。
欢迎大家来到IT世界,在知识的湖畔探索吧! void AsyncCompleteRpc() { void* got_tag; bool ok = false; // 从队列里面取出回调信息,没有如果还没回包的话会阻塞 while (cq_.Next(&got_tag, &ok)) { AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag); GPR_ASSERT(ok); if (call->status.ok()) // 获取到回包的数据 std::cout << "Greeter received: " << call->reply.message() << std::endl; else std::cout << "RPC failed" << std::endl; // 销毁new的对象 delete call; } }
这里会一直循环调用 Next 方法处理 server 的 response,如果没有回包会一直阻塞,所以这里需要新起一个线程处理,避免阻塞主流程。
相关视频推荐
[linux]一个让性能飞起的解决方案,异步处理到底有哪些不一样
linux多线程之epoll原理剖析与reactor原理及应用
需要C/C++ Linux服务器架构师学习资料加qun获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享
异步 Server
一般我们在写 Server 端的时候如果是同步操作的话,在收到请求之后立即处理,然后给 client 回包,这个回包的过程中需要等待回包完毕整个 RPC 请求才算结束,这里就存在阻塞等待的过程,而使用异步请求就是用来避免阻塞,可以在单个线程里面处理更多的信息量。
但是在理解 gRPC 异步 API 的时候还是会感觉非常的别扭,导致一开始看不太明白实例代码的含义。
首先,gRPC让你像流水线操作一样,要先准备一个 CallData 对象作为一个容器,然后 gRPC 会通过 ServerCompletionQueue 将各种事件发送到 CallData 对象中,让这个对象根据自身的状态进行处理。
然后处理完毕当前的事件之后还需要手动再创建一个 CallData 对象,这个对象是为下个 Client 请求准备的,整个过程就像流水线一样。
上面这个异步的过程还有一个小状态机在里面,全部都由 CallData 对象进行扭转:
- CallData 对象刚创建的时候会从 CREATE 状态扭转为 PROCESS 状态,表示等待接收请求;
- 请求过来之后,首先会创建一个 CallData 对象,然后处理完后扭转为 FINISH 状态,等待给 Client 回包结束;
- 回包结束之后将 CallData 对象自身删除。
清楚了这个CallData 对象是用来做什么之后,下面我们来看一下整个 Server 的流程应该如下:
- Server 启动,注册,创建一个 CallData 对象,这个对象用来给下个 Client 请求准备的;
- 创建好的 CallData 对象会交给 gRPC 托管,有事件过来的时候会将事件放入到 CallData 对象对,然后以 ServerCompletionQueue 对象来进行通知;
- 等待 Client 请求过来…
- 有事件过来之后会从 ServerCompletionQueue 对象中解包出来数据,转为 CallData 对象调用 Proceed 方法,然后进行业务逻辑处理,并重新创建 CallData 对象,用来给下个 Client 请求准备的;
- 等待给 Client 的回包结束。。。
- 继续处理 ServerCompletionQueue 回过来的 event 事件,清理自身 CallData 对象。
由于这个图我也不知道怎么画了,算了摆烂,不画了,自己脑补ba~
下面看看代码。
启动注册
void Run() { // 启动注册监听 std::string server_address("0.0.0.0:50051"); ServerBuilder builder; builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); builder.RegisterService(&service_); cq_ = builder.AddCompletionQueue(); server_ = builder.BuildAndStart(); std::cout << "Server listening on " << server_address << std::endl; // 处理RPC HandleRpcs(); }
启动主流程
欢迎大家来到IT世界,在知识的湖畔探索吧! std::unique_ptr<ServerCompletionQueue> cq_; void HandleRpcs() { // 这里其实是异步逻辑的处理地方 new CallData(&service_, cq_.get()); void* tag; // uniquely identifies a request. bool ok; while (true) { // Next方法会阻塞,直到有下个请求过来,才会继续往下走 GPR_ASSERT(cq_->Next(&tag, &ok)); // 必须检查 Next 的返回值,这个返回值告诉我们是有事件到来 // 还是 cq_ 正在关闭。 GPR_ASSERT(ok); // 转成 CallData 调用 Proceed static_cast<CallData*>(tag)->Proceed(); } }
主流程这里会创建 CallData 对象,然后不断循环从 cq_ 对象里面获取事件,这个 cq_ 就是一个等待队列,没有事件过来的时候会一直阻塞。有事件过来之后会从 cq_ 里面取出 tag 转换成 CallData对象调用 Proceed 方法。
创建 CallData & 逻辑处理 & 完成
class ServerImpl final { ... private: class CallData { public: CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq) : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) { Proceed(); } void Proceed() { if (status_ == CREATE) { // 首次进入,改变状态到 PROCESS status_ = PROCESS; // 需要注意的是,这里将 this 作为 tag 塞入到请求中作为唯一识别请求 service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this); } else if (status_ == PROCESS) { // 在我们处理这个请求之前,创建一个新的 CallData 实例用于处理未来 // 的新请求。实例会在它的 FINISH 状态流程中释放自己占用的内存 new CallData(service_, cq_); // 实际的业务逻辑了 std::string prefix("Hello "); // 从request_中获取client请求数据,并设置回包数据 reply_.set_message(prefix + request_.name()); // 业务处理完毕,让 gRPC 运行时知道我们已经完成了,使用这个实例的内存 // 地址作为事件内唯一识别请求的 tag status_ = FINISH; responder_.Finish(reply_, Status::OK, this); } else { GPR_ASSERT(status_ == FINISH); // 已经到达 FINISH 状态,释放自身占用内存(CallData) delete this; } } private: Greeter::AsyncService* service_; // 生产-消费队列,用来异步服务消息通知 ServerCompletionQueue* cq_; // RPC 的上下文信息,用于例如压缩、鉴权以及发送元数据给客户等用途。 ServerContext ctx_; // 从客户端接受到了什么 HelloRequest request_; // 从客户端返回什么 HelloReply reply_; // 用于回复客户端的方法 ServerAsyncResponseWriter<HelloReply> responder_; // 状态机 enum CallStatus { CREATE, PROCESS, FINISH }; CallStatus status_; // The current serving state. }; ... };
从这里我们就可以和上面的 HandleRpcs 方法联系起来。
- 首先是 new CallData 会直接调用 Proceed 方法,这个时候会进入到 if 的第一个分支,然后将自身 this 写入到 cq_ 中,并将状态扭转为 PROCESS;
- 这个时候会继续会到 HandleRpcs 方法的 while 循环中等待事件;
- 当有 Client 发送请求过来时会进入到 Proceed 的 if 第二个分支进行业务逻辑的处理;
- 这里首先会 new CallData 给下一个请求使用;
- 然后从 request_ 获取到 Client 请求参数并进行处理;
- 回包数据写入到 reply_ 中,最后调用 Finish 结束;
- 这个时候会继续会到 HandleRpcs 方法的 while 循环中等待给 Client 的 response 回包结束;
- 收到回包结束之后会继续调用到 Proceed 方法的 if 第三个分支,删除当前对象。
总结
其实对比 go 的 grpc 的异步 API 来看,不得不说 cpp 的 API 设计很有问题,咋一看之下,根本不知道 new CallData 有什么用,为啥 new 了之后不做任何事情,也没有 delete 操作,不会内存溢出吗?然后进入到构造方法中才发现逻辑都在构造函数中,这样的写代码的方式我只在这里看到过。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/97729.html