1.说明

在gRPC的服务中定义接口的时候,
请求参数和响应参数可以设置为非Stream或者是Stream方式。
Stream流方式,即操作者可以任意的读写流数据,
直到关闭流,
而非Stream方式在写入或者读取后,
数据是不能改变的。

这样一个rpc接口的定义就有如下四种定义方式:

序号

名称

请求(客户端)

响应(服务端)

1

简单RPC

非Stream

非Stream

2

服务器端流式RPC

非Stream

Stream

3

客户端流式 RPC

Stream

非Stream

4

双向流式 RPC

Stream

Stream

2.proto定义

新建hello_stream.proto文件如下:

syntax = "proto3";

option java_multiple_files = false;
option java_package = "io.grpc.examples.hello.stream";
option java_outer_classname = "HelloStream";
option objc_class_prefix = "HLWS";

package hellostream;

// The greeting service definition.
service StreamingGreeter {
  // 1.Simple RPC
  rpc SayHello (HelloRequest) returns (HelloReply) {}

  // 2.Server-to-Client streaming RPC
  rpc SayHelloServerStream (HelloRequest) returns (stream HelloReply) {}
  
  // 3.Client-to-Server streaming RPC
  rpc SayHelloClientStream (stream HelloRequest) returns (HelloReply) {}

  // 4.Bidirectional streaming RPC
  rpc SayHelloBidirStream (stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

3.同步和异步stub

在上面的proto文件中定义了4种rpc接口,
而GRPC实现rpc接口的时候,
提供了同步和异步调用的stub方法。
同步的stub只能实现1,2这三种接口。
异步的stub能实现全部4种接口。
即1和2两种接口的客户端有同步和异步两种写法,
而3和4两种接口的客户端只有异步一种写法。

4.简单RPC

4.1.客户端-同步

StreamingGreeterClient.sayHelloBlock(String)

4.2.客户端-异步

StreamingGreeterClient.sayHelloAsync(String)

4.3.服务端

StreamingGreeterImpl.sayHello(HelloRequest, StreamObserver<HelloReply>)

5. 服务器端流式RPC

5.1.客户端-同步

StreamingGreeterClient.sayHelloServerStreamBlock(String)

5.2.客户端-异步

StreamingGreeterClient.sayHelloServerStreamAsync(String)

5.3.服务端

StreamingGreeterImpl.sayHelloServerStream(HelloRequest, StreamObserver<HelloReply>)

6.客户端流式 RPC

6.1.客户端-异步

StreamingGreeterClient.sayHelloClientStreamAsync(String...)

6.2.服务端

StreamingGreeterImpl.sayHelloClientStream(StreamObserver<HelloReply>)

7.双向流式 RPC

7.1.客户端-异步

StreamingGreeterClient.sayHelloBidirStreamAsync(String...)

7.2.服务端

StreamingGreeterImpl.sayHelloBidirStream(StreamObserver<HelloReply>)

8.客户端和服务端完整代码

8.1.客户端代码

StreamingGreeterClient.java

package com.ai.grpc.service;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.examples.hello.stream.HelloStream;
import io.grpc.examples.hello.stream.HelloStream.HelloReply;
import io.grpc.examples.hello.stream.HelloStream.HelloRequest;
import io.grpc.examples.hello.stream.StreamingGreeterGrpc;
import io.grpc.examples.hello.stream.StreamingGreeterGrpc.StreamingGreeterBlockingStub;
import io.grpc.examples.hello.stream.StreamingGreeterGrpc.StreamingGreeterStub;
import io.grpc.stub.StreamObserver;

public class StreamingGreeterClient {
	private static Logger LOG = LoggerFactory.getLogger(StreamingGreeterClient.class);

	// 同步的stub
	private StreamingGreeterBlockingStub blockStub;

	// 异步的stub
	private StreamingGreeterStub asyncStub;

	// 初始化GPRC客户端连接
	public void initGrpcStub(String ip, int port) {

		// 获取服务器连接
		ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(ip, port).usePlaintext().build();

		// 获取同步的stub
		blockStub = StreamingGreeterGrpc.newBlockingStub(managedChannel);

		// 获取异步的stub
		asyncStub = StreamingGreeterGrpc.newStub(managedChannel);

	}

	/**
	 * 1.Simple RPC 同步调用
	 */
	public String sayHelloBlock(String name) {
		HelloRequest request = HelloRequest.newBuilder().setName(name).build();
		HelloReply reply = blockStub.sayHello(request);
		return reply.getMessage();
	}

	/**
	 * 1.Simple RPC 异步调用
	 */
	public String sayHelloAsync(String name) {
		HelloRequest request = HelloRequest.newBuilder().setName(name).build();
		BlockingQueue<HelloReply> replys = new ArrayBlockingQueue<HelloReply>(1);
		StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloStream.HelloReply>() {

			@Override
			public void onNext(HelloReply reply) {
				LOG.info("onNext=" + reply);
				replys.offer(reply);
			}

			@Override
			public void onCompleted() {
				LOG.info("onCompleted");
			}

			@Override
			public void onError(Throwable t) {
				LOG.info("onError," + t);
				String error = t.getMessage();
				HelloReply reply = HelloReply.newBuilder().setMessage(error).build();
				replys.offer(reply);
			}
		};
		asyncStub.sayHello(request, responseObserver);

		try {
			// 获取到结果才返回
			return replys.take().getMessage();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		return null;
	}

	/**
	 * 2.Server-to-Client streaming RPC 同步调用
	 */
	public List<String> sayHelloServerStreamBlock(String name) {
		HelloRequest request = HelloRequest.newBuilder().setName(name).build();
		Iterator<HelloReply> replys = blockStub.sayHelloServerStream(request);

		List<String> messages = new ArrayList<>();
		while (replys.hasNext()) {
			HelloReply reply = replys.next();
			messages.add(reply.getMessage());
		}

		return messages;
	}

	/**
	 * 2.Server-to-Client streaming RPC 异步调用
	 */
	public List<String> sayHelloServerStreamAsync(String name) {
		HelloRequest request = HelloRequest.newBuilder().setName(name).build();

		// 用于保存返回值
		List<String> messages = new ArrayList<>();
		// 用于判断服务端返回的流是否结束
		BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
		StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloStream.HelloReply>() {

			@Override
			public void onNext(HelloReply reply) {
				LOG.info("onNext=" + reply);
				messages.add(reply.getMessage());
			}

			@Override
			public void onCompleted() {
				LOG.info("onCompleted");
				closeStream(queue);
			}

			@Override
			public void onError(Throwable t) {
				LOG.info("onError," + t);
				closeStream(queue);
			}
		};
		asyncStub.sayHelloServerStream(request, responseObserver);

		waiteStreamClose(queue);

		return messages;
	}

	/**
	 * 3.Client-to-Server streaming RPC 异步调用<br/>
	 * 对于客户端的流式调用,只能是异步调用
	 */
	public String sayHelloClientStreamAsync(String... names) {

		// 用于保存返回值,即使返回值只有一个值,也需要通过容器返回
		List<String> messages = new ArrayList<>();
		// 用于判断服务端返回的流是否结束
		BlockingQueue<Object> queue = new ArrayBlockingQueue<>(1);
		StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloReply>() {

			@Override
			public void onNext(HelloReply reply) {
				LOG.info("onNext=" + reply);
				messages.add(reply.getMessage());
			}

			@Override
			public void onCompleted() {
				LOG.info("onCompleted");
				closeStream(queue);
			}

			@Override
			public void onError(Throwable t) {
				LOG.info("onError," + t);
				closeStream(queue);
			}
		};
		// 异步调用接口
		StreamObserver<HelloRequest> requestObserver = asyncStub.sayHelloClientStream(responseObserver);

		// 客户端发送消息流
		for (String name : names) {
			HelloRequest request = HelloRequest.newBuilder().setName(name).build();
			requestObserver.onNext(request);
		}
		// 关闭客户端流
		requestObserver.onCompleted();

		waiteStreamClose(queue);

		return messages.get(0);
	}

	/**
	 * 4.Bidirectional streaming RPC 异步调用<br/>
	 * 对于客户端的流式调用,只能是异步调用
	 */
	public String sayHelloBidirStreamAsync(String... names) {

		// 用于保存返回值,也需要通过容器返回
		List<String> messages = new ArrayList<>();
		// 用于判断服务端返回的流是否结束
		CountDownLatch downLatch = new CountDownLatch(1);
		StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloReply>() {

			@Override
			public void onNext(HelloReply reply) {
				LOG.info("onNext=" + reply);
				messages.add(reply.getMessage());
			}

			@Override
			public void onCompleted() {
				LOG.info("onCompleted");
				downLatch.countDown();
			}

			@Override
			public void onError(Throwable t) {
				LOG.info("onError," + t);
				downLatch.countDown();
			}
		};
		// 异步调用接口
		StreamObserver<HelloRequest> requestObserver = asyncStub.sayHelloBidirStream(responseObserver);

		// 客户端发送消息流
		for (String name : names) {
			HelloRequest request = HelloRequest.newBuilder().setName(name).build();
			requestObserver.onNext(request);
		}
		// 关闭客户端流
		requestObserver.onCompleted();

		try {
			// 等待服务端响应结束
			downLatch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		return messages.toString();
	}

	private void closeStream(BlockingQueue<Object> queue) {
		// 表示服务端返回的流结束
		queue.offer(new Object());
	}

	private void waiteStreamClose(BlockingQueue<Object> queue) {
		// 阻塞直到获取到对象,则可以返回结果
		try {
			queue.take();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

StreamingGreeterClientTest.java

package com.ai.grpc.service;

import java.util.List;

import org.junit.Test;

public class StreamingGreeterClientTest {

	private StreamingGreeterClient client = new StreamingGreeterClient();

	@Test
	public void testInitGrpcStub() {
		initGrpcStub();
	}

	private void initGrpcStub() {
		String ip = "localhost";
		int port = 50051;
		client.initGrpcStub(ip, port);
	}

	/**
	 * 1.Simple RPC 同步调用
	 */
	@Test
	public void testSayHelloBlock() {
		initGrpcStub();

		String name = "yuwen";
		String message = client.sayHelloBlock(name);
		System.out.println(message);
	}

	/**
	 * 1.Simple RPC 异步调用
	 */
	@Test
	public void testSayHelloAsync() {
		initGrpcStub();

		String name = "tom";
		String message = client.sayHelloAsync(name);
		System.out.println(message);
	}

	/**
	 * 2.Server-to-Client streaming RPC 同步调用
	 */
	@Test
	public void testSayHelloServerStreamBlock() {
		initGrpcStub();

		String name = "yuwen";
		List<String> messages = client.sayHelloServerStreamBlock(name);
		System.out.println(messages);
	}

	/**
	 * 2.Server-to-Client streaming RPC 异步调用
	 */
	@Test
	public void testSayHelloServerStreamAsync() {
		initGrpcStub();

		String name = "yuwen";
		List<String> messages = client.sayHelloServerStreamAsync(name);
		System.out.println(messages);
	}

	/**
	 * 3.Client-to-Server streaming RPC 异步调用
	 */
	@Test
	public void testSayHelloClientStreamAsync() {
		initGrpcStub();

		String name = "yuwen";
		String messages = client.sayHelloClientStreamAsync(name, "Tom", "cat", "Jerry", "mouse");
		System.out.println(messages);
	}

	/**
	 * 4.Bidirectional streaming RPC 异步调用
	 */
	@Test
	public void testSayHelloBidirStreamAsync() {
		initGrpcStub();

		String name = "yuwen";
		String messages = client.sayHelloBidirStreamAsync(name, "Tom", "cat", "Jerry", "mouse");
		System.out.println(messages);
	}
}

8.2.服务端代码

StreamingGreeterImpl.java:

package com.ai.grpc.service.impl;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.grpc.examples.hello.stream.HelloStream;
import io.grpc.examples.hello.stream.HelloStream.HelloReply;
import io.grpc.examples.hello.stream.HelloStream.HelloRequest;
import io.grpc.examples.hello.stream.StreamingGreeterGrpc;
import io.grpc.stub.StreamObserver;

public class StreamingGreeterImpl extends StreamingGreeterGrpc.StreamingGreeterImplBase {

	private static Logger LOG = LoggerFactory.getLogger(StreamingGreeterImpl.class);

	/**
	 * 1.Simple RPC
	 */
	@Override
	public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
		// 处理消息
		LOG.info("request=" + request);
		String message = "hello-" + request.getName();

		// 返回消息
		HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
		// 只能调用一次onNext,第二次调用会报错
		responseObserver.onNext(reply);

		// 结束处理
		responseObserver.onCompleted();
	}

	/**
	 * 2.Server-to-Client streaming RPC 服务端实现<br/>
	 * 虽然入参定义和Simple RPC基本相同,但是这里的responseObserver能够返回多个响应消息
	 */
	@Override
	public void sayHelloServerStream(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
		// 处理消息
		LOG.info("request=" + request);

		// 返回消息,总共返回三个响应消息
		for (int i = 1; i <= 3; i++) {
			String message = i + "-hello-" + request.getName();
			HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
			responseObserver.onNext(reply);
			try {
				// 等待3s后返回下一个消息
				TimeUnit.SECONDS.sleep(3);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		// 结束处理
		responseObserver.onCompleted();
	}

	/**
	 * 3.Client-to-Server streaming RPC
	 */
	@Override
	public StreamObserver<HelloRequest> sayHelloClientStream(StreamObserver<HelloReply> responseObserver) {
		StreamObserver<HelloRequest> requestObserver = new StreamObserver<HelloStream.HelloRequest>() {

			// 缓存客户端发送的消息
			List<String> messages = new ArrayList<>();

			@Override
			public void onNext(HelloRequest request) {
				LOG.info("onNext=" + request);
				messages.add(request.getName());
			}

			@Override
			public void onCompleted() {
				LOG.info("onCompleted");
				// 处理并且返回消息
				String message = "hello-" + messages;
				HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
				// 由于服务端不是流,这里只能返回一次响应消息
				responseObserver.onNext(reply);
				// 结束处理
				responseObserver.onCompleted();
			}

			@Override
			public void onError(Throwable t) {
				LOG.info("onError," + t);
				// 结束处理
				responseObserver.onCompleted();
			}
		};

		return requestObserver;
	}

	/**
	 * 4.Bidirectional streaming RPC
	 */
	@Override
	public StreamObserver<HelloRequest> sayHelloBidirStream(StreamObserver<HelloReply> responseObserver) {
		StreamObserver<HelloRequest> requestObserver = new StreamObserver<HelloStream.HelloRequest>() {
			@Override
			public void onNext(HelloRequest request) {
				LOG.info("onNext=" + request);
				// 处理并且返回消息
				String message = "hello-" + request.getName();
				HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
				// 服务端返回流处理后的消息
				responseObserver.onNext(reply);
			}

			@Override
			public void onCompleted() {
				LOG.info("onCompleted");
				// 结束处理
				responseObserver.onCompleted();
			}

			@Override
			public void onError(Throwable t) {
				LOG.info("onError," + t);
				// 结束处理
				responseObserver.onCompleted();
			}
		};

		return requestObserver;
	}

}

GRPCServer.java:

package com.ai.grpc;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ai.grpc.service.impl.StreamingGreeterImpl;

import io.grpc.Server;
import io.grpc.ServerBuilder;

/**
 * 
 * 静态订阅模式,采集器作为服务端
 * 
 * gRPC服务启动类,启动时注册添加需要对外提供的服务类
 */
public class GRPCServer {
	private static Logger LOG = LoggerFactory.getLogger(GRPCServer.class);

	private Server server;

	private void start() throws IOException {
		// 服务运行端口
		int port = 50051;
		// 注册暴露对外提供的服务
		server = ServerBuilder.forPort(port).addService(new StreamingGreeterImpl()).build().start();
		LOG.info("Server started, listening on port={} ", port);

		Runtime.getRuntime().addShutdownHook(new Thread() {
			@Override
			public void run() {
				// 使用标准错误输出,因为日志记录器有可能在JVM关闭时被重置
				System.err.println("*** shutting down gRPC server since JVM is shutting down");
				LOG.info("*** shutting down gRPC server since JVM is shutting down");
				try {
					GRPCServer.this.stop();
				} catch (InterruptedException e) {
					e.printStackTrace(System.err);
				}
				System.err.println("*** server shut down complete");
				LOG.info("*** server shut down complete");

			}
		});
	}

	private void stop() throws InterruptedException {
		if (server != null) {
			server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
		}
	}

	/**
	 * 在主线程上等待终止,因为grpc库使用守护进程。
	 */
	private void blockUntilShutdown() throws InterruptedException {
		if (server != null) {
			server.awaitTermination();
		}
	}

	/**
	 * 启动服务Main方法
	 */
	public static void main(String[] args) throws IOException, InterruptedException {
		final GRPCServer server = new GRPCServer();
		server.start();
		server.blockUntilShutdown();
	}
}

9.参考文档

GRPC Java官方手册


阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6