Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect #46570

Closed
wants to merge 3 commits into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented May 14, 2024

What changes were proposed in this pull request?

This PR proposes to DataFrame.checkpoint and DataFrame.localCheckpoint API in Spark Connect.

Overview

Screenshot 2024-05-16 at 10 39 25 AM

  1. Spark Connect Client invokes [local]checkpoint
    • Connects to the server, store (Session UI, UUID) <> Checkpointed DataFrame
  2. Execute [local]checkpoint
  3. Returns UUID for the checkedpointed DataFrame.
    • Client side holds the UUID with truncated (replaced) the protobuf message
  4. When the DataFrame in client side is garbage-collected, it is invoked to clear the state within Spark Connect server.
  5. If the checkpointed RDD is not referred anymore (e.g., not even by temp view as an example), it is cleaned by ContextCleaner (which runs separately, and periodically)
  6. *When the session is closed, it attempts to clear all mapped state in Spark Connect server (because it is not guaranteed to call DataFrame.__del__ in Python upon garbage-collection)
  7. *If the checkpointed RDD is not referred anymore (e.g., not even by temp view as an example), it is cleaned by ContextCleaner (which runs separately, and periodically)

*In 99.999% cases, the state (map<(session_id, uuid), c'p'dataframe>) will be cleared when DataFrame is garbage-collected, e.g., unless there are some crashes. Practically, Py4J also leverages to clean up their Java objects. For 0.001% cases, the 6. and 7. address them. Both steps happen when session is closed, and session holder is released, see also apache/spark#41580.

Command/RPCs

Reuse CachedRemoteRelation (from apache/spark#41580)

message Command {
  oneof command_type {
    ...
    CheckpointCommand checkpoint_command = 14;
    RemoveCachedRemoteRelationCommand remove_cached_remote_relation_command = 15;
    ...
  }
}

// Command to remove `CashedRemoteRelation`
message RemoveCachedRemoteRelationCommand {
  // (Required) The remote to be related
  CachedRemoteRelation relation = 1;
}

message CheckpointCommand {
  // (Required) The logical plan to checkpoint.
  Relation relation = 1;

  // (Optional) Locally checkpoint using a local temporary
  // directory in Spark Connect server (Spark Driver)
  optional bool local = 2;

  // (Optional) Whether to checkpoint this dataframe immediately.
  optional bool eager = 3;
}

message CheckpointCommandResult {
  // (Required) The logical plan checkpointed.
  CachedRemoteRelation relation = 1;
}
message ExecutePlanResponse {

  ...

  oneof response_type {

    ...

    CheckpointCommandResult checkpoint_command_result = 19;
  }

  ...

  message Checkpoint {
    // (Required) The logical plan checkpointed.
    CachedRemoteRelation relation = ...;
  }
}

Usage

./sbin/start-connect-server.sh --conf spark.checkpoint.dir=/path/to/checkpoint
spark.range(1).localCheckpoint()
spark.range(1).checkpoint()

Why are the changes needed?

For feature parity without Spark Connect.

Does this PR introduce any user-facing change?

Yes, it adds both DataFrame.checkpoint and DataFrame.localCheckpoint API in Spark Connect.

How was this patch tested?

Unittests, and manually tested as below:

Code

./bin/pyspark --remote "local[*]"
>>> df = spark.range(1).localCheckpoint()
>>> df.explain(True)
== Parsed Logical Plan ==
LogicalRDD [id#1L], false

== Analyzed Logical Plan ==
id: bigint
LogicalRDD [id#1L], false

== Optimized Logical Plan ==
LogicalRDD [id#1L], false

== Physical Plan ==
*(1) Scan ExistingRDD[id#1L]

>>> df._plan
<pyspark.sql.connect.plan.CachedRemoteRelation object at 0x147734a50>
>>> del df

Logs

...
{"ts":"2024-05-14T06:18:01.711Z","level":"INFO","msg":"Caching DataFrame with id 7316f315-d20d-446d-b5e7-ac848870e280","context":{"dataframe_id":"7316f315-d20d-446d-b5e7-ac848870e280"},"logger":"SparkConnectAnalyzeHandler"}
...
{"ts":"2024-05-14T06:18:11.718Z","level":"INFO","msg":"Removing DataFrame with id 7316f315-d20d-446d-b5e7-ac848870e280 from the cache","context":{"dataframe_id":"7316f315-d20d-446d-b5e7-ac848870e280"},"logger":"SparkConnectPlanner"}
...

Was this patch authored or co-authored using generative AI tooling?

No.

@HyukjinKwon HyukjinKwon changed the title [[SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [DO-NOT-MERGE][SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect May 14, 2024
@HyukjinKwon HyukjinKwon marked this pull request as draft May 14, 2024 04:13
@HyukjinKwon HyukjinKwon force-pushed the SPARK-48258 branch 13 times, most recently from 88b3fa1 to 7a4d6f7 Compare May 14, 2024 07:46
@HyukjinKwon HyukjinKwon changed the title [DO-NOT-MERGE][SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect May 16, 2024
@HyukjinKwon HyukjinKwon marked this pull request as ready for review May 16, 2024 01:38
@HyukjinKwon HyukjinKwon force-pushed the SPARK-48258 branch 3 times, most recently from 4b7f7a1 to cf26e14 Compare May 16, 2024 05:36
HyukjinKwon added a commit that referenced this pull request May 16, 2024
…tDir

### What changes were proposed in this pull request?

This PR adds `spark.checkpoint.dir` configuration so users can set the checkpoint dir when they submit their application.

### Why are the changes needed?

Separate the configuration logic so the same app can run with a different checkpoint.
In addition, this would be useful for Spark Connect with #46570.

### Does this PR introduce _any_ user-facing change?

Yes, it adds a new user-facing configuration.

### How was this patch tested?

unittest added

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46571 from HyukjinKwon/SPARK-48268.

Lead-authored-by: Hyukjin Kwon <gurwls223@apache.org>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon force-pushed the SPARK-48258 branch 4 times, most recently from d0d8432 to ec0d893 Compare May 17, 2024 08:01
@HyukjinKwon HyukjinKwon marked this pull request as draft May 20, 2024 02:19
@HyukjinKwon HyukjinKwon marked this pull request as ready for review May 20, 2024 03:41
optional bool local = 2;

// (Optional) Whether to checkpoint this dataframe immediately.
optional bool eager = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is eager = true right? Should the protocol encode this better? Currently the protocol defaults to eager = false if the field is not set, so my question is should we flip the logic (i.e. replace this with lazy) so the default behavior does not require you to set additional fields.

The same question for local...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can but I actually followed other cases though (see optional bool at relations.proto).

responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
val target = Dataset
.ofRows(session, transformRelation(checkpointCommand.getRelation))
val checkpointed = if (checkpointCommand.hasLocal && checkpointCommand.hasEager) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also increase the visibility of Dataset.checkpoint(eager: Boolean, reliableCheckpoint: Boolean).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did it locally but I think it's actually better to keep just as is .. I think the current one is easier to read .. Dataset.checkpoint(eager: Boolean, reliableCheckpoint: Boolean) is private as well.

@@ -138,6 +138,41 @@ def __init__(
# by __repr__ and _repr_html_ while eager evaluation opens.
self._support_repr_html = False
self._cached_schema: Optional[StructType] = None
self._cached_remote_relation_id: Optional[str] = None

def __del__(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So no Python GC expert here. I am assuming some system thread is doing this work. Is it wise to execute an RPC from there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least Py4J does the same thing (socket connection).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and that (previous) maintainer knows quite well. Although now I am the maintainer for Py4J though :-).

@HyukjinKwon
Copy link
Member Author

Merged to master.

I will followup the discussion if there are more to address since we're releasing preview soon.

HyukjinKwon added a commit that referenced this pull request May 23, 2024
…Connect client

### What changes were proposed in this pull request?

This PR adds `Dataset.checkpoint` and `Dataset.localCheckpoint` into Scala Spark Connect client. Python API was implemented at #46570

### Why are the changes needed?

For API parity.

### Does this PR introduce _any_ user-facing change?

Yes, it adds `Dataset.checkpoint` and `Dataset.localCheckpoint` into Scala Spark Connect client.

### How was this patch tested?

Unittests added.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46683 from HyukjinKwon/SPARK-48370.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon added a commit that referenced this pull request May 23, 2024
… and eager required fields in CheckpointCommand

### What changes were proposed in this pull request?

This PR is a followup of #46683 and #46570 that refactors `local` and `eager` required fields in `CheckpointCommand`

### Why are the changes needed?

To make the code easier to maintain.

### Does this PR introduce _any_ user-facing change?

No, the main change has not been released yet.

### How was this patch tested?

Manually tested.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46712 from HyukjinKwon/SPARK-48370-SPARK-48258-followup.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants