hive-sql执行流程分析

本文针对hive的sql解析进行分析

介绍

Driver是hive所有对外接口的入口,比如常用的Hive JDBC和Beeline,参考下图hive执行流程分析。
hive执行流程分析
那么hive是如何把命令翻译成mr任务执行的?下面结合部分代码对这个问题就行分析。

流程分析

  • Driver.run()
    run方法是入口,封装了一些响应编码和出错处理的逻辑

  • Driver.runInternal()

  1. runInternal方法加载了注册到driver里的hook,通过配置hive.exec.driver.run.hooks,注意多个用逗号隔开,然后同反射实例化所有hook,并调用hook的preDriverRun方法。
  2. 编译command,也就是生成QueryPlan,详见Driver.compile()
  3. 根据QueryPlan分task执行,task就是stage,详见Driver.execute()
  4. 调用步骤1的hook的postDriverRun方法
  • TaskRunner.runSequential()
    如果task的mr任务,那么用mr执行

  • Driver.compile
    首先hive通过配置hive.execution.engine来选择编译引擎,默认是mr,还支持tez和spark,下面那MR来说说。
    command转换成QueryPlan,这是sql解析的核心,hive通过antlr这个框架完成对sql的词法解析和语法解析,然后再进行语义解析,至此hive-sql的执行计划就生成了。这里稍微解释一下这几个术语的概念。
    词法解析
    一句话里哪些词是关键词,哪些词合法哪些词非法,判断是这些内容的操作可以理解为词法解析。对应的可以参考hive代码中的org.apahce.hadoop.hive.ql.parse.HiveLexer.g文件,这是一个antlr框架格式的文件,里面记录了Hive的关键词。
    语法解析
    有了关键词,还得确认词在句子中的位置,也就是语法,比如命令select * from table,hive得先规定好这个命令的格式才能进行解析。这个可以参考org.apahce.hadoop.hive.ql.parse.HiveParser.g文件,里面配置了hive所有支持的语法,最后通过TOKEN生成AST树来表明sql的语法。
    hive获取到antlr返回的AST树后,再进行语义解析
    语义解析
    这一块是hive解析sql的核心,hive通过不同的TOKEN来选择不同的语义分析器,参考类SemanticAnalyzerFactory,这里hive写的很清晰,可以利用这个分类来做自定义的sql分析。语义分析器的核心功能包括逻辑计划的生成,逻辑计划优化和物理计划的生成,物理计算优化等。参考代码SemanticAnalyzer.analyzeInternal(),一共有11个步骤,每个步骤里还有很多子步骤。出于这个方法的重要性,容我列一发代码:
    最重要的是第2步,第7步,第9步。
    第2步是将AST树转换成Operator树,也就是生成逻辑计划
    第7步是对逻辑计划进行优化
    第9步是经逻辑计划基于计算的引擎(MR,TEZ,SPARK)来生成物理计划

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    void analyzeInternal(ASTNode ast, PlannerContext plannerCtx) throws SemanticException {
    // 1. Generate Resolved Parse tree from syntax tree
    LOG.info("Starting Semantic Analysis");
    if (!genResolvedParseTree(ast, plannerCtx)) {
    return;
    }

    // 2. Gen OP Tree from resolved Parse Tree
    Operator sinkOp = genOPTree(ast, plannerCtx);

    // 3. Deduce Resultset Schema
    if (createVwDesc != null) {
    resultSchema = convertRowSchemaToViewSchema(opParseCtx.get(sinkOp).getRowResolver());
    } else {
    resultSchema = convertRowSchemaToResultSetSchema(opParseCtx.get(sinkOp).getRowResolver(),
    HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES));
    }

    // 4. Generate Parse Context for Optimizer & Physical compiler
    copyInfoToQueryProperties(queryProperties);
    ParseContext pCtx = new ParseContext(conf, opToPartPruner, opToPartList, topOps,
    new HashSet<JoinOperator>(joinContext.keySet()),
    new HashSet<SMBMapJoinOperator>(smbMapJoinContext.keySet()),
    loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
    listMapJoinOpsNoReducer, prunedPartitions, opToSamplePruner,
    globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToPartToSkewedPruner,
    viewAliasToInput, reduceSinkOperatorsAddedByEnforceBucketingSorting,
    analyzeRewrite, tableDesc, queryProperties);

    // 5. Take care of view creation
    if (createVwDesc != null) {
    saveViewDefinition();

    // validate the create view statement at this point, the createVwDesc gets
    // all the information for semanticcheck
    validateCreateView(createVwDesc);

    // Since we're only creating a view (not executing it), we don't need to
    // optimize or translate the plan (and in fact, those procedures can
    // interfere with the view creation). So skip the rest of this method.
    ctx.setResDir(null);
    ctx.setResFile(null);

    try {
    PlanUtils.addInputsForView(pCtx);
    } catch (HiveException e) {
    throw new SemanticException(e);
    }
    return;
    }

    // 6. Generate table access stats if required
    if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_TABLEKEYS) == true) {
    TableAccessAnalyzer tableAccessAnalyzer = new TableAccessAnalyzer(pCtx);
    setTableAccessInfo(tableAccessAnalyzer.analyzeTableAccess());
    }

    // 7. Perform Logical optimization
    if (LOG.isDebugEnabled()) {
    LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
    }
    Optimizer optm = new Optimizer();
    optm.setPctx(pCtx);
    optm.initialize(conf);
    pCtx = optm.optimize();
    FetchTask origFetchTask = pCtx.getFetchTask();
    if (LOG.isDebugEnabled()) {
    LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
    }

    // 8. Generate column access stats if required - wait until column pruning
    // takes place during optimization
    boolean isColumnInfoNeedForAuth = SessionState.get().isAuthorizationModeV2()
    && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_ENABLED);
    if (isColumnInfoNeedForAuth
    || HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS) == true) {
    ColumnAccessAnalyzer columnAccessAnalyzer = new ColumnAccessAnalyzer(pCtx);
    setColumnAccessInfo(columnAccessAnalyzer.analyzeColumnAccess());
    }

    // 9. Optimize Physical op tree & Translate to target execution engine (MR,
    // TEZ..)
    if (!ctx.getExplainLogical()) {
    TaskCompiler compiler = TaskCompilerFactory.getCompiler(conf, pCtx);
    compiler.init(conf, console, db);
    compiler.compile(pCtx, rootTasks, inputs, outputs);
    fetchTask = pCtx.getFetchTask();
    }
    LOG.info("Completed plan generation");

    // 10. put accessed columns to readEntity
    if (HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_STATS_COLLECT_SCANCOLS)) {
    putAccessedColumnsToReadEntity(inputs, columnAccessInfo);
    }

    // 11. if desired check we're not going over partition scan limits
    if (!ctx.getExplain()) {
    enforceScanLimits(pCtx, origFetchTask);
    }

    return;
    }
  • Driver.execute

  1. 通过执行计划确定job数量,job如果在hadoop上执行,那么就是mr任务数量
  2. 通过TaskRunner执行task
    优先执行root task,只有当root task执行完成后,才能执行root task的child task。当你用hive的命令explain命令,比如explain select count(*) from table,hive会输出两个stage,并且注明哪个stage是root以及他们的依赖关系,对应到代码里,这个关系在SemanticAnalyzer产生。

  3. 判断是否支持并发执行,必须是mr任务才有资格,另外还有配置开关hive.exec.parallel,默认是关闭的

  • ExecDriver.execute
    根据不同的task执行计划,比如需要执行mr任务的task在ExecDriver中提交

总结

hive-sql执行流程和spark-sql执行流程相似,都采用antlr框架作为sql词法和语法解析,优化过程也都借用了calcite框架的基于代价的优化策略,只是最后执行引擎不同。对于hive来算,语义分析是最复杂的一部分,光SemanticAnalyzer这个类就有12k+的代码量,都能抵得上小型项目了。

参考

hive 1.2.1代码

ulysses wechat
订阅+