使用 Ray 进行大规模分布式数据处理
模拟一些耗时操作computation_time = random.uniform(1, 5) # 模拟计算耗时(1-5秒): 这是 Ray 提供的装饰器,表示该函数将会在远程执行,而不是在本地串行执行。Ray 会将函数调度到集群中可用的工作节点上。: 模拟任务的执行时间,使用生成 1 到 5 秒之间的随机数。: 模拟计算过程中的耗时操作。Ray 远程任务执行:通过装饰器将任务分配到集群中的各个节
文章目录
今天我又来学习了一个新玩具:Ray
Ray 是一个开源的统一框架,用于扩展机器学习等 AI 和 Python 应用程序。
它为并行处理提供了计算层,因此您无需成为分布式系统专家。
Ray 最大限度地降低了运行分布式个人和端到端机器学习工作流的复杂性
安装
pip install -U "ray[data,train,tune,serve]"
启动
我在两个 WSL 不同的发行版本中分别启动 主节点、工作节点
ray start --head --port=6379
ray start --address='主节点IP:6379'
主节点:
(ai) root@michael:/mnt/d/Personal/Desktop# ray status
======== Autoscaler status: 2024-11-25 09:43:58.027338 ========
Node status
---------------------------------------------------------------
Active:
1 node_04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7
Pending:
(no pending nodes)
Recent failures:
(no failures)
Resources
---------------------------------------------------------------
Usage:
0.0/8.0 CPU
0.0/1.0 GPU
0B/8.57GiB memory
0B/4.28GiB object_store_memory
Demands:
(no resource demands)
加入新的工作节点:
(ai) root@michael:/mnt/d/Personal/Desktop# ray start --address='***.**.215.**:6379'
Local node IP: ***.**.215.**
[2024-11-25 09:45:10,194 W 2030 2030] global_state_accessor.cc:463: Retrying to get node with node ID 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33
[2024-11-25 09:45:11,198 W 2030 2030] global_state_accessor.cc:463: Retrying to get node with node ID 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33
--------------------
Ray runtime started.
--------------------
To terminate the Ray runtime, run
ray stop
主节点显示有两个 node(加入成功)
(ai) root@michael:/mnt/d/Personal/Desktop# ray status
======== Autoscaler status: 2024-11-25 09:45:29.996284 ========
Node status
---------------------------------------------------------------
Active:
1 node_04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7
1 node_62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33
Pending:
(no pending nodes)
Recent failures:
(no failures)
Resources
---------------------------------------------------------------
Usage:
0.0/16.0 CPU
0.0/2.0 GPU
0B/18.51GiB memory
0B/8.55GiB object_store_memory
Demands:
(no resource demands)
分布式计算
import ray
import time
import random
# 初始化 Ray
ray.init(ignore_reinit_error=True)
# 定义远程任务,模拟耗时计算
@ray.remote
def long_computation(task_id):
# 模拟一些耗时操作
computation_time = random.uniform(1, 5) # 模拟计算耗时(1-5秒)
time.sleep(computation_time)
# 获取当前节点的资源信息
node_info = ray.nodes()
# 获取当前执行任务的节点信息
current_node = ray.get_runtime_context().get_node_id() # 获取当前任务执行的节点ID
# 查找当前节点的 IP 地址
node_ip = None
for node in node_info:
if node['NodeID'] == current_node:
node_ip = node['NodeManagerAddress'].split(':')[0]
break
if node_ip is None:
raise RuntimeError(f"Could not find node IP for node ID {current_node}")
# 返回计算结果和节点信息
return {
"task_id": task_id,
"computation_time": computation_time,
"node_ip": node_ip,
# "node_info": node_info,
"node_id": current_node
}
# 计算任务
task_count = 30 # 假设有 n 个任务
# 任务分配:不指定节点,让 Ray 动态选择节点
tasks = [long_computation.remote(task_id) for task_id in range(task_count)]
# 记录开始时间
start_time = time.time()
# 执行计算任务并获取结果
results = ray.get(tasks)
# 计算总耗时
total_time = time.time() - start_time
total_computation_time = sum(result['computation_time'] for result in results)
# 输出计算结果
for result in results:
print(f"任务 ID: {result['task_id']} node_id: {result['node_id']}, 执行时间: {result['computation_time']:.2f}秒")
print("-" * 80)
# 输出总耗时
print(f"所有任务总耗时: {total_time:.2f}秒, 子任务计算总耗时: {total_computation_time:.2f}秒")
# 关闭 Ray
ray.shutdown()
1. 初始化 Ray 环境
ray.init(ignore_reinit_error=True)
ray.init(): 初始化 Ray 集群或本地环境。ignore_reinit_error=True:这个参数确保在 Ray 已经初始化的情况下不会重复初始化。如果我们运行多次ray.init(),它会忽略Ray already initialized的错误。
2. 定义远程任务函数
@ray.remote
def long_computation(task_id):
# 模拟一些耗时操作
computation_time = random.uniform(1, 5) # 模拟计算耗时(1-5秒)
time.sleep(computation_time)
@ray.remote: 这是 Ray 提供的装饰器,表示该函数将会在远程执行,而不是在本地串行执行。Ray 会将函数调度到集群中可用的工作节点上。computation_time: 模拟任务的执行时间,使用random.uniform(1, 5)生成 1 到 5 秒之间的随机数。time.sleep(computation_time): 模拟计算过程中的耗时操作。
3. 获取当前节点的信息
node_info = ray.nodes()
current_node = ray.get_runtime_context().get_node_id() # 获取当前任务执行的节点ID
ray.nodes(): 获取当前 Ray 集群中所有节点的信息(包括节点的资源、状态、IP 地址等)。返回的是一个列表,每个元素包含一个节点的详细信息。ray.get_runtime_context().get_node_id(): 获取当前任务所在的节点 ID,表示当前任务是在哪个工作节点上执行的。
4. 查找并返回节点的 IP 地址
node_ip = None
for node in node_info:
if node['NodeID'] == current_node:
node_ip = node['NodeManagerAddress'].split(':')[0]
break
if node_ip is None:
raise RuntimeError(f"Could not find node IP for node ID {current_node}")
-
查找节点 IP:
- 通过遍历
node_info,根据当前任务执行的current_node,查找对应的节点 IP 地址。 node['NodeManagerAddress'].split(':')[0]提取出节点的 IP 地址部分。
- 通过遍历
-
错误处理: 如果找不到当前节点的 IP 地址,抛出
RuntimeError。
5. 返回任务计算结果和节点信息
return {
"task_id": task_id,
"computation_time": computation_time,
"node_ip": node_ip,
"node_id": current_node
}
- 返回任务结果:任务执行结束后,返回一个字典,包含:
task_id: 当前任务的 ID。computation_time: 当前任务执行的时间。node_ip: 当前任务执行的节点 IP 地址。node_id: 当前任务执行的节点 ID。
6. 任务分配与执行
task_count = 30 # 假设有 n 个任务
tasks = [long_computation.remote(task_id) for task_id in range(task_count)]
- 任务分配:我们模拟了 30 个任务 (
task_count = 30)。通过long_computation.remote(task_id),为每个任务调用远程执行函数,这些任务将被 Ray 动态调度到集群中的各个节点执行。
7. 执行任务并获取结果
start_time = time.time()
results = ray.get(tasks)
ray.get(tasks):ray.get()会阻塞主线程,直到所有的任务完成并返回结果。通过ray.get()获取任务结果。
8. 计算总耗时
total_time = time.time() - start_time
total_computation_time = sum(result['computation_time'] for result in results)
- 总耗时:通过计算
ray.get()完成的时间减去任务开始时的时间,得到所有任务的总耗时。 - 计算所有任务的总计算时间:通过对所有任务的
computation_time求和,得到子任务的计算总时间。
9. 输出任务结果
for result in results:
print(f"任务 ID: {result['task_id']} node_id: {result['node_id']}, 执行时间: {result['computation_time']:.2f}秒")
- 输出每个任务的 ID、节点 ID 和执行时间,通过遍历
results输出每个任务的详细信息,帮助分析任务的执行情况。
10. 输出总耗时
print(f"所有任务总耗时: {total_time:.2f}秒, 子任务计算总耗时: {total_computation_time:.2f}秒")
- 输出所有任务的总耗时和计算总耗时,帮助评估集群的计算效率。
11. 关闭 Ray 环境
ray.shutdown()
ray.shutdown(): 关闭 Ray 环境,释放资源。
输出结果
(ai) root@michael:/mnt/d/Personal/Desktop# python 1.py
2024-11-25 11:07:27,127 INFO worker.py:1634 -- Connecting to existing Ray cluster at address: ***.30.215.***:6379...
2024-11-25 11:07:27,142 INFO worker.py:1810 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265
任务 ID: 0 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 3.74秒
--------------------------------------------------------------------------------
任务 ID: 1 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 1.41秒
--------------------------------------------------------------------------------
任务 ID: 2 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 3.17秒
--------------------------------------------------------------------------------
任务 ID: 3 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 3.79秒
--------------------------------------------------------------------------------
任务 ID: 4 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 3.83秒
--------------------------------------------------------------------------------
任务 ID: 5 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 1.13秒
--------------------------------------------------------------------------------
任务 ID: 6 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 1.60秒
--------------------------------------------------------------------------------
任务 ID: 7 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.48秒
--------------------------------------------------------------------------------
任务 ID: 8 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 1.53秒
--------------------------------------------------------------------------------
任务 ID: 9 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 3.56秒
--------------------------------------------------------------------------------
任务 ID: 10 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 2.28秒
--------------------------------------------------------------------------------
任务 ID: 11 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 3.17秒
--------------------------------------------------------------------------------
任务 ID: 12 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.30秒
--------------------------------------------------------------------------------
任务 ID: 13 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 2.25秒
--------------------------------------------------------------------------------
任务 ID: 14 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 2.05秒
--------------------------------------------------------------------------------
任务 ID: 15 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.12秒
--------------------------------------------------------------------------------
任务 ID: 16 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 3.84秒
--------------------------------------------------------------------------------
任务 ID: 17 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 2.36秒
--------------------------------------------------------------------------------
任务 ID: 18 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.84秒
--------------------------------------------------------------------------------
任务 ID: 19 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.59秒
--------------------------------------------------------------------------------
任务 ID: 20 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.44秒
--------------------------------------------------------------------------------
任务 ID: 21 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.93秒
--------------------------------------------------------------------------------
任务 ID: 22 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 3.84秒
--------------------------------------------------------------------------------
任务 ID: 23 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 1.90秒
--------------------------------------------------------------------------------
任务 ID: 24 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 3.76秒
--------------------------------------------------------------------------------
任务 ID: 25 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.48秒
--------------------------------------------------------------------------------
任务 ID: 26 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.07秒
--------------------------------------------------------------------------------
任务 ID: 27 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.70秒
--------------------------------------------------------------------------------
任务 ID: 28 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.86秒
--------------------------------------------------------------------------------
任务 ID: 29 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.58秒
--------------------------------------------------------------------------------
所有任务总耗时: 11.61秒, 子任务计算总耗时: 103.59秒
可以看到任务被动态分配到各个节点进行计算,效率是非常高的
主要思路总结
- Ray 远程任务执行:通过
@ray.remote装饰器将任务分配到集群中的各个节点并行执行。 - 节点信息获取:任务在执行时获取当前任务执行所在的节点信息,并根据该信息找到节点的 IP 地址。
- 并行计算与任务调度:Ray 自动调度任务到集群中的空闲节点,使得多个计算任务可以并行执行。
- 性能分析:通过计算每个任务的执行时间和节点信息,帮助分析分布式计算的性能。
- 动态任务分配:无需手动指定节点,Ray 自动选择最佳节点执行任务。
这种方法非常适合需要处理大量独立计算任务的场景,比如分布式机器学习、数据处理等。通过合理的并行化,能大大提高计算效率,减少整体计算时间。
参考
https://docs.ray.io/en/latest/ray-overview/installation.html
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)