今天我又来学习了一个新玩具:Ray

Ray 是一个开源的统一框架,用于扩展机器学习等 AI 和 Python 应用程序。
它为并行处理提供了计算层,因此您无需成为分布式系统专家。
Ray 最大限度降低了运行分布式个人和端到端机器学习工作流的复杂性

安装

pip install -U "ray[data,train,tune,serve]" 

启动

我在两个 WSL 不同的发行版本中分别启动 主节点、工作节点

ray start --head --port=6379
ray start --address='主节点IP:6379'

主节点:

(ai) root@michael:/mnt/d/Personal/Desktop# ray status
======== Autoscaler status: 2024-11-25 09:43:58.027338 ========
Node status
---------------------------------------------------------------
Active:
 1 node_04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 0.0/8.0 CPU
 0.0/1.0 GPU
 0B/8.57GiB memory
 0B/4.28GiB object_store_memory

Demands:
 (no resource demands)

加入新的工作节点:

(ai) root@michael:/mnt/d/Personal/Desktop#  ray start --address='***.**.215.**:6379'
Local node IP: ***.**.215.**
[2024-11-25 09:45:10,194 W 2030 2030] global_state_accessor.cc:463: Retrying to get node with node ID 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33
[2024-11-25 09:45:11,198 W 2030 2030] global_state_accessor.cc:463: Retrying to get node with node ID 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33

--------------------
Ray runtime started.
--------------------

To terminate the Ray runtime, run
  ray stop

主节点显示有两个 node(加入成功

(ai) root@michael:/mnt/d/Personal/Desktop# ray status
======== Autoscaler status: 2024-11-25 09:45:29.996284 ========
Node status
---------------------------------------------------------------
Active:
 1 node_04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7
 1 node_62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 0.0/16.0 CPU
 0.0/2.0 GPU
 0B/18.51GiB memory
 0B/8.55GiB object_store_memory

Demands:
 (no resource demands)

分布式计算

import ray
import time
import random

# 初始化 Ray
ray.init(ignore_reinit_error=True)

# 定义远程任务,模拟耗时计算
@ray.remote
def long_computation(task_id):
    # 模拟一些耗时操作
    computation_time = random.uniform(1, 5)  # 模拟计算耗时(1-5秒)
    time.sleep(computation_time)
    
    # 获取当前节点的资源信息
    node_info = ray.nodes()
    
    # 获取当前执行任务的节点信息
    current_node = ray.get_runtime_context().get_node_id()  # 获取当前任务执行的节点ID

    # 查找当前节点的 IP 地址
    node_ip = None
    for node in node_info:
        if node['NodeID'] == current_node:
            node_ip = node['NodeManagerAddress'].split(':')[0]
            break

    if node_ip is None:
        raise RuntimeError(f"Could not find node IP for node ID {current_node}")

    # 返回计算结果和节点信息
    return {
        "task_id": task_id,
        "computation_time": computation_time,
        "node_ip": node_ip,
        # "node_info": node_info,
        "node_id": current_node
    }

# 计算任务
task_count = 30 # 假设有 n 个任务

# 任务分配:不指定节点,让 Ray 动态选择节点
tasks = [long_computation.remote(task_id) for task_id in range(task_count)]

# 记录开始时间
start_time = time.time()

# 执行计算任务并获取结果
results = ray.get(tasks)

# 计算总耗时
total_time = time.time() - start_time

total_computation_time = sum(result['computation_time'] for result in results)

# 输出计算结果
for result in results:
    print(f"任务 ID: {result['task_id']} node_id: {result['node_id']}, 执行时间: {result['computation_time']:.2f}秒")
    print("-" * 80)

# 输出总耗时
print(f"所有任务总耗时: {total_time:.2f}秒, 子任务计算总耗时: {total_computation_time:.2f}秒")

# 关闭 Ray
ray.shutdown()

1. 初始化 Ray 环境

ray.init(ignore_reinit_error=True)
  • ray.init(): 初始化 Ray 集群或本地环境。
    • ignore_reinit_error=True:这个参数确保在 Ray 已经初始化的情况下不会重复初始化。如果我们运行多次 ray.init(),它会忽略 Ray already initialized 的错误。

2. 定义远程任务函数

@ray.remote
def long_computation(task_id):
    # 模拟一些耗时操作
    computation_time = random.uniform(1, 5)  # 模拟计算耗时(1-5秒)
    time.sleep(computation_time)
  • @ray.remote: 这是 Ray 提供的装饰器,表示该函数将会在远程执行,而不是在本地串行执行。Ray 会将函数调度到集群中可用的工作节点上。
  • computation_time: 模拟任务的执行时间,使用 random.uniform(1, 5) 生成 1 到 5 秒之间的随机数。
  • time.sleep(computation_time): 模拟计算过程中的耗时操作。

3. 获取当前节点的信息

    node_info = ray.nodes()
    current_node = ray.get_runtime_context().get_node_id()  # 获取当前任务执行的节点ID
  • ray.nodes(): 获取当前 Ray 集群中所有节点的信息(包括节点的资源、状态、IP 地址等)。返回的是一个列表,每个元素包含一个节点的详细信息。
  • ray.get_runtime_context().get_node_id(): 获取当前任务所在的节点 ID,表示当前任务是在哪个工作节点上执行的。

4. 查找并返回节点的 IP 地址

    node_ip = None
    for node in node_info:
        if node['NodeID'] == current_node:
            node_ip = node['NodeManagerAddress'].split(':')[0]
            break

    if node_ip is None:
        raise RuntimeError(f"Could not find node IP for node ID {current_node}")
  • 查找节点 IP:

    • 通过遍历 node_info,根据当前任务执行的 current_node,查找对应的节点 IP 地址。
    • node['NodeManagerAddress'].split(':')[0] 提取出节点的 IP 地址部分。
  • 错误处理: 如果找不到当前节点的 IP 地址,抛出 RuntimeError

5. 返回任务计算结果和节点信息

    return {
        "task_id": task_id,
        "computation_time": computation_time,
        "node_ip": node_ip,
        "node_id": current_node
    }
  • 返回任务结果:任务执行结束后,返回一个字典,包含:
    • task_id: 当前任务的 ID。
    • computation_time: 当前任务执行的时间。
    • node_ip: 当前任务执行的节点 IP 地址。
    • node_id: 当前任务执行的节点 ID。

6. 任务分配与执行

task_count = 30 # 假设有 n 个任务

tasks = [long_computation.remote(task_id) for task_id in range(task_count)]
  • 任务分配:我们模拟了 30 个任务 (task_count = 30)。通过 long_computation.remote(task_id),为每个任务调用远程执行函数,这些任务将被 Ray 动态调度到集群中的各个节点执行。

7. 执行任务并获取结果

start_time = time.time()
results = ray.get(tasks)
  • ray.get(tasks): ray.get() 会阻塞主线程,直到所有的任务完成并返回结果。通过 ray.get() 获取任务结果。

8. 计算总耗时

total_time = time.time() - start_time
total_computation_time = sum(result['computation_time'] for result in results)
  • 总耗时:通过计算 ray.get() 完成的时间减去任务开始时的时间,得到所有任务的总耗时。
  • 计算所有任务的总计算时间:通过对所有任务的 computation_time 求和,得到子任务的计算总时间。

9. 输出任务结果

for result in results:
    print(f"任务 ID: {result['task_id']} node_id: {result['node_id']}, 执行时间: {result['computation_time']:.2f}秒")
  • 输出每个任务的 ID、节点 ID 和执行时间,通过遍历 results 输出每个任务的详细信息,帮助分析任务的执行情况。

10. 输出总耗时

print(f"所有任务总耗时: {total_time:.2f}秒, 子任务计算总耗时: {total_computation_time:.2f}秒")
  • 输出所有任务的总耗时和计算总耗时,帮助评估集群的计算效率。

11. 关闭 Ray 环境

ray.shutdown()
  • ray.shutdown(): 关闭 Ray 环境,释放资源。

输出结果

(ai) root@michael:/mnt/d/Personal/Desktop# python 1.py
2024-11-25 11:07:27,127 INFO worker.py:1634 -- Connecting to existing Ray cluster at address: ***.30.215.***:6379...
2024-11-25 11:07:27,142 INFO worker.py:1810 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265
任务 ID: 0 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 3.74--------------------------------------------------------------------------------
任务 ID: 1 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 1.41--------------------------------------------------------------------------------
任务 ID: 2 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 3.17--------------------------------------------------------------------------------
任务 ID: 3 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 3.79--------------------------------------------------------------------------------
任务 ID: 4 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 3.83--------------------------------------------------------------------------------
任务 ID: 5 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 1.13--------------------------------------------------------------------------------
任务 ID: 6 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 1.60--------------------------------------------------------------------------------
任务 ID: 7 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.48--------------------------------------------------------------------------------
任务 ID: 8 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 1.53--------------------------------------------------------------------------------
任务 ID: 9 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 3.56--------------------------------------------------------------------------------
任务 ID: 10 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 2.28--------------------------------------------------------------------------------
任务 ID: 11 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 3.17--------------------------------------------------------------------------------
任务 ID: 12 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.30--------------------------------------------------------------------------------
任务 ID: 13 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 2.25--------------------------------------------------------------------------------
任务 ID: 14 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 2.05--------------------------------------------------------------------------------
任务 ID: 15 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.12--------------------------------------------------------------------------------
任务 ID: 16 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 3.84--------------------------------------------------------------------------------
任务 ID: 17 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 2.36--------------------------------------------------------------------------------
任务 ID: 18 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.84--------------------------------------------------------------------------------
任务 ID: 19 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.59--------------------------------------------------------------------------------
任务 ID: 20 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.44--------------------------------------------------------------------------------
任务 ID: 21 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.93--------------------------------------------------------------------------------
任务 ID: 22 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 3.84--------------------------------------------------------------------------------
任务 ID: 23 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 1.90--------------------------------------------------------------------------------
任务 ID: 24 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 3.76--------------------------------------------------------------------------------
任务 ID: 25 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.48--------------------------------------------------------------------------------
任务 ID: 26 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.07--------------------------------------------------------------------------------
任务 ID: 27 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.70--------------------------------------------------------------------------------
任务 ID: 28 node_id: 62e30ecfa761cf342fb7533eb2e6e88c1c0aa6d6308e8885066d9f33, 执行时间: 4.86--------------------------------------------------------------------------------
任务 ID: 29 node_id: 04825470ccaed6b7e9ca3043a7da9a9b6ff2c33ad2dc6b5fa4ed60e7, 执行时间: 4.58--------------------------------------------------------------------------------
所有任务总耗时: 11.61, 子任务计算总耗时: 103.59

可以看到任务被动态分配到各个节点进行计算,效率是非常高

主要思路总结

  1. Ray 远程任务执行:通过 @ray.remote 装饰器将任务分配到集群中的各个节点并行执行。
  2. 节点信息获取:任务在执行时获取当前任务执行所在的节点信息,并根据该信息找到节点的 IP 地址。
  3. 并行计算与任务调度:Ray 自动调度任务到集群中的空闲节点,使得多个计算任务可以并行执行。
  4. 性能分析:通过计算每个任务的执行时间和节点信息,帮助分析分布式计算的性能。
  5. 动态任务分配:无需手动指定节点,Ray 自动选择最佳节点执行任务。

这种方法非常适合需要处理大量独立计算任务的场景,比如分布式机器学习数据处理等。通过合理的并行化,能大大提高计算效率,减少整体计算时间。

参考

https://docs.ray.io/en/latest/ray-overview/installation.html

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐