AWS S3 SDK for c++之分块存储及本人学习经验
本文讲解了使用AWS S3 C++ SDK实现大文件分块上传的方法。核心内容涵盖三个步骤:创建上传会话、分块传输数据、最终合并文件。文章重点剖析了两个关键机制:一是上传ID(Upload ID)作为会话唯一标识,是实现断点续传和并发控制的核心;二是ETag的生成规则,明确指出分块上传的ETag并非文件MD5,不能用于校验完整性。最后强调了存储所有分块信息对保证最终正确组装的重要性。
AWS S3 SDK for c++之分块存储及本人学习经验
在接触使用c++软件包存储文件一段时间后,突然发现采用putobject函数无法上传比较大的数据,于是就了解到了分块存储,在分块存储的时候我考虑到了几个问题。第一,它是如何分块的,第二,它分块过后是如何组装到一起保证文件不损坏的。这些都是需要思考的问题,于是我利用的ai来解决了这些问题,并且对分块存储进行简单封装。
存储数据
1.存储大内存数据(>100MB)
2.分块存储操作讲解
2.1存储id
2.2存储所有分块信息的原因
2.3ETag讲解
存储数据
存储建议:
1.小文件(<5MB):直接用PutObject,简单高效
具体操作看本人这个博客:
基于centos9的AWS S3 SDK for cpp配置及快速入门
2.中等文件(5MB-108MB):可以使用TransferManager简化操作
3.大文件(>100MB):建议使用分块上传,以获得更好的可靠性和性能
存储大内存数据(>100MB)
步骤一:初始化多部分上传会话
//1.初始化多部分上传会话
Aws::S3::Model::CreateMultipartUploadRequest create_request;
create_request.SetBucket(BUCKET_NAME);
create_request.SetKey(key);
auto create_outcome =s3_client->CreateMultipartUpload(create_request);
if(!create_outcome.IsSuccess()){
std::cerr<<"Error creating multipart upload:"<< create_outcome.GetError().GetMessage()<< std::endl;
return;
}
//发送创建多部分上传请求
auto create_outcome =s3_client->CreateMultipartUpload(create_request);
if(!create_outcome.IsSuccess()){
std::cerr<<"Error creating multipart upload:"<< create_outcome.GetError().GetMessage()<<std::endl;
return ;
}
//获取上传id
Aws::String upload_id = create_outcome.GetResult().GetUploadId();
std::cout<< "Multipart upload inititate ID:"<<upload_id<< std::endl;
步骤二:分块上传文件
//2.分块上传文件
//以二进制的形式打开本地文件
std::ifstream file(path, std::ios::binary);
if(!file){
std::cerr<<"Error opening file"<< path<< std::endl;
return;
}
//分块缓冲区
char *buffer = new char[part_size]; // 分块缓冲区
// 分块计数器
int part_number = 1;
//循环读取并上传文件分块
while(file){
// 读取一块数据
file.read(buffer, part_size);
// 实际读取的字节数
std::streamsize bytes_read =file.gcount();
//如果没有读到数据,循环结束
if(bytes_read == 0){
break;
}
std::cout << "Uploading part" << part_number <<"(" << bytes_read << "bytes)" <<std::endl;
//配置分块上传请求
Aws::S3::Model::UploadPartRequest upload_request;
//请求携带参数
upload_request.SetBucket(BUCKET_NAME); //桶名
upload_request.SetKey(key); //键名
upload_request.SetUploadId(upload_id); //块id号,唯一标识
upload_request.SetPartNumber(part_number); //块的编号
//创建数据流并将数据流设置到请求体里
auto input_data=Aws::MakeShared<Aws::StringStream>("UploadPartInputStream");
upload_request.SetBody(input_data);
// 发送上传分块请求
auto upload_outcome =s3_client->UploadPart(upload_request);
// 检查上传是否成功
if(!upload_outcome.IsSuccess()){
std::cerr <<"Error uploading part"<< part_number <<":"<< upload_outcome.GetError().GetMessage()<<std::endl;
// 出错时,终止整个上传过程
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(BUCKET_NAME);
abort_request.SetKey(key);
abort_request.SetUploadId(upload_id);
s3_client->AbortMultipartUpload(abort_request);
delete[] buffer;
return ;
}
//保存分块信息(用于最后完成上传)
parts.push_back({part_number, upload_outcome.GetResult().GetETag()});
part_number++;
}
delete []buffer;
file.close();
步骤三;完成多部分上传
//3.完成多部分上传
Aws::S3::Model::CompletedMultipartUpload completed_upload;
// 添加所有已上传的分块信息
for(const auto &part:parts){
Aws::S3::Model::CompletedPart completed_part;
completed_part.SetPartNumber(part.partNumber);//设置分块编号
completed_part.SetETag(part.eTag); //设置分块的ETag
completed_upload.AddParts(completed_part); //添加到完成请求
}
//配置完成上传请求
Aws::S3::Model::CompleteMultipartUploadRequest complete_request;
complete_request.SetBucket(BUCKET_NAME);
complete_request.SetKey(key);
complete_request.SetUploadId(upload_id);
complete_request.WithMultipartUpload(completed_upload);
// 发送完成上传请求
auto complete_outcome =s3_client->CompleteMultipartUpload(complete_request);
//检查是否完成
if(!complete_outcome.IsSuccess()){
std::cerr<<"Error completing multipart upload:"<< complete_outcome.GetError().GetMessage()<<std::endl;
//没完成的话尝试中断请求
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(BUCKET_NAME);
abort_request.SetKey(key);
abort_request.SetUploadId(upload_id);
s3_client->AbortMultipartUpload(abort_request);
delete[]buffer;
return ;
}
//上传成功输出信息
std::cout<<"Multipart upload completed successfully!"<< std::endl;
std::cout<<"Object location:"<<complete_outcome.GetResult().GetLocation()<<std::endl;
完整代码
- 头文件
#pragma once
#include <iostream>
#include <aws/s3/S3Client.h>
#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/Object.h>
#include <fstream>
#include <memory>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/CompletedMultipartUpload.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
const Aws::String MINIO_ENDPOINT = "http://localhost:8081";
const Aws::String MINIO_ACCESS_KEY = "minioadmin";
const Aws::String MINIO_SECRET_KEY = "minioadmin";
const Aws::String BUCKET_NAME = "bucket";
class Client
{
public:
// 工厂方法:返回智能指针管理的实例,通过工厂方法来初始化对象
static std::unique_ptr<Client> Create();
~Client();
// 存储大内存数据(>100MB)
void save(const Aws::String &path, const Aws::String &key, const size_t part_size);
private:
// 私有构造函数,强制通过工厂方法创建
Client(const Aws::Client::ClientConfiguration &&config, const Aws::SDKOptions &&options);
Aws::Client::ClientConfiguration config_;
Aws::SDKOptions options_;
// 使用智能指针管理S3客户端
std::unique_ptr<Aws::S3::S3Client> s3_client;
// 分块存储的时候,存储所有分块信息
struct PartData
{
int partNumber; // 分块编号
Aws::String eTag; // 分块的ETag标识
};
std::vector<PartData> parts;
};
- 源文件
#include "mulpart.h"
// 静态工厂方法
std::unique_ptr<Client> Client::Create()
{
Aws::SDKOptions options;
Aws::InitAPI(options);
Aws::Client::ClientConfiguration config;
// 配置MinIo
config.endpointOverride = MINIO_ENDPOINT;
// 右值引用+移动语义,减少不必要的拷贝开销
return std::unique_ptr<Client>(new Client(std::move(config), std::move(options)));
}
Client::~Client()
{
s3_client.reset();
Aws::SDKOptions(options_);
}
// 构造函数,私有
// 右值引用+移动语义,减少不必要的拷贝开销
Client::Client(const Aws::Client::ClientConfiguration &&config, const Aws::SDKOptions &&options)
: config_(std::move(config)), options_(std::move(options))
{
s3_client = std::make_unique<Aws::S3::S3Client>(
Aws::Auth::AWSCredentials(MINIO_ACCESS_KEY, MINIO_SECRET_KEY),
config_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
false);
}
// 存储大的数据(>100MB)
void Client::save(const Aws::String &path, const Aws::String &key, const size_t part_size)
{
try
{
// 1.初始化多部分上传会话
Aws::S3::Model::CreateMultipartUploadRequest create_request;
create_request.SetBucket(BUCKET_NAME);
create_request.SetKey(key);
// 发送创建多部分上传请求
auto create_outcome = s3_client->CreateMultipartUpload(create_request);
if (!create_outcome.IsSuccess())
{
std::cerr << "Error creating multipart upload:" << create_outcome.GetError().GetMessage() << std::endl;
return;
}
// 获取上传id
Aws::String upload_id = create_outcome.GetResult().GetUploadId();
std::cout << "Multipart upload inititate ID:" << upload_id << std::endl;
// 2.分块上传文件
// 以二进制的形式打开本地文件
std::ifstream file(path, std::ios::binary);
if (!file)
{
std::cerr << "Error opening file" << path << std::endl;
return;
}
// 定义缓冲区
char *buffer = new char[part_size]; // 分块缓冲区
// 分块计数器
int part_number = 1;
// 循环读取并上传文件分块
while (file)
{
// 读取一块数据
file.read(buffer, part_size);
// 实际读取的字节数
std::streamsize bytes_read = file.gcount();
// 如果没有读到数据,循环结束
if (bytes_read == 0)
{
break;
}
std::cout << "Uploading part" << part_number << "(" << bytes_read << "bytes)" << std::endl;
// 配置分块上传请求
Aws::S3::Model::UploadPartRequest upload_request;
// 请求携带参数
upload_request.SetBucket(BUCKET_NAME); // 桶名
upload_request.SetKey(key); // 键名
upload_request.SetUploadId(upload_id); // 块id号,唯一标识
upload_request.SetPartNumber(part_number); // 块的编号
// 创建数据流并将数据流设置到请求体里
auto input_data = Aws::MakeShared<Aws::StringStream>("UploadPartInputStream");
input_data->write(buffer, bytes_read);
upload_request.SetBody(input_data);
// 发送上传分块请求
auto upload_outcome = s3_client->UploadPart(upload_request);
// 检查上传是否成功
if (!upload_outcome.IsSuccess())
{
std::cerr << "Error uploading part" << part_number << ":" << upload_outcome.GetError().GetMessage() << std::endl;
// 出错时,终止整个上传过程
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(BUCKET_NAME);
abort_request.SetKey(key);
abort_request.SetUploadId(upload_id);
s3_client->AbortMultipartUpload(abort_request);
delete[] buffer;
return;
}
// 保存分块信息(用于最后完成上传)
parts.push_back({part_number, upload_outcome.GetResult().GetETag()});
part_number++;
}
delete[] buffer;
file.close();
// 3.完成多部分上传
Aws::S3::Model::CompletedMultipartUpload completed_upload;
// 添加所有已上传的分块信息
for (const auto &part : parts)
{
Aws::S3::Model::CompletedPart completed_part;
completed_part.SetPartNumber(part.partNumber); // 设置分块编号
completed_part.SetETag(part.eTag); // 设置分块的ETag
completed_upload.AddParts(completed_part); // 添加到完成请求
}
// 配置完成上传请求
Aws::S3::Model::CompleteMultipartUploadRequest complete_request;
complete_request.SetBucket(BUCKET_NAME);
complete_request.SetKey(key);
complete_request.SetUploadId(upload_id);
complete_request.WithMultipartUpload(completed_upload);
// 发送完成上传请求
auto complete_outcome = s3_client->CompleteMultipartUpload(complete_request);
// 检查是否完成
if (!complete_outcome.IsSuccess())
{
std::cerr << "Error completing multipart upload:" << complete_outcome.GetError().GetMessage() << std::endl;
// 没完成的话尝试中断请求
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(BUCKET_NAME);
abort_request.SetKey(key);
abort_request.SetUploadId(upload_id);
s3_client->AbortMultipartUpload(abort_request);
delete[] buffer;
return;
}
// 上传成功输出信息
std::cout << "Multipart upload completed successfully!" << std::endl;
std::cout << "Object location:" << complete_outcome.GetResult().GetLocation() << std::endl;
}
catch (const std::exception &e)
{
std::cerr << "Exception occurred:" << e.what() << std::endl;
return;
}
}
- 测试案例
#include "mulpart.h"
int main()
{
auto client = Client::Create();
client->save("../mp4.mp4", "mp4.mp4", 10 * 1024 * 1024);
return 0;
}
- CMakeLists.txt文件
# #在 CMakeLists.txt 的最顶部添加(必颁在 project()之前)
# set(CMAKE_TOOLCHAIN_FILE "~/vcpkg/scripts/buildsystems/vcpkg.cmake" CACHE STRING "Vcpkg toolchain file")
# cmake_minimum_required(VERSION 3.15)
# project(demo)
# #设置 AWSSDK_DIR或CMAKE PREFIX_PATH
# set(AWSSDK_DIR "/home/centos9/vcpkg/installed/x64-linux/share/awssdk")
# #查找 AWSSDK 并指定组件
# find_package(AWSSDK CONFIG REQUIRED COMPONENTS s3 core)
# file(GLOB SRC_FILES
# "${PROJECT_SOURCE_DIR}/*.cpp"
# "${PROJECT_SOURCE_DIR}/*.h")
# add_executable(demo mulpart.cpp demo.cpp)
# target_link_libraries(demo PRIVATE ${AWSSPK_LIBRARIES})
# 在 CMakeLists.txt 的最顶部添加(必须在 project() 之前)
set(CMAKE_TOOLCHAIN_FILE "~/vcpkg/scripts/buildsystems/vcpkg.cmake" CACHE STRING "Vcpkg toolchain file")
# 此行仅测试使用,需根据实际开发环境调整路径
cmake_minimum_required(VERSION 3.15)
project(demo)
# 设置 AWSSDK_DIR 或 CMAKE_PREFIX_PATH
set(AWSSDK_DIR "/home/centos9/vcpkg/installed/x64-linux/share/awssdk")
# 查找 AWSSDK 并指定组件
find_package(AWSSDK CONFIG REQUIRED COMPONENTS s3 core)
file(GLOB SRC_FILES
"${PROJECT_SOURCE_DIR}/*.cpp"
"${PROJECT_SOURCE_DIR}/*.h")
add_executable(demo mulpart.cpp demo.cpp)
target_link_libraries(demo PRIVATE ${AWSSDK_LIBRARIES})
分块存储操作讲解
2.1存储ID
1.上传ID是S3服务为多部分上传会活分配的唯一标识符
1.ID在整个上传过程中,是这一特定上传任务的唯一标识
2.由S3服务器生成并返回给客户端
3.格式类似于:EXAMPLEJZ6e0YupT2h66ieP0CC9IEbYbDUy4RTpMeOSMLPRp8Z5s1h4a3fHgX6cyDaLB7
2.关联所有上传操作
1.上传分块时:必须提供相同的上传ID,S3才知道这些分块属于同一文件
2.完成上传时:告诉S3哪些分块应该组合成最终文件
3.中止上传时:标识要清理哪个上传会话
3.实现断点续传的关键
1.如果上传中途失败,客户端可以保留上传ID
2.重新连接后,可以查询已上传的分块(ListParts)
3.只需上传缺失的分块,然后使用原始的上传ID完成上传
4.并发上传控制
1.同一上传ID允许多个分块可以并行上传(提高速度)
2.必须要正最终按正确的顺序组合通过(PartNumber)
3.服务器通过上传ID跟踪所有分块的状态
5.安全隔离
1.上传ID保证不同客户端/会话的上传不会互相干执
2.只有知道上传ID的客户端才能操作该上传会话
3.防止未授权的完成或中断操作
6.生命周期
1.创建:CreateMultipartUpload时生成
2.使用:在所有分块上传和完成操作中使用
3.终止:
-
成功完成上传后自动失效
-
通过AbortMultipartUpload显式终止
-
未完成的上传ID会在一定时间后自动过期(通常7天)
2.2存储所有分块信息的原因
1.完成上传的必要条件
当调用Comp1eteMultipartUpload时,必须提供
-
所有分块的完整列表,不能缺少任何一部分
-
每个分块的编号(partNumber)和ETag(用于验证完整性)
-
严格按分块编号顺序排序(确保最终文件正确重组)
2.数据完整性验证
ETag的作用:
-
每个分块上传后,S3会返回唯一的ETag(类似于指纹)
-
完成上传时提供这些ETag,S3会验证,所有分块都存在且未被修改,分块顺序正确
-
防止传输过程中数据损坏或被篡改
3.断点续传支持
如果上传中断
-
可以查询已上传的分块(使用ListPartsAPI)
-
只需上传缺失的分块
-
使用之前存储的分块信息继续完成上传
-
无需重新上传已完成的分块
4.错误恢复机制
当上传出错时:
-
知道哪些分块已成功上传
-
可以针对性重试失败的分块
-
避免盲目重新上传所有内容
注意事顶
1.分块编号必须连续且唯一,从1开始递增,不能跳过任何编号,最大支持10000个分块
2.ETag必须原样使用,不能修改或省略,区分大小写
3.最小分块大小:除最后一个分块外,每个分块必须大于等于partsize
4.存储实际:必须在分块上传成功后立即存储,建议持久化存情,防止程序崩溃
5.不存储分块信息可能会造成:无法正常完成上传。可能已上传的分块会变成“孤儿块“占用存储空间,可以调用AbortMultipartUpload进行清理后重新开始。造成带宽和时间的浪费
2.3ETag讲解
通俗理解:S3 为每个对象自动生成的 “哈希值” ,用于唯一标识该对象当前的内容。
核心特性与计算规则:
| 上传方式 | ETag 计算规则 | 示例 | 重要提示 |
|---|---|---|---|
| 单次PUT上传 | 对象内容的 MD5 哈希值 | “5eb63bbbe01eeed093cb22bb8f5acdc3” | ETag 等于文件内容的 MD5,可用于校验完整性。 |
| 分块上传 (Multipart Upload) | 各分块MD5的集合的MD5 + “-” + 分块数 | “d4d5e97213b6c39db7b432a9faf6a15f-3” | ETag 不是文件内容的 MD5!不能直接用于校验文件完整性。 |
黄金法则:切勿假定 ETag 总是文件的 MD5。只有对非分块上传的小对象,这个等式才成立。
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)