Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions python/ray/_private/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,7 @@
# See `common.proto` for more details.
class TensorTransportEnum(Enum):
OBJECT_STORE = TensorTransport.Value("OBJECT_STORE")
NCCL = TensorTransport.Value("NCCL")
GLOO = TensorTransport.Value("GLOO")
NIXL = TensorTransport.Value("NIXL")
DIRECT_TRANSPORT = TensorTransport.Value("DIRECT_TRANSPORT")

@classmethod
def from_str(cls, name: str) -> "TensorTransportEnum":
Expand Down
59 changes: 19 additions & 40 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ def put_object(
value: Any,
owner_address: Optional[str] = None,
_is_experimental_channel: bool = False,
_tensor_transport: str = "object_store",
_tensor_transport: str = "OBJECT_STORE",
):
"""Put value in the local object store.

Expand Down Expand Up @@ -835,18 +835,16 @@ def put_object(
"ray.ObjectRef in a list and call 'put' on it."
)
tensors = None
tensor_transport: TensorTransportEnum = TensorTransportEnum.from_str(
_tensor_transport
)
tensor_transport = _tensor_transport.upper()
if tensor_transport not in [
TensorTransportEnum.OBJECT_STORE,
TensorTransportEnum.NIXL,
"OBJECT_STORE",
"NIXL",
]:
raise ValueError(
"Currently, Ray Direct Transport only supports 'object_store' and 'nixl' for tensor transport in ray.put()."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This check is overly restrictive and seems to contradict the goal of this PR, which is to enable "bring your own transport". By limiting the transport to OBJECT_STORE and NIXL, it prevents users from using ray.put with any other custom transport they might register. The validation for supported transports should ideally be handled within the gpu_object_manager, which has access to the transport registry and already performs this check. Removing this check would make the implementation more aligned with the PR's objective.

Additionally, the error message is slightly misleading as OBJECT_STORE is not a "Ray Direct Transport", and it uses lowercase for the transport names which is inconsistent with the code's convention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, this could be something like is object_store or is_one_sided

try:
if tensor_transport != TensorTransportEnum.OBJECT_STORE:
if tensor_transport != "OBJECT_STORE":
(
serialized_value,
tensors,
Expand All @@ -867,19 +865,24 @@ def put_object(
# object. Instead, clients will keep the object pinned.
pin_object = not _is_experimental_channel

tensor_transport_enum = TensorTransportEnum.OBJECT_STORE
if tensor_transport != "OBJECT_STORE":
tensor_transport_enum = TensorTransportEnum.DIRECT_TRANSPORT

# This *must* be the first place that we construct this python
# ObjectRef because an entry with 0 local references is created when
# the object is Put() in the core worker, expecting that this python
# reference will be created. If another reference is created and
# removed before this one, it will corrupt the state in the
# reference counter.

ret = self.core_worker.put_object(
serialized_value,
pin_object=pin_object,
owner_address=owner_address,
inline_small_object=True,
_is_experimental_channel=_is_experimental_channel,
tensor_transport_val=tensor_transport.value,
tensor_transport_val=tensor_transport_enum.value,
)
if tensors:
self.gpu_object_manager.put_object(ret, tensor_transport, tensors)
Expand All @@ -896,43 +899,26 @@ def deserialize_objects(
self,
serialized_objects,
object_refs,
tensor_transport_hint: Optional[TensorTransportEnum] = None,
tensor_transport_hint: Optional[str] = None,
):
gpu_objects: Dict[str, List["torch.Tensor"]] = {}
for obj_ref, (_, _, tensor_transport) in zip(object_refs, serialized_objects):
# TODO: Here tensor_transport_hint is set by the user in ray.get(), tensor_transport is set
# in serialize_objects by ray.method(tensor_transport="xxx"), and obj_ref.tensor_transport()
# is set by ray.put(). We may clean up this logic in the future.
if (
tensor_transport is None
or tensor_transport == TensorTransportEnum.OBJECT_STORE
) and (
obj_ref is None
or obj_ref.tensor_transport() == TensorTransportEnum.OBJECT_STORE.value
):
# The object is not a gpu object, so we cannot use other external transport to
# fetch it.
continue

# If the object is a gpu object, we can choose to use the object store or other external
# transport to fetch it. The `tensor_transport_hint` has the highest priority, then the
# tensor_transport in obj_ref.tensor_transport(), then the tensor_transport in serialize_objects,
# then the default value `OBJECT_STORE`.
chosen_tensor_transport = (
tensor_transport_hint
or (
TensorTransportEnum(obj_ref.tensor_transport()) if obj_ref else None
)
or tensor_transport
or TensorTransportEnum.OBJECT_STORE
)

object_id = obj_ref.hex()
if object_id not in gpu_objects:
# If using a non-object store transport, then tensors will be sent
# out-of-band. Get them before deserializing the object store data.
# The user can choose OBJECT_STORE as the hint to fetch the RDT object
# through the object store.
gpu_objects[object_id] = self.gpu_object_manager.get_gpu_object(
object_id, tensor_transport=chosen_tensor_transport
object_id, tensor_transport=tensor_transport_hint
)

# Function actor manager or the import thread may call pickle.loads
Expand Down Expand Up @@ -983,16 +969,6 @@ def get_objects(
f"Attempting to call `get` on the value {object_ref}, "
"which is not an ray.ObjectRef."
)
tensor_transport: TensorTransportEnum = (
TensorTransportEnum.from_str(_tensor_transport)
if _tensor_transport is not None
else None
)
assert tensor_transport in [
TensorTransportEnum.OBJECT_STORE,
TensorTransportEnum.NIXL,
None,
], "Currently, RDT only supports 'object_store' and 'nixl' for tensor transport in ray.get()."
timeout_ms = (
int(timeout * 1000) if timeout is not None and timeout != -1 else -1
)
Expand All @@ -1004,7 +980,7 @@ def get_objects(
)

debugger_breakpoint = b""
for data, metadata, _ in serialized_objects:
for _, metadata, _ in serialized_objects:
if metadata:
metadata_fields = metadata.split(b",")
if len(metadata_fields) >= 2 and metadata_fields[1].startswith(
Expand All @@ -1016,6 +992,9 @@ def get_objects(
if skip_deserialization:
return None, debugger_breakpoint

tensor_transport = (
_tensor_transport.upper() if _tensor_transport is not None else None
)
values = self.deserialize_objects(
serialized_objects, object_refs, tensor_transport_hint=tensor_transport
)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2874,7 +2874,7 @@ cdef class CoreWorker:
c_object_ids, timeout_ms, results)
check_status(op_status)

return RayObjectsToSerializedRayObjects(results)
return RayObjectsToSerializedRayObjects(results, object_refs)

def get_if_local(self, object_refs):
"""Get objects from local plasma store directly
Expand All @@ -2886,7 +2886,7 @@ cdef class CoreWorker:
check_status(
CCoreWorkerProcess.GetCoreWorker().GetIfLocal(
c_object_ids, &results))
return RayObjectsToSerializedRayObjects(results)
return RayObjectsToSerializedRayObjects(results, object_refs)

def object_exists(self, ObjectRef object_ref, memory_store_only=False):
cdef:
Expand Down
45 changes: 22 additions & 23 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,7 @@ def annotate_method(method: Callable[_P, _Ret]):
if "enable_task_events" in kwargs and kwargs["enable_task_events"] is not None:
method.__ray_enable_task_events__ = kwargs["enable_task_events"]
if "tensor_transport" in kwargs:
method.__ray_tensor_transport__ = TensorTransportEnum.from_str(
kwargs["tensor_transport"]
)
method.__ray_tensor_transport__ = kwargs["tensor_transport"].upper()
return method

# Check if decorator is called without parentheses (args[0] would be the function)
Expand Down Expand Up @@ -521,7 +519,7 @@ def __init__(
enable_task_events: bool,
decorator: Optional[Any] = None,
signature: Optional[List[inspect.Parameter]] = None,
tensor_transport: Optional[TensorTransportEnum] = None,
tensor_transport: Optional[str] = None,
):
"""Initialize an _ActorMethodMetadata.

Expand Down Expand Up @@ -599,7 +597,7 @@ def __init__(
enable_task_events: bool,
decorator=None,
signature: Optional[List[inspect.Parameter]] = None,
tensor_transport: Optional[TensorTransportEnum] = None,
tensor_transport: Optional[str] = None,
):
"""Initialize an ActorMethod.

Expand Down Expand Up @@ -649,10 +647,10 @@ def __init__(
# and return the resulting ObjectRefs.
self._decorator = decorator

# If the task call doesn't specify a tensor transport option, use `_tensor_transport`
# If the task call doesn't specify a tensor transport option, use `OBJECT_STORE`
# as the default transport for this actor method.
if tensor_transport is None:
tensor_transport = TensorTransportEnum.OBJECT_STORE
tensor_transport = "OBJECT_STORE"
self._tensor_transport = tensor_transport

def __call__(self, *args, **kwargs):
Expand Down Expand Up @@ -695,7 +693,7 @@ def options(self, **options):

tensor_transport = options.get("tensor_transport", None)
if tensor_transport is not None:
options["tensor_transport"] = TensorTransportEnum.from_str(tensor_transport)
options["tensor_transport"] = tensor_transport.upper()

class FuncWrapper:
def remote(self, *args, **kwargs):
Expand Down Expand Up @@ -800,7 +798,7 @@ def _remote(
concurrency_group=None,
_generator_backpressure_num_objects=None,
enable_task_events=None,
tensor_transport: Optional[TensorTransportEnum] = None,
tensor_transport: Optional[str] = None,
):
if num_returns is None:
num_returns = self._num_returns
Expand All @@ -820,23 +818,23 @@ def _remote(
if tensor_transport is None:
tensor_transport = self._tensor_transport

if tensor_transport != TensorTransportEnum.OBJECT_STORE:
if tensor_transport != "OBJECT_STORE":
if num_returns != 1:
raise ValueError(
f"Currently, methods with tensor_transport={tensor_transport.name} only support 1 return value. "
f"Currently, methods with tensor_transport={tensor_transport} only support 1 return value. "
"Please make sure the actor method is decorated with `@ray.method(num_returns=1)` (the default)."
)
if not self._actor._ray_enable_tensor_transport:
raise ValueError(
f'Currently, methods with .options(tensor_transport="{tensor_transport.name}") are not supported when enable_tensor_transport=False. '
f'Currently, methods with .options(tensor_transport="{tensor_transport}") are not supported when enable_tensor_transport=False. '
"Please set @ray.remote(enable_tensor_transport=True) on the actor class definition."
)
gpu_object_manager = ray._private.worker.global_worker.gpu_object_manager
if not gpu_object_manager.actor_has_tensor_transport(
self._actor, tensor_transport
):
raise ValueError(
f'{self._actor} does not have tensor transport {tensor_transport.name} available. If using a collective-based transport ("nccl" or "gloo"), please create a communicator with '
f'{self._actor} does not have tensor transport {tensor_transport} available. If using a collective-based transport ("nccl" or "gloo"), please create a communicator with '
"`ray.experimental.collective.create_collective_group` "
"before calling actor tasks with non-default tensor_transport."
)
Expand Down Expand Up @@ -876,7 +874,7 @@ def invocation(args, kwargs):
invocation = self._decorator(invocation)

object_refs = invocation(args, kwargs)
if tensor_transport != TensorTransportEnum.OBJECT_STORE:
if tensor_transport != "OBJECT_STORE":
# Currently, we only support transfer tensor out-of-band when
# num_returns is 1.
assert isinstance(object_refs, ObjectRef)
Expand Down Expand Up @@ -979,14 +977,12 @@ def create(
self.enable_task_events = {}
self.generator_backpressure_num_objects = {}
self.concurrency_group_for_methods = {}
self.method_name_to_tensor_transport: Dict[str, TensorTransportEnum] = {}
self.method_name_to_tensor_transport: Dict[str, str] = {}

# Check whether any actor methods specify a non-default tensor transport.
self.has_tensor_transport_methods = any(
getattr(
method, "__ray_tensor_transport__", TensorTransportEnum.OBJECT_STORE
)
!= TensorTransportEnum.OBJECT_STORE
getattr(method, "__ray_tensor_transport__", "OBJECT_STORE")
!= "OBJECT_STORE"
for _, method in actor_methods
)

Expand Down Expand Up @@ -1941,7 +1937,7 @@ def __init__(
method_generator_backpressure_num_objects: Dict[str, int],
method_enable_task_events: Dict[str, bool],
enable_tensor_transport: bool,
method_name_to_tensor_transport: Dict[str, TensorTransportEnum],
method_name_to_tensor_transport: Dict[str, str],
actor_method_cpus: int,
actor_creation_function_descriptor,
cluster_and_job,
Expand All @@ -1968,7 +1964,7 @@ def __init__(
this actor. If True, then methods can be called with
.options(tensor_transport=...) to specify a non-default tensor
transport.
method_name_to_tensor_transport: Dictionary mapping method names to their tensor transport settings.
method_name_to_tensor_transport: Dictionary mapping method names to their tensor transport type.
actor_method_cpus: The number of CPUs required by actor methods.
actor_creation_function_descriptor: The function descriptor for actor creation.
cluster_and_job: The cluster and job information.
Expand Down Expand Up @@ -2079,7 +2075,7 @@ def _actor_method_call(
concurrency_group_name: Optional[str] = None,
generator_backpressure_num_objects: Optional[int] = None,
enable_task_events: Optional[bool] = None,
tensor_transport: Optional[TensorTransportEnum] = None,
tensor_transport: Optional[str] = None,
):
"""Method execution stub for an actor handle.

Expand Down Expand Up @@ -2157,6 +2153,9 @@ def _actor_method_call(
if generator_backpressure_num_objects is None:
generator_backpressure_num_objects = -1

tensor_transport_enum = TensorTransportEnum.OBJECT_STORE
if tensor_transport is not None and tensor_transport != "OBJECT_STORE":
tensor_transport_enum = TensorTransportEnum.DIRECT_TRANSPORT
object_refs = worker.core_worker.submit_actor_task(
self._ray_actor_language,
self._ray_actor_id,
Expand All @@ -2171,7 +2170,7 @@ def _actor_method_call(
concurrency_group_name if concurrency_group_name is not None else b"",
generator_backpressure_num_objects,
enable_task_events,
tensor_transport.value,
tensor_transport_enum.value,
)

if num_returns == STREAMING_GENERATOR_RETURN:
Expand Down
Loading