1. 背景

在HDFS中,默认是通过setacl和getacl命令的方式增加和查询hdfs的acl信息。为了了解acl信息,需要亲自登陆机器执行hdfs命令,对于没有机器权限的业务人员非常不友好;同时,运维人员无法浏览HDFS所有acl信息,对于管理来说也不透明。

为了解决该问题,引入了Ranger组件,将acl信息存放到Ranger组件中。HDFS在鉴权时,会从Ranger组件获取相关权限,NameNode根据用户拥有的权限对其进行鉴权。

2. HDFS鉴权框架

在Hadoop中,定义了一个抽象类INodeAttributeProvider,它的getAttributes方法负责提供NameNode中路径对应的真实权限信息,start和stop方法用于初始化和清理这个对象。

在这个类内部,定义了一个接口AccessControlEnforcer,它的checkPermission方法用于根据真实权限和请求的权限进行匹配,实现鉴权:

public abstract class INodeAttributeProvider {

  public interface AccessControlEnforcer {
    //鉴权
    public abstract void checkPermission(String fsOwner, String supergroup,
        UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
        INode[] inodes, byte[][] pathByNameArr, int snapshotId, String path,
        int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
        FsAction parentAccess, FsAction access, FsAction subAccess,
        boolean ignoreEmptyDir)
            throws AccessControlException;

  }
  //初始化provider
  public abstract void start();
  //清理provider
  public abstract void stop();
  //获取路径对应的acl信息
  public INodeAttributes getAttributes(String fullPath, INodeAttributes inode) {
    return getAttributes(getPathElements(fullPath), inode);
  }

3. HDFS默认鉴权方式

当NameNode接受客户端delete请求时,最终会调用FSPermissionChecker.checkPermission检查权限:

static BlocksMapUpdateInfo delete(
      FSNamesystem fsn, FSPermissionChecker pc, String src, boolean recursive,
      boolean logRetryCache) throws IOException {
    FSDirectory fsd = fsn.getFSDirectory();

    if (FSDirectory.isExactReservedName(src)) {
      throw new InvalidPathException(src);
    }

    final INodesInPath iip = fsd.resolvePath(pc, src, DirOp.WRITE_LINK);
    //检查权限
    if (fsd.isPermissionEnabled()) {
      fsd.checkPermission(pc, iip, false, null, FsAction.WRITE, null,
                          FsAction.ALL, true);
    }
    if (fsd.isNonEmptyDirectory(iip)) {
      if (!recursive) {
        throw new PathIsNotEmptyDirectoryException(
            iip.getPath() + " is non empty");
      }
      DFSUtil.checkProtectedDescendants(fsd, iip);
    }

    return deleteInternal(fsn, iip, logRetryCache);
  }

可以看到FSPermissionChecker实现了AccessControlEnforcer接口,它在鉴权时,会进行两步:

  1. 获取路径对应acl权限。
  2. 判断权限是否符合请求。
public class FSPermissionChecker implements AccessControlEnforcer {
  void checkPermission(INodesInPath inodesInPath, boolean doCheckOwner,
      FsAction ancestorAccess, FsAction parentAccess, FsAction access,
      FsAction subAccess, boolean ignoreEmptyDir)
      throws AccessControlException {
    
    // check if (parentAccess != null) && file exists, then check sb
    // If resolveLink, the check is performed on the link target.
    final int snapshotId = inodesInPath.getPathSnapshotId();
    final INode[] inodes = inodesInPath.getINodesArray();
    final INodeAttributes[] inodeAttrs = new INodeAttributes[inodes.length];
    final byte[][] components = inodesInPath.getPathComponents();
    for (int i = 0; i < inodes.length && inodes[i] != null; i++) {
      //获取路径对应的权限
      inodeAttrs[i] = getINodeAttrs(components, i, inodes[i], snapshotId);
    }

    String path = inodesInPath.getPath();
    int ancestorIndex = inodes.length - 2;
    //鉴权权限是否匹配访问请求,使用INodeAttributeProvider中设置enforcer进行鉴权
    AccessControlEnforcer enforcer = getAccessControlEnforcer();
    enforcer.checkPermission(fsOwner, supergroup, callerUgi, inodeAttrs, inodes,
        components, snapshotId, path, ancestorIndex, doCheckOwner,
        ancestorAccess, parentAccess, access, subAccess, ignoreEmptyDir);
  }
}

getINodeAttrs方法负责从INodeAttributeProvider中获取路径对应的acl信息:

private INodeAttributes getINodeAttrs(byte[][] pathByNameArr, int pathIdx,
      INode inode, int snapshotId) {
    INodeAttributes inodeAttrs = inode.getSnapshotINode(snapshotId);
    if (getAttributesProvider() != null) {
      //从INodeAttributeProvider中获取路径对应的acl信息
      inodeAttrs = getAttributesProvider().getAttributes(elements, inodeAttrs);
    }
    return inodeAttrs;
  }

而INodeAttributeProvider是一个接口,它通过dfs.namenode.inode.attributes.provider.class配置创建acl的provider,默认是DefaultINodeAttributesProvider。在获取文件对应的acl信息时,直接返回inode中的acl信息。

获取到acl信息后,执行getAccessControlEnforcer方法获取鉴权对象。DefaultINodeAttributesProvider中的AccessControlEnforcer就是FSPermissionChecker,因此使用FSPermissionChecker进行鉴权:

private AccessControlEnforcer getAccessControlEnforcer() {
    return (attributeProvider != null)
        ? attributeProvider.getExternalAccessControlEnforcer(this) : this;
  }

FSPermissionChecker.checkPermission检查权限是否正确,鉴权结束:

public void checkPermission(String fsOwner, String supergroup,
      UserGroupInformation callerUgi, INodeAttributes[] inodeAttrs,
      INode[] inodes, byte[][] components, int snapshotId, String path,
      int ancestorIndex, boolean doCheckOwner, FsAction ancestorAccess,
      FsAction parentAccess, FsAction access, FsAction subAccess,
      boolean ignoreEmptyDir)
      throws AccessControlException {
    for(; ancestorIndex >= 0 && inodes[ancestorIndex] == null;
        ancestorIndex--);

    try {
      checkTraverse(inodeAttrs, inodes, components, ancestorIndex);
    } catch (UnresolvedPathException | ParentNotDirectoryException ex) {
      // must tunnel these exceptions out to avoid breaking interface for
      // external enforcer
      throw new TraverseAccessControlException(ex);
    }

    final INodeAttributes last = inodeAttrs[inodeAttrs.length - 1];
    if (parentAccess != null && parentAccess.implies(FsAction.WRITE)
        && inodeAttrs.length > 1 && last != null) {
      checkStickyBit(inodeAttrs, components, inodeAttrs.length - 2);
    }
    if (ancestorAccess != null && inodeAttrs.length > 1) {
      check(inodeAttrs, components, ancestorIndex, ancestorAccess);
    }
    if (parentAccess != null && inodeAttrs.length > 1) {
      check(inodeAttrs, components, inodeAttrs.length - 2, parentAccess);
    }
    if (access != null) {
      check(inodeAttrs, components, inodeAttrs.length - 1, access);
    }
    if (subAccess != null) {
      INode rawLast = inodes[inodeAttrs.length - 1];
      checkSubAccess(components, inodeAttrs.length - 1, rawLast,
          snapshotId, subAccess, ignoreEmptyDir);
    }
    if (doCheckOwner) {
      checkOwner(inodeAttrs, components, inodeAttrs.length - 1);
    }
  }

4. HDFS集成Ranger进行鉴权

4.1 Ranger组件基本介绍

Ranger主要由以下三个组件构成:

  • RangerAdmin: 接受UserSync进程传过来的用户/组信息,并存储在MySQL数据库中。提供创建policy策略接口,以RESTFUL形式提供策略的增删改查接口,同时内置一个Web管理页面。
  • AgentPlugin: 嵌入到各系统执行流程中,定期从RangerAdmin拉取策略,根据策略执行访问决策树,并且记录访问审计。
  • UserSync: 定期从LDAP/Unix/File中加载用户,上报给RangerAdmin。

其架构图如下所示:

Untitled.png

访问权限描述了**“用户-资源-权限”**三者之间的关系,其中:

  • 用户:由User或Group来表达访问资源的用户或用户所在用户组;
  • 资源:由Resource来表达,不同组件对应的业务资源不一样,如HDFS的File Path,HBase的Table;
  • 权限:由AllowACL、DenyACL表达允许和拒绝访问。

Ranger中对于访问权限模型的表达式描述为:

Service = List<Policy>

Policy = List<Resource> + AllowACL + DenyACL

AllowACL = List<AccessItem> allow + List<AccessItem> allowException

DenyACL = List<AccessItem> deny + List<AccessItem> denyException

AccessItem = List<User/Group> + List<AcessType>

由表达式可知一条Policy分为:allow、allowException、deny、denyException四组AccessItem,则优先级:denyException > deny > allowException > allow。

决策下放:如果没有Policy能决策访问,一般情况认为没有权限拒绝访问,Ranger支持将决出下放给系统自身的访问控制层,即HDFS鉴权作为兜底策略。

4.2 Ranger代码鉴权流程

首先,将dfs.namenode.inode.attributes.provider.class 配置修改为org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer 。进入ranger项目,查看RangerHdfsAuthorizer类,可以发现它实现了INodeAttributeProvider接口,它负责获取acl权限;它的内部类RangerAccessControlEnforcer实现了AccessControlEnforcer,由该类RangerAccessControlEnforcer进行鉴权:

public class RangerHdfsAuthorizer extends INodeAttributeProvider {
  class RangerAccessControlEnforcer implements AccessControlEnforcer {
  //省略
  }
}

直接查看RangerHdfsAuthorizer方法,它实际上还是返回HDFS文件系统中的ACL:

public INodeAttributes getAttributes(String fullPath, INodeAttributes inode) {

		INodeAttributes ret = inode; // return default attributes

		return ret;
	}

观察Ranger provider的初始化过程,它构建了一个RangerHdfsPlugin对象,由这个对象执行访问Ranger服务端的行为:

public void start() {

		RangerHdfsPlugin plugin = new RangerHdfsPlugin();
		plugin.init();
		rangerPlugin = plugin;
	}

在RangerAccessControlEnforcer执行checkPermission时,它会构建Ranger的权限请求。向ranger服务端执行RangerHdfsPlugin.isAccessAllowed请求方法判断acl信息。

RangerHdfsAccessRequest request = new RangerHdfsAccessRequest(inode, path, pathOwner, access, EXECUTE_ACCCESS_TYPE, user, groups, clusterName);
RangerAccessResult result = plugin.isAccessAllowed(request, null);

当然,在具体执行验证时,会先从客户端本地cache中获取ranger的acl缓存,查询不到再访问ranger。

4.3 RangerHdfsPlugin定时更新缓存过程

RangerHdfsPlugin 会通过本地缓存的方式获取ranger ACL权限。具体是在HDFS Namenode启动时,会初始化RangerHdfsPlugin 对象,RangerHdfsPlugin 继承RangerBasePluginRangerBasePlugin 会启动一个线程PolicyRefresher ,定期从Ranger服务端拉取policies到本地内存中,同时会生成缓存文件,保存到本地磁盘中:

public class RangerBasePlugin {
  public void init() {
    //省略
    refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, pollingIntervalMs, cacheDir);
		refresher.setDaemon(true);
		refresher.startRefresher();
    //省略
  }
}

PolicyRefresher定期执行loadPolicy方法,实际就是通过RangerAdminClient 类向Ranger服务端获取polices信息,保存到内存和本地文件中:

public void run() {

		if(LOG.isDebugEnabled()) {
			LOG.debug("==> PolicyRefresher(serviceName=" + serviceName + ").run()");
		}

		while(true) {
			loadPolicy();
			try {
				Thread.sleep(pollingIntervalMs);
			} catch(InterruptedException excp) {
				LOG.info("PolicyRefresher(serviceName=" + serviceName + ").run(): interrupted! Exiting thread", excp);
				break;
			}
		}

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== PolicyRefresher(serviceName=" + serviceName + ").run()");
		}
	}

4.4 HDFS集成Ranger相关依赖

增加ranger plugin依赖包:

$ tree ranger-hdfs-plugin/
ranger-hdfs-plugin/
|-- conf
|   |-- ranger-hdfs-audit.xml
|   `-- ranger-hdfs-security.xml
|-- ranger-hdfs-plugin-impl
|   |-- commons-lang-2.6.jar
|   |-- eclipselink-2.5.2.jar
|   |-- httpmime-4.5.3.jar
|   |-- javax.persistence-2.1.0.jar
|   |-- noggit-0.6.jar
|   |-- ranger-hdfs-plugin-1.2.1-SNAPSHOT.jar
|   |-- ranger-plugins-audit-1.2.1-SNAPSHOT.jar
|   |-- ranger-plugins-common-1.2.1-SNAPSHOT.jar
|   |-- ranger-plugins-cred-1.2.1-SNAPSHOT.jar
|   `-- solr-solrj-5.5.4.jar
|-- ranger-hdfs-plugin-shim-1.2.1-SNAPSHOT.jar
`-- ranger-plugin-classloader-1.2.1-SNAPSHOT.jar

将ranger-hdfs-audit.xml和ranger-hdfs-security.xml放到hadoop配置文件中即可。

5. 用户组鉴权

FSPermissionChecker中,维护了用户组信息,用于鉴权:

Untitled 1.png

group信息初始化流程如下。core-site.xml中指定了hadoop.security.group.mapping实现类:

<property>
        <name>hadoop.security.group.mapping</name>
        <value>org.apache.hadoop.security.LdapGroupsMapping</value>
  </property>

FSPermissionChecker初始化时,会调用UserGroupInformation#getGroups方法,它会调用Groups#getGroups获取组信息:

protected FSPermissionChecker(String fsOwner, String supergroup,
      UserGroupInformation callerUgi,
      INodeAttributeProvider attributeProvider) {
    this.fsOwner = fsOwner;
    this.supergroup = supergroup;
    this.callerUgi = callerUgi;
    this.groups = callerUgi.getGroups();
    user = callerUgi.getShortUserName();
    isSuper = user.equals(fsOwner) || groups.contains(supergroup);
    this.attributeProvider = attributeProvider;
  }

Groups类在初始化时,读取core-site.xml中的配置:

public Groups(Configuration conf, final Timer timer) {
    impl = ReflectionUtils.newInstance(
        conf.getClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
            JniBasedUnixGroupsMappingWithFallback.class,
            GroupMappingServiceProvider.class),
        conf);

Groups初始化时,还会创建GroupCacheLoader 对象,GroupCacheLoader 实现CacheLoader 接口,创建一个线程拉取groups信息用于拉取groups信息:

ThreadPoolExecutor parentExecutor =  new ThreadPoolExecutor(
            reloadGroupsThreadCount,
            reloadGroupsThreadCount,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(),
            threadFactory);

GroupCacheLoader#load方法中调用LdapGroupsMapping#getGroups获取ldap用户组:

try {
        groups = fetchGroupList(user);
      } finally {
        if (scope != null) {
          scope.close();
        }
      }

LdapGroupsMapping#getGroups 会调用doGetGroups,通过用户名获取用户组:

List<String> doGetGroups(String user, int goUpHierarchy)
      throws NamingException {
    DirContext c = getDirContext();

    // Search for the user. We'll only ever need to look at the first result
    NamingEnumeration<SearchResult> results = c.search(userbaseDN,
        userSearchFilter, new Object[]{user}, SEARCH_CONTROLS);