AWS S3 SDK for c++之分块存储及本人学习经验

 在接触使用c++软件包存储文件一段时间后,突然发现采用putobject函数无法上传比较大的数据,于是就了解到了分块存储,在分块存储的时候我考虑到了几个问题。第一,它是如何分块的,第二,它分块过后是如何组装到一起保证文件不损坏的。这些都是需要思考的问题,于是我利用的ai来解决了这些问题,并且对分块存储进行简单封装。

存储数据
1.存储大内存数据(>100MB)
2.分块存储操作讲解
2.1存储id
2.2存储所有分块信息的原因
2.3ETag讲解

存储数据

存储建议:

1.小文件(<5MB):直接用PutObject,简单高效
具体操作看本人这个博客:
基于centos9的AWS S3 SDK for cpp配置及快速入门

2.中等文件(5MB-108MB):可以使用TransferManager简化操作

3.大文件(>100MB):建议使用分块上传,以获得更好的可靠性和性能

存储大内存数据(>100MB)

步骤一:初始化多部分上传会话

    //1.初始化多部分上传会话
    Aws::S3::Model::CreateMultipartUploadRequest create_request;
    create_request.SetBucket(BUCKET_NAME);
    create_request.SetKey(key);
    auto create_outcome =s3_client->CreateMultipartUpload(create_request);
    if(!create_outcome.IsSuccess()){
        std::cerr<<"Error creating multipart upload:"<< create_outcome.GetError().GetMessage()<< std::endl;
        return;
    }
    //发送创建多部分上传请求
    auto create_outcome =s3_client->CreateMultipartUpload(create_request);
    if(!create_outcome.IsSuccess()){
        std::cerr<<"Error creating multipart upload:"<< create_outcome.GetError().GetMessage()<<std::endl;
        return ;
    }
    //获取上传id
    Aws::String upload_id = create_outcome.GetResult().GetUploadId();
    std::cout<< "Multipart upload inititate ID:"<<upload_id<< std::endl;

步骤二:分块上传文件

//2.分块上传文件
        //以二进制的形式打开本地文件
        std::ifstream file(path, std::ios::binary);
        if(!file){
            std::cerr<<"Error opening file"<< path<< std::endl;
            return;
        }
        //分块缓冲区
        char *buffer = new char[part_size]; // 分块缓冲区
        // 分块计数器
        int part_number = 1;

        //循环读取并上传文件分块
        while(file){
            // 读取一块数据
            file.read(buffer, part_size);
            // 实际读取的字节数
            std::streamsize bytes_read =file.gcount();
        //如果没有读到数据,循环结束
        if(bytes_read == 0){
            break;
        }
        std::cout << "Uploading part" << part_number <<"(" << bytes_read << "bytes)" <<std::endl;

        //配置分块上传请求
        Aws::S3::Model::UploadPartRequest upload_request;
        //请求携带参数
        upload_request.SetBucket(BUCKET_NAME);      //桶名
        upload_request.SetKey(key);                 //键名
        upload_request.SetUploadId(upload_id);      //块id号,唯一标识
        upload_request.SetPartNumber(part_number);  //块的编号

        //创建数据流并将数据流设置到请求体里
        auto input_data=Aws::MakeShared<Aws::StringStream>("UploadPartInputStream");
        upload_request.SetBody(input_data);

        // 发送上传分块请求
        auto upload_outcome =s3_client->UploadPart(upload_request);

        // 检查上传是否成功
        if(!upload_outcome.IsSuccess()){
            std::cerr <<"Error uploading part"<< part_number <<":"<< upload_outcome.GetError().GetMessage()<<std::endl;

            // 出错时,终止整个上传过程
            Aws::S3::Model::AbortMultipartUploadRequest abort_request;
            abort_request.SetBucket(BUCKET_NAME);
            abort_request.SetKey(key);
            abort_request.SetUploadId(upload_id);
            s3_client->AbortMultipartUpload(abort_request);

            delete[] buffer;
            return ;
        }

        //保存分块信息(用于最后完成上传)
        parts.push_back({part_number, upload_outcome.GetResult().GetETag()});
        part_number++;
        }
        delete []buffer;
        file.close();

步骤三;完成多部分上传

    //3.完成多部分上传
        Aws::S3::Model::CompletedMultipartUpload completed_upload;
        // 添加所有已上传的分块信息

        for(const auto &part:parts){
            Aws::S3::Model::CompletedPart completed_part;
            completed_part.SetPartNumber(part.partNumber);//设置分块编号
            completed_part.SetETag(part.eTag);            //设置分块的ETag
            completed_upload.AddParts(completed_part);    //添加到完成请求
        }
        //配置完成上传请求
        Aws::S3::Model::CompleteMultipartUploadRequest complete_request;
        complete_request.SetBucket(BUCKET_NAME);
        complete_request.SetKey(key);
        complete_request.SetUploadId(upload_id);
        complete_request.WithMultipartUpload(completed_upload);

        // 发送完成上传请求
        auto complete_outcome =s3_client->CompleteMultipartUpload(complete_request);

        //检查是否完成
        if(!complete_outcome.IsSuccess()){
            std::cerr<<"Error completing multipart upload:"<< complete_outcome.GetError().GetMessage()<<std::endl;

            //没完成的话尝试中断请求
            Aws::S3::Model::AbortMultipartUploadRequest abort_request;
            abort_request.SetBucket(BUCKET_NAME);
            abort_request.SetKey(key);
            abort_request.SetUploadId(upload_id);
            s3_client->AbortMultipartUpload(abort_request);

            delete[]buffer;
            return ;
        }
        //上传成功输出信息
        std::cout<<"Multipart upload completed successfully!"<< std::endl;
        std::cout<<"Object location:"<<complete_outcome.GetResult().GetLocation()<<std::endl;

完整代码

  • 头文件
#pragma once
#include <iostream>
#include <aws/s3/S3Client.h>
#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/Object.h>
#include <fstream>
#include <memory>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/CompletedMultipartUpload.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>

const Aws::String MINIO_ENDPOINT = "http://localhost:8081";
const Aws::String MINIO_ACCESS_KEY = "minioadmin";
const Aws::String MINIO_SECRET_KEY = "minioadmin";
const Aws::String BUCKET_NAME = "bucket";

class Client

{
public:
    // 工厂方法:返回智能指针管理的实例,通过工厂方法来初始化对象
    static std::unique_ptr<Client> Create();

    ~Client();
    // 存储大内存数据(>100MB)
    void save(const Aws::String &path, const Aws::String &key, const size_t part_size);

private:
    // 私有构造函数,强制通过工厂方法创建
    Client(const Aws::Client::ClientConfiguration &&config, const Aws::SDKOptions &&options);
    Aws::Client::ClientConfiguration config_;
    Aws::SDKOptions options_;

    // 使用智能指针管理S3客户端
    std::unique_ptr<Aws::S3::S3Client> s3_client;
    // 分块存储的时候,存储所有分块信息

    struct PartData
    {
        int partNumber; // 分块编号

        Aws::String eTag; // 分块的ETag标识
    };
    std::vector<PartData> parts;
};
  • 源文件

#include "mulpart.h"

// 静态工厂方法

std::unique_ptr<Client> Client::Create()
{
    Aws::SDKOptions options;
    Aws::InitAPI(options);
    Aws::Client::ClientConfiguration config;

    // 配置MinIo
    config.endpointOverride = MINIO_ENDPOINT;
    // 右值引用+移动语义,减少不必要的拷贝开销

    return std::unique_ptr<Client>(new Client(std::move(config), std::move(options)));
}

Client::~Client()
{
    s3_client.reset();
    Aws::SDKOptions(options_);
}

// 构造函数,私有
// 右值引用+移动语义,减少不必要的拷贝开销
Client::Client(const Aws::Client::ClientConfiguration &&config, const Aws::SDKOptions &&options)
    : config_(std::move(config)), options_(std::move(options))
{
    s3_client = std::make_unique<Aws::S3::S3Client>(
        Aws::Auth::AWSCredentials(MINIO_ACCESS_KEY, MINIO_SECRET_KEY),
        config_,
        Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
        false);
}

// 存储大的数据(>100MB)
void Client::save(const Aws::String &path, const Aws::String &key, const size_t part_size)
{
    try
    {
        // 1.初始化多部分上传会话
        Aws::S3::Model::CreateMultipartUploadRequest create_request;
        create_request.SetBucket(BUCKET_NAME);
        create_request.SetKey(key);

        // 发送创建多部分上传请求
        auto create_outcome = s3_client->CreateMultipartUpload(create_request);
        if (!create_outcome.IsSuccess())
        {
            std::cerr << "Error creating multipart upload:" << create_outcome.GetError().GetMessage() << std::endl;
            return;
        }
        // 获取上传id
        Aws::String upload_id = create_outcome.GetResult().GetUploadId();
        std::cout << "Multipart upload inititate ID:" << upload_id << std::endl;

        // 2.分块上传文件
        // 以二进制的形式打开本地文件
        std::ifstream file(path, std::ios::binary);
        if (!file)
        {
            std::cerr << "Error opening file" << path << std::endl;
            return;
        }
        // 定义缓冲区
        char *buffer = new char[part_size]; // 分块缓冲区
        // 分块计数器
        int part_number = 1;
        // 循环读取并上传文件分块
        while (file)
        {
            // 读取一块数据
            file.read(buffer, part_size);
            // 实际读取的字节数
            std::streamsize bytes_read = file.gcount();
            // 如果没有读到数据,循环结束
            if (bytes_read == 0)
            {
                break;
            }
            std::cout << "Uploading part" << part_number << "(" << bytes_read << "bytes)" << std::endl;

            // 配置分块上传请求
            Aws::S3::Model::UploadPartRequest upload_request;
            // 请求携带参数
            upload_request.SetBucket(BUCKET_NAME);     // 桶名
            upload_request.SetKey(key);                // 键名
            upload_request.SetUploadId(upload_id);     // 块id号,唯一标识
            upload_request.SetPartNumber(part_number); // 块的编号

            // 创建数据流并将数据流设置到请求体里
            auto input_data = Aws::MakeShared<Aws::StringStream>("UploadPartInputStream");
            input_data->write(buffer, bytes_read);
            upload_request.SetBody(input_data);

            // 发送上传分块请求
            auto upload_outcome = s3_client->UploadPart(upload_request);

            // 检查上传是否成功
            if (!upload_outcome.IsSuccess())
            {
                std::cerr << "Error uploading part" << part_number << ":" << upload_outcome.GetError().GetMessage() << std::endl;

                // 出错时,终止整个上传过程
                Aws::S3::Model::AbortMultipartUploadRequest abort_request;
                abort_request.SetBucket(BUCKET_NAME);
                abort_request.SetKey(key);
                abort_request.SetUploadId(upload_id);
                s3_client->AbortMultipartUpload(abort_request);

                delete[] buffer;
                return;
            }

            // 保存分块信息(用于最后完成上传)
            parts.push_back({part_number, upload_outcome.GetResult().GetETag()});
            part_number++;
        }
        delete[] buffer;
        file.close();
        // 3.完成多部分上传
        Aws::S3::Model::CompletedMultipartUpload completed_upload;
        // 添加所有已上传的分块信息

        for (const auto &part : parts)
        {
            Aws::S3::Model::CompletedPart completed_part;
            completed_part.SetPartNumber(part.partNumber); // 设置分块编号
            completed_part.SetETag(part.eTag);             // 设置分块的ETag
            completed_upload.AddParts(completed_part);     // 添加到完成请求
        }
        // 配置完成上传请求
        Aws::S3::Model::CompleteMultipartUploadRequest complete_request;
        complete_request.SetBucket(BUCKET_NAME);
        complete_request.SetKey(key);
        complete_request.SetUploadId(upload_id);
        complete_request.WithMultipartUpload(completed_upload);

        // 发送完成上传请求
        auto complete_outcome = s3_client->CompleteMultipartUpload(complete_request);

        // 检查是否完成
        if (!complete_outcome.IsSuccess())
        {
            std::cerr << "Error completing multipart upload:" << complete_outcome.GetError().GetMessage() << std::endl;

            // 没完成的话尝试中断请求
            Aws::S3::Model::AbortMultipartUploadRequest abort_request;
            abort_request.SetBucket(BUCKET_NAME);
            abort_request.SetKey(key);
            abort_request.SetUploadId(upload_id);
            s3_client->AbortMultipartUpload(abort_request);

            delete[] buffer;
            return;
        }
        // 上传成功输出信息
        std::cout << "Multipart upload completed successfully!" << std::endl;
        std::cout << "Object location:" << complete_outcome.GetResult().GetLocation() << std::endl;
    }
    catch (const std::exception &e)
    {
        std::cerr << "Exception occurred:" << e.what() << std::endl;
        return;
    }
}

  • 测试案例
#include "mulpart.h"

int main()
{
    auto client = Client::Create();

    client->save("../mp4.mp4", "mp4.mp4", 10 * 1024 * 1024);
    return 0;
}
  • CMakeLists.txt文件
# #在 CMakeLists.txt 的最顶部添加(必颁在 project()之前)
# set(CMAKE_TOOLCHAIN_FILE "~/vcpkg/scripts/buildsystems/vcpkg.cmake" CACHE STRING "Vcpkg toolchain file")

# cmake_minimum_required(VERSION 3.15)

# project(demo)

# #设置 AWSSDK_DIR或CMAKE PREFIX_PATH
# set(AWSSDK_DIR "/home/centos9/vcpkg/installed/x64-linux/share/awssdk")

# #查找 AWSSDK 并指定组件
# find_package(AWSSDK CONFIG REQUIRED COMPONENTS s3 core)

# file(GLOB SRC_FILES
#     "${PROJECT_SOURCE_DIR}/*.cpp"
#     "${PROJECT_SOURCE_DIR}/*.h")

# add_executable(demo mulpart.cpp demo.cpp)

# target_link_libraries(demo PRIVATE ${AWSSPK_LIBRARIES})

# 在 CMakeLists.txt 的最顶部添加(必须在 project() 之前)
set(CMAKE_TOOLCHAIN_FILE "~/vcpkg/scripts/buildsystems/vcpkg.cmake" CACHE STRING "Vcpkg toolchain file")
# 此行仅测试使用,需根据实际开发环境调整路径


cmake_minimum_required(VERSION 3.15)
project(demo)

# 设置 AWSSDK_DIR 或 CMAKE_PREFIX_PATH
set(AWSSDK_DIR "/home/centos9/vcpkg/installed/x64-linux/share/awssdk")

# 查找 AWSSDK 并指定组件
find_package(AWSSDK CONFIG REQUIRED COMPONENTS s3 core)

file(GLOB SRC_FILES
    "${PROJECT_SOURCE_DIR}/*.cpp"
    "${PROJECT_SOURCE_DIR}/*.h")

add_executable(demo  mulpart.cpp demo.cpp)
target_link_libraries(demo PRIVATE ${AWSSDK_LIBRARIES})

分块存储操作讲解

2.1存储ID

1.上传ID是S3服务为多部分上传会活分配的唯一标识符

1.ID在整个上传过程中,是这一特定上传任务的唯一标识

2.由S3服务器生成并返回给客户端

3.格式类似于:EXAMPLEJZ6e0YupT2h66ieP0CC9IEbYbDUy4RTpMeOSMLPRp8Z5s1h4a3fHgX6cyDaLB7

2.关联所有上传操作

1.上传分块时:必须提供相同的上传ID,S3才知道这些分块属于同一文件

2.完成上传时:告诉S3哪些分块应该组合成最终文件

3.中止上传时:标识要清理哪个上传会话

3.实现断点续传的关键

1.如果上传中途失败,客户端可以保留上传ID

2.重新连接后,可以查询已上传的分块(ListParts)

3.只需上传缺失的分块,然后使用原始的上传ID完成上传

4.并发上传控制

1.同一上传ID允许多个分块可以并行上传(提高速度)

2.必须要正最终按正确的顺序组合通过(PartNumber)

3.服务器通过上传ID跟踪所有分块的状态

5.安全隔离

1.上传ID保证不同客户端/会话的上传不会互相干执

2.只有知道上传ID的客户端才能操作该上传会话

3.防止未授权的完成或中断操作

6.生命周期

1.创建:CreateMultipartUpload时生成

2.使用:在所有分块上传和完成操作中使用

3.终止:

  • 成功完成上传后自动失效

  • 通过AbortMultipartUpload显式终止

  • 未完成的上传ID会在一定时间后自动过期(通常7天)

2.2存储所有分块信息的原因

1.完成上传的必要条件

当调用Comp1eteMultipartUpload时,必须提供

  • 所有分块的完整列表,不能缺少任何一部分

  • 每个分块的编号(partNumber)和ETag(用于验证完整性)

  • 严格按分块编号顺序排序(确保最终文件正确重组)

2.数据完整性验证

ETag的作用:

  • 每个分块上传后,S3会返回唯一的ETag(类似于指纹)

  • 完成上传时提供这些ETag,S3会验证,所有分块都存在且未被修改,分块顺序正确

  • 防止传输过程中数据损坏或被篡改

3.断点续传支持

如果上传中断

  • 可以查询已上传的分块(使用ListPartsAPI)

  • 只需上传缺失的分块

  • 使用之前存储的分块信息继续完成上传

  • 无需重新上传已完成的分块

4.错误恢复机制

当上传出错时:

  • 知道哪些分块已成功上传

  • 可以针对性重试失败的分块

  • 避免盲目重新上传所有内容

注意事顶

1.分块编号必须连续且唯一,从1开始递增,不能跳过任何编号,最大支持10000个分块

2.ETag必须原样使用,不能修改或省略,区分大小写

3.最小分块大小:除最后一个分块外,每个分块必须大于等于partsize

4.存储实际:必须在分块上传成功后立即存储,建议持久化存情,防止程序崩溃

5.不存储分块信息可能会造成:无法正常完成上传。可能已上传的分块会变成“孤儿块“占用存储空间,可以调用AbortMultipartUpload进行清理后重新开始。造成带宽和时间的浪费

2.3ETag讲解

通俗理解:S3 为每个对象自动生成的 “哈希值” ,用于唯一标识该对象当前的内容。

核心特性与计算规则:

上传方式 ETag 计算规则 示例 重要提示
单次PUT上传 对象内容的 MD5 哈希值 “5eb63bbbe01eeed093cb22bb8f5acdc3” ETag 等于文件内容的 MD5,可用于校验完整性。
分块上传 (Multipart Upload) 各分块MD5的集合的MD5 + “-” + 分块数 “d4d5e97213b6c39db7b432a9faf6a15f-3” ETag 不是文件内容的 MD5!不能直接用于校验文件完整性。

黄金法则:切勿假定 ETag 总是文件的 MD5。只有对非分块上传的小对象,这个等式才成立。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐