Running on Ray#
You can run Daft on Ray in multiple ways:
Simple Local Setup#
If you want to start a single node ray cluster on your local machine, you can do the following:
1 2 | |
This should output something like:
1 2 3 4 5 6 7 8 9 | |
You can take the IP address and port and pass it to Daft with set_runner_ray:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | |
By default, if no address is specified, Daft will spin up a Ray cluster locally on your machine. If you are running Daft on a powerful machine (such as an AWS P3 machine which is equipped with multiple GPUs) this is already very useful because Daft can parallelize its execution of computation across your CPUs and GPUs.
Connecting to Remote Ray Clusters#
If you already have your own Ray cluster running remotely, you can connect Daft to it by supplying an address with set_runner_ray:
1 | |
For more information about the address keyword argument, please see the Ray documentation on initialization.
Using Ray Client#
The Ray client is a quick way to get started with running tasks and retrieving their results on Ray using Python.
Warning
To run tasks using the Ray client, the version of Daft and the minor version (eg. 3.9, 3.10) of Python must match between client and server.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | |
| Output | |
|---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
Using Ray Jobs#
Ray jobs allow for more control and observability over using the Ray client. In addition, your entire code runs on Ray, which means it is not constrained by the compute, network, library versions, or availability of your local machine.
1 2 3 4 5 6 7 8 9 10 11 12 | |
To submit this script as a job, use the Ray CLI, which can be installed with pip install "ray[default]".
1 2 3 4 5 | |
Note
The runtime env parameter specifies that Daft should be installed on the Ray workers. Alternative methods of including Daft in the worker dependencies can be found here.
For more information about Ray jobs, see Ray docs -> Ray Jobs Overview.
Autoscaling and downscaling#
When Daft runs on a Ray cluster managed by the Ray autoscaler (including KubeRay), it can send scale-up requests based on pending task demand. Ray's autoscaler request API is sticky: without additional coordination, the autoscaler may keep previously requested capacity even when the workload becomes idle.
Daft can optionally retire idle Flotilla workers (scale-in) and clear outstanding autoscaler requests to make it easier for Ray to scale the cluster back down. This feature is opt-in.
You can enable it via set_runner_ray:
1 2 3 4 5 6 7 8 9 | |
Or via environment variables (useful for Ray Jobs / KubeRay manifests): DAFT_AUTOSCALING_DOWNSCALE_ENABLED (default: false), DAFT_AUTOSCALING_DOWNSCALE_IDLE_SECONDS (default: 60), DAFT_AUTOSCALING_MIN_SURVIVOR_WORKERS (default: 1), and DAFT_AUTOSCALING_PENDING_RELEASE_EXCLUDE_SECONDS (default: 120).
