MongoDB的RLE压缩数据存储

1. RLE(Run-Length Encoding)算法详解

1.1 RLE基本概念

RLE是一种简单的无损数据压缩算法,特别适用于处理连续重复数据。其核心思想是将连续的重复数据值序列替换为一个值和重复次数。

1.2 RLE编码原理
原始数据: [1,1,1,1,1,2,2,3,3,3,3]

RLE编码: [(1,5), (2,2), (3,4)]

1.3 RLE优势与适用场景
优势: 算法简单、压缩效率高(针对重复数据)、解压快速

适用场景: 图像处理、日志数据、传感器数据等包含大量连续重复值的场景

  1. MongoDB与Spring Data MongoDB配置
    2.1 添加依赖
    xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>mongodb-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.3.5.RELEASE</version>
    </parent>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb</artifactId>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.0</version>
        </dependency>

        <dependency>
            <groupId>com.github.xiaoymin</groupId>
            <artifactId>knife4j-spring-boot-starter</artifactId>
            <version>3.0.3</version>
        </dependency>
    </dependencies>
</project>

2.2 配置文件
yaml

server:
  port: 9005
spring:
  application:
    name: mongodb-service
  data:
    mongodb:
      uri: mongodb://localhost:27017/test
  1. 完整项目实现
    3.1 数据库文档设计
    3.1.1 RLE编码文档结构
    json
{
  "_id": ObjectId("..."),
  "type": "q",
  "version": "123",
  "createTime": ISODate("2024-01-01T00:00:00Z"),
  "compressed": true,
  "rleValues": [
    {"value": 1.0, "count": 100},
    {"value": 2.0, "count": 50}
  ],
  "originalSize": 150,
  "lastUpdateTime": ISODate("2024-01-01T01:00:00Z")
}

3.1.2 未压缩文档结构(小数据量)
json

{
  "_id": ObjectId("..."),
  "type": "q", 
  "version": "124",
  "createTime": ISODate("2024-01-01T00:00:00Z"),
  "compressed": false,
  "values": [1.23, 4.21, 5.90],
  "lastUpdateTime": ISODate("2024-01-01T01:00:00Z")
}

3.2 实体类设计
3.2.1 通用父类

package com.test.mongodb.entity;

import lombok.Data;

import java.util.Date;
@Data
public class AbstractEntity {
    private String id;
    private String createUser;
    private Date createTime;
    private String updateUser;
    private Date updateTime;
}

RLE条目类

package com.test.mongodb.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.mongodb.core.mapping.Field;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class RLEEntry{
    @Field("value")
    private float value;

    @Field("count")
    private int count;
}

3.2.2 主文档实体类
java

package com.test.mongodb.entity;

import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.CompoundIndex;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;

import java.util.List;

@Data
@Document(collection = "record")
@CompoundIndex(name = "type_version_idx", def = "{'type': 1, 'version': 1}", unique = true)
public class Record  extends AbstractEntity{
    @Id
    private String id;

    private String type;
    private String version;

    @Field("compressed")
    private boolean compressed;

    @Field("values")
    private List<Float> values;

    @Field("rleValues")
    private List<RLEEntry> rleValues;

    @Field("originalSize")
    private Integer originalSize;

    public Record(String type, String version, List<Float> values) {
        this();
        this.type = type;
        this.version = version;
        this.values = values;
        this.compressed = false;
    }

    public Record() {
    }
}

3.3 RLE工具类
java

package com.test.mongodb.utils;

import com.test.mongodb.entity.RLEEntry;

import java.util.ArrayList;
import java.util.List;

public class RLEUtil {
    // 压缩阈值:当连续重复值超过此阈值时使用RLE压缩
    private static final int COMPRESSION_THRESHOLD = 10;

    /**
     * RLE编码压缩
     */
    public static List<RLEEntry> encode(List<Float> values) {
        if (values == null || values.isEmpty()) {
            return new ArrayList<>();
        }

        List<RLEEntry> encoded = new ArrayList<>();
        float currentValue = values.get(0);
        int count = 1;

        for (int i = 1; i < values.size(); i++) {
            if (values.get(i).equals(currentValue)) {
                count++;
            } else {
                encoded.add(new RLEEntry(currentValue, count));
                currentValue = values.get(i);
                count = 1;
            }
        }
        encoded.add(new RLEEntry(currentValue, count));

        return encoded;
    }

    /**
     * RLE解码解压
     */
    public static List<Float> decode(List<RLEEntry> rleEntries) {
        List<Float> decoded = new ArrayList<>();
        if (rleEntries == null) return decoded;

        for (RLEEntry entry : rleEntries) {
            for (int i = 0; i < entry.getCount(); i++) {
                decoded.add(entry.getValue());
            }
        }
        return decoded;
    }

    /**
     * 判断是否应该使用压缩(基于数据特征)
     */
    public static boolean shouldCompress(List<Float> values) {
        if (values == null || values.size() < COMPRESSION_THRESHOLD) {
            return false;
        }

        // 检查连续重复模式
        int maxRunLength = 1;
        int currentRunLength = 1;

        for (int i = 1; i < values.size(); i++) {
            if (values.get(i).equals(values.get(i - 1))) {
                currentRunLength++;
                maxRunLength = Math.max(maxRunLength, currentRunLength);
            } else {
                currentRunLength = 1;
            }
        }

        return maxRunLength >= COMPRESSION_THRESHOLD;
    }

    /**
     * 计算压缩比
     */
    public static double calculateCompressionRatio(List<Float> original, List<RLEEntry> compressed) {
        if (original == null || compressed == null) return 0.0;

        int originalSize = original.size();
        int compressedSize = compressed.size() * 2; // 每个RLE条目包含值和计数

        return originalSize == 0 ? 0.0 : (double) compressedSize / originalSize;
    }
}

3.4 Repository接口
java

package com.test.mongodb.mapper;

import com.test.mongodb.entity.RLEEntry;
import com.test.mongodb.entity.Record;
import org.apache.ibatis.annotations.Update;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.data.mongodb.repository.Query;

import java.util.Date;
import java.util.List;
import java.util.Optional;

public interface RecordMapper extends MongoRepository<Record, String> {
    Optional<Record> findByTypeAndVersion(String type, String version);

    List<Record> findByType(String type);

    List<Record> findByCompressed(boolean compressed);

    @Query("{'type': ?0, 'version': ?1}")
    @Update("{$set: {'lastUpdateTime': ?2}}")
    void updateLastUpdateTime(String type, String version, Date lastUpdateTime);

    @Query("{'type': ?0, 'version': ?1, 'compressed': false}")
    @Update("{$push: {'values': {$each: ?2}}, $set: {'lastUpdateTime': ?3}}")
    void appendValues(String type, String version, List<Float> newValues, Date lastUpdateTime);

    @Query("{'type': ?0, 'version': ?1, 'compressed': true}")
    @Update("{$set: {'rleValues': ?2, 'lastUpdateTime': ?3, 'originalSize': ?4}}")
    void updateRleValues(String type, String version, List<RLEEntry> rleValues, Date lastUpdateTime, int originalSize);

    @Query("{'createTime': {$gte: ?0, $lte: ?1}}")
    List<Record> findByCreateTimeBetween(Date start, Date end);
}

3.5 服务实现类

package com.test.mongodb.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.test.mongodb.dto.RecordStats;
import com.test.mongodb.entity.Record;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;

import java.util.List;
import java.util.Optional;

public interface RecordService {
    Record saveOrUpdate(String type, String version, List<Float> newValues);

    List<Float> getValues(String type, String version);

    List<Record> findByType(String type);

    Record compressRecord(String type, String version);

    Record decompressRecord(String type, String version);

    RecordStats getStats(String type, String version);

    void deleteRecord(String type, String version);

    Optional<Record> findById(String id);

    Page<Record> findAll(PageRequest of);
}

package com.test.mongodb.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.test.mongodb.dto.RecordStats;
import com.test.mongodb.entity.RLEEntry;
import com.test.mongodb.entity.Record;
import com.test.mongodb.mapper.RecordMapper;
import com.test.mongodb.service.RecordService;
import com.test.mongodb.utils.RLEUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;

@Service
public class RecordServiceImpl implements RecordService{
    @Autowired
    private RecordMapper recordMapper;

    // 压缩阈值配置
    private static final int COMPRESSION_SIZE_THRESHOLD = 1000; // 数据量超过1000条时考虑压缩

    /**
     * 插入或更新记录(自动处理压缩)
     */
    public Record saveOrUpdate(String type, String version, List<Float> newValues) {
        Optional<Record> existingRecord = recordMapper.findByTypeAndVersion(type, version);

        if (existingRecord.isPresent()) {
            return appendToExistingRecord(existingRecord.get(), newValues);
        } else {
            return createNewRecord(type, version, newValues);
        }
    }

    /**
     * 创建新记录
     */
    private Record createNewRecord(String type, String version, List<Float> values) {
        Record record = new Record(type, version, values);

        // 判断是否需要压缩
        if (shouldCompress(values)) {
            compressRecord(record);
        }

        return recordMapper.save(record);
    }

    /**
     * 向现有记录追加数据
     */
    private Record appendToExistingRecord(Record existingRecord, List<Float> newValues) {
        List<Float> allValues = getAllValues(existingRecord);
        allValues.addAll(newValues);

        // 检查是否需要重新评估压缩策略
        if (existingRecord.isCompressed()) {
            // 已压缩的记录,直接更新RLE编码
            updateCompressedRecord(existingRecord, allValues);
        } else {
            // 未压缩的记录,检查是否需要压缩
            if (shouldCompress(allValues)) {
                compressRecord(existingRecord);
                existingRecord.setValues(null);
                recordMapper.save(existingRecord);
            } else {
                // 保持未压缩状态,直接追加
                recordMapper.appendValues(
                        existingRecord.getType(),
                        existingRecord.getVersion(),
                        newValues,
                        new Date()
                );
            }
        }

        return recordMapper.findByTypeAndVersion(
                existingRecord.getType(),
                existingRecord.getVersion()
        ).orElse(null);
    }

    /**
     * 获取记录的所有值(自动解压)
     */
    public List<Float> getValues(String type, String version) {
        Optional<Record> record = recordMapper.findByTypeAndVersion(type, version);
        return record.map(this::getAllValues).orElse(new ArrayList<>());
    }

    /**
     * 根据ID查找记录
     */
    public Optional<Record> findById(String id) {
        return recordMapper.findById(id);
    }

    @Override
    public Page<Record> findAll(PageRequest of) {
        return recordMapper.findAll(of);
    }

    /**
     * 根据类型查找记录
     */
    public List<Record> findByType(String type) {
        return recordMapper.findByType(type);
    }

    /**
     * 分页查询
     */
    public Page<Record> findAll(Pageable pageable) {
        return recordMapper.findAll(pageable);
    }

    /**
     * 删除记录
     */
    public void deleteRecord(String type, String version) {
        Optional<Record> record = recordMapper.findByTypeAndVersion(type, version);
        record.ifPresent(recordMapper::delete);
    }

    /**
     * 手动触发压缩
     */
    public Record compressRecord(String type, String version) {
        Optional<Record> recordOpt = recordMapper.findByTypeAndVersion(type, version);
        if (recordOpt.isPresent()) {
            Record record = recordOpt.get();
            if (!record.isCompressed() && shouldCompress(getAllValues(record))) {
                compressRecord(record);
                return recordMapper.save(record);
            }
        }
        return recordOpt.orElse(null);
    }

    /**
     * 手动解压缩
     */
    public Record decompressRecord(String type, String version) {
        Optional<Record> recordOpt = recordMapper.findByTypeAndVersion(type, version);
        if (recordOpt.isPresent()) {
            Record record = recordOpt.get();
            if (record.isCompressed()) {
                List<Float> values = RLEUtil.decode(record.getRleValues());
                record.setValues(values);
                record.setRleValues(null);
                record.setCompressed(false);
                record.setOriginalSize(null);
                return recordMapper.save(record);
            }
        }
        return recordOpt.orElse(null);
    }

    /**
     * 获取统计信息
     */
    public RecordStats getStats(String type, String version) {
        Optional<Record> recordOpt = recordMapper.findByTypeAndVersion(type, version);
        if (recordOpt.isPresent()) {
            Record record = recordOpt.get();
            List<Float> values = getAllValues(record);

            RecordStats stats = new RecordStats();
            stats.setTotalValues(values.size());
            stats.setCompressed(record.isCompressed());
            stats.setCompressionRatio(record.isCompressed() ?
                    RLEUtil.calculateCompressionRatio(values, record.getRleValues()) : 1.0);
            stats.setCreateTime(record.getCreateTime());
            stats.setLastUpdateTime(record.getUpdateTime());

            return stats;
        }
        return null;
    }

    // 内部辅助方法
    private List<Float> getAllValues(Record record) {
        if (record.isCompressed()) {
            return RLEUtil.decode(record.getRleValues());
        } else {
            return record.getValues() != null ? record.getValues() : new ArrayList<>();
        }
    }

    private boolean shouldCompress(List<Float> values) {
        return values.size() > COMPRESSION_SIZE_THRESHOLD && RLEUtil.shouldCompress(values);
    }

    private void compressRecord(Record record) {
        List<Float> values = getAllValues(record);
        List<RLEEntry> rleValues = RLEUtil.encode(values);

        record.setRleValues(rleValues);
        record.setValues(null);
        record.setCompressed(true);
        record.setOriginalSize(values.size());
        record.setUpdateTime(new Date());
    }

    private void updateCompressedRecord(Record record, List<Float> allValues) {
        List<RLEEntry> rleValues = RLEUtil.encode(allValues);
        recordMapper.updateRleValues(
                record.getType(),
                record.getVersion(),
                rleValues,
                new Date(),
                allValues.size()
        );
    }
}

package com.test.mongodb.dto;

import java.util.Date;

public class RecordStats {
    private int totalValues;
    private boolean compressed;
    private double compressionRatio;
    private Date createTime;
    private Date lastUpdateTime;

    // Getters and Setters
    public int getTotalValues() { return totalValues; }
    public void setTotalValues(int totalValues) { this.totalValues = totalValues; }

    public boolean isCompressed() { return compressed; }
    public void setCompressed(boolean compressed) { this.compressed = compressed; }

    public double getCompressionRatio() { return compressionRatio; }
    public void setCompressionRatio(double compressionRatio) { this.compressionRatio = compressionRatio; }

    public Date getCreateTime() { return createTime; }
    public void setCreateTime(Date createTime) { this.createTime = createTime; }

    public Date getLastUpdateTime() { return lastUpdateTime; }
    public void setLastUpdateTime(Date lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; }
}

3.6 控制器类

package com.test.mongodb.controller;

import com.test.mongodb.dto.RecordStats;
import com.test.mongodb.entity.Record;
import com.test.mongodb.service.RecordService;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
@RequestMapping("record")
@Api(tags = "记录")
public class RecordController {
    @Autowired
    private RecordService recordService;

    @PostMapping("/createOrUpdateRecord")
    public ResponseEntity<Record> createOrUpdateRecord(
            @RequestParam String type,
            @RequestParam String version,
            @RequestBody List<Float> values) {

        Record record = recordService.saveOrUpdate(type, version, values);
        return ResponseEntity.ok(record);
    }

    @GetMapping("/getRecord")
    public ResponseEntity<Record> getRecord(
            @RequestParam String type,
            @RequestParam String version) {

        List<Float> values = recordService.getValues(type, version);
        Record record = new Record();
        record.setType(type);
        record.setVersion(version);
        // 注意:这里返回的是包含解压后数据的简化对象
        record.setValues(values);

        return ResponseEntity.ok(record);
    }

    @GetMapping("/{id}")
    public ResponseEntity<Record> getRecordById(@PathVariable String id) {
        return recordService.findById(id)
                .map(ResponseEntity::ok)
                .orElse(ResponseEntity.notFound().build());
    }

    @GetMapping("/type/{type}")
    public ResponseEntity<List<Record>> getRecordsByType(@PathVariable String type) {
        List<Record> records = recordService.findByType(type);
        return ResponseEntity.ok(records);
    }

    @GetMapping("/page")
    public ResponseEntity<Page<Record>> getRecordsPage(
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "10") int size) {

        Page<Record> records = recordService.findAll(PageRequest.of(page, size));
        return ResponseEntity.ok(records);
    }

    @PostMapping("/compress")
    public ResponseEntity<Record> compressRecord(
            @RequestParam String type,
            @RequestParam String version) {

        Record record = recordService.compressRecord(type, version);
        return record != null ? ResponseEntity.ok(record) : ResponseEntity.notFound().build();
    }

    @PostMapping("/decompress")
    public ResponseEntity<Record> decompressRecord(
            @RequestParam String type,
            @RequestParam String version) {

        Record record = recordService.decompressRecord(type, version);
        return record != null ? ResponseEntity.ok(record) : ResponseEntity.notFound().build();
    }

    @GetMapping("/stats")
    public ResponseEntity<RecordStats> getRecordStats(
            @RequestParam String type,
            @RequestParam String version) {

        RecordStats stats = recordService.getStats(type, version);
        return stats != null ? ResponseEntity.ok(stats) : ResponseEntity.notFound().build();
    }

    @DeleteMapping("/delete")
    public ResponseEntity<Void> deleteRecord(
            @RequestParam String type,
            @RequestParam String version) {

        recordService.deleteRecord(type, version);
        return ResponseEntity.ok().build();
    }
}

4. 使用示例

4.1 插入数据示例

java

// 插入第一条数据
List<Float> values1 = Arrays.asList(1.23f, 4.21f, 5.90f);
recordService.saveOrUpdate("q", "123", values1);

// 插入第二条数据(自动追加)
List<Float> values2 = Arrays.asList(5.78f, 9.88f);
recordService.saveOrUpdate("q", "123", values2);

// 插入大量重复数据(自动压缩)
List<Float> repeatedValues = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
    repeatedValues.add(1.0f);
}
recordService.saveOrUpdate("q", "124", repeatedValues);

4.2 查询数据示例

java

// 查询数据(自动解压)
List<Float> values = recordService.getValues("q", "123");

// 获取统计信息
RecordService.RecordStats stats = recordService.getStats("q", "124");
System.out.println("压缩比: " + stats.getCompressionRatio());

5. 性能优化建议

5.1 索引优化

java
// 在实体类上添加复合索引
@CompoundIndex(name = "type_version_idx", def = "{'type': 1, 'version': 1}", unique = true)

5.2 批量操作优化
对于大批量数据插入,考虑使用MongoDB的批量操作

实现数据分片策略,避免单个文档过大

5.3 监控与调优
监控压缩比和查询性能

根据实际数据特征调整压缩阈值

定期清理历史数据

6. 总结

本项目实现了基于Spring Boot和MongoDB的智能数据存储系统,具有以下特点:

自动压缩: 根据数据特征自动选择是否使用RLE压缩

透明操作: 提供统一的API,压缩/解压对用户透明

高性能: 针对重复数据优化存储效率

灵活配置: 可调整压缩阈值和策略

完整功能: 提供增删改查、统计等完整功能

大量重复数据查询效果页面效果
在这里插入图片描述

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐