【MongoDB的RLE压缩数据存储】
MongoDB的RLE压缩数据存储
MongoDB的RLE压缩数据存储
1. RLE(Run-Length Encoding)算法详解
1.1 RLE基本概念
RLE是一种简单的无损数据压缩算法,特别适用于处理连续重复数据。其核心思想是将连续的重复数据值序列替换为一个值和重复次数。
1.2 RLE编码原理
原始数据: [1,1,1,1,1,2,2,3,3,3,3]
RLE编码: [(1,5), (2,2), (3,4)]
1.3 RLE优势与适用场景
优势: 算法简单、压缩效率高(针对重复数据)、解压快速
适用场景: 图像处理、日志数据、传感器数据等包含大量连续重复值的场景
- MongoDB与Spring Data MongoDB配置
2.1 添加依赖
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mongodb-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.3.5.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-spring-boot-starter</artifactId>
<version>3.0.3</version>
</dependency>
</dependencies>
</project>
2.2 配置文件
yaml
server:
port: 9005
spring:
application:
name: mongodb-service
data:
mongodb:
uri: mongodb://localhost:27017/test
- 完整项目实现
3.1 数据库文档设计
3.1.1 RLE编码文档结构
json
{
"_id": ObjectId("..."),
"type": "q",
"version": "123",
"createTime": ISODate("2024-01-01T00:00:00Z"),
"compressed": true,
"rleValues": [
{"value": 1.0, "count": 100},
{"value": 2.0, "count": 50}
],
"originalSize": 150,
"lastUpdateTime": ISODate("2024-01-01T01:00:00Z")
}
3.1.2 未压缩文档结构(小数据量)
json
{
"_id": ObjectId("..."),
"type": "q",
"version": "124",
"createTime": ISODate("2024-01-01T00:00:00Z"),
"compressed": false,
"values": [1.23, 4.21, 5.90],
"lastUpdateTime": ISODate("2024-01-01T01:00:00Z")
}
3.2 实体类设计
3.2.1 通用父类
package com.test.mongodb.entity;
import lombok.Data;
import java.util.Date;
@Data
public class AbstractEntity {
private String id;
private String createUser;
private Date createTime;
private String updateUser;
private Date updateTime;
}
RLE条目类
package com.test.mongodb.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.mongodb.core.mapping.Field;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class RLEEntry{
@Field("value")
private float value;
@Field("count")
private int count;
}
3.2.2 主文档实体类
java
package com.test.mongodb.entity;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.CompoundIndex;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import java.util.List;
@Data
@Document(collection = "record")
@CompoundIndex(name = "type_version_idx", def = "{'type': 1, 'version': 1}", unique = true)
public class Record extends AbstractEntity{
@Id
private String id;
private String type;
private String version;
@Field("compressed")
private boolean compressed;
@Field("values")
private List<Float> values;
@Field("rleValues")
private List<RLEEntry> rleValues;
@Field("originalSize")
private Integer originalSize;
public Record(String type, String version, List<Float> values) {
this();
this.type = type;
this.version = version;
this.values = values;
this.compressed = false;
}
public Record() {
}
}
3.3 RLE工具类
java
package com.test.mongodb.utils;
import com.test.mongodb.entity.RLEEntry;
import java.util.ArrayList;
import java.util.List;
public class RLEUtil {
// 压缩阈值:当连续重复值超过此阈值时使用RLE压缩
private static final int COMPRESSION_THRESHOLD = 10;
/**
* RLE编码压缩
*/
public static List<RLEEntry> encode(List<Float> values) {
if (values == null || values.isEmpty()) {
return new ArrayList<>();
}
List<RLEEntry> encoded = new ArrayList<>();
float currentValue = values.get(0);
int count = 1;
for (int i = 1; i < values.size(); i++) {
if (values.get(i).equals(currentValue)) {
count++;
} else {
encoded.add(new RLEEntry(currentValue, count));
currentValue = values.get(i);
count = 1;
}
}
encoded.add(new RLEEntry(currentValue, count));
return encoded;
}
/**
* RLE解码解压
*/
public static List<Float> decode(List<RLEEntry> rleEntries) {
List<Float> decoded = new ArrayList<>();
if (rleEntries == null) return decoded;
for (RLEEntry entry : rleEntries) {
for (int i = 0; i < entry.getCount(); i++) {
decoded.add(entry.getValue());
}
}
return decoded;
}
/**
* 判断是否应该使用压缩(基于数据特征)
*/
public static boolean shouldCompress(List<Float> values) {
if (values == null || values.size() < COMPRESSION_THRESHOLD) {
return false;
}
// 检查连续重复模式
int maxRunLength = 1;
int currentRunLength = 1;
for (int i = 1; i < values.size(); i++) {
if (values.get(i).equals(values.get(i - 1))) {
currentRunLength++;
maxRunLength = Math.max(maxRunLength, currentRunLength);
} else {
currentRunLength = 1;
}
}
return maxRunLength >= COMPRESSION_THRESHOLD;
}
/**
* 计算压缩比
*/
public static double calculateCompressionRatio(List<Float> original, List<RLEEntry> compressed) {
if (original == null || compressed == null) return 0.0;
int originalSize = original.size();
int compressedSize = compressed.size() * 2; // 每个RLE条目包含值和计数
return originalSize == 0 ? 0.0 : (double) compressedSize / originalSize;
}
}
3.4 Repository接口
java
package com.test.mongodb.mapper;
import com.test.mongodb.entity.RLEEntry;
import com.test.mongodb.entity.Record;
import org.apache.ibatis.annotations.Update;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.data.mongodb.repository.Query;
import java.util.Date;
import java.util.List;
import java.util.Optional;
public interface RecordMapper extends MongoRepository<Record, String> {
Optional<Record> findByTypeAndVersion(String type, String version);
List<Record> findByType(String type);
List<Record> findByCompressed(boolean compressed);
@Query("{'type': ?0, 'version': ?1}")
@Update("{$set: {'lastUpdateTime': ?2}}")
void updateLastUpdateTime(String type, String version, Date lastUpdateTime);
@Query("{'type': ?0, 'version': ?1, 'compressed': false}")
@Update("{$push: {'values': {$each: ?2}}, $set: {'lastUpdateTime': ?3}}")
void appendValues(String type, String version, List<Float> newValues, Date lastUpdateTime);
@Query("{'type': ?0, 'version': ?1, 'compressed': true}")
@Update("{$set: {'rleValues': ?2, 'lastUpdateTime': ?3, 'originalSize': ?4}}")
void updateRleValues(String type, String version, List<RLEEntry> rleValues, Date lastUpdateTime, int originalSize);
@Query("{'createTime': {$gte: ?0, $lte: ?1}}")
List<Record> findByCreateTimeBetween(Date start, Date end);
}
3.5 服务实现类
package com.test.mongodb.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.test.mongodb.dto.RecordStats;
import com.test.mongodb.entity.Record;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import java.util.List;
import java.util.Optional;
public interface RecordService {
Record saveOrUpdate(String type, String version, List<Float> newValues);
List<Float> getValues(String type, String version);
List<Record> findByType(String type);
Record compressRecord(String type, String version);
Record decompressRecord(String type, String version);
RecordStats getStats(String type, String version);
void deleteRecord(String type, String version);
Optional<Record> findById(String id);
Page<Record> findAll(PageRequest of);
}
package com.test.mongodb.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.test.mongodb.dto.RecordStats;
import com.test.mongodb.entity.RLEEntry;
import com.test.mongodb.entity.Record;
import com.test.mongodb.mapper.RecordMapper;
import com.test.mongodb.service.RecordService;
import com.test.mongodb.utils.RLEUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
@Service
public class RecordServiceImpl implements RecordService{
@Autowired
private RecordMapper recordMapper;
// 压缩阈值配置
private static final int COMPRESSION_SIZE_THRESHOLD = 1000; // 数据量超过1000条时考虑压缩
/**
* 插入或更新记录(自动处理压缩)
*/
public Record saveOrUpdate(String type, String version, List<Float> newValues) {
Optional<Record> existingRecord = recordMapper.findByTypeAndVersion(type, version);
if (existingRecord.isPresent()) {
return appendToExistingRecord(existingRecord.get(), newValues);
} else {
return createNewRecord(type, version, newValues);
}
}
/**
* 创建新记录
*/
private Record createNewRecord(String type, String version, List<Float> values) {
Record record = new Record(type, version, values);
// 判断是否需要压缩
if (shouldCompress(values)) {
compressRecord(record);
}
return recordMapper.save(record);
}
/**
* 向现有记录追加数据
*/
private Record appendToExistingRecord(Record existingRecord, List<Float> newValues) {
List<Float> allValues = getAllValues(existingRecord);
allValues.addAll(newValues);
// 检查是否需要重新评估压缩策略
if (existingRecord.isCompressed()) {
// 已压缩的记录,直接更新RLE编码
updateCompressedRecord(existingRecord, allValues);
} else {
// 未压缩的记录,检查是否需要压缩
if (shouldCompress(allValues)) {
compressRecord(existingRecord);
existingRecord.setValues(null);
recordMapper.save(existingRecord);
} else {
// 保持未压缩状态,直接追加
recordMapper.appendValues(
existingRecord.getType(),
existingRecord.getVersion(),
newValues,
new Date()
);
}
}
return recordMapper.findByTypeAndVersion(
existingRecord.getType(),
existingRecord.getVersion()
).orElse(null);
}
/**
* 获取记录的所有值(自动解压)
*/
public List<Float> getValues(String type, String version) {
Optional<Record> record = recordMapper.findByTypeAndVersion(type, version);
return record.map(this::getAllValues).orElse(new ArrayList<>());
}
/**
* 根据ID查找记录
*/
public Optional<Record> findById(String id) {
return recordMapper.findById(id);
}
@Override
public Page<Record> findAll(PageRequest of) {
return recordMapper.findAll(of);
}
/**
* 根据类型查找记录
*/
public List<Record> findByType(String type) {
return recordMapper.findByType(type);
}
/**
* 分页查询
*/
public Page<Record> findAll(Pageable pageable) {
return recordMapper.findAll(pageable);
}
/**
* 删除记录
*/
public void deleteRecord(String type, String version) {
Optional<Record> record = recordMapper.findByTypeAndVersion(type, version);
record.ifPresent(recordMapper::delete);
}
/**
* 手动触发压缩
*/
public Record compressRecord(String type, String version) {
Optional<Record> recordOpt = recordMapper.findByTypeAndVersion(type, version);
if (recordOpt.isPresent()) {
Record record = recordOpt.get();
if (!record.isCompressed() && shouldCompress(getAllValues(record))) {
compressRecord(record);
return recordMapper.save(record);
}
}
return recordOpt.orElse(null);
}
/**
* 手动解压缩
*/
public Record decompressRecord(String type, String version) {
Optional<Record> recordOpt = recordMapper.findByTypeAndVersion(type, version);
if (recordOpt.isPresent()) {
Record record = recordOpt.get();
if (record.isCompressed()) {
List<Float> values = RLEUtil.decode(record.getRleValues());
record.setValues(values);
record.setRleValues(null);
record.setCompressed(false);
record.setOriginalSize(null);
return recordMapper.save(record);
}
}
return recordOpt.orElse(null);
}
/**
* 获取统计信息
*/
public RecordStats getStats(String type, String version) {
Optional<Record> recordOpt = recordMapper.findByTypeAndVersion(type, version);
if (recordOpt.isPresent()) {
Record record = recordOpt.get();
List<Float> values = getAllValues(record);
RecordStats stats = new RecordStats();
stats.setTotalValues(values.size());
stats.setCompressed(record.isCompressed());
stats.setCompressionRatio(record.isCompressed() ?
RLEUtil.calculateCompressionRatio(values, record.getRleValues()) : 1.0);
stats.setCreateTime(record.getCreateTime());
stats.setLastUpdateTime(record.getUpdateTime());
return stats;
}
return null;
}
// 内部辅助方法
private List<Float> getAllValues(Record record) {
if (record.isCompressed()) {
return RLEUtil.decode(record.getRleValues());
} else {
return record.getValues() != null ? record.getValues() : new ArrayList<>();
}
}
private boolean shouldCompress(List<Float> values) {
return values.size() > COMPRESSION_SIZE_THRESHOLD && RLEUtil.shouldCompress(values);
}
private void compressRecord(Record record) {
List<Float> values = getAllValues(record);
List<RLEEntry> rleValues = RLEUtil.encode(values);
record.setRleValues(rleValues);
record.setValues(null);
record.setCompressed(true);
record.setOriginalSize(values.size());
record.setUpdateTime(new Date());
}
private void updateCompressedRecord(Record record, List<Float> allValues) {
List<RLEEntry> rleValues = RLEUtil.encode(allValues);
recordMapper.updateRleValues(
record.getType(),
record.getVersion(),
rleValues,
new Date(),
allValues.size()
);
}
}
package com.test.mongodb.dto;
import java.util.Date;
public class RecordStats {
private int totalValues;
private boolean compressed;
private double compressionRatio;
private Date createTime;
private Date lastUpdateTime;
// Getters and Setters
public int getTotalValues() { return totalValues; }
public void setTotalValues(int totalValues) { this.totalValues = totalValues; }
public boolean isCompressed() { return compressed; }
public void setCompressed(boolean compressed) { this.compressed = compressed; }
public double getCompressionRatio() { return compressionRatio; }
public void setCompressionRatio(double compressionRatio) { this.compressionRatio = compressionRatio; }
public Date getCreateTime() { return createTime; }
public void setCreateTime(Date createTime) { this.createTime = createTime; }
public Date getLastUpdateTime() { return lastUpdateTime; }
public void setLastUpdateTime(Date lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; }
}
3.6 控制器类
package com.test.mongodb.controller;
import com.test.mongodb.dto.RecordStats;
import com.test.mongodb.entity.Record;
import com.test.mongodb.service.RecordService;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@RestController
@RequestMapping("record")
@Api(tags = "记录")
public class RecordController {
@Autowired
private RecordService recordService;
@PostMapping("/createOrUpdateRecord")
public ResponseEntity<Record> createOrUpdateRecord(
@RequestParam String type,
@RequestParam String version,
@RequestBody List<Float> values) {
Record record = recordService.saveOrUpdate(type, version, values);
return ResponseEntity.ok(record);
}
@GetMapping("/getRecord")
public ResponseEntity<Record> getRecord(
@RequestParam String type,
@RequestParam String version) {
List<Float> values = recordService.getValues(type, version);
Record record = new Record();
record.setType(type);
record.setVersion(version);
// 注意:这里返回的是包含解压后数据的简化对象
record.setValues(values);
return ResponseEntity.ok(record);
}
@GetMapping("/{id}")
public ResponseEntity<Record> getRecordById(@PathVariable String id) {
return recordService.findById(id)
.map(ResponseEntity::ok)
.orElse(ResponseEntity.notFound().build());
}
@GetMapping("/type/{type}")
public ResponseEntity<List<Record>> getRecordsByType(@PathVariable String type) {
List<Record> records = recordService.findByType(type);
return ResponseEntity.ok(records);
}
@GetMapping("/page")
public ResponseEntity<Page<Record>> getRecordsPage(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "10") int size) {
Page<Record> records = recordService.findAll(PageRequest.of(page, size));
return ResponseEntity.ok(records);
}
@PostMapping("/compress")
public ResponseEntity<Record> compressRecord(
@RequestParam String type,
@RequestParam String version) {
Record record = recordService.compressRecord(type, version);
return record != null ? ResponseEntity.ok(record) : ResponseEntity.notFound().build();
}
@PostMapping("/decompress")
public ResponseEntity<Record> decompressRecord(
@RequestParam String type,
@RequestParam String version) {
Record record = recordService.decompressRecord(type, version);
return record != null ? ResponseEntity.ok(record) : ResponseEntity.notFound().build();
}
@GetMapping("/stats")
public ResponseEntity<RecordStats> getRecordStats(
@RequestParam String type,
@RequestParam String version) {
RecordStats stats = recordService.getStats(type, version);
return stats != null ? ResponseEntity.ok(stats) : ResponseEntity.notFound().build();
}
@DeleteMapping("/delete")
public ResponseEntity<Void> deleteRecord(
@RequestParam String type,
@RequestParam String version) {
recordService.deleteRecord(type, version);
return ResponseEntity.ok().build();
}
}
4. 使用示例
4.1 插入数据示例
java
// 插入第一条数据
List<Float> values1 = Arrays.asList(1.23f, 4.21f, 5.90f);
recordService.saveOrUpdate("q", "123", values1);
// 插入第二条数据(自动追加)
List<Float> values2 = Arrays.asList(5.78f, 9.88f);
recordService.saveOrUpdate("q", "123", values2);
// 插入大量重复数据(自动压缩)
List<Float> repeatedValues = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
repeatedValues.add(1.0f);
}
recordService.saveOrUpdate("q", "124", repeatedValues);
4.2 查询数据示例
java
// 查询数据(自动解压)
List<Float> values = recordService.getValues("q", "123");
// 获取统计信息
RecordService.RecordStats stats = recordService.getStats("q", "124");
System.out.println("压缩比: " + stats.getCompressionRatio());
5. 性能优化建议
5.1 索引优化
java
// 在实体类上添加复合索引
@CompoundIndex(name = "type_version_idx", def = "{'type': 1, 'version': 1}", unique = true)
5.2 批量操作优化
对于大批量数据插入,考虑使用MongoDB的批量操作
实现数据分片策略,避免单个文档过大
5.3 监控与调优
监控压缩比和查询性能
根据实际数据特征调整压缩阈值
定期清理历史数据
6. 总结
本项目实现了基于Spring Boot和MongoDB的智能数据存储系统,具有以下特点:
自动压缩: 根据数据特征自动选择是否使用RLE压缩
透明操作: 提供统一的API,压缩/解压对用户透明
高性能: 针对重复数据优化存储效率
灵活配置: 可调整压缩阈值和策略
完整功能: 提供增删改查、统计等完整功能
大量重复数据查询效果页面效果
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐

所有评论(0)