松哥最近正在录制 项目视频~采用 Boot+Vue3 技术栈,里边会涉及到各种好玩的技术grpc,小伙伴们来和松哥一起做一个完成率超 90% 的项目,戳戳戳这里-->。
温馨提示:本文需要结合一起食用,否则可能看不懂。
前面一篇文章松哥和大家聊了 gRPC 的基本用法,今天我们再来稍微深入一点点,来看下 gRPC 中四种不同的通信模式。
gRPC 中四种不同的通信模式分别是:
一元 RPC
服务端流 RPC
客户端流 RPC
双向流 RPC
接下来松哥就通过四个完整的案例,来分别和向伙伴们演示这四种不同的通信模式。
1. 准备工作
关于 gRPC 的基础知识我们就不啰嗦了,咱们直接来看我今天的 proto 文件,如下:
这次我新建了一个名为 book.proto 的文件,这里主要定义了一些图书相关的方法,如下:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.javaboy.grpc.demo";
option java_outer_classname = "BookServiceProto";
import "google/protobuf/wrappers.proto";
package book;
service BookService {
rpc addBook(Book) returns (google.protobuf.StringValue);
rpc getBook(google.protobuf.StringValue) returns (Book);
rpc searchBooks(google.protobuf.StringValue) returns (stream Book);
rpc updateBooks(stream Book) returns (google.protobuf.StringValue);
rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet);
}
message Book {
string id = 1;
repeated string tags = 2;
string name = 3;
float price = 4;
string author = 5;
}
message BookSet {
string id = 1;
repeated Book bookList = 3;
}
这个文件中,有一些内容我们在中都讲过了,讲过的我就不再重复了,我说一些没有涉及到的东西:
由于我们在这个文件中,引用了 提供的 (..),所以这个文件上面我们首先用 导入相关的文件,导入之后,才可以使用。
在方法参数和返回值中出现的 ,就表示这个方法的参数或者返回值是流的形式(其实就是数据可以多次传输)。
中出现了一个没有的关键字 ,这个表示这个字段可以重复,可以简单理解为这就是我们 Java 中的数组。
好了,和相比,本文主要就是这几个地方不一样。
proto 文件写好之后,按照介绍的方法进行编译,生成对应的代码,这里就不再重复了。
2. 一元 RPC
一元 RPC 是一种比较简单的 RPC 模式,其实说白了我们和大家介绍的就是一种一元 RPC,也就是客户端发起一个请求,服务端给出一个响应,然后请求结束。
上面我们定义的五个方法中, 和 都算是一种一元 RPC。
2.1
先来看 方法,这个方法的逻辑很简单,我们提前在服务端准备一个 Map 用来保存 Book, 调用的时候,就把 book 对象存入到 Map 中,并且将 book 的 ID 返回,大家就这样一件事,来看看服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
private Map bookMap = new HashMap();
public BookServiceImpl() {
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public void addBook(Book request, StreamObserver responseObserver) {
bookMap.put(request.getId(), request);
responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build());
responseObserver.onCompleted();
}
}
看过的小伙伴,我觉得这段代码应该很好理解。
客户端调用方式如下:
public class BookServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
addBook(stub);
}
private static void addBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
stub.addBook(Book.newBuilder().setPrice(99).setId("100").setName("java").setAuthor("javaboy").build(), new StreamObserver() {
@Override
public void onNext(StringValue stringValue) {
System.out.println("stringValue.getValue() = " + stringValue.getValue());
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
countDownLatch.countDown();
System.out.println("添加完毕");
}
});
countDownLatch.await();
}
}
这里我使用了 来实现线程等待,等服务端给出响应之后,客户端再结束。这里在回调的 方法中,我们就可以拿到服务端的返回值。
2.2
跟上面的 类似,先来看服务端代码,如下:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
private Map bookMap = new HashMap();
public BookServiceImpl() {
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public void getBook(StringValue request, StreamObserver responseObserver) {
String id = request.getValue();
Book book = bookMap.get(id);
if (book != null) {
responseObserver.onNext(book);
responseObserver.onCompleted();
} else {
responseObserver.onCompleted();
}
}
}
这个 就是根据客户端传来的 id,从 Map 中查询到一个 Book 并返回。
客户端调用代码如下:
public class BookServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
getBook(stub);
}
private static void getBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
stub.getBook(StringValue.newBuilder().setValue("2").build(), new StreamObserver() {
@Override
public void onNext(Book book) {
System.out.println("book = " + book);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
countDownLatch.countDown();
System.out.println("查询完毕");
}
});
countDownLatch.await();
}
}
小伙伴们大概也能看出来, 和 基本上操作套路是一模一样的。
3. 服务端流 RPC
前面的一元 RPC,客户端发起一个请求,服务端给出一个响应,请求就结束了。服务端流则是客户端发起一个请求,服务端给一个响应序列,这个响应序列组成一个流。
上面我们给出的 就是这样一个例子, 是传递图书的 tags 参数,然后在服务端查询哪些书的 tags 满足条件,将满足条件的书全部都返回去。
我们来看下服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
private Map bookMap = new HashMap();
public BookServiceImpl() {
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public void searchBooks(StringValue request, StreamObserver responseObserver) {
Set keySet = bookMap.keySet();
String tags = request.getValue();
for (String key : keySet) {
Book book = bookMap.get(key);
int tagsCount = book.getTagsCount();
for (int i = 0; i < tagsCount; i++) {
String t = book.getTags(i);
if (t.equals(tags)) {
responseObserver.onNext(book);
break;
}
}
}
responseObserver.onCompleted();
}
}
小伙伴们看下,这段 Java 代码应该很好理解:
首先从 中提取客户端传来的 tags 参数。
遍历 ,查看每一本书的 tags 是否等于客户端传来的 tags,如果相等,说明添加匹配,则通过 .(book); 将这本书写回到客户端。
等所有操作都完成后,执行 .();,表示服务端的响应序列结束了,这样客户端也就知道请求结束了。
我们来看看客户端的代码,如下:
public class BookServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
searchBook(stub);
}
private static void searchBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
stub.searchBooks(StringValue.newBuilder().setValue("明清小说").build(), new StreamObserver() {
@Override
public void onNext(Book book) {
System.out.println(book);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
countDownLatch.countDown();
System.out.println("查询完毕!");
}
});
countDownLatch.await();
}
}
客户端的代码好理解,搜索的关键字是 明清小说,每当服务端返回一次数据的时候,客户端回调的 方法就会被触发一次,当服务端之行了 .(); 之后,客户端的 方法也会被触发。
这个就是服务端流,客户端发起一个请求,服务端通过 可以多次写回数据。
4. 客户端流 RPC
客户端流则是客户端发起多个请求,服务端只给出一个响应。
上面的 就是一个客户端流的案例,客户端想要修改图书,可以发起多个请求修改多本书grpc,服务端则收集多次修改的结果,将之汇总然后一次性返回给客户端。
我们先来看看服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
private Map bookMap = new HashMap();
public BookServiceImpl() {
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public StreamObserver updateBooks(StreamObserver responseObserver) {
StringBuilder sb = new StringBuilder("更新的图书 ID 为:");
return new StreamObserver() {
@Override
public void onNext(Book book) {
bookMap.put(book.getId(), book);
sb.append(book.getId())
.append(",");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
responseObserver.onNext(StringValue.newBuilder().setValue(sb.toString()).build());
responseObserver.onCompleted();
}
};
}
}
客户端每发送一本书来,就会触发服务端的 方法,然后我们在这方法中进行图书的更新操作,并记录更新结果。最后,我们在 方法中,将更新结果汇总返回给客户端,基本上就是这样一个流程。
我们再来看看客户端的代码:
public class BookServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
updateBook(stub);
}
private static void updateBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
StreamObserver request = stub.updateBooks(new StreamObserver() {
@Override
public void onNext(StringValue stringValue) {
System.out.println("stringValue.getValue() = " + stringValue.getValue());
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
System.out.println("更新完毕");
countDownLatch.countDown();
}
});
request.onNext(Book.newBuilder().setId("1").setName("a").setAuthor("b").build());
request.onNext(Book.newBuilder().setId("2").setName("c").setAuthor("d").build());
request.onCompleted();
countDownLatch.await();
}
}
在客户端这块, 方法会返回一个 对象,调用该对象的 方法就是给服务端传递数据了,可以传递多个数据,调用该对象的 方法就是告诉服务端数据传递结束了,此时也会触发服务端的 方法,服务端的 方法执行之后,进而触发了客户端的 方法。
5. 双向流 RPC
双向流其实就是 3、4 小节的合体。即客户端多次发送数据,服务端也多次响应数据。
我们先来看下服务端的代码:
public class BookServiceImpl extends BookServiceGrpc.BookServiceImplBase {
private Map bookMap = new HashMap();
private List books = new ArrayList();
public BookServiceImpl() {
Book b1 = Book.newBuilder().setId("1").setName("三国演义").setAuthor("罗贯中").setPrice(30).addTags("明清小说").addTags("通俗小说").build();
Book b2 = Book.newBuilder().setId("2").setName("西游记").setAuthor("吴承恩").setPrice(40).addTags("志怪小说").addTags("通俗小说").build();
Book b3 = Book.newBuilder().setId("3").setName("水浒传").setAuthor("施耐庵").setPrice(50).addTags("明清小说").addTags("通俗小说").build();
bookMap.put("1", b1);
bookMap.put("2", b2);
bookMap.put("3", b3);
}
@Override
public StreamObserver processBooks(StreamObserver responseObserver) {
return new StreamObserver() {
@Override
public void onNext(StringValue stringValue) {
Book b = Book.newBuilder().setId(stringValue.getValue()).build();
books.add(b);
if (books.size() == 3) {
BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
responseObserver.onNext(bookSet);
books.clear();
}
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build();
responseObserver.onNext(bookSet);
books.clear();
responseObserver.onCompleted();
}
};
}
}
这段代码没有实际意义,单纯为了给小伙伴们演示双向流,我的操作逻辑是客户端传递多个 ID 到服务端,然后服务端根据这些 ID 构建对应的 Book 对象,然后三个三个一组,再返回给客户端。客户端每次发送一个请求,都会触发服务端的 方法,我们在这个方法中对请求分组返回。最后如果还有剩余的请求,我们在 () 方法中返回。
再来看看客户端的代码:
public class BookServiceClient {
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
.usePlaintext()
.build();
BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel);
processBook(stub);
}
private static void processBook(BookServiceGrpc.BookServiceStub stub) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
StreamObserver request = stub.processBooks(new StreamObserver() {
@Override
public void onNext(BookSet bookSet) {
System.out.println("bookSet = " + bookSet);
System.out.println("=============");
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCompleted() {
System.out.println("处理完毕!");
countDownLatch.countDown();
}
});
request.onNext(StringValue.newBuilder().setValue("a").build());
request.onNext(StringValue.newBuilder().setValue("b").build());
request.onNext(StringValue.newBuilder().setValue("c").build());
request.onNext(StringValue.newBuilder().setValue("d").build());
request.onCompleted();
countDownLatch.await();
}
}
这个客户端的代码跟第四小节一模一样,不再赘述了。
好啦,这就是松哥和小伙伴们介绍的 gRPC 的四种不同的通信模式,文章中只给出了一些关键代码,如果小伙伴们没看明白,建议结合一起阅读就懂啦~
最后,又到了本周的送书环节啦~
《软件平台架构设计与技术管理之道》
驾驭复杂平台的深度思考 顶层架构设计的实践分享
领悟大道至简的技术哲学观 掌控IT管理与决策的智慧锦囊
内容简介
《软件平台架构设计与技术管理之道》以复杂平台顶层设计为线索,作者使用接地气的技术语言,轻松叙事的写作风格,提炼近20年软件平台技术工作中的心得体会,整理出如此多的锦囊,执笔成书与读者分享。书中内容引经据典,多处引用成语、谚语,大量使用比喻写法,期望带给读者原生态技术语言之感受,体现软件平台技术工作中的情愫和品味。
本书内容十分解耦,而且语言犀利,其中不乏以博弈和批判性思维来揭示工作中问题的实质,风格迥异、见解独到,内容处处引人入胜,让“无形的道”跃然纸上,是一本难得一见的技术书籍,为从事“IT平台建设与维护、软件系统设计与开发、项目与质量管理”等工作领域的读者们,带来一份职业能力发展的“点金术”。为给读者最佳的阅读感受,本书图文并茂,不仅有20余幅实战彩图,全景展示平台架构设计,更是绘制近70张主题简笔漫画,为读者朋友们奉上精美、飘逸之风。
全书分为两部分。第1部分包括5章50节,通过主题短文,在思想底蕴与思维认知、平台顶层架构与核心能力、技术管理与分析决策方面,给读者提供丰富的工作锦囊,综合提升读者的技术掌控力和布道力,精进方法论,使读者能快速成为一个通识全貌型人才,从容自如地驾驭中大型软件平台方方面面的技术工作。第2部分包括3章,提供20余幅技术方案图和架构设计的工作示意图,以及常用工作台账示例,帮助读者进一步精通图形化表达方法,提升技术设计呈现能力。
本书不仅适合工作于一线的技术总监、架构师和中高级技术人员阅读,对致力于IT咨询和布道师岗位的读者,以及扩展知识面、争取持续提升的IT项目管理人员、质量管理人员,同样可以从中受益,大获技能包,增强软实力,早日脱颖而出。
作者简介
由维昭,清华大学2002级软件工程专业,硕士研究生。
在中信集团旗下企业工作逾三十年,现就职于金宝信社保卡科技有限公司,任技术总监。
银行、互联网、电子商务、社会保险等领域系统建设经历丰富,具备复杂平台顶层架构设计以及海量并发性能实战能力;擅长联机交易及数据处理,掌握分布式架构、开源技术栈;技术开发、运维及安全工作有多年的管理经验。
小伙伴们随便留言吧,我会随机挑选6位幸运小伙伴,《软件平台架构设计与技术管理之道》包邮到家!
限时特惠:本站持续每日更新海量各大内部创业课程,一年会员仅需要98元,全站资源免费下载
点击查看详情
站长微信:Jiucxh