Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48270][SQL] Improve the OptimizeRepartition with Repartition #46573

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

beliefer
Copy link
Contributor

@beliefer beliefer commented May 14, 2024

What changes were proposed in this pull request?

This PR propose to improve the OptimizeRepartition with Repartition.

Why are the changes needed?

Repartition is cheaper than RepartitionByExpression.

Does this PR introduce any user-facing change?

'No'.

How was this patch tested?

GA

Was this patch authored or co-authored using generative AI tooling?

'No'.

@github-actions github-actions bot added the SQL label May 14, 2024
@beliefer beliefer changed the title [WIP][SPARK-48270][SQL] Improve the OptimizeRepartition with Repartition [SPARK-48270][SQL] Improve the OptimizeRepartition with Repartition May 15, 2024
@beliefer
Copy link
Contributor Author

ping @cloud-fan

@cloud-fan
Copy link
Contributor

cloud-fan commented May 15, 2024

Why is Repartition cheaper than RepartitionByExpression?

@beliefer
Copy link
Contributor Author

Why is RepartitionByExpression cheaper than Repartition?

Repartition is cheaper than RepartitionByExpression.

@beliefer
Copy link
Contributor Author

For example:

  private def canEliminateGlobalSort(plan: LogicalPlan): Boolean = plan match {
    case r: RepartitionByExpression => r.partitionExpressions.forall(_.deterministic)
    case r: RebalancePartitions => r.partitionExpressions.forall(_.deterministic)
    case _: Repartition => true
    case _ => false
  }
  protected def userSpecifiedRepartition(p: LogicalPlan): Boolean = p match {
    case _: Repartition => true
    case r: RepartitionByExpression
      if r.optNumPartitions.isDefined || r.partitionExpressions.nonEmpty => true
    case _ => false
  }
      case r: logical.RepartitionByExpression =>
        val shuffleOrigin = if (r.partitionExpressions.isEmpty && r.optNumPartitions.isEmpty) {
          REBALANCE_PARTITIONS_BY_NONE
        } else if (r.optNumPartitions.isEmpty) {
          REPARTITION_BY_COL
        } else {
          REPARTITION_BY_NUM
        }
        exchange.ShuffleExchangeExec(
          r.partitioning, planLater(r.child),
          shuffleOrigin, r.optAdvisoryPartitionSize) :: Nil
      case r @ logical.Repartition(numPartitions, shuffle, child) =>
        if (shuffle) {
          ShuffleExchangeExec(r.partitioning, planLater(child), REPARTITION_BY_NUM) :: Nil
        } else {
          execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
        }

@cloud-fan
Copy link
Contributor

For the places you mentioned, RepartitionByExpression with constant is the same as Repartition?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants