classQueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
// TODO: Move the planner an optimizer into here from SessionState. protecteddefplanner= sparkSession.sessionState.planner
defassertAnalyzed(): Unit = analyzed
defassertSupported(): Unit = { if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { UnsupportedOperationChecker.checkForBatch(analyzed) } }
lazyval sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() }
// executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. lazyval executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */ lazyval toRdd: RDD[InternalRow] = executedPlan.execute()
逻辑交代的非常清楚,从最后一行的 lazy val toRdd: RDD[InternalRow] = executedPlan.execute() 往前推便可清晰看到整个流程。细心的同学也可以看到 所有步骤都是lazy的,只有调用了execute才会触发执行,这也是spark的重要设计思想。
/** * Executes a SQL query using Spark, returning the result as a `DataFrame`. * The dialect that is used for SQL parsing can be configured with 'spark.sql.dialect'. * * @since 2.0.0 */ defsql(sqlText: String): DataFrame = { Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText)) }
1 2 3 4 5 6 7 8 9 10
/** * Interface for a parser. */ traitParserInterface{ /** * Parse a string to a [[LogicalPlan]]. */ //将sql转为逻辑计划 defparsePlan(sqlText: String): LogicalPlan .....
overridedefvisitQuerySpecification( ctx: QuerySpecificationContext): LogicalPlan = withOrigin(ctx) { val from = OneRowRelation().optional(ctx.fromClause) { visitFromClause(ctx.fromClause) } withQuerySpecification(ctx, from) }
FROM 语句解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
overridedefvisitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) { val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) => val right = plan(relation.relationPrimary) val join = right.optionalMap(left)(Join(_, _, Inner, None)) withJoinRelations(join, relation) } if (ctx.pivotClause() != null) { if (!ctx.lateralView.isEmpty) { thrownewParseException("LATERAL cannot be used together with PIVOT in FROM clause", ctx) } withPivot(ctx.pivotClause, from) } else { ctx.lateralView.asScala.foldLeft(from)(withGenerate) } }
/** * Add a query specification to a logical plan. The query specification is the core of the logical * plan, this is where sourcing (FROM clause), transforming (SELECT TRANSFORM/MAP/REDUCE), * projection (SELECT), aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place. * * Note that query hints are ignored (both by the parser and the builder). */ privatedefwithQuerySpecification( ctx: QuerySpecificationContext, relation: LogicalPlan): LogicalPlan = withOrigin(ctx) { import ctx._
defwithHaving(ctx: BooleanExpressionContext, plan: LogicalPlan): LogicalPlan = { // Note that we add a cast to non-predicate expressions. If the expression itself is // already boolean, the optimizer will get rid of the unnecessary cast. val predicate = expression(ctx) match { case p: Predicate => p case e => Cast(e, BooleanType) } Filter(predicate, plan) }
// Expressions. 也就是要查询的内容 val expressions = Option(namedExpressionSeq).toSeq .flatMap(_.namedExpression.asScala) .map(typedVisit[Expression])
// Create either a transform or a regular query. val specType = Option(kind).map(_.getType).getOrElse(SqlBaseParser.SELECT) specType match { caseSqlBaseParser.MAP | SqlBaseParser.REDUCE | SqlBaseParser.TRANSFORM => // Transform
// Add where. val withFilter = relation.optionalMap(where)(filter)
// Add lateral views. val withLateralView = ctx.lateralView.asScala.foldLeft(relation)(withGenerate)
// Add where. val withFilter = withLateralView.optionalMap(where)(filter)
// Add aggregation or a project. val namedExpressions = expressions.map { case e: NamedExpression => e case e: Expression => UnresolvedAlias(e) } //最终返回的是 Project(namedExpressions, withFilter),他继承了LogicalPlan defcreateProject() = if (namedExpressions.nonEmpty) { //sql语句返回的结果 Project(namedExpressions, withFilter) } else { withFilter }
val withProject = if (aggregation == null && having != null) { if (conf.getConf(SQLConf.LEGACY_HAVING_WITHOUT_GROUP_BY_AS_WHERE)) { // If the legacy conf is set, treat HAVING without GROUP BY as WHERE. withHaving(having, createProject()) } else { // According to SQL standard, HAVING without GROUP BY means global aggregate. withHaving(having, Aggregate(Nil, namedExpressions, withFilter)) } } elseif (aggregation != null) { val aggregate = withAggregation(aggregation, namedExpressions, withFilter) aggregate.optionalMap(having)(withHaving) } else { // When hitting this branch, `having` must be null. // createProject() }
// Distinct val withDistinct = if (setQuantifier() != null && setQuantifier().DISTINCT() != null) { Distinct(withProject) } else { withProject }
// Window val withWindow = withDistinct.optionalMap(windows)(withWindows)
// Hint hints.asScala.foldRight(withWindow)(withHints) } }
// TODO: Move the planner an optimizer into here from SessionState. protecteddefplanner= sparkSession.sessionState.planner
defassertAnalyzed(): Unit = analyzed
defassertSupported(): Unit = { if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { UnsupportedOperationChecker.checkForBatch(analyzed) } }
//已经进行过优化的逻辑执行计划进行转换而得到的物理执行计划SparkPlan lazyval sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, // but we will implement to choose the best plan. // 在一系列plan中选取一个 planner.plan(ReturnAnswer(optimizedPlan)).next() }
// executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. lazyval executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */ //QueryExecution最后生成的RDD,触发了action操作后才会执行 lazyval toRdd: RDD[InternalRow] = executedPlan.execute()
/** * An execution strategy for rules that indicates the maximum number of executions. If the * execution reaches fix point (i.e. converge) before maxIterations, it will stop. */ /** 控制运行次数的策略,如果在达到最大迭代次数前到达稳定点就停止运行 */ abstractclassStrategy{ defmaxIterations: Int }
/** A strategy that only runs once. */ /** 只运行一次的策略 */ caseobjectOnceextendsStrategy{ val maxIterations = 1 }
/** A strategy that runs until fix point or maxIterations times, whichever comes first. */ /** 运行到稳定点或者最大迭代次数的策略,2选1 */ caseclassFixedPoint(maxIterations: Int) extendsStrategy /**Abatchofrules. */ //一个批次的rules protectedcaseclassBatch(name: String, strategy: Strategy, rules: Rule[TreeType]*) /**Definesasequenceofrulebatches, tobeoverriddenbytheimplementation. */ //所有的rule,按批次存放 protecteddefbatches: Seq[Batch]
/** * Defines a check function that checks for structural integrity of the plan after the execution * of each rule. For example, we can check whether a plan is still resolved after each rule in * `Optimizer`, so we can catch rules that return invalid plans. The check function returns * `false` if the given plan doesn't pass the structural integrity check. */ protecteddefisPlanIntegral(plan: TreeType): Boolean = true
/** * Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. */ // 执行rule,关键代码很简单,就是按批次,按顺序对plan执行rule,会迭代多次 defexecute(plan: TreeType): TreeType = { // 用来对比执行规则前后,初始的plan有无变化 var curPlan = plan val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan varcontinue = true
// Run until fix point (or the max number of iterations as specified in the strategy. // 执行直到达到稳定点或者最大迭代次数 while (continue) { curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val startTime = System.nanoTime() // 执行rule,得到新的plan val result = rule(plan) val runTime = System.nanoTime() - startTime // 判断rule是否起了作用 if (!result.fastEquals(plan)) { queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName) queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime) logTrace( s""" |=== Applying Rule ${rule.ruleName} === |${sideBySide(plan.treeString, result.treeString).mkString("\n")} """.stripMargin) } queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime) queryExecutionMetrics.incNumExecution(rule.ruleName)
// Run the structural integrity checker against the plan after each rule. if (!isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." thrownewTreeNodeException(result, message, null) }
result } iteration += 1 // 到达最大迭代次数, 不再执行优化 if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. // 只对最大迭代次数大于1的情况打log if (iteration != 2) { val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" if (Utils.isTesting) { thrownewTreeNodeException(curPlan, message, null) } else { logWarning(message) } } continue = false } // plan不变了,到达稳定点,不再执行优化 if (curPlan.fastEquals(lastPlan)) { logTrace( s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.") continue = false } lastPlan = curPlan } // 该批次rule是否起作用 if (!batchStartPlan.fastEquals(curPlan)) { logDebug( s""" |=== Result of Batch ${batch.name} === |${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")} """.stripMargin) } else { logTrace(s"Batch ${batch.name} has no effect.") } }
/** Name for this rule, automatically inferred based on class name. */ val ruleName: String = { val className = getClass.getName if (className endsWith "$") className.dropRight(1) else className }
// If the unresolved relation is running directly on files, we just return the original // UnresolvedRelation, the plan will get resolved later. Else we look up the table from catalog // and change the default database name(in AnalysisContext) if it is a view. // We usually look up a table from the default database if the table identifier has an empty // database part, for a view the default database should be the currentDb when the view was // created. When the case comes to resolving a nested view, the view may have different default // database with that the referenced view has, so we need to use // `AnalysisContext.defaultDatabase` to track the current default database. // When the relation we resolve is a view, we fetch the view.desc(which is a CatalogTable), and // then set the value of `CatalogTable.viewDefaultDatabase` to // `AnalysisContext.defaultDatabase`, we look up the relations that the view references using // the default database. // For example: // |- view1 (defaultDatabase = db1) // |- operator // |- table2 (defaultDatabase = db1) // |- view2 (defaultDatabase = db2) // |- view3 (defaultDatabase = db3) // |- view4 (defaultDatabase = db4) // In this case, the view `view1` is a nested view, it directly references `table2`, `view2` // and `view4`, the view `view2` references `view3`. On resolving the table, we look up the // relations `table2`, `view2`, `view4` using the default database `db1`, and look up the // relation `view3` using the default database `db2`. // // Note this is compatible with the views defined by older versions of Spark(before 2.2), which // have empty defaultDatabase and all the relations in viewText have database part defined. // 在Catalog中匹配table信息 defresolveRelation(plan: LogicalPlan): LogicalPlan = plan match { //不是这种情况 select * from parquet.`/path/to/query` case u: UnresolvedRelationif !isRunningDirectlyOnFiles(u.tableIdentifier) => // 默认数据库 val defaultDatabase = AnalysisContext.get.defaultDatabase // 在Catalog中查找 val foundRelation = lookupTableFromCatalog(u, defaultDatabase) resolveRelation(foundRelation) // The view's child should be a logical plan parsed from the `desc.viewText`, the variable // `viewText` should be defined, or else we throw an error on the generation of the View // operator. case view @ View(desc, _, child) if !child.resolved => // Resolve all the UnresolvedRelations and Views in the child. val newChild = AnalysisContext.withAnalysisContext(desc.viewDefaultDatabase) { if (AnalysisContext.get.nestedViewDepth > conf.maxNestedViewDepth) { view.failAnalysis(s"The depth of view ${view.desc.identifier} exceeds the maximum " + s"view resolution depth (${conf.maxNestedViewDepth}). Analysis is aborted to " + s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " + "around this.") } executeSameContext(child) } view.copy(child = newChild) case p @ SubqueryAlias(_, view: View) => val newChild = resolveRelation(view) p.copy(child = newChild) case _ => plan }
// rule的入口,resolveOperatorsUp 后序遍历树,并对每个节点应用rule defapply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => EliminateSubqueryAliases(lookupTableFromCatalog(u)) match { case v: View => u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") case other => i.copy(table = other) } // 匹配到UnresolvedRelation case u: UnresolvedRelation => resolveRelation(u) }
// Look up the table with the given name from catalog. The database we used is decided by the // precedence: // 1. Use the database part of the table identifier, if it is defined; // 2. Use defaultDatabase, if it is defined(In this case, no temporary objects can be used, // and the default database is only used to look up a view); // 3. Use the currentDb of the SessionCatalog. privatedeflookupTableFromCatalog( u: UnresolvedRelation, defaultDatabase: Option[String] = None): LogicalPlan = { val tableIdentWithDb = u.tableIdentifier.copy( database = u.tableIdentifier.database.orElse(defaultDatabase)) try { catalog.lookupRelation(tableIdentWithDb) } catch { // 如果没有找到表,便会抛出异常了 case e: NoSuchTableException => u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}", e) // If the database is defined and that database is not found, throw an AnalysisException. // Note that if the database is not defined, it is possible we are looking up a temp view. case e: NoSuchDatabaseException => u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " + s"database ${e.db} doesn't exist.", e) } }
deflookupRelation(name: TableIdentifier): LogicalPlan = { synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) //若db等于globalTempViewManager.database if (db == globalTempViewManager.database) { //globalTempViewManager(HashMap[String, LogicalPlan])维护了一个全局viewName和其元数据LogicalPlan的映射 globalTempViewManager.get(table).map { viewDef => SubqueryAlias(table, db, viewDef) }.getOrElse(thrownewNoSuchTableException(db, table)) //若database已定义,且临时表中未有此table } elseif (name.database.isDefined || !tempViews.contains(table)) { //从externalCatalog(如hive)中获取table对应的元数据信息metadata:CatalogTable, //此对象包含了table对应的类型(table(内部还是外部表),view)、存储格式、字段shema信息等 val metadata = externalCatalog.getTable(db, table) //返回的table是View类型 if (metadata.tableType == CatalogTableType.VIEW) { val viewText = metadata.viewText.getOrElse(sys.error("Invalid view without text.")) // The relation is a view, so we wrap the relation by: // 1. Add a [[View]] operator over the relation to keep track of the view desc; // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view. /** * 构造View对象(包括将viewText通过parser模块解析成语法树),并传入构造一个SubqueryAlias返回 * * 说明此table名对应的就是一个如hive的table表,通过metadata、数据和分区列的schema构造了CatalogRelation, * 并以此tableRelation构造SubqueryAlias返回。 * * 这里就可以看出从一个未绑定的UnresolvedRelation到通过catalog替换的过程。 */ val child = View( desc = metadata, output = metadata.schema.toAttributes, child = parser.parsePlan(viewText)) SubqueryAlias(table, db, child) } else { SubqueryAlias(table, db, UnresolvedCatalogRelation(metadata)) } } else { //明是个session级别的临时表,从tempTables获取到包含元数据信息的LogicalPlan 并构造SubqueryAlias返回 SubqueryAlias(table, tempViews(table)) } } } ......
// Check for structural integrity of the plan in test mode. Currently we only check if a plan is // still resolved after the execution of each rule. overrideprotecteddefisPlanIntegral(plan: LogicalPlan): Boolean = { !Utils.isTesting || plan.resolved }
/** * Defines the default rule batches in the Optimizer. * * Implementations of this class should override this method, and [[nonExcludableRules]] if * necessary, instead of [[batches]]. The rule batches that eventually run in the Optimizer, * i.e., returned by [[batches]], will be (defaultBatches - (excludedRules - nonExcludableRules)). */ defdefaultBatches: Seq[Batch] = { val operatorOptimizationRuleSet = Seq( // Operator push down PushProjectionThroughUnion, //谓词下推 ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, LimitPushDown, ColumnPruning, //列剪裁 InferFiltersFromConstraints, // Operator combine CollapseRepartition, CollapseProject, CollapseWindow, CombineFilters, //合并filter CombineLimits, //合并limit CombineUnions, // Constant folding and strength reduction NullPropagation, //null处理,NULL容易产生数据倾斜 ConstantPropagation, FoldablePropagation, OptimizeIn, // 关键字in的优化,替代为InSet ConstantFolding, //针对常量的优化,在这里会直接计算可以获得的常量 ReorderAssociativeOperator, LikeSimplification, //like的简单优化 //简化过滤条件,比如true and score > 0 直接替换成score > BooleanSimplification, //简化filter,比如where 1=1 或者where 1=2,前者直接去掉这个过滤 SimplifyConditionals, RemoveDispensableExpressions, SimplifyBinaryComparison, PruneFilters, EliminateSorts, //简化转换,比如两个比较字段的数据类型是一样的,就不需要转换 SimplifyCasts, //简化大小写转换,比如Upper(Upper('a'))转为认为是Upper('a') SimplifyCaseConversionExpressions, RewriteCorrelatedScalarSubquery, EliminateSerialization, RemoveRedundantAliases, RemoveRedundantProject, SimplifyExtractValueOps, CombineConcats) ++ extendedOperatorOptimizationRules
val operatorOptimizationBatch: Seq[Batch] = { val rulesWithoutInferFiltersFromConstraints = operatorOptimizationRuleSet.filterNot(_ == InferFiltersFromConstraints) Batch("Operator Optimization before Inferring Filters", fixedPoint, //精度优化 rulesWithoutInferFiltersFromConstraints: _*) :: Batch("Infer Filters", Once, InferFiltersFromConstraints) :: Batch("Operator Optimization after Inferring Filters", fixedPoint, rulesWithoutInferFiltersFromConstraints: _*) :: Nil }
(Batch("Eliminate Distinct", Once, EliminateDistinct) :: // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). // However, because we also use the analyzer to canonicalized queries (for view definition), // we do not eliminate subqueries or compute current time in the analyzer. Batch("Finish Analysis", Once, EliminateSubqueryAliases, EliminateView, ReplaceExpressions, ComputeCurrentTime, GetCurrentDatabase(sessionCatalog), RewriteDistinctAggregates, ReplaceDeduplicateWithAggregate) :: ////////////////////////////////////////////////////////////////////////////////////////// // Optimizer rules start here ////////////////////////////////////////////////////////////////////////////////////////// // - Do the first call of CombineUnions before starting the major Optimizer rules, // since it can reduce the number of iteration and the other rules could add/move // extra operators between two adjacent Union operators. // - Call CombineUnions again in Batch("Operator Optimizations"), // since the other rules might make two separate Unions operators adjacent. Batch("Union", Once, CombineUnions) :: // run this once earlier. this might simplify the plan and reduce cost of optimizer. // for example, a query such as Filter(LocalRelation) would go through all the heavy // optimizer rules that are triggered when there is a filter // (e.g. InferFiltersFromConstraints). if we run this batch earlier, the query becomes just // LocalRelation and does not trigger many rules Batch("LocalRelation early", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelation) :: Batch("Pullup Correlated Expressions", Once, PullupCorrelatedPredicates) :: Batch("Subquery", Once, OptimizeSubqueries) :: Batch("Replace Operators", fixedPoint, RewriteExceptAll, RewriteIntersectAll, ReplaceIntersectWithSemiJoin, ReplaceExceptWithFilter, ReplaceExceptWithAntiJoin, ReplaceDistinctWithAggregate) :: // aggregate替换distinct Batch("Aggregate", fixedPoint, RemoveLiteralFromGroupExpressions, RemoveRepetitionFromGroupExpressions) :: Nil ++ operatorOptimizationBatch) :+ Batch("Join Reorder", Once, CostBasedJoinReorder) :+ Batch("Remove Redundant Sorts", Once, RemoveRedundantSorts) :+ Batch("Decimal Optimizations", fixedPoint, DecimalAggregates) :+ Batch("Object Expressions Optimization", fixedPoint, EliminateMapObjects, CombineTypedFilters) :+ Batch("LocalRelation", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelation) :+ Batch("Extract PythonUDF From JoinCondition", Once, PullOutPythonUDFInJoinCondition) :+ // The following batch should be executed after batch "Join Reorder" "LocalRelation" and // "Extract PythonUDF From JoinCondition". Batch("Check Cartesian Products", Once, CheckCartesianProducts) :+ Batch("RewriteSubquery", Once, RewritePredicateSubquery, ColumnPruning,//去掉一些用不上的列 CollapseProject, RemoveRedundantProject) :+ Batch("UpdateAttributeReferences", Once, UpdateNullabilityInAttributeReferences) }
defapply(plan: LogicalPlan): LogicalPlan = plan transform { // push the where condition down into join filter // match 这个结构 case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) => val (leftFilterConditions, rightFilterConditions, commonFilterCondition) = split(splitConjunctivePredicates(filterCondition), left, right) joinType match { // 是 inner join case _: InnerLike => // push down the single side `where` condition into respective sides // left下推为Filter val newLeft = leftFilterConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) // right下推为Filter val newRight = rightFilterConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val (newJoinConditions, others) = commonFilterCondition.partition(canEvaluateWithinJoin) val newJoinCond = (newJoinConditions ++ joinCondition).reduceLeftOption(And) // 最终的优化结果 val join = Join(newLeft, newRight, joinType, newJoinCond) if (others.nonEmpty) { Filter(others.reduceLeft(And), join) } else { join } caseRightOuter => // push down the right side only `where` condition val newLeft = left val newRight = rightFilterConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) val newJoinCond = joinCondition val newJoin = Join(newLeft, newRight, RightOuter, newJoinCond)
(leftFilterConditions ++ commonFilterCondition). reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) caseLeftOuter | LeftExistence(_) => // push down the left side only `where` condition val newLeft = leftFilterConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = right val newJoinCond = joinCondition val newJoin = Join(newLeft, newRight, joinType, newJoinCond)
(rightFilterConditions ++ commonFilterCondition). reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin) caseFullOuter => f // DO Nothing for Full Outer Join caseNaturalJoin(_) => sys.error("Untransformed NaturalJoin node") caseUsingJoin(_, _) => sys.error("Untransformed Using join node") }
/** * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be * filled in automatically by the QueryPlanner using the other execution strategies that are * available. */ protecteddefplanLater(plan: LogicalPlan): PhysicalPlan
defapply(plan: LogicalPlan): Seq[PhysicalPlan] }
abstractclassQueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ defstrategies: Seq[GenericStrategy[PhysicalPlan]]
/** * 整合所有的Strategy,_(plan)每个Strategy应用plan上, * 得到所有Strategies执行完后生成的所有Physical Plan的集合。 */ defplan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still...
// Collect physical plan candidates. // strategies 实现了转化 val candidates = strategies.iterator.flatMap(_(plan))
// The candidates may contain placeholders marked as [[planLater]], // so try to replace them by their child plans. // 移除planLater val plans = candidates.flatMap { candidate => val placeholders = collectPlaceholders(candidate)
if (placeholders.isEmpty) { // Take the candidate as is because it does not contain placeholders. Iterator(candidate) } else { // Plan the logical plan marked as [[planLater]] and replace the placeholders. placeholders.iterator.foldLeft(Iterator(candidate)) { case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => // Plan the logical plan for the placeholder. val childPlans = this.plan(logicalPlan)
candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => childPlans.map { childPlan => // Replace the placeholder by the child plan candidateWithPlaceholders.transformUp { case p if p.eq(placeholder) => childPlan } } } } } } // 裁剪plan,去掉 bad plan,但目前只是原封不动返回 val pruned = prunePlans(plans) assert(pruned.hasNext, s"No plan for $plan") pruned }
//包含不同策略的策略来优化物理执行计划 classSparkPlanner( val sparkContext: SparkContext, val conf: SQLConf, val experimentalMethods: ExperimentalMethods) extendsSparkStrategies{
//partitions的个数 defnumPartitions: Int = conf.numShufflePartitions
/** A sequence of rules that will be applied in order to the physical plan before execution. */ protecteddefpreparations: Seq[Rule[SparkPlan]] = Seq( PlanSubqueries(sparkSession), EnsureRequirements(sparkSession.sessionState.conf), CollapseCodegenStages(sparkSession.sessionState.conf), ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf))
第四步:EnsureRequirements的apply 方法
1 2 3 4 5 6 7 8 9 10 11
defapply(plan: SparkPlan): SparkPlan = plan.transformUp { // TODO: remove this after we create a physical operator for `RepartitionByExpression`. case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) => child.outputPartitioning match { case lower: HashPartitioningif upper.semanticEquals(lower) => child case _ => operator } case operator: SparkPlan => //执行 ensureDistributionAndOrdering ensureDistributionAndOrdering(reorderJoinPredicates(operator)) }
// Ensure that the operator's children satisfy their output distribution requirements. // children的实际输出分布(其实就是partitioning)满足要求的输出分布 children = children.zip(requiredChildDistributions).map { //满足,直接返回 case (child, distribution) if child.outputPartitioning.satisfies(distribution) => child // 广播单独处理 case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) case (child, distribution) => val numPartitions = distribution.requiredNumPartitions .getOrElse(defaultNumPreShufflePartitions) // 做一次shuffle(可以认为是重新分区,改变分布),满足需求。 ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child) }
// Get the indexes of children which have specified distribution requirements and need to have // same number of partitions. // 过滤有指定分布需求的children val childrenIndexes = requiredChildDistributions.zipWithIndex.filter { case (UnspecifiedDistribution, _) => false case (_: BroadcastDistribution, _) => false case _ => true }.map(_._2)
// children 的 partition 数量 val childrenNumPartitions = childrenIndexes.map(children(_).outputPartitioning.numPartitions).toSet
if (childrenNumPartitions.size > 1) { // Get the number of partitions which is explicitly required by the distributions. // children 的分布需要的partition数量 val requiredNumPartitions = { val numPartitionsSet = childrenIndexes.flatMap { index => requiredChildDistributions(index).requiredNumPartitions }.toSet assert(numPartitionsSet.size <= 1, s"$operator have incompatible requirements of the number of partitions for its children") numPartitionsSet.headOption } // partition 数量目标 val targetNumPartitions = requiredNumPartitions.getOrElse(childrenNumPartitions.max) // 指定分布的children的partition数量全部统一为 targetNumPartitions children = children.zip(requiredChildDistributions).zipWithIndex.map { case ((child, distribution), index) if childrenIndexes.contains(index) => if (child.outputPartitioning.numPartitions == targetNumPartitions) { child // 符合目标,直接返回 } else { // 不符合目标,shuffle val defaultPartitioning = distribution.createPartitioning(targetNumPartitions) child match { // If child is an exchange, we replace it with a new one having defaultPartitioning. caseShuffleExchangeExec(_, c, _) => ShuffleExchangeExec(defaultPartitioning, c) case _ => ShuffleExchangeExec(defaultPartitioning, child) } }
case ((child, _), _) => child } }
// Now, we need to add ExchangeCoordinator if necessary. // Actually, it is not a good idea to add ExchangeCoordinators while we are adding Exchanges. // However, with the way that we plan the query, we do not have a place where we have a // global picture of all shuffle dependencies of a post-shuffle stage. So, we add coordinator // at here for now. // Once we finish https://issues.apache.org/jira/browse/SPARK-10665, // we can first add Exchanges and then add coordinator once we have a DAG of query fragments. //添加ExchangeCoordinator,调节多个spark plan的数据分布 children = withExchangeCoordinator(children, requiredChildDistributions)
// Now that we've performed any necessary shuffles, add sorts to guarantee output orderings: // 如果有sort的需求,则加上SortExec children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort. if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) { child } else { SortExec(requiredOrdering, global = false, child = child) } }
privatedefwithExchangeCoordinator( children: Seq[SparkPlan], requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = { // 判断是否需要添加 ExchangeCoordinator val supportsCoordinator = if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) { // Right now, ExchangeCoordinator only support HashPartitionings. children.forall { // 条件1 children中有ShuffleExchangeExec且分区为HashPartitioning case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true case child => child.outputPartitioning match { case hash: HashPartitioning => true case collection: PartitioningCollection => collection.partitionings.forall(_.isInstanceOf[HashPartitioning]) case _ => false } } } else { // In this case, although we do not have Exchange operators, we may still need to // shuffle data when we have more than one children because data generated by // these children may not be partitioned in the same way. // Please see the comment in withCoordinator for more details. // 条件2 分布为ClusteredDistribution 或 HashClusteredDistribution val supportsDistribution = requiredChildDistributions.forall { dist => dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution] } children.length > 1 && supportsDistribution }
val withCoordinator = // adaptiveExecutionEnabled 且 符合条件1或2 //adaptiveExecutionEnabled 默认是false的,所以ExchangeCoordinator默认是关闭的 if (adaptiveExecutionEnabled && supportsCoordinator) { val coordinator = newExchangeCoordinator( targetPostShuffleInputSize, minNumPostShufflePartitions) children.zip(requiredChildDistributions).map { case (e: ShuffleExchangeExec, _) => // This child is an Exchange, we need to add the coordinator. // 条件1, 直接添加 coordinator e.copy(coordinator = Some(coordinator)) case (child, distribution) => // If this child is not an Exchange, we need to add an Exchange for now. // Ideally, we can try to avoid this Exchange. However, when we reach here, // there are at least two children operators (because if there is a single child // and we can avoid Exchange, supportsCoordinator will be false and we // will not reach here.). Although we can make two children have the same number of // post-shuffle partitions. Their numbers of pre-shuffle partitions may be different. // For example, let's say we have the following plan // Join // / \ // Agg Exchange // / \ // Exchange t2 // / // t1 // In this case, because a post-shuffle partition can include multiple pre-shuffle // partitions, a HashPartitioning will not be strictly partitioned by the hashcodes // after shuffle. So, even we can use the child Exchange operator of the Join to // have a number of post-shuffle partitions that matches the number of partitions of // Agg, we cannot say these two children are partitioned in the same way. // Here is another case // Join // / \ // Agg1 Agg2 // / \ // Exchange1 Exchange2 // / \ // t1 t2 // In this case, two Aggs shuffle data with the same column of the join condition. // After we use ExchangeCoordinator, these two Aggs may not be partitioned in the same // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle partitions and 2 // post-shuffle partitions. It is possible that Agg1 fetches those pre-shuffle // partitions by using a partitionStartIndices [0, 3]. However, Agg2 may fetch its // pre-shuffle partitions by using another partitionStartIndices [0, 4]. // So, Agg1 and Agg2 are actually not co-partitioned. // // It will be great to introduce a new Partitioning to represent the post-shuffle // partitions when one post-shuffle partition includes multiple pre-shuffle partitions. // 条件2,这个child的children虽然分区数目一样,但是不一定是同一种分区方式,所以加上coordinator val targetPartitioning = distribution.createPartitioning(defaultNumPreShufflePartitions) assert(targetPartitioning.isInstanceOf[HashPartitioning]) ShuffleExchangeExec(targetPartitioning, child, Some(coordinator)) } } else { // If we do not need ExchangeCoordinator, the original children are returned. children }
withCoordinator }
第七步:executedPlan.execute()最后一步执行
1 2 3 4 5 6 7
finaldefexecute(): RDD[InternalRow] = executeQuery { if (isCanonicalizedPlan) { thrownewIllegalStateException("A canonicalized plan is not supposed to be executed.") } //执行各个具体SparkPlan的doExecute函数 doExecute() }
finaldefprepare(): Unit = { // doPrepare() may depend on it's children, we should call prepare() on all the children first. children.foreach(_.prepare()) synchronized { if (!prepared) { prepareSubqueries() doPrepare() prepared = true } } }
protectedoverridedefdoExecute(): RDD[InternalRow] = { val peakMemory = longMetric("peakMemory") val spillSize = longMetric("spillSize") val sortTime = longMetric("sortTime")
//调用child的execute方法,然后对每个partition进行排序 child.execute().mapPartitionsInternal { iter => val sorter = createSorter()
val metrics = TaskContext.get().taskMetrics() // Remember spill data size of this task before execute this operator so that we can // figure out how many bytes we spilled for this operator. val spillSizeBefore = metrics.memoryBytesSpilled // 排序 val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) sortTime += sorter.getSortTimeNanos / 1000000 peakMemory += sorter.getPeakMemoryUsage spillSize += metrics.memoryBytesSpilled - spillSizeBefore metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]()
overrideprotecteddefdoPrepare(): Unit = { // If an ExchangeCoordinator is needed, we register this Exchange operator // to the coordinator when we do prepare. It is important to make sure // we register this operator right before the execution instead of register it // in the constructor because it is possible that we create new instances of // Exchange operators when we transform the physical plan // (then the ExchangeCoordinator will hold references of unneeded Exchanges). // So, we should only call registerExchange just before we start to execute // the plan. coordinator match { // 向exchangeCoordinator注册该exchange caseSome(exchangeCoordinator) => exchangeCoordinator.registerExchange(this) case _ => } }
protectedoverridedefdoExecute(): RDD[InternalRow] = attachTree(this, "execute") { // Returns the same ShuffleRowRDD if this plan is used by multiple plans. // 有缓存则直接返回缓存 if (cachedShuffleRDD == null) { cachedShuffleRDD = coordinator match { // 有exchangeCoordinator caseSome(exchangeCoordinator) => val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) shuffleRDD // 没有exchangeCoordinator case _ => val shuffleDependency = prepareShuffleDependency() preparePostShuffleRDD(shuffleDependency) } } cachedShuffleRDD }
// Now, we manually create a ShuffleDependency. Because pairs in rddWithPartitionIds // are in the form of (partitionId, row) and every partitionId is in the expected range // [0, part.numPartitions - 1]. The partitioner of this is a PartitionIdPassthrough. val dependency = newShuffleDependency[Int, InternalRow, InternalRow]( rddWithPartitionIds, newPartitionIdPassthrough(part.numPartitions), serializer)
dependency }
第十一步: preparePostShuffleRDD(shuffleDependency)方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14
private[exchange] defpreparePostShuffleRDD( shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow], specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = { // If an array of partition start indices is provided, we need to use this array // to create the ShuffledRowRDD. Also, we need to update newPartitioning to // update the number of post-shuffle partitions. // 如果specifiedPartitionStartIndices存在,它将决定shuffle后的分区情况 // exchangeCoordinator 会用到specifiedPartitionStartIndices来实现功能 specifiedPartitionStartIndices.foreach { indices => assert(newPartitioning.isInstanceOf[HashPartitioning]) newPartitioning = UnknownPartitioning(indices.length) } newShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices) }
// 分区数目 private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions
// 每个partition的startIndice private[this] val partitionStartIndices: Array[Int] = specifiedPartitionStartIndices match { caseSome(indices) => indices caseNone => // When specifiedPartitionStartIndices is not defined, every post-shuffle partition // corresponds to a pre-shuffle partition. (0 until numPreShufflePartitions).toArray }
// rdd 的partitioner private[this] val part: Partitioner = newCoalescedPartitioner(dependency.partitioner, partitionStartIndices)
// 获取所有的partition overridedefgetPartitions: Array[Partition] = { assert(partitionStartIndices.length == part.numPartitions) Array.tabulate[Partition](partitionStartIndices.length) { i => val startIndex = partitionStartIndices(i) val endIndex = if (i < partitionStartIndices.length - 1) { partitionStartIndices(i + 1) } else { numPreShufflePartitions } newShuffledRowRDDPartition(i, startIndex, endIndex) } }
overridedefgetPreferredLocations(partition: Partition): Seq[String] = { val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] tracker.getPreferredLocationsForShuffle(dep, partition.index) }
overridedefcompute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition] // The range of pre-shuffle partitions that we are fetching at here is // [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1]. val reader = SparkEnv.get.shuffleManager.getReader( dependency.shuffleHandle, shuffledRowPartition.startPreShufflePartitionIndex, shuffledRowPartition.endPreShufflePartitionIndex, context) reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2) }
/** * A Partitioner that might group together one or more partitions from the parent. * * @param parent a parent partitioner * @param partitionStartIndices indices of partitions in parent that should create new partitions * in child (this should be an array of increasing partition IDs). For example, if we have a * parent with 5 partitions, and partitionStartIndices is [0, 2, 4], we get three output * partitions, corresponding to partition ranges [0, 1], [2, 3] and [4] of the parent partitioner. */ classCoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: Array[Int]) extendsPartitioner{ // 实现 partition 的转换 @transientprivatelazyval parentPartitionMapping: Array[Int] = { val n = parent.numPartitions val result = newArray[Int](n) for (i <- 0 until partitionStartIndices.length) { val start = partitionStartIndices(i) val end = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n for (j <- start until end) { result(j) = i } } result }
@GuardedBy("this") privatedefdoEstimationIfNecessary(): Unit = synchronized { // It is unlikely that this method will be called from multiple threads // (when multiple threads trigger the execution of THIS physical) // because in common use cases, we will create new physical plan after // users apply operations (e.g. projection) to an existing DataFrame. // However, if it happens, we have synchronized to make sure only one // thread will trigger the job submission. if (!estimated) { // Make sure we have the expected number of registered Exchange operators. assert(exchanges.length == numExchanges)
val newPostShuffleRDDs = newJHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges)
// Submit all map stages val shuffleDependencies = ArrayBuffer[ShuffleDependency[Int, InternalRow, InternalRow]]() val submittedStageFutures = ArrayBuffer[SimpleFutureAction[MapOutputStatistics]]() var i = 0 // 依次执行每个注册的exchange的prepareShuffleDependency方法 while (i < numExchanges) { val exchange = exchanges(i) val shuffleDependency = exchange.prepareShuffleDependency() shuffleDependencies += shuffleDependency if (shuffleDependency.rdd.partitions.length != 0) { // submitMapStage does not accept RDD with 0 partition. // So, we will not submit this dependency. submittedStageFutures += exchange.sqlContext.sparkContext.submitMapStage(shuffleDependency) } i += 1 }
// Wait for the finishes of those submitted map stages. // 统计结果 val mapOutputStatistics = newArray[MapOutputStatistics](submittedStageFutures.length) var j = 0 while (j < submittedStageFutures.length) { // This call is a blocking call. If the stage has not finished, we will wait at here. mapOutputStatistics(j) = submittedStageFutures(j).get() j += 1 }
// If we have mapOutputStatistics.length < numExchange, it is because we do not submit // a stage when the number of partitions of this dependency is 0. assert(mapOutputStatistics.length <= numExchanges)
// Now, we estimate partitionStartIndices. partitionStartIndices.length will be the // number of post-shuffle partitions. // 得到partitionStartIndices val partitionStartIndices = if (mapOutputStatistics.length == 0) { Array.empty[Int] } else { // 根据 mapOutputStatistics 获取 partitionStartIndices estimatePartitionStartIndices(mapOutputStatistics) } // 执行preparePostShuffleRDD,和没有exchangeCoordinator唯一的不同是有partitionStartIndices参数! var k = 0 while (k < numExchanges) { val exchange = exchanges(k) val rdd = exchange.preparePostShuffleRDD(shuffleDependencies(k), Some(partitionStartIndices)) newPostShuffleRDDs.put(exchange, rdd)
k += 1 }
// Finally, we set postShuffleRDDs and estimated. assert(postShuffleRDDs.isEmpty) assert(newPostShuffleRDDs.size() == numExchanges) // 结果放入缓存 postShuffleRDDs.putAll(newPostShuffleRDDs) estimated = true } }
/** * Estimates partition start indices for post-shuffle partitions based on * mapOutputStatistics provided by all pre-shuffle stages. */ defestimatePartitionStartIndices( mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = { // If minNumPostShufflePartitions is defined, it is possible that we need to use a // value less than advisoryTargetPostShuffleInputSize as the target input size of // a post shuffle task. // 每个partition的目标inputsize,即每个分区数据量的大小 val targetPostShuffleInputSize = minNumPostShufflePartitions match { caseSome(numPartitions) => val totalPostShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId.sum).sum // The max at here is to make sure that when we have an empty table, we // only have a single post-shuffle partition. // There is no particular reason that we pick 16. We just need a number to // prevent maxPostShuffleInputSize from being set to 0. val maxPostShuffleInputSize = math.max(math.ceil(totalPostShuffleInputSize / numPartitions.toDouble).toLong, 16) math.min(maxPostShuffleInputSize, advisoryTargetPostShuffleInputSize)
// Make sure we do get the same number of pre-shuffle partitions for those stages. // 得到分区数,应该有且只有一个数值 val distinctNumPreShufflePartitions = mapOutputStatistics.map(stats => stats.bytesByPartitionId.length).distinct // The reason that we are expecting a single value of the number of pre-shuffle partitions // is that when we add Exchanges, we set the number of pre-shuffle partitions // (i.e. map output partitions) using a static setting, which is the value of // spark.sql.shuffle.partitions. Even if two input RDDs are having different // number of partitions, they will have the same number of pre-shuffle partitions // (i.e. map output partitions). assert( distinctNumPreShufflePartitions.length == 1, "There should be only one distinct value of the number pre-shuffle partitions " + "among registered Exchange operator.") val numPreShufflePartitions = distinctNumPreShufflePartitions.head // 开始构建partitionStartIndices val partitionStartIndices = ArrayBuffer[Int]() // The first element of partitionStartIndices is always 0. partitionStartIndices += 0
var postShuffleInputSize = 0L // 根据targetPostShuffleInputSize,对分区进行调整,会做一些合并之类的操作。 var i = 0 while (i < numPreShufflePartitions) { // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages. // Then, we add the total size to postShuffleInputSize. var nextShuffleInputSize = 0L var j = 0 while (j < mapOutputStatistics.length) { nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i) j += 1 }
// If including the nextShuffleInputSize would exceed the target partition size, then start a // new partition. if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) { partitionStartIndices += i // reset postShuffleInputSize. postShuffleInputSize = nextShuffleInputSize } else postShuffleInputSize += nextShuffleInputSize