paint-brush
How to Implement Server-Side Interceptors for Logging GRPC Requestsby@dmitriiantonov90
1,516 reads
1,516 reads

How to Implement Server-Side Interceptors for Logging GRPC Requests

by Dmitrii AntonovNovember 21st, 2023
Read on Terminal Reader
Read this story w/o Javascript
tldt arrow

Too Long; Didn't Read

GRPC has an interceptor mechanism that can be used for logging, GRPC request verification, authentication, etc. To intercept requests on the server side, it is necessary to implement ServerInterceptor. And inherit two abstract classes, ServerCall and ServerCall.Listener. Our implementation uses the queue for storing sent requests.
featured image - How to Implement Server-Side Interceptors for Logging GRPC Requests
Dmitrii Antonov HackerNoon profile picture

Once, I had a requirement to implement server-side logging for GRPC requests. GRPC has an interceptor mechanism that can be used for logging, GRPC request verification, authentication, etc.


To intercept requests on the server side, it is necessary to implement ServerInterceptor. And inherit two abstract classes, ServerCall and ServerCall.Listener. First, let’s make our implementation of ServerCall.Listener.


There is ForwardingServerCallListener that gives calls to the next listener in the chain. We will extend this class and override the method onMessage.


We will use the ConcurrentLinkedQueue for storing received requests. This method might be invoked more than one time if using client-side streaming. Also, there are the onComplete and onCancel methods that will be invoked after completing the GRPC request or after the client’s cancelation. Our implementation uses the queue for storing sent requests.


class LoggingServerCallListener<ReqT>(
    private val delegate: ServerCall.Listener<ReqT>,
    private val input: ConcurrentLinkedQueue<ReqT>
) : ForwardingServerCallListener<ReqT>() {

    override fun delegate(): ServerCall.Listener<ReqT> {
        return delegate
    }

    override fun onMessage(message: ReqT) {
        super.onMessage(message)
        input.offer(message)
    }
}


Next, we need to implement the ServerCall class. There is the implementation SimpleForwardingServerCall that works as ForwardingServerCallListener for ServerCall.Listener.

We will use the same instance of ConcurrentLinkedQueue for getting received requests.


Also, we will use another instance of the queue for storing sent responses. We will implement the sendMessage method that invokes before a response is sent to a client. This method can be invoked more than once if the result is server-side streaming. We will also override the close method, which will be called after all data has been sent to the client or an error occurs.


The method ensures that there will be no further calls to onMessage or sendMessage.


import com.google.protobuf.GeneratedMessageV3
import com.google.protobuf.util.JsonFormat
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall
import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.Status
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentLinkedQueue

class LoggingServerCall<RequestType, ResponseType>(
    delegate: ServerCall<RequestType, ResponseType>,
    private val metadata: Metadata,
    private val input: ConcurrentLinkedQueue<RequestType>,
    private val output: ConcurrentLinkedQueue<ResponseType>
) : SimpleForwardingServerCall<RequestType, ResponseType>(delegate) {

    override fun sendMessage(message: ResponseType) {
        output.offer(message)
        super.sendMessage(message)
    }

    override fun close(status: Status, trailers: Metadata) {
        if (status.isOk) {
            val requestId = metadata.get(REQUEST_ID_METADATA_KEY)

            val printer = JsonFormat.printer()

            val request = input
                .filterIsInstance<GeneratedMessageV3>()
                .joinToString(separator = ", ", prefix = "[", postfix = "]", transform = printer::print)

            val response = output
                .filterIsInstance<GeneratedMessageV3>()
                .joinToString(separator = ", ", prefix = "[", postfix = "]", transform = printer::print)

            logger.info("The request with id: $requestId and request data $request returns $response")

        } else if (listOf(Status.INTERNAL.code, Status.NOT_FOUND.code, Status.INVALID_ARGUMENT.code).contains(status.code)) {
            val requestId = metadata.get(REQUEST_ID_METADATA_KEY)
            logger.error("An error occurred while processing the request with request-id $requestId", status.asRuntimeException())
        }

        super.close(status, trailers)
    }

    companion object {
        private val logger = LoggerFactory.getLogger(LoggingServerCall::class.java)
        private val REQUEST_ID_METADATA_KEY = Metadata.Key.of("x-request-id", Metadata.ASCII_STRING_MARSHALLER)
    }
}


Our implementation is very simple. We use JsonFormater for converting protobuf to JSON and joining to the string. Also, we take a request ID from the metadata for emulating real logging situations. Then we log the message into the system out. If an error occurs, we will log it.


After that, the onComplete/onCancel methods of the listener will be called.


Next, we must implement our ServerInterceptor which uses both our ServerCall and ServerCall.Listener.


class LoggingInterceptor : ServerInterceptor {
    override fun <ReqT : Any, RespT : Any?> interceptCall(
        call: ServerCall<ReqT, RespT>,
        headers: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> {
        val input = ConcurrentLinkedQueue<ReqT>()
        val output = ConcurrentLinkedQueue<RespT>()
        val loggingServerCall = LoggingServerCall(call, headers, input, output)
        return LoggingServerCallListener(next.startCall(loggingServerCall, headers), input)
    }
}


To start the processing request, we must call the startCall method of the ServerCallHandler class. After that, we must register our server interceptor.


fun main(args: Array<String>) {
	val server = ServerBuilder
        .forPort(8080)
		.addService(OurGrpcService())
		.intercept(LoggingInterceptor())
		.build()
	
	server.start()
}


In the second part, we consider how to work with client interceptors. I hope the article can help you with how to work with the server-side interceptors in GRPC.