阿里达摩院最新框架FederatedScope来了!让联邦学习从可用到好用https://baijiahao.baidu.com/s?id=1731964837575449214&wfr=spider&for=pc

FederatedScope框架部署流程(基于federatedscope 0.2.0)

安装FSC框架
从https://github.com/alibaba/FederatedScope/releases下载 v 0.2.0版本
cd FederatedScope
配置虚拟环境
conda create -n fs python=3.9
conda activate fs
conda install -y pytorch=1.10.1 torchvision=0.11.2 torchaudio=0.10.1 torchtext=0.11.1 cudatoolkit=11.3 -c pytorch -c conda-forge
pip install -e .
自定义dataset
  1. 在federatedscope/contrib/data文件夹下创建.py文件

  2. 定义 load函数

  3. register your data with a keyword, such as `“mydata”

load_my_data 函数用于加载和准备训练和评估所需的数据。它返回一个字典 data_dict,其中包含每个客户端的训练、测试和验证数据,以及配置对象 config

data_dict 的结构如下:

  • 键是客户端 ID,是整数。
  • 值是包含三个键 ‘train’、‘test’ 和 ‘val’ 的字典。
  • 每个键的值可以是 PyTorch 的 DataLoader 对象或自定义的 Data 对象。

config 对象包含训练和评估过程的配置参数。这个函数不修改它,但将它返回供后续函数使用。

 def load_my_data(config):
         r"""
             NOTE: client_id 0 is SERVER for global evaluation!
        
     Returns:
           dict: {
                     'client_id': {
                         'train': DataLoader or Data,
                         'test': DataLoader or Data,
                         'val': DataLoader or Data,
                     }
                 }
     """
     ...
     return data_dict, config
   

这段代码是为了注册一个名为 mydata 的数据集,并将其加载函数 load_my_data 注册为处理该数据集的函数。具体来说,它执行以下步骤:

  1. 定义一个名为 call_my_data 的函数,该函数检查 config 对象中的 data.type 是否为 "mydata"
  2. 如果是,则调用 load_my_data 函数来加载和准备数据,并将其返回。
  3. call_my_data 函数和数据集类型 "mydata" 注册到 register_data 函数中,以便以后可以使用 mydata 类型的数据集。

通过这种方式,可以将不同类型的数据集注册到代码库中,并根据需要使用不同的数据集类型进行训练和评估。

 from federatedscope.register import register_data
    
 def call_my_data(config):
     if config.data.type == "mydata":
         data, modified_config = load_my_data(config)
         return data, modified_config
    
 register_data("mydata", call_my_data)

以MINST为例

def load_my_data(config):
      import numpy as np
      from torchvision import transforms
      from torchvision.datasets import MNIST
      from torch.utils.data import DataLoader
    
      # Build data   处理数据格式
      transform = transforms.Compose([
          transforms.ToTensor(),
          transforms.Normalize(mean=[0.1307], std=[0.3081])
      ])
      data_train = MNIST(root='data', train=True, transform=transform, download=True)
      data_test = MNIST(root='data', train=False, transform=transform, download=True)
    
      # Split data into dict  
      data_dict = dict()
      train_per_client = len(data_train) // config.federate.client_num
      test_per_client = len(data_test) // config.federate.client_num
    
      for client_idx in range(1, config.federate.client_num + 1):
          dataloader_dict = {
              'train':
              DataLoader([
                  data_train[i]
                  for i in range((client_idx - 1) *
                                 train_per_client, client_idx * train_per_client)
              ],
                         config.data.batch_size,
                         shuffle=config.data.shuffle),
              'test':
              DataLoader([
                  data_test[i]
                  for i in range((client_idx - 1) * test_per_client, client_idx *
                                 test_per_client)
              ],
                         config.data.batch_size,
                         shuffle=False)
          }
          data_dict[client_idx] = dataloader_dict
    
      return data_dict, config
 from federatedscope.register import register_data
    
 def call_my_data(config):
     if config.data.type == "mydata":
         data, modified_config = load_my_data(config)
         return data, modified_config
    
 register_data("mydata", call_my_data)
自定义model

和自定义dataset类似,federatedscope/contrib/model下创建.py文件,定义load函数后用register注册

以ConvNet2为例

import torch
    
    
 class MyNet(torch.nn.Module):
     def __init__(self,
                  in_channels,
                  h=32,
                  w=32,
                  hidden=2048,
                  class_num=10,
                  use_bn=True):
         super(MyNet, self).__init__()
         self.conv1 = torch.nn.Conv2d(in_channels, 32, 5, padding=2)
         self.conv2 = torch.nn.Conv2d(32, 64, 5, padding=2)
         self.fc1 = torch.nn.Linear((h // 2 // 2) * (w // 2 // 2) * 64, hidden)
         self.fc2 = torch.nn.Linear(hidden, class_num)
         self.relu = torch.nn.ReLU(inplace=True)
         self.maxpool = torch.nn.MaxPool2d(2)
    
     def forward(self, x):
         x = self.conv1(x)
         x = self.maxpool(self.relu(x))
         x = self.conv2(x)
         x = self.maxpool(self.relu(x))
         x = torch.nn.Flatten()(x)
         x = self.relu(self.fc1(x))
         x = self.fc2(x)
         return x
    
    
 def load_my_net(model_config, local_data):
     # You can also build models without local_data
     data = next(iter(local_data['train']))
     model = MyNet(in_channels=data[0].shape[1],
                   h=data[0].shape[2],
                   w=data[0].shape[3],
                   hidden=model_config.hidden,
                   class_num=model_config.out_channels)
     return model
     
 from federatedscope.register import register_model
    
 def call_my_net(model_config, local_data):
     if model_config.type == "mynet":
         model = load_my_net(model_config, local_data)
         return model
    
 register_model("mynet", call_my_net)
自定义trainer

在federatedscope/contrib/trainer下创建.py文件,继承GeneralTorchTrainer类创建自己的trainer,定义train函数,定义训练过程。跳过框架中的context部分,之后注册

以deepmg为例

from federatedscope.core.auxiliaries.utils import get_random
import inspect
from federatedscope.register import register_trainer
from federatedscope.core.trainers import GeneralTorchTrainer
from federatedscope.contrib.model.losses import SoftDiceLoss



class MyTorchTrainer(GeneralTorchTrainer):

    def __init__(self, model, data, device, config, only_for_eval=False, monitor=None):

        super().__init__(model, data, device, config, only_for_eval, monitor)
        self.cfg = config
        self.model = model
        self.data = data
        self.device = device
        self.criterion = SoftDiceLoss()

    def train(self, **kwargs):
        self.model.to(self.device)
        self.model.train()
        sample_interval = self.cfg.sample_interval

        total_loss = tot_iters = best_f1_score = total_metric = 0
        """start training"""
        for epoch in range(1, self.cfg.n_epochs + 1):
            for i, data in enumerate(self.data['train']):
                self.model.set_input(data)
                iter_loss_all, iter_loss_road, iter_loss_cl, iter_metrics, iter_road_metrics = self.model.optimize_parameters()
                iter_metrics = iter_metrics.numpy()
                iter_road_metrics = iter_road_metrics.numpy()
                print(
                    "[Epoch %d/%d] [Batch %d/%d] [Loss: %f] [Road Loss: %f] [CL Loss: %f] [Precision: %f] [Recall: %f] [F1: %f] [Road IOU: %f] [CL IOU: %f]" % (
                        epoch, self.cfg.n_epochs, i, len(self.data['train']), iter_loss_all.item(),
                        iter_loss_road.item(),
                        iter_loss_cl.item(), iter_metrics[0], iter_metrics[1], iter_metrics[2], iter_road_metrics[3],
                        iter_metrics[3]))
                tot_iters += 1

                """eval"""
                if tot_iters % sample_interval == 0:a
                    self.model.eval()
                    tot_loss = 0
                    tot_metrics = 0
                    tot_road_metrics = 0
                    for j, eval_data in enumerate(self.data['val']):
                        self.model.set_input(eval_data)
                        _, iter_loss, iter_metrics, iter_road_metrics = self.model.test()
                        tot_loss += iter_loss.item() * data['centerline'].shape[0] / 8
                        tot_metrics += iter_metrics
                        tot_road_metrics += iter_road_metrics
                    tot_loss /= len(self.data['val'])
                    tot_metrics /= len(self.data['val'])
                    tot_road_metrics /= len(self.data['val'])



            self.model.update_learning_rate()

        # _hook_on_fit_end
        num_samples = len(self.data['val'])
        num_train_data = len(self.data['train'])
        total_metric /= num_samples

        for name, param in self.model.named_parameters():
            if 'bias' in name or '.1' in name or '.4' in name:
                continue
            nbafl_scale_u = 0.5 * param.data.norm(2) * self.cfg.nbafl.constant / \
                num_train_data / self.cfg.nbafl.epsilon
            noise = get_random("Normal", param.shape, {
                "loc": 0,
                "scale": nbafl_scale_u
            }, param.device)
            param.data = param.data + noise
        return num_samples, self.model.cpu().state_dict(), \
               {'loss_total': tot_loss, 'precision': tot_metrics[0],
                'recall': tot_metrics[1], 'F1': tot_metrics[2], 'CL IOU': tot_metrics[3]}

    def evaluate(self, target_data_split_name, rnd):
        import torch
        with torch.no_grad():
            self.model.to(self.device)
            self.model.eval()
            total_loss = num_samples = total_metrics = total_road_metrics = 0
            bestf1 = 0.0
            # _hook_on_batch_start_init
            for i, data in enumerate(self.data[target_data_split_name]):
                # _hook_on_batch_forward
                self.model.set_input(data)
                _, iter_loss, iter_metrics, iter_road_metrics = self.model.test()
                total_loss += iter_loss.item() * data['centerline'].shape[0] / 8
                total_metrics += iter_metrics
                total_road_metrics += iter_road_metrics
            num_samples = len(self.data[target_data_split_name])
            total_loss /= num_samples
            total_metrics /= num_samples
            self.model.save_networks(rnd)
            if total_metrics[2] > bestf1:
                bestf1 = total_metrics[2]
                # print(dict(self.model.state_dict()))
                self.model.save_networks('latest')
            # self.model.save_networks(100)
            return {
                f'{target_data_split_name}_loss': total_loss,
                f'{target_data_split_name}_total': num_samples,
                f'{target_data_split_name}_precision': total_metrics[0],
                f'{target_data_split_name}_recall': total_metrics[1],
                f'{target_data_split_name}_F1': total_metrics[2],
                f'{target_data_split_name}_CLIOU': total_metrics[3]
            }

    def update(self, model_parameters, strict=True):

        self.model.load_state_dict(model_parameters, True)

    def get_model_para(self):
        return self.model.cpu().state_dict()

    def print_trainer_meta_info(self):
        sign = inspect.signature(self.__init__).parameters.values()
        meta_info = tuple([(val.name, getattr(self, val.name))
                           for val in sign])
        return f'{self.__class__.__name__}{meta_info}'


def call_my_torch_trainer(trainer_type):
    if trainer_type == 'mydmtrainer':
        trainer_builder = MyTorchTrainer
        return trainer_builder


register_trainer('mydmtrainer', call_my_torch_trainer)
修改config文件

通常需要定义联邦学习需要的参数,具体参数信息详见federatedscope/core/config

框架中的cfg信息都是两段式,如果需要自定义config,如框架中不自带的cfg.trainer.xx,需要和model,dataset的自定义一样,在federatedscope/contrib/config下创建文件后注册

def extend_training_cfg(cfg):
    # ------------------------------------------------------------------------ #
    # Trainer related options
    # ------------------------------------------------------------------------ #
    cfg.trainer = CN()
   
    cfg.trainer.type = 'general'
    cfg.trainer.finetune = CN()
    cfg.trainer.finetune.steps = 0
    cfg.trainer.finetune.only_psn = True
    cfg.trainer.finetune.stepsize = 0.01
        
    # --------------- register corresponding check function ----------
    cfg.register_cfg_check_fun(assert_training_cfg)

def assert_training_cfg(cfg):
    pass

from federatedscope.register import register_config
register_config("fl_training", extend_training_cfg)


注册之后就可以在自己的config文件中使用cfg.fl_training.xx了。

运行
python federatedscope/main.py --cfg scripts/example_configs/femnist.yaml

1、联邦学习概述

联邦学习是一种新兴的机器学习方法,它允许多个本地设备或者数据中心在不共享数据的情况下进行模型训练,从而保护数据隐私性。联邦学习的基本思想是将模型训练过程分成多个阶段,每个阶段由本地设备或数据中心完成。在每个阶段结束后,这些本地设备或数据中心将部分模型参数上传到中央服务器,中央服务器根据上传的参数进行模型更新。通过这种方式,联邦学习可以在不泄露数据隐私的前提下完成模型训练。

联邦学习的基本流程如下:

  1. 中央服务器将模型参数初始化,并向本地设备或数据中心发送模型参数。

  2. 本地设备或数据中心在本地训练模型,并将部分模型参数上传到中央服务器。

  3. 中央服务器收到上传的模型参数,并根据这些参数更新模型。

  4. 中央服务器将更新后的模型参数发送给本地设备或数据中心,并重复上述过程直到模型训练完成。

联邦学习的优点包括:

  1. 保护数据隐私性:联邦学习允许本地设备或数据中心在不共享数据的情况下进行模型训练,从而保护数据隐私性。

  2. 可以处理大规模数据:联邦学习允许多个本地设备或数据中心并行进行模型训练,从而可以处理大规模数据。

  3. 模型具有通用性:通过联邦学习训练的模型可以适用于多个本地设备或数据中心,从而提高了模型的通用性。

  4. 可以减少数据传输:联邦学习允许本地设备或数据中心在本地训练模型,并仅上传部分模型参数到中央服务器,从而减少了数据传输量。

联邦学习的缺点包括:

  1. 模型训练时间较长:由于联邦学习需要多个设备或数据中心进行协作,因此模型训练时间较长。

  2. 需要解决安全性问题:由于本地设备或数据中心在训练模型时可能存在攻击风险,因此需要采取相应的安全措施来保证模型训练的安全性。

  3. 模型准确性可能受限:由于本地设备或数据中心在训练模型时可能存在数据偏差,因此模型准确性可能受限。

总的来说,联邦学习是一种非常有前途的机器学习方法,它可以在保护数据隐私的前提下完成模型训练,并可以处理大规模数据。随着联邦学习技术的不断发展,它的应用前景将会更加广泛。

2、联邦学习的发展与分类

联邦学习是一种新兴的机器学习方法,它的发展可以分为以下几个阶段:

  1. 静态联邦学习:最早的联邦学习方法是静态联邦学习,它将所有参与方的数据汇总在一起进行模型训练。这种方法虽然可以保护数据隐私性,但是需要将所有数据传输到中央服务器,因此传输量较大,而且参与方之间需要信任中央服务器。

  2. 基于聚合的联邦学习:基于聚合的联邦学习是一种基于平均聚合的联邦学习方法,它将本地模型参数上传到中央服务器后,中央服务器根据加权平均聚合这些参数,从而更新全局模型。这种方法可以减少数据传输量,而且参与方之间不需要信任中央服务器。

  3. 基于差分隐私的联邦学习:基于差分隐私的联邦学习是一种采用差分隐私保护数据隐私性的联邦学习方法。它通过添加噪声来保护本地数据,从而避免了数据隐私泄露的风险。

  4. 动态联邦学习:动态联邦学习是一种可以在运行时动态加入或删除参与方的联邦学习方法。它可以适应参与方的变化,同时保护数据隐私性和模型训练效率。

除了按照不同的方法分类,联邦学习还可以按照应用场景进行分类,包括:

  1. 医疗健康领域:在医疗健康领域,联邦学习可以用于多个医疗机构之间的模型训练,从而提高医疗诊断的准确性。

  2. 金融领域:在金融领域,联邦学习可以用于多个银行之间的信用评估模型训练,从而提高信用评估的准确性。

  3. 交通领域:在交通领域,联邦学习可以用于多个城市之间的交通流量预测模型训练,从而提高交通流量预测的准确性。

总的来说,联邦学习是一种非常有前途的机器学习方法,它可以在保护数据隐私的前提下完成模型训练,并可以处理大规模数据。随着联邦学习技术的不断发展,它的应用前景将会更加广泛。

3、联邦学习开源平台

目前,有许多联邦学习开源平台可供选择。以下是一些常见的联邦学习开源平台:

  1. TensorFlow Federated:由谷歌开发的联邦学习开源平台,基于 TensorFlow 框架,提供了用于联邦学习的 API 和工具。

  2. PySyft:由 OpenMined 社区开发的联邦学习开源平台,基于 PyTorch 框架,提供了用于安全联邦学习的 API 和工具。

  3. IBM Federated Learning:由 IBM 开发的联邦学习平台,提供了用于联邦学习的 API 和工具,支持 TensorFlow 和 PyTorch 等框架。

  4. FATE:由 WeBank 开发的联邦学习平台,提供了用于联邦学习的 API 和工具,支持 TensorFlow 和 PyTorch 等框架。

  5. Flower:由 Adap 研究团队开发的联邦学习平台,提供了用于联邦学习的 API 和工具,支持 TensorFlow 和 PyTorch 等框架。

这些联邦学习开源平台都提供了丰富的功能和工具,可以帮助开发人员更轻松地实现联邦学习应用。同时,它们也都有活跃的社区支持和更新,可以帮助开发人员及时解决问题和更新新功能。

4、联邦学习的应用及必读论文

联邦学习在许多领域都有广泛的应用,以下是一些应用示例:

  1. 医疗健康:联邦学习可以应用于医疗图像识别、疾病预测、药物发现等领域。

  2. 金融:联邦学习可以应用于信用评估、欺诈检测、风险评估等领域。

  3. 物联网:联邦学习可以应用于传感器数据分析、设备故障检测、环境监测等领域。

  4. 零售:联邦学习可以应用于客户行为分析、推荐系统、销售预测等领域。

以下是一些必读的联邦学习论文:

  1. McMahan, H. Brendan, et al. “Communication-efficient learning of deep networks from decentralized data.” arXiv preprint arXiv:1602.05629 (2016).

  2. Li, T., Sahu, A. K., Zaheer, M., Sanjabi, M., Talwalkar, A., & Smith, V. (2018). Federated optimization in heterogeneous networks. arXiv preprint arXiv:1812.06127.

  3. Kairouz, P., McMahan, H. B., Avent, B., Bellet, A., Bennis, M., Bhagoji, A. N., … & Song, D. (2019). Advances and open problems in federated learning. arXiv preprint arXiv:1912.04977.

  4. Yang, Q., Liu, Y., Chen, T., & Tong, Y. (2019). Federated machine learning: Concept and applications. ACM Transactions on Intelligent Systems and Technology (TIST), 10(2), 1-19.

  5. Bonawitz, K., Ivanov, V., Kreuter, B., Marcedone, A., McMahan, H. B., Patel, S., … & Yurochkin, M. (2019). Towards federated learning at scale: System design. arXiv preprint arXiv:1902.01046.

这些论文介绍了联邦学习的基本概念、算法、应用和系统设计,并提供了有关联邦学习的未来研究方向的思考。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐