开始前必读:基于grpc从零开始搭建一个准生产分布式应用(0) - quickStart
本章内容比较简单,用springGrpc框架实现一个拦截器,下面代码比较简单不过多解释,读者仔细研究下代码即可
一、方案一:自定义拦截器
1.1、异常定义
public final class ValidationExceptions {
private ValidationExceptions() {
}
/**
* Convert a {@link ValidationException} into a gRPC {@code StatusRuntimeException}
* with status code {@code Code.INVALID_ARGUMENT},
* the {@link ValidationException} exception message,
* and {@link Any} error details containing {@link BadRequest} with field violation details.
*
* @param ex the {@code ValidationException} to convert.
* @return a gRPC {@code StatusRuntimeException}
*/
public static StatusRuntimeException asStatusRuntimeException(ValidationException ex) {
BadRequest badRequestElement = BadRequest.newBuilder()
.addFieldViolations(BadRequest.FieldViolation.newBuilder().setField(ex.getField()).setDescription(ex.getReason()).build())
.build();
return StatusProto.toStatusRuntimeException(Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT.getNumber())
.setMessage(ex.getMessage())
.addDetails(Any.pack(badRequestElement)).build());
}
}
1.2、服务端拦截器
public class ValidatingServerInterceptor implements ServerInterceptor {
private final ValidatorIndex index;
public ValidatingServerInterceptor(ValidatorIndex index) {
this.index = index;
}
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(call, headers)) {
// Implementations are free to block for extended periods of time. Implementations are not
// required to be thread-safe.
private boolean aborted = false;
public void onMessage(ReqT message) {
try {
index.validatorFor(message.getClass()).assertValid(message);
super.onMessage(message);
} catch (ValidationException ex) {
StatusRuntimeException status = ValidationExceptions.asStatusRuntimeException(ex);
aborted = true;
call.close(status.getStatus(), status.getTrailers());
}
}
public void onHalfClose() {
if (!aborted) {
super.onHalfClose();
}
}
};
}
}
1.3、客户端拦截器
public class ValidatingClientInterceptor implements ClientInterceptor {
private final ValidatorIndex index;
public ValidatingClientInterceptor(ValidatorIndex index) {
this.index = index;
}
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
public void sendMessage(ReqT message) {
try {
index.validatorFor(message.getClass()).assertValid(message);
super.sendMessage(message);
} catch (ValidationException ex) {
throw ValidationExceptions.asStatusRuntimeException(ex);
}
}
};
}
}
二、方案二:自定义拦截器
2.1、服务端拦截器
public class DelegateInterceptor implements ServerInterceptor {
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata, ServerCallHandler<ReqT, RespT> serverCallHandler) {
long inTime = System.currentTimeMillis();
String trackId = metadata.get(Metadata.Key.of(CONST.TRACKID_KEY, Metadata.ASCII_STRING_MARSHALLER));
if (StringUtils.isEmpty(trackId)){
trackId = String.valueOf(genLogId(System.nanoTime()));
}
StringBuilder delegateLog = new StringBuilder();
delegateLog.append("tid=").append(trackId)
.append(CONST.SPLIT_BLANK).append("appid=").append(TokenParser.appId())
.append(CONST.SPLIT_BLANK).append("ip=").append(serverCall.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR))
.append(CONST.SPLIT_BLANK).append("uri=").append(serverCall.getMethodDescriptor().getFullMethodName())
// .append(LogConst.SPLIT_BLANK).append("param=").append(serverCall.getAttributes()) //TODO 打印原始入参,暂时还没取出来
.append(CONST.SPLIT_BLANK).append("inTime=").append(inTime);
//保存请求时间和相关日志到请求线程中,供后面拦截器打印用
metadata.put(Metadata.Key.of(CONST.DELEGATE_LOG_KEY, Metadata.ASCII_STRING_MARSHALLER), delegateLog.toString());
metadata.put(Metadata.Key.of(CONST.DELEGATE_INTIME_KEY, Metadata.ASCII_STRING_MARSHALLER), String.valueOf(inTime));
log.info(delegateLog.toString());
//下面设置的值必须为原始值,不能自定义的变量,保持参数的纯净
DelegateCall<ReqT, RespT> serverCallDelegate = new DelegateCall<>(serverCall);
serverCallDelegate.setMetadata(metadata);
DelegateCallListener<ReqT, RespT> delegateCallListener = new DelegateCallListener<>(serverCallHandler.startCall(serverCallDelegate, metadata));
delegateCallListener.setServerCall(serverCall);
return delegateCallListener;
}
2.2、客户端拦截器
public class LogGrpcInterceptor implements ClientInterceptor {
private static final Logger log = LoggerFactory.getLogger(LogGrpcInterceptor.class);
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
log.info("Received call to {}", method.getFullMethodName());
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
public void sendMessage(ReqT message) {
log.debug("Request message: {}", message);
super.sendMessage(message);
}
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
public void onMessage(RespT message) {
log.debug("Response message: {}", message);
super.onMessage(message);
}
public void onHeaders(Metadata headers) {
log.debug("gRPC headers: {}", headers);
super.onHeaders(headers);
}
public void onClose(Status status, Metadata trailers) {
log.info("Interaction ends with status: {}", status);
log.info("Trailers: {}", trailers);
super.onClose(status, trailers);
}
}, headers);
}
};
}
}
2.3、利用反射取得相关的值
这是一个通用方法,一些框架出于安全原因会把一些原生的对象设置成保护对象使使用者不能扩展。下面这个方法是通过反射的方法拿到受保护对象的值,注意:如果不是非常必要不建议使用。
public class ServerCallStreamObserverUtil<T> {
private static Class targetClazz;
private static Field targetField;
static {
Class[] classes = ServerCalls.class.getDeclaredClasses();
for(Class clazz : classes) {
if("io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl".equals(clazz.getName())) {
targetClazz = clazz;
break;
}
}
try {
targetField = targetClazz.getDeclaredField("call");
targetField.setAccessible(true);
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
}
public DelegateCall delegateCall(StreamObserver<T> responseObserver){
try {
DelegateCall a = (DelegateCall) targetField.get(responseObserver);
return a;
} catch (IllegalAccessException e) {
e.printStackTrace();
}
return null;
}
}