绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
利用SemanticAnalyzerHook来过滤不加分区条件的Hive查询
2013-09-24 18:52:32
    我们Hadoop集群中将近百分之80的作业是通过Hive来提交的,由于Hive写起来简单便捷,而且我们又提供了Hive Web Client,所以使用范围很广,包括ba,pm,po,sales都在使用hive进行ad-hoc查询,但是hive在降低用户使用门槛的同时,也使得用户经常写不合理开销很大的语句,生成了很多的mapreduce job,占用了大量slot数,其中典型的例子就是分区表查询,不指定分区条件,导致hive没有做partition pruner优化,进而读入了所有的表数据,占用大量IO和计算资源。
        
    为了尽可能规避这种情况,我们可以利用了hive的hook机制,在hook中实现一些方法来对语句做预判,期先不会直接block住语句,而是记录有问题的语句来公告警示.
        
    具体做法是实现HiveSemanticAnalyzerHook接口,preAnalyze方法和postAnalyze方法会分别在compile函数之前和之后执行,我们只要实现preAnalyze方法,遍历传进来的ASTNode抽象语法树,获取左子树的From表名和右子树的where判断条件key值,如果该From表是分区表的话,会通过metastore client获取它的所有分区key名字,用户指定的where条件中只要出现任何一个分区key,则此语句通过检测,否则会在标准错误中输出一条warning,并且在后台log中记录用户名和执行语句,每隔一段时间会将这些bad case在hive-user组邮箱进行公示,希望能通过这种方式来起到相互警示和学习的效果.

compile函数中根据hiveconf中指定的hive.semantic.analyzer.hook来反射实例化hook类,此处为实现AbstractSemanticAnalyzerHook的DPSemanticAnalyzerHook
package org.apache.hadoop.hive.ql.parse;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.hive.shims.ShimLoader;

public class DPSemanticAnalyzerHook extends AbstractSemanticAnalyzerHook {
  private final static String NO_PARTITION_WARNING = "WARNING: HQL is not efficient, Please specify partition condition! HQL:%s ;USERNAME:%s";

  private final SessionState ss = SessionState.get();
  private final LogHelper console = SessionState.getConsole();
  private Hive hive = null;
  private String username;
  private String currentDatabase = "default";
  private String hql;
  private String whereHql;
  private String tableAlias;
  private String tableName;
  private String tableDatabaseName;
  private Boolean needCheckPartition = false;

  @Override
  public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context, ASTNode ast)
      throws SemanticException {
    try {
      hql = ss.getCmd().toLowerCase();
      hql = StringUtils.replaceChars(hql, '\n', ' ');
      if (hql.contains("where")) {
        whereHql = hql.substring(hql.indexOf("where"));
      }
      username = ShimLoader.getHadoopShims().getUserName(context.getConf());

      if (ast.getToken().getType() == HiveParser.TOK_QUERY) {
        try {
          hive = context.getHive();
          currentDatabase = hive.getCurrentDatabase();
        } catch (HiveException e) {
          throw new SemanticException(e);
        }

        extractFromClause((ASTNode) ast.getChild(0));

        if (needCheckPartition && !StringUtils.isBlank(tableName)) {
          String dbname = StringUtils.isEmpty(tableDatabaseName) ? currentDatabase
              : tableDatabaseName;
          String tbname = tableName;
          String[] parts = tableName.split(".");
          if (parts.length == 2) {
            dbname = parts[0];
            tbname = parts[1];
          }
          Table t = hive.getTable(dbname, tbname);
          if (t.isPartitioned()) {
            if (StringUtils.isBlank(whereHql)) {
              console.printError(String.format(NO_PARTITION_WARNING, hql, username));
            } else {
              List<FieldSchema> partitionKeys = t.getPartitionKeys();
              List<String> partitionNames = new ArrayList<String>();
              for (int i = 0; i < partitionKeys.size(); i++) {
                partitionNames.add(partitionKeys.get(i).getName().toLowerCase());
              }

              if (!containsPartCond(partitionNames, whereHql, tableAlias)) {
                console.printError(String.format(NO_PARTITION_WARNING, hql, username));
              }
            }
          }
        }

      }
    } catch (Exception ex) {
      ex.printStackTrace();
    }
    return ast;
  }

  private boolean containsPartCond(List<String> partitionKeys, String sql, String alias) {
    for (String pk : partitionKeys) {
      if (sql.contains(pk)) {
        return true;
      }
      if (!StringUtils.isEmpty(alias) && sql.contains(alias + "." + pk)) {
        return true;
      }
    }
    return false;
  }

  private void extractFromClause(ASTNode ast) {
    if (HiveParser.TOK_FROM == ast.getToken().getType()) {
      ASTNode refNode = (ASTNode) ast.getChild(0);
      if (refNode.getToken().getType() == HiveParser.TOK_TABREF && ast.getChildCount() == 1) {
        ASTNode tabNameNode = (ASTNode) (refNode.getChild(0));
        int refNodeChildCount = refNode.getChildCount();
        if (tabNameNode.getToken().getType() == HiveParser.TOK_TABNAME) {
          if (tabNameNode.getChildCount() == 2) {
            tableDatabaseName = tabNameNode.getChild(0).getText().toLowerCase();
            tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(1))
                .toLowerCase();
          } else if (tabNameNode.getChildCount() == 1) {
            tableName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabNameNode.getChild(0))
                .toLowerCase();
          } else {
            return;
          }

          if (refNodeChildCount == 2) {
            tableAlias = BaseSemanticAnalyzer.unescapeIdentifier(refNode.getChild(1).getText())
                .toLowerCase();
          }
          needCheckPartition = true;
        }
      }
    }
  }

  @Override
  public void postAnalyze(HiveSemanticAnalyzerHookContext context,
      List<Task<? extends Serializable>> rootTasks) throws SemanticException {
    // LogHelper console = SessionState.getConsole();
    // Set<ReadEntity> readEntitys = context.getInputs();
    // console.printInfo("Total Read Entity Size:" + readEntitys.size());
    // for (ReadEntity readEntity : readEntitys) {
    // Partition p = readEntity.getPartition();
    // Table t = readEntity.getTable();
    // }
  }
}

分享好友

分享这个小栈给你的朋友们,一起进步吧。

大数据之我观
创建时间:2020-05-20 11:12:12
关注 Hadoop, Hive, HBase, YARN, Shark, Spark, 大规模数据处理相关的开源项目,数据挖掘,个性化推荐,反作弊诚信...
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • yukang
    栈主

小栈成员

查看更多
  • 栈栈
  • gaokeke123
  • wojiuzhuai
  • fenyun689
戳我,来吐槽~