1. 背景

在Hadoop的性能指标中,rpc是最核心的一类指标,它标志着Hadoop服务的性能。通过该指标能够判断服务此时是否正常。如下所示:

Untitled.png

同时,在配置文件中,还有很多rpc相关的重要配置,例如:

  • ipc.server.handler.queue.size
  • ipc.server.read.threadpool.size
  • dfs.namenode.handler.count

作为Hadoop开发运维人员,一定要对Hadoop RPC有深入的了解,否则根本不懂上面的配置和指标具体含义,也无法对集群进行优化。

2. Reactor模型

在多路复用模型之前,如果只使用non Blocking IO,用户空间需要不断发起read系统调用等待linux内核数据复制到缓冲区完成,导致CPU资源不足。

为了解决该问题,各操作系统都提供了select和epoll系统调用。它们会监听socket连接请求、读请求、写请求。一旦socket网络连接有数据达到,api就会返回该网络链接,后续通过该连接处理数据即可。用户空间只需要一个线程,就可以维护成千上万的连接,就是多路复用。

对于Selector,如果读写处理逻辑非常耗时,会阻塞住整个服务线程,导致acceptor无法处理连接请求,因此可以将读写请求相关连接注册到新的Selector中。AcceptorSelector和SubSelector分别就是主从复用器。

基于Selector的socket处理模型就是Reactor模型。Reactor主从多路复用模型如下所示:

Untitled 1.png

3. Hadoop Reactor RPC处理流程

对于Hadoop RPC来说,处理RPC请求的简要流程如下:

  1. Listener线程接受客户端请求。
  2. 由Reader线程池处理请求。
  3. 将请求封装成为Call对象放到BlockingQueue中。
  4. Handler线程池消费BlockingQueue,处理call请求。
  5. Responder线程返回处理结果。

Untitled 2.png

上面是Hadoop RPC最基本的处理流程,它基于Reactor模型进行实现。在Listener线程中通过Selector接受连接,该Selector只吃力accept请求;在Reader线程中将连接注册到ReaderSelector中,该Selector只处理read请求;在Responder线程中将连接注册到WriterSelector中,该Selector只处理write请求。如下所示:

Untitled 3.png

3.1 RPC Server启动线程

Server先创建固定的线程Responder、Listener和Handler:

/** Starts the service.  Must be called before any calls will be handled. */
  public synchronized void start() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }

Reader是Listener的内部类,在Listener的构造函数中可以看到:

readers = new Reader[readThreads];//readThreads个Reader进行处理
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }

3.2 Listener线程注册accept事件

Listener只处理accept事件:

public void run() {
      LOG.info(Thread.currentThread().getName() + ": starting");
      SERVER.set(Server.this);
      connectionManager.startIdleScan();
      while (running) {
        SelectionKey key = null;
        try {
          getSelector().select();
          Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
          while (iter.hasNext()) {
            key = iter.next();
            iter.remove();
            try {
              if (key.isValid()) {
                if (key.isAcceptable())  //一个新的socket连接请求是否被接受
                  doAccept(key);//执行ACCEPT对应的处理逻辑
              }
            } catch (IOException e) {
            //.....
            }
            key = null;
          }
        } catch (OutOfMemoryError e) {
          //......
        } catch (Exception e) {
          closeCurrentConnection(key, e);
        }
      }
      LOG.info("Stopping " + Thread.currentThread().getName()); 
        //....
        //关闭连接操作
      }
    }

注册后,将连接放到Reader对象所维护的一个连接队列pendingConnections ,待Reader线程处理:

/**
     * 执行接受新的socket的连接请求的逻辑
     */
     void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
      ServerSocketChannel server = (ServerSocketChannel) key.channel();
      SocketChannel channel;
      while ((channel = server.accept()) != null) {
        //非关键代码 略
        Reader reader = getReader(); //采用轮询方式在众多的reader中取出一个reader进行处理
        Connection c = connectionManager.register(channel);
        // If the connectionManager can't take it, close the connection.
        if (c == null) {
          if (channel.isOpen()) {
            IOUtils.cleanup(null, channel);
          }
          continue;
        }
        //将这个封装了对应的SocketChannel的Connection对象attatch到当前这个SelectionKey对象上
        //这样,如果这个SelectionKey对象对应的Channel有读写事件,就可以从这个SelectionKey上取出
        //Connection,获取到这个Channel的相关信息
        key.attach(c);  // so closeCurrentConnection can get the object
        //将当前的connection添加给reader的connection队列,reader将会依次从队列中取出连接进行处理
        reader.addConnection(c);
      }
    }

3.3 Reader线程注册read事件

Reader线程将pendingConnections 中的连接注册到readSelector中:

private synchronized void doRunLoop() {
        while (running) {
          SelectionKey key = null;
          try {
            // consume as many connections as currently queued to avoid
            // unbridled acceptance of connections that starves the select
            int size = pendingConnections.size();
            for (int i=size; i>0; i--) {
              Connection conn = pendingConnections.take();
              conn.channel.register(readSelector, SelectionKey.OP_READ, conn);//向Selector注册OP_READ
            }
            readSelector.select();
            Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
            while (iter.hasNext()) {
              key = iter.next();
              iter.remove();
              if (key.isValid()) {
                if (key.isReadable()) {
                  doRead(key);
                }
              }
              key = null;
            }
          } catch (InterruptedException e) {
            //....
          }
        }
      }

doRead方法最终会调用processRpcRequest,将请求封装成为Call对象放入到callQueue中:

private void processRpcRequest(RpcRequestHeaderProto header,
        DataInputStream dis) throws WrappedRpcServerException,
        InterruptedException {
      //获取RPC类型,目前主要有两种RPC类型有WritableRPC 和ProtobufRPC
      //老版本的Hadoop使用WritableRPC,新版本的Hadoop开始使用基于Protobuf协议的RPC,即ProtobufRPC
      //以ProtobufRpcEngine为例,对应的WrapperClass是ProtobufRpcEngine.RpcRequestWrapper
     //提取并实例化wrapper class,用来解析请求中的具体字段
        Class<? extends Writable> rpcRequestClass = 
          getRpcRequestWrapper(header.getRpcKind());
      if (rpcRequestClass == null) {
        //无法从header中解析出对应的RPCRequestClass,抛出异常
      }

      Writable rpcRequest;
      try { //Read the rpc request
        //可以将rpcRequestClass理解为当前基于具体某个序列化协议的解释器,解释器负责解释
        //和解析请求内容,封装为rpcRequest对象
        rpcRequest = ReflectionUtils.newInstance(rpcRequestClass, conf);
        rpcRequest.readFields(dis);
      } catch (Throwable t) { // includes runtime exception from newInstance
        //数据解析发生异常,则抛出异常
      }
      //略

      //根据请求中提取的callId、重试次数、当前的连接、RPC类型、发起请求的客户端ID等,创建对应的Call对象
      Call call = new Call(header.getCallId(), header.getRetryCount(),
          rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
          header.getClientId().toByteArray(), traceSpan);

      //将Call对象放入callQueue中,Handler线程将负责从callQueue中逐一取出请求并处理
      callQueue.put(call);              // queue the call; maybe blocked here
      incRpcCount();  // Increment the rpc count
    }

Handler线程消费callQueue,调用call方法进行实际的rpc请求处理,最后通过responder.doRespond方法将Call对象添加到当前这个Connection的responseQueue 中:

public void run() {
      SERVER.set(Server.this);
      ByteArrayOutputStream buf = 
        new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
      while (running) {
        try {
          //从callQueue中取出Call对象,Call对象封装了请求的所有信息,包括连接对象、序列号等等信息
          final Call call = callQueue.take(); // pop the queue; maybe blocked here
          //判断这个请求对应是SocketChannel是否是open状态,如果不是,可能客户端已经断开连接,没有响应的必要
          if (!call.connection.channel.isOpen()) {
            LOG.info(Thread.currentThread().getName() + ": skipped " + call);
            continue;
          }
          //略
          CurCall.set(call);
          try {
             //call方法是一个抽象方法,实际运行的时候会调用具体实现类的call
             value = call(call.rpcKind, call.connection.protocolName, 
                                   call.rpcRequest, call.timestamp);

          } catch (Throwable e) {
            //发生异常,根据异常的类型,设置异常的详细信息、返回码等等
          }
          //服务端调用结束,即服务端已经完成了客户端请求的相关操作,开始对响应进行设置,将响应发送给客户端
          CurCall.set(null);
          synchronized (call.connection.responseQueue) {
            //将error信息封装在call对象中,responder线程将会处理这个Call对象,向客户端返回响应
            setupResponse(buf, call, returnStatus, detailedErr, 
                value, errorClass, error);
            //将封装了Error信息或者成功调用的信息的Call对象交付给Responder线程进行处理
            responder.doRespond(call);
          }
        } catch (InterruptedException e) {
          //异常信息
        } finally {
          //略
      }
      LOG.debug(Thread.currentThread().getName() + ": exiting");
    }
  }

3.4 Responder线程注册write事件

Responder线程消费responseQueue,将连接注册到writeSelector,想客户端返回响应信息:

private boolean processResponse(LinkedList<Call> responseQueue,
                                    boolean inHandler) throws IOException {

      try {
        synchronized (responseQueue) {
          //先进先出,因此从respondeQueue中取出第一个Call对象进行处理
          call = responseQueue.removeFirst();//
          SocketChannel channel = call.connection.channel;
          //将call.rpcResponse中的数据写入到channel中
          int numBytes = channelWrite(channel, call.rpcResponse);
          if (!call.rpcResponse.hasRemaining()) {//数据已经写入完毕
            //数据已经写完,进行一个buffer的清理工作
          } else {
             //如果数据没有完成写操作,则把Call对象重新放进responseQueue中的第一个,下次会进行发送剩余数据
            call.connection.responseQueue.addFirst(call);
              //如果是inHandler,说明这个方法是Handler直接调用的,这时候数据没有发送完毕,需要将channel注册到writeSelector, 这样Responder.doRunLoop()中就可以检测到这个writeSelector上的writable的SocketChannel,然后把剩余数据发送给客户端
            if (inHandler) {
                // Wakeup the thread blocked on select, only then can the call 
                // to channel.register() complete.
                writeSelector.wakeup();
                //将channel注册到writeSelector,同时将这个Call对象attach到这个SelectionKey对象,这样Responder线程就可以通过select方法检测到channel上的写事件,同时从Call中提取需要写的数据以及SocketChannel,进而进行写操作
                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
            }
          }
          error = false;              // everything went off well
        }
      }  
      return done;
    }

3.5 Rpc反射调用流程

在Handler中,会通过call执行rpc逻辑,对于ProtobufRpcEngine,它会通过反射进行执行:

/**
       * This is a server side method, which is invoked over RPC. On success
       * the return response has protobuf response payload. On failure, the
       * exception name and the stack trace are return in the resposne.
       * See {@link HadoopRpcResponseProto}
       * 
       * In this method there three types of exceptions possible and they are
       * returned in response as follows.
       * <ol>
       * <li> Exceptions encountered in this method that are returned 
       * as {@link RpcServerException} </li>
       * <li> Exceptions thrown by the service is wrapped in ServiceException. 
       * In that this method returns in response the exception thrown by the 
       * service.</li>
       * <li> Other exceptions thrown by the service. They are returned as
       * it is.</li>
       * </ol>
       */
      public Writable call(RPC.Server server, String protocol,
          Writable writableRequest, long receiveTime) throws Exception {
        RpcRequestWrapper request = (RpcRequestWrapper) writableRequest;
        RequestHeaderProto rpcRequest = request.requestHeader;
        //从请求中获取方法、协议名称
        String methodName = rpcRequest.getMethodName();
        //从请求中取出proto名称
        String protoName = rpcRequest.getDeclaringClassProtocolName();
        //从请求中取出版本号
        long clientVersion = rpcRequest.getClientProtocolVersion();
        if (server.verbose)
          LOG.info("Call: protocol=" + protocol + ", method=" + methodName);

        //向RPC.Server()获取它所管理的具体协议对应的实现,比如ResourceTracker协议对应的BlockingService
        ProtoClassProtoImpl protocolImpl = getProtocolImpl(server, protoName,
            clientVersion);
        //这个service是在ResourceTracker里面创建的一个匿名实例newReflectiveBlockingService()
        BlockingService service = (BlockingService) protocolImpl.protocolImpl;
        //通过远程调用的方法名称,获取方法描述信息
        MethodDescriptor methodDescriptor = service.getDescriptorForType()
            .findMethodByName(methodName);
        if (methodDescriptor == null) {
          //......throw exception
        }
        //获取请求的proto对应的java 类,如RegisterNodeManagerRequestProto,通过protobuf生成的proto的java类都是一个Message
        Message prototype = service.getRequestPrototype(methodDescriptor);

        //将实际请求的参数数据匹配到对应的请求类
        Message param = prototype.newBuilderForType()
            .mergeFrom(request.theRequestRead).build();

        Message result;
        long startTime = Time.now();
        int qTime = (int) (startTime - receiveTime);
        Exception exception = null;
        try {
          server.rpcDetailedMetrics.init(protocolImpl.protocolClass);
          result = service.callBlockingMethod(methodDescriptor, null, param);
        } catch (ServiceException e) {
          //.....throw some exception....
        } finally {
          int processingTime = (int) (Time.now() - startTime);
          if (LOG.isDebugEnabled()) {
            String msg = "Served: " + methodName + " queueTime= " + qTime +
                " procesingTime= " + processingTime;
            if (exception != null) {
              msg += " exception= " + exception.getClass().getSimpleName();
            }
            LOG.debug(msg);
          }
          String detailedMetricsName = (exception == null) ?
              methodName :
              exception.getClass().getSimpleName();
          server.rpcMetrics.addRpcQueueTime(qTime);
          server.rpcMetrics.addRpcProcessingTime(processingTime);
          server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
              processingTime);
        }
        return new RpcResponseWrapper(result);
      }

4. Hadoop RPC常用配置

ipc.server.listen.queue.size=4096:Hadoop Server的socket监听的backlog上限。当一个请求未被处理时,进入backlog,当server处理请求较慢时,监听队列被填满,新来的请求会被拒绝。

ipc.client.fallback-to-simple-auth-allowed=true:使用kerberos访问非kerberos集群,系统自动转成简单认证。

ipc.client.connection.maxidletime=20000:若在20s内没有请求则连接主动关闭。

ipc.server.read.threadpool.size=24:reader线程池大小。

ipc.maximum.data.length=201326592:客户端请求或者响应数据不能超过192MB。

ipc.server.handler.queue.size=100:每个handler能够处理的call队列长度为100。

dfs.namenode.handler.count=1024:namenode服务处理客户端请求的handler线程数。

dfs.namenode.service.handler.count=256:namenode处理内部请求的handler线程数,包含DataNode,JournalNode等。

callQueue长度:callQueue是BlockingQueue类型。它的上限为ipc.server.handler.queue.size * handler,即每个handler能够处理的call请求*handler线程数。对于namenode来说,其处理客户端请求的callQueue的上限是1024 * 256 = 262144。

5. 总结

Hadoop通过Reactor模型实现了RPC Server。在Listener线程中通过Selector接受连接,该Selector只吃力accept请求;在Reader线程中将连接注册到ReaderSelector中,该Selector只处理read请求;在Responder线程中将连接注册到WriterSelector中,该Selector只处理write请求。

它在Handler线程中执行具体的rpc逻辑,通过call方法,找到协议的实现类,通过反射的方法执行。