Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][experimental] Add support for dynamically sized torch.Tensors passed via NCCL in accelerated DAG #45332

Merged
merged 95 commits into from
May 20, 2024

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented May 14, 2024

Why are these changes needed?

This adds support for dynamically sized torch.Tensors to be passed between accelerated DAG nodes via NCCL. Specifically, the following code is now supported, whereas previously shape and dtype had to be explicitly passed to TorchTensorType.

    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)

    compiled_dag = dag.experimental_compile()

The feature works by creating a shared memory channel to pass the metadata for the shape and dtype of the tensor. The metadata is then used to create a buffer of the correct size on the NCCL receiver.

Initial microbenchmarks shows this adds about 50% throughput overhead compared to statically declaring the shape and dtype, or about 160us/DAG call. This seems a bit higher than expected (see also #45319).

This also adds a few other fixes:

  • adds support for reusing actors to create new NCCL groups, which is needed if a DAG is torn down and a new one is created.
  • adds a lock to DAG teardown, to prevent the same NCCL group from getting destructed twice.
  • User-defined TorchTensorType shape or dtype is now used as a hint for the buffer size, instead of a required size. Since buffers are currently static, an error will be thrown if the user tries to return a too-large tensor.

Related issue number

Part 1 of #45306, will follow up with a separate PR for nested tensors.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

stephanie-wang and others added 30 commits April 17, 2024 15:56
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
GPU
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Your Name <you@example.com>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
python/ray/experimental/channel/torch_tensor_type.py Outdated Show resolved Hide resolved
# CPU and another from CPU to shared memory. Ideally we should elide
# the first copy and memcpy directly from GPU to the shared memory
# buffer.
if tensor.device.type == "cuda":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I searched online and see that most people use tensor.detach().cpu().numpy(). It seems to be almost the same as the current function, but more memory efficient (though I'm not 100% sure). In addition, detach also has some influence on autodiff. I don't have enough context about ADAG, PyTorch, and ML, so I can't determine are they equivalent in this case or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I don't think detach matters yet since we only care about inference, but yes I think we will want to do that for training.

Not sure if there is much of a difference between cpu() vs Tensor.to.

Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need some iterations.

@@ -442,12 +433,18 @@ def _preprocess(self) -> None:
)
self.actor_task_count[actor_handle._actor_id] += 1

if (
isinstance(dag_node.type_hint, TorchTensorType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why do we check whether the type is TorchTensorType here? Is NCCL only able to transfer TorchTensorType, or can it transfer different data types, but we currently only implement the data transfer with NCCL for TorchTensorType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can only transfer TorchTensorType (for now).

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
torch_tensor_serializer.serialize_to_numpy,
torch_tensor_serializer.deserialize_from_numpy,
),
ray.util.serialization.register_serializer(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, what's the scope of the serializer? Is it per Ray job, per core worker process, or per channel? To elaborate, if there are multiple nodes in the DAG that use the same ChannelOutputType (e.g., TorchTensorType), do different worker processes for actor tasks reuse the serializer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question, actually I think our docs are off on this... they say that serializers need to be registered on each worker, so it should be per core worker process. But it seemed to me that register_serializer would actually ship the deserializer with the value.

For the same type, a single worker would reuse the same serialization context.

I'll add some notes about this.

for dim in shape:
num_elements *= dim
element_size_bytes = TORCH_DTYPE_ITEMSIZE_MAP[self.dtype]
buffer_size_bytes = int(num_elements * element_size_bytes)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to convert to int here? Is it possible for it to be a non-int type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think no but probably better just for safety.

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good!

Initial microbenchmarks shows this adds about 50% throughput overhead compared to statically declaring the shape and dtype, or about 160us/DAG call. This seems a bit higher than expected (see also #45319).

This seems like expected? (IIRC, our shared memory overhread was around 100~200us).
When you say 50%, how much gpu data is transferred?

python/ray/experimental/channel/torch_tensor_type.py Outdated Show resolved Hide resolved
# FLOAT types
torch.half: 2,
torch.float: 4,
torch.float16: 2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add bfloat16?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also probably should add other data types such as https://pytorch.org/docs/stable/tensors.html

# 100KB so that we have room to store an exception if needed.
MIN_TENSOR_BUFFER_SIZE = 100_000
# 100KB to store metadata and/or exceptions.
TENSOR_PADDING_SIZE_BYTES = 100_000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: isn't 100KB too big for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not too much overhead other than CPU memory consumption here, so I think it's okay for now. For remote data, only the actual object should get transferred, not the entire allocated buffer.

We can make it adjustable in the future.

def set_torch_device(self, torch_device: "torch.device") -> None:
self.torch_device = torch_device

def reset_tensors(self, tensors: List["torch.Tensor"]) -> List["torch.Tensor"]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah it's used in the following PR, removed.



@DeveloperAPI
@dataclass
class ChannelContext:
# Used for the torch.Tensor NCCL transport.
nccl_group: Optional["_NcclGroup"] = None
serialization_context = _SerializationContext()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to keep it here btw? Isn't it cleaner to just create it on the fly whenever serialization is needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for convenience. The ChannelContext will get created on the fly.

python/ray/experimental/channel/nccl_group.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved

self.dag_output_channels = []
for output in self.idx_to_task[self.output_task_idx].args:
assert isinstance(output, DAGNode)
output_idx = self.dag_node_to_idx[output]
self.dag_output_channels.append(self.idx_to_task[output_idx].output_channel)
# Register custom serializers for DAG outputs.
output.type_hint.register_custom_serializer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: does it have to be called like this? Isn't it possible to just make it a part of type hint constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, let me do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm actually going to hold off on this. We may not always want to register the serializer, i.e. if the value is just getting passed through the driver to another actor. Also, we may need to wait until after the constructor before we can define the serializer.

stephanie-wang and others added 2 commits May 17, 2024 11:12
Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Co-authored-by: Kai-Hsun Chen <kaihsun@apache.org>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@stephanie-wang
Copy link
Contributor Author

Looks pretty good!

Initial microbenchmarks shows this adds about 50% throughput overhead compared to statically declaring the shape and dtype, or about 160us/DAG call. This seems a bit higher than expected (see also #45319).

This seems like expected? (IIRC, our shared memory overhread was around 100~200us). When you say 50%, how much gpu data is transferred?

Current microbenchmarks actually show <50us per accelerated DAG call. So it's a little strange.

This was for <=1MiB tensors. I don't know exactly when the overhead disappears but for 1GiB tensors there is negligible overhead.

Signed-off-by: Your Name <you@example.com>
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! +1 on this #45319. I feel like it is safer to kill actors when nccl destroy times out just in case (especially given it is hard to test and we don't understand this very well yet), but I will leave it yup to you.

@stephanie-wang
Copy link
Contributor Author

LGTM! +1 on this #45319. I feel like it is safer to kill actors when nccl destroy times out just in case (especially given it is hard to test and we don't understand this very well yet), but I will leave it yup to you.

For the initial case what I would like to do is just offer two options, one to kill the actors and the other that syncs the stream and raises an exception to keep the actors alive. I agree it needs more testing, but we can probably defer that.

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@stephanie-wang stephanie-wang added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label May 17, 2024
@stephanie-wang
Copy link
Contributor Author

Hmm some CI test issue about not being able to find GPUs...

This reverts commit 9915fbe.
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@stephanie-wang stephanie-wang enabled auto-merge (squash) May 18, 2024 00:13
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@github-actions github-actions bot disabled auto-merge May 18, 2024 01:13
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
@@ -339,5 +339,6 @@ steps:
- bazel run //ci/ray_ci:test_in_docker -- //python/ray/tests/... //python/ray/dag/... core
--parallelism-per-worker 2 --gpus 2
--build-name coregpubuild
--only-tags multi_gpu
--only-tags multi_gpu || true
- sleep 1000000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w00t i think you might forget to remove this on CI so it was running for 8 hours

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Copy link
Collaborator

@can-anyscale can-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ci changes look good, thankks

@@ -1,7 +1,7 @@
# coding: utf-8
import logging
import torch
import pickle
import ray.cloudpickle as pickle
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to run this release test on this PR?

@stephanie-wang stephanie-wang merged commit ca9f736 into ray-project:master May 20, 2024
5 of 6 checks passed
stephanie-wang added a commit that referenced this pull request May 26, 2024
…ython objects (#45473)

Allows torch.Tensors nested inside Python objects to be transferred via
NCCL using the following syntax:

```python
    with InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
        dag = receiver.recv.bind(dag)
```

We implement this by using an additional shared memory channel to pass
CPU data, with a "nested" NCCL channel to pass the GPU data. Here is the
send procedure for the above code:
1. Serialize the data. Extract out all tensors that are on the GPU and
replace them with some kind of placeholder.
2. Send a list of metadata through the meta_channel.
3. Send the GPU tensors through the NCCL channel.
4. Send the rest of the CPU data through a cpu_data_channel, with the
placeholders for the GPU tensors.

Note that if the TorchTensorType doesn't have a shape and dtype
specified, we currently use the separate meta_channel to pass metadata
for the serialized tensors, as introduced in #45332. To elide the
cpu_data_channel, the user should now use
`TorchTensorType(direct_return=True)`, to indicate that no CPU data is
sent along with the GPU data. To elide the meta_channel, the user should
declare the shape and dtype, e.g., `TorchTensorType(shape=(10, ),
dtype=torch.float16)`.

## Related issue number

Closes #45306.

---------

Signed-off-by: Stephanie Wang <swang@cs.berkeley.edu>
Co-authored-by: SangBin Cho <rkooo567@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. go Trigger full test run on premerge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants