在这里插入图片描述

👋 大家好,欢迎来到我的技术博客!
💻 作为一名热爱 Java 与软件开发的程序员,我始终相信:清晰的逻辑 + 持续的积累 = 稳健的成长
📚 在这里,我会分享学习笔记、实战经验与技术思考,力求用简单的方式讲清楚复杂的问题。
🎯 本文将围绕MySQL这个话题展开,希望能为你带来一些启发或实用的参考。
🌱 无论你是刚入门的新手,还是正在进阶的开发者,希望你都能有所收获!


MySQL - 物联网场景实战:海量时序数据存储与查询优化 📊📈

在物联网(IoT)时代,设备产生的数据量呈爆炸式增长。这些数据通常以时序数据的形式存在,例如传感器读数、设备状态、地理位置信息等。如何高效地存储和查询这些海量时序数据,是构建稳定可靠的物联网应用的关键挑战之一。本文将深入探讨在 MySQL 数据库中处理海量时序数据的策略,并结合 Java 代码示例进行实践演示。我们将重点关注数据建模、索引优化、分区策略以及查询性能调优等方面。

引言:物联网数据的挑战 🚀

物联网设备持续不断地产生数据流,这些数据具有以下特点:

  • 高并发性:大量设备同时上报数据。
  • 高频率:数据生成频率极高,可能每秒甚至毫秒级。
  • 海量性:随着时间推移,数据量迅速膨胀。
  • 时序性:数据具有明确的时间戳,按时间顺序排列。
  • 实时性要求:对数据的实时查询和分析有较高需求。

传统的数据库设计和查询方式往往难以应对如此庞大的数据量和高并发请求。MySQL 虽然不是专为时序数据设计的数据库(如 InfluxDB、TimescaleDB),但通过合理的架构设计和优化手段,它依然可以在处理 IoT 场景下的时序数据方面发挥重要作用。

数据模型设计:从基础到优化 💡

1. 基础表结构设计

让我们从一个最简单的 IoT 数据表开始。假设我们有一个温度传感器网络,需要记录每个传感器的温度值及其时间戳。

-- 创建原始数据表 (基础版)
CREATE TABLE sensor_readings (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    sensor_id VARCHAR(50) NOT NULL,
    reading_time DATETIME NOT NULL,
    temperature DECIMAL(5,2) NOT NULL,
    humidity DECIMAL(5,2),
    -- 可以添加更多字段,如设备状态、位置等
    INDEX idx_sensor_time (sensor_id, reading_time),
    INDEX idx_reading_time (reading_time)
) ENGINE=InnoDB;
  • id: 主键,自增,用于唯一标识每条记录。
  • sensor_id: 传感器 ID,用于区分不同的设备。
  • reading_time: 时间戳,记录数据采集的具体时刻。这是时序数据的核心。
  • temperaturehumidity: 具体的测量值。
  • INDEX idx_sensor_time (sensor_id, reading_time): 复合索引,用于加速基于传感器和时间范围的查询。
  • INDEX idx_reading_time (reading_time): 单独的时间索引,用于加速按时间排序或范围查询。

⚠️ 注意: 在生产环境中,reading_time 字段应使用 TIMESTAMPDATETIME 类型。TIMESTAMP 在某些情况下占用更少空间(4 字节 vs 8 字节),并且可以自动更新(如果设置了 ON UPDATE CURRENT_TIMESTAMP)。对于需要精确到微秒的场景,考虑使用 DATETIME(6)

2. 高效的表结构优化

针对物联网场景,我们可以进一步优化表结构以提高性能。

a. 使用更小的数据类型

选择合适的数据类型至关重要,可以显著减少存储空间和提高 I/O 效率。

-- 优化后的表结构
CREATE TABLE optimized_sensor_readings (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    sensor_id VARCHAR(32) NOT NULL, -- 根据实际ID长度调整
    reading_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    temperature SMALLINT SIGNED, -- 假设温度范围在 -32768 到 32767 之间,精度可接受
    humidity TINYINT UNSIGNED,     -- 假设湿度范围在 0-255 之间
    -- 其他字段...
    INDEX idx_sensor_time (sensor_id, reading_time),
    INDEX idx_reading_time (reading_time)
) ENGINE=InnoDB ROW_FORMAT=DYNAMIC;
  • BIGINT UNSIGNED: 对于自增主键,使用无符号整数可以增加正数范围。
  • VARCHAR(32): 根据实际传感器 ID 的长度,尽可能缩小字段大小。
  • SMALLINT SIGNED / TINYINT UNSIGNED: 如果数据精度允许,使用更小的数据类型。
  • ROW_FORMAT=DYNAMIC: 对于包含变长字段(如 VARCHAR)的表,动态行格式可能比默认的紧凑格式更高效。
b. 选择合适的存储引擎
  • InnoDB: 是 MySQL 的默认存储引擎,支持事务、外键和行级锁定。对于大多数 IoT 应用来说,它是首选。它能很好地处理并发写入。
  • MyISAM: 不支持事务和行级锁定,但其查询速度可能更快。对于只读或极少写入的场景(如历史数据归档),可以考虑。
c. 分区表(Partitioning)

当数据量达到千万甚至上亿级别时,分区是提升查询性能的关键技术。MySQL 支持多种分区策略,其中按时间分区(RANGE 分区)对于时序数据非常有效。

-- 按月分区的表
CREATE TABLE partitioned_sensor_readings (
    id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
    sensor_id VARCHAR(32) NOT NULL,
    reading_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    temperature SMALLINT SIGNED,
    humidity TINYINT UNSIGNED,
    -- 其他字段...
    INDEX idx_sensor_time (sensor_id, reading_time),
    INDEX idx_reading_time (reading_time)
) ENGINE=InnoDB ROW_FORMAT=DYNAMIC
PARTITION BY RANGE (TO_DAYS(reading_time)) (
    PARTITION p2024_01 VALUES LESS THAN (TO_DAYS('2024-02-01')),
    PARTITION p2024_02 VALUES LESS THAN (TO_DAYS('2024-03-01')),
    PARTITION p2024_03 VALUES LESS THAN (TO_DAYS('2024-04-01')),
    -- ... 更多分区 ...
    PARTITION p_future VALUES LESS THAN MAXVALUE
);
  • PARTITION BY RANGE (TO_DAYS(reading_time)): 根据 reading_time 字段的日期值进行分区。
  • PARTITION p2024_01 VALUES LESS THAN (TO_DAYS('2024-02-01')): 这个分区存储 2024 年 1 月的所有数据。
  • PARTITION p_future VALUES LESS THAN MAXVALUE: 存储所有未来时间的数据。这通常用于处理新加入的月份。
  • 注意: 分区需要定期维护,比如添加新的月份分区。

3. 时序数据的聚合表(Aggregation Tables)

为了优化特定类型的查询(如获取某天的平均温度),可以创建专门的聚合表。

-- 聚合表:按天统计传感器数据
CREATE TABLE daily_aggregated_readings (
    sensor_id VARCHAR(32) NOT NULL,
    date DATE NOT NULL,
    avg_temperature DECIMAL(5,2),
    max_temperature DECIMAL(5,2),
    min_temperature DECIMAL(5,2),
    avg_humidity DECIMAL(5,2),
    max_humidity DECIMAL(5,2),
    min_humidity DECIMAL(5,2),
    record_count INT UNSIGNED,
    PRIMARY KEY (sensor_id, date),
    INDEX idx_date (date)
) ENGINE=InnoDB ROW_FORMAT=DYNAMIC;

这个表可以定期由后台任务从原始表计算并填充。这样,当用户查询“某天的平均温度”时,可以直接从聚合表中获取结果,而无需扫描大量的原始数据。

Java 代码实现:数据写入与查询 🔧

现在,我们通过 Java 代码来演示如何操作上述的表结构。

1. 项目依赖

首先,在 pom.xml 中添加必要的依赖项:

<dependencies>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.33</version> <!-- 请根据实际情况选择版本 -->
    </dependency>
    <dependency>
        <groupId>com.zaxxer</groupId>
        <artifactId>HikariCP</artifactId>
        <version>5.0.1</version> <!-- 连接池 -->
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.9</version> <!-- 日志 -->
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.4.11</version> <!-- 日志实现 -->
    </dependency>
    <!-- 如果使用 Lombok 减少样板代码 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.30</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

2. 数据库配置与连接池

// DatabaseConfig.java
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

import javax.sql.DataSource;
import java.util.Properties;

public class DatabaseConfig {

    private static final String DB_URL = "jdbc:mysql://localhost:3306/iot_db?useSSL=false&serverTimezone=UTC";
    private static final String DB_USER = "your_username"; // 替换为你的用户名
    private static final String DB_PASSWORD = "your_password"; // 替换为你的密码

    public static DataSource getDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl(DB_URL);
        config.setUsername(DB_USER);
        config.setPassword(DB_PASSWORD);

        // 连接池配置
        config.setMaximumPoolSize(20); // 最大连接数
        config.setMinimumIdle(5);      // 最小空闲连接数
        config.setConnectionTimeout(30000); // 连接超时时间 (ms)
        config.setIdleTimeout(600000);      // 空闲超时时间 (ms)
        config.setMaxLifetime(1800000);     // 连接最大存活时间 (ms)

        return new HikariDataSource(config);
    }
}

3. 实体类定义

// SensorReading.java
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;

@Data
public class SensorReading {
    private Long id; // 自增主键
    private String sensorId;
    private LocalDateTime readingTime; // 使用 LocalDateTime
    private BigDecimal temperature;
    private BigDecimal humidity;

    // 构造函数
    public SensorReading() {}

    public SensorReading(String sensorId, LocalDateTime readingTime, BigDecimal temperature, BigDecimal humidity) {
        this.sensorId = sensorId;
        this.readingTime = readingTime;
        this.temperature = temperature;
        this.humidity = humidity;
    }

    // getter 和 setter 方法 (Lombok 自动生成)
}

4. 数据访问层 (DAO)

// SensorReadingDAO.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.sql.*;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;

public class SensorReadingDAO {

    private static final Logger logger = LoggerFactory.getLogger(SensorReadingDAO.class);
    private final DataSource dataSource;

    public SensorReadingDAO(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    /**
     * 批量插入传感器数据
     */
    public void batchInsert(List<SensorReading> readings) {
        if (readings == null || readings.isEmpty()) {
            logger.warn("No readings to insert.");
            return;
        }

        String sql = "INSERT INTO optimized_sensor_readings (sensor_id, reading_time, temperature, humidity) VALUES (?, ?, ?, ?)";

        try (Connection conn = dataSource.getConnection();
             PreparedStatement pstmt = conn.prepareStatement(sql)) {

            for (SensorReading reading : readings) {
                pstmt.setString(1, reading.getSensorId());
                pstmt.setTimestamp(2, Timestamp.valueOf(reading.getReadingTime()));
                pstmt.setBigDecimal(3, reading.getTemperature());
                pstmt.setBigDecimal(4, reading.getHumidity());

                pstmt.addBatch(); // 添加到批处理
            }

            int[] results = pstmt.executeBatch(); // 执行批处理
            logger.info("Successfully inserted {} records.", results.length);

        } catch (SQLException e) {
            logger.error("Error inserting batch of sensor readings", e);
            throw new RuntimeException("Failed to insert sensor readings", e);
        }
    }

    /**
     * 查询指定传感器在指定时间范围内的数据
     */
    public List<SensorReading> getReadingsBySensorAndTimeRange(String sensorId, LocalDateTime startTime, LocalDateTime endTime) {
        List<SensorReading> readings = new ArrayList<>();

        // 注意:这里假设查询条件中的时间是闭区间 [startTime, endTime]
        String sql = "SELECT id, sensor_id, reading_time, temperature, humidity " +
                    "FROM optimized_sensor_readings " +
                    "WHERE sensor_id = ? AND reading_time BETWEEN ? AND ? " +
                    "ORDER BY reading_time ASC";

        try (Connection conn = dataSource.getConnection();
             PreparedStatement pstmt = conn.prepareStatement(sql)) {

            pstmt.setString(1, sensorId);
            pstmt.setTimestamp(2, Timestamp.valueOf(startTime));
            pstmt.setTimestamp(3, Timestamp.valueOf(endTime));

            try (ResultSet rs = pstmt.executeQuery()) {
                while (rs.next()) {
                    SensorReading reading = new SensorReading();
                    reading.setId(rs.getLong("id"));
                    reading.setSensorId(rs.getString("sensor_id"));
                    reading.setReadingTime(rs.getTimestamp("reading_time").toLocalDateTime());
                    reading.setTemperature(rs.getBigDecimal("temperature"));
                    reading.setHumidity(rs.getBigDecimal("humidity"));

                    readings.add(reading);
                }
            }

        } catch (SQLException e) {
            logger.error("Error querying sensor readings for sensor {} between {} and {}", sensorId, startTime, endTime, e);
            throw new RuntimeException("Failed to query sensor readings", e);
        }

        return readings;
    }

    /**
     * 获取最近一次的传感器数据
     */
    public SensorReading getLastReading(String sensorId) {
        String sql = "SELECT id, sensor_id, reading_time, temperature, humidity " +
                    "FROM optimized_sensor_readings " +
                    "WHERE sensor_id = ? " +
                    "ORDER BY reading_time DESC LIMIT 1";

        try (Connection conn = dataSource.getConnection();
             PreparedStatement pstmt = conn.prepareStatement(sql)) {

            pstmt.setString(1, sensorId);

            try (ResultSet rs = pstmt.executeQuery()) {
                if (rs.next()) {
                    SensorReading reading = new SensorReading();
                    reading.setId(rs.getLong("id"));
                    reading.setSensorId(rs.getString("sensor_id"));
                    reading.setReadingTime(rs.getTimestamp("reading_time").toLocalDateTime());
                    reading.setTemperature(rs.getBigDecimal("temperature"));
                    reading.setHumidity(rs.getBigDecimal("humidity"));

                    return reading;
                }
            }

        } catch (SQLException e) {
            logger.error("Error getting last reading for sensor {}", sensorId, e);
            throw new RuntimeException("Failed to get last sensor reading", e);
        }

        return null; // 没有找到记录
    }
}

5. 示例应用

// IotApplication.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class IotApplication {

    private static final Logger logger = LoggerFactory.getLogger(IotApplication.class);
    private static final Random random = new Random();

    public static void main(String[] args) {
        // 初始化数据库连接池
        var dataSource = DatabaseConfig.getDataSource();
        var dao = new SensorReadingDAO(dataSource);

        // 模拟生成测试数据
        List<SensorReading> testReadings = generateTestReadings(1000); // 生成 1000 条数据

        // 插入数据
        long startInsert = System.currentTimeMillis();
        dao.batchInsert(testReadings);
        long endInsert = System.currentTimeMillis();
        logger.info("Inserted {} records in {} ms", testReadings.size(), endInsert - startInsert);

        // 查询测试
        String testSensorId = "SENSOR_001";
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime oneHourAgo = now.minusHours(1);

        long startQuery = System.currentTimeMillis();
        List<SensorReading> readings = dao.getReadingsBySensorAndTimeRange(testSensorId, oneHourAgo, now);
        long endQuery = System.currentTimeMillis();

        logger.info("Retrieved {} records in {} ms", readings.size(), endQuery - startQuery);

        // 显示部分结果
        if (!readings.isEmpty()) {
            logger.info("Sample data:");
            int count = Math.min(5, readings.size()); // 只显示前5条
            for (int i = 0; i < count; i++) {
                SensorReading r = readings.get(i);
                logger.info("  Sensor: {}, Time: {}, Temp: {}, Humidity: {}",
                        r.getSensorId(), r.getReadingTime(), r.getTemperature(), r.getHumidity());
            }
        }

        // 获取最后一条记录
        SensorReading lastReading = dao.getLastReading(testSensorId);
        if (lastReading != null) {
            logger.info("Last reading for {}: {} at {}", testSensorId, lastReading.getTemperature(), lastReading.getReadingTime());
        } else {
            logger.info("No readings found for sensor {}", testSensorId);
        }

        logger.info("Application finished.");
    }

    /**
     * 生成模拟的传感器读数
     */
    private static List<SensorReading> generateTestReadings(int count) {
        List<SensorReading> readings = new ArrayList<>();
        LocalDateTime baseTime = LocalDateTime.now().minusDays(1); // 从一天前开始

        for (int i = 0; i < count; i++) {
            String sensorId = "SENSOR_" + String.format("%03d", i % 10); // 循环使用 10 个传感器
            LocalDateTime time = baseTime.plusSeconds(i); // 每秒一条数据
            BigDecimal temp = BigDecimal.valueOf(20 + random.nextDouble() * 10).setScale(2, BigDecimal.ROUND_HALF_UP); // 20-30°C
            BigDecimal humidity = BigDecimal.valueOf(40 + random.nextDouble() * 30).setScale(2, BigDecimal.ROUND_HALF_UP); // 40-70%

            readings.add(new SensorReading(sensorId, time, temp, humidity));
        }

        return readings;
    }
}

6. 代码说明

  • 连接池: 使用了 HikariCP 作为连接池,它可以有效管理数据库连接,避免频繁创建/销毁连接带来的开销。
  • 批量插入: batchInsert 方法使用了 PreparedStatement.addBatch()executeBatch() 来批量执行 SQL 插入语句,相比逐条插入,性能提升显著。
  • 时间处理: 使用 LocalDateTime 来表示 Java 端的时间,通过 Timestamp.valueOf() 转换为数据库时间类型。
  • 查询优化: 查询语句利用了之前创建的复合索引 idx_sensor_time,可以快速定位到特定传感器在指定时间段内的数据。
  • 错误处理: 使用了基本的异常处理和日志记录。

性能优化策略:从索引到分区 🚀

1. 索引优化

索引是数据库性能优化的核心。对于 IoT 时序数据,我们需要精心设计索引。

a. 复合索引的选择

前面的表结构中已经包含了 idx_sensor_timeidx_reading_time

  • idx_sensor_time (sensor_id, reading_time):

    • 用途: 查询特定传感器在某个时间段内的数据。
    • 原理: MySQL 会使用这个索引来快速定位 sensor_id,然后在该传感器的范围内按 reading_time 排序。
    • 重要: sensor_id 放在前面,因为通常查询会先指定传感器 ID。
  • idx_reading_time (reading_time):

    • 用途: 查询某个时间段内的所有数据(不关心传感器)。
    • 原理: 直接根据时间范围查找。
b. 索引失效的场景
  • 范围查询后使用非索引列: WHERE sensor_id = 'S1' AND reading_time > '2024-01-01' AND other_column = 'value'。如果 other_column 没有索引,MySQL 可能无法有效利用 sensor_idreading_time 的组合索引。
  • 函数或表达式: WHERE YEAR(reading_time) = 2024 会导致索引失效。应该改写为 WHERE reading_time >= '2024-01-01' AND reading_time < '2025-01-01'
  • LIKE 通配符: WHERE sensor_id LIKE '%01' 会导致索引失效。LIKE '01%' 则不会。
c. 使用 EXPLAIN 分析查询计划
EXPLAIN SELECT id, sensor_id, reading_time, temperature, humidity
FROM optimized_sensor_readings
WHERE sensor_id = 'SENSOR_001' AND reading_time BETWEEN '2024-01-01 00:00:00' AND '2024-01-01 01:00:00'
ORDER BY reading_time ASC;

查看 EXPLAIN 输出的 key 字段是否使用了预期的索引,rows 字段估算的扫描行数是否合理。

2. 表分区优化

a. 分区的好处
  • 查询性能: 查询只需要扫描相关的分区,而不是整个表。
  • 维护效率: 可以单独删除或备份某个分区的数据。
  • 扩展性: 可以将不同分区分布到不同的物理存储上。
b. 分区策略选择

对于 IoT 时序数据,最常用的分区策略是 RANGE 分区,按时间划分。

-- 动态添加分区的示例 (需要手动管理或脚本自动化)
ALTER TABLE partitioned_sensor_readings ADD PARTITION (
    PARTITION p2024_04 VALUES LESS THAN (TO_DAYS('2024-05-01'))
);
c. 分区裁剪 (Partition Pruning)

当查询条件明确指定了分区键(这里是 reading_time)时,MySQL 会自动进行分区裁剪,只扫描符合条件的分区。

-- 这个查询只会扫描 p2024_01 分区
SELECT COUNT(*) FROM partitioned_sensor_readings WHERE reading_time >= '2024-01-01' AND reading_time < '2024-02-01';

3. 查询优化技巧

a. 避免 SELECT *

在实际应用中,除非确实需要所有字段,否则应明确指定所需字段。

-- 推荐
SELECT sensor_id, reading_time, temperature FROM optimized_sensor_readings WHERE sensor_id = 'S1';

-- 不推荐 (如果不需要 humidity)
SELECT * FROM optimized_sensor_readings WHERE sensor_id = 'S1';
b. 限制返回结果集大小

使用 LIMIT 限制返回结果数量。

-- 获取最新的 100 条记录
SELECT id, sensor_id, reading_time, temperature FROM optimized_sensor_readings ORDER BY reading_time DESC LIMIT 100;
c. 使用覆盖索引

如果查询所需的字段都在索引中,MySQL 可以直接从索引中获取数据,无需回表查询原表。

-- 如果创建了索引 idx_sensor_time_covering (sensor_id, reading_time, temperature)
-- 查询语句可以利用这个覆盖索引
SELECT reading_time, temperature FROM optimized_sensor_readings WHERE sensor_id = 'S1' AND reading_time > '2024-01-01' ORDER BY reading_time;
d. 避免全表扫描

确保查询条件能够利用到索引。可以通过 EXPLAIN 来验证。

4. 配置优化

a. InnoDB 缓冲池 (InnoDB Buffer Pool)

这是 InnoDB 存储引擎最重要的配置之一,用于缓存表数据和索引。

# my.cnf or my.ini
innodb_buffer_pool_size = 1G # 根据服务器内存调整,建议设置为物理内存的 50%-70%
b. 日志文件 (InnoDB Log Files)

InnoDB 使用重做日志(Redo Log)来保证事务的持久性。

# my.cnf or my.ini
innodb_log_file_size = 256M # 建议设置为 256MB 或更大,但不超过 512MB
innodb_log_files_in_group = 2 # 默认值
c. 查询缓存 (Query Cache) - 已废弃

从 MySQL 8.0 开始,查询缓存功能已被移除。因此,不再需要关注此配置。

d. 连接数和超时
# my.cnf or my.ini
max_connections = 500 # 根据并发需求调整
wait_timeout = 28800 # 等待超时时间
interactive_timeout = 28800 # 交互式连接超时时间

高级话题:聚合与历史数据处理 📈

1. 聚合表的构建与维护

正如前面提到的,为了加速特定查询(如统计),可以创建聚合表。这通常需要一个后台任务定期执行。

// AggregationService.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.sql.*;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class AggregationService {

    private static final Logger logger = LoggerFactory.getLogger(AggregationService.class);
    private final DataSource dataSource;
    private final ScheduledExecutorService scheduler;

    public AggregationService(DataSource dataSource) {
        this.dataSource = dataSource;
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
    }

    /**
     * 启动定时聚合任务 (每天凌晨 1 点执行)
     */
    public void startAggregationJob() {
        scheduler.scheduleAtFixedRate(this::performDailyAggregation, 0, 1, TimeUnit.DAYS);
    }

    /**
     * 执行每日聚合
     */
    private void performDailyAggregation() {
        LocalDate today = LocalDate.now();
        LocalDate yesterday = today.minusDays(1);

        logger.info("Starting daily aggregation for {}", yesterday);

        try (Connection conn = dataSource.getConnection()) {
            // 1. 删除昨天的旧聚合数据 (可选)
            deleteOldAggregatedData(conn, yesterday);

            // 2. 计算昨天的聚合数据
            calculateDailyAggregates(conn, yesterday);

            logger.info("Daily aggregation completed for {}", yesterday);
        } catch (SQLException e) {
            logger.error("Error performing daily aggregation for {}", yesterday, e);
        }
    }

    private void deleteOldAggregatedData(Connection conn, LocalDate date) throws SQLException {
        String sql = "DELETE FROM daily_aggregated_readings WHERE date = ?";
        try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
            pstmt.setDate(1, Date.valueOf(date));
            int rowsDeleted = pstmt.executeUpdate();
            logger.debug("Deleted {} old aggregated records for date {}", rowsDeleted, date);
        }
    }

    private void calculateDailyAggregates(Connection conn, LocalDate date) throws SQLException {
        LocalDateTime startOfDay = date.atStartOfDay();
        LocalDateTime endOfDay = date.atTime(LocalTime.MAX); // 23:59:59.999999999

        String sql = """
            INSERT INTO daily_aggregated_readings (sensor_id, date, avg_temperature, max_temperature, min_temperature, avg_humidity, max_humidity, min_humidity, record_count)
            SELECT
                sensor_id,
                ?
                AS date,
                AVG(temperature) AS avg_temperature,
                MAX(temperature) AS max_temperature,
                MIN(temperature) AS min_temperature,
                AVG(humidity) AS avg_humidity,
                MAX(humidity) AS max_humidity,
                MIN(humidity) AS min_humidity,
                COUNT(*) AS record_count
            FROM optimized_sensor_readings
            WHERE reading_time >= ? AND reading_time < ?
            GROUP BY sensor_id
            """;

        try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
            pstmt.setDate(1, Date.valueOf(date));
            pstmt.setTimestamp(2, Timestamp.valueOf(startOfDay));
            pstmt.setTimestamp(3, Timestamp.valueOf(endOfDay));

            int rowsInserted = pstmt.executeUpdate();
            logger.info("Inserted {} aggregated records for date {}", rowsInserted, date);
        }
    }

    public void shutdown() {
        scheduler.shutdown();
        try {
            if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
                scheduler.shutdownNow();
            }
        } catch (InterruptedException e) {
            scheduler.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

这个服务会在每天凌晨执行一次,计算前一天所有传感器的聚合数据并存入 daily_aggregated_readings 表。

2. 历史数据归档与清理

随着系统运行时间的增长,原始数据表会变得非常庞大。可以采用归档策略,将历史数据转移到专门的归档表或冷存储中。

a. 归档策略
  • 时间范围归档: 将超过一定时间(如一年)的数据移动到归档表。
  • 按传感器归档: 将不活跃的传感器数据归档。
  • 压缩归档: 对归档数据进行压缩,节省存储空间。
b. 示例:归档旧数据
// ArchiveService.java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import javax.sql.DataSource;

public class ArchiveService {

    private static final Logger logger = LoggerFactory.getLogger(ArchiveService.class);
    private final DataSource dataSource;

    public ArchiveService(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    /**
     * 归档超过 1 年的历史数据
     */
    public void archiveOldData() {
        LocalDateTime cutoffDate = LocalDateTime.now().minusYears(1);

        try (Connection conn = dataSource.getConnection()) {
            // 1. 移动数据到归档表
            moveDataToArchive(conn, cutoffDate);

            // 2. 删除源表中的数据
            deleteArchivedData(conn, cutoffDate);

            logger.info("Archived data older than {}", cutoffDate);
        } catch (SQLException e) {
            logger.error("Error archiving old data", e);
        }
    }

    private void moveDataToArchive(Connection conn, LocalDateTime cutoffDate) throws SQLException {
        // 假设有一个归档表 archive_sensor_readings
        String insertSql = """
            INSERT INTO archive_sensor_readings (id, sensor_id, reading_time, temperature, humidity)
            SELECT id, sensor_id, reading_time, temperature, humidity
            FROM optimized_sensor_readings
            WHERE reading_time < ?
            """;

        try (PreparedStatement pstmt = conn.prepareStatement(insertSql)) {
            pstmt.setTimestamp(1, Timestamp.valueOf(cutoffDate));
            int rowsMoved = pstmt.executeUpdate();
            logger.info("Moved {} records to archive table", rowsMoved);
        }
    }

    private void deleteArchivedData(Connection conn, LocalDateTime cutoffDate) throws SQLException {
        String deleteSql = "DELETE FROM optimized_sensor_readings WHERE reading_time < ?";
        try (PreparedStatement pstmt = conn.prepareStatement(deleteSql)) {
            pstmt.setTimestamp(1, Timestamp.valueOf(cutoffDate));
            int rowsDeleted = pstmt.executeUpdate();
            logger.info("Deleted {} records from original table", rowsDeleted);
        }
    }
}

3. 监控与性能分析

监控数据库性能对于及时发现问题至关重要。可以使用以下工具和技术:

  • MySQL Performance Schema: 内置的性能监控框架。
  • 慢查询日志 (Slow Query Log): 记录执行时间超过阈值的查询。
  • SHOW PROCESSLIST: 查看当前正在执行的进程。
  • 第三方监控工具: 如 Prometheus + Grafana, Zabbix 等。

总结与展望 📝

本文详细探讨了在 MySQL 中处理物联网海量时序数据的多种策略。通过合理的数据建模、索引优化、表分区以及 Java 代码示例,我们展示了如何构建一个高性能、可扩展的 IoT 数据存储系统。

关键点回顾:

  1. 数据模型: 选择合适的数据类型,设计有效的索引和分区策略。
  2. Java 实现: 使用连接池、批量操作、正确的日期处理等技术。
  3. 性能优化: 利用索引、避免全表扫描、查询计划分析、配置调优。
  4. 高级策略: 聚合表、历史数据归档、监控。

虽然 MySQL 不是专门为时序数据设计的,但在经过充分优化后,它仍然可以胜任大部分 IoT 场景下的数据存储和查询需求。然而,对于极高的写入吞吐量和复杂的时间序列分析需求,也可以考虑使用专业的时序数据库,如 InfluxDBTimescaleDB

未来,随着物联网设备的普及和数据量的持续增长,数据库技术也在不断发展。结合云原生、分布式计算等技术,未来的 IoT 数据平台将更加智能和高效。


参考文献与资源

Mermaid 图表:时序数据查询流程图

渲染错误: Mermaid 渲染失败: Parse error on line 5: ...引] D --> E[定位分区 (如果启用)] E --> F[ ----------------------^ Expecting 'SQE', 'DOUBLECIRCLEEND', 'PE', '-)', 'STADIUMEND', 'SUBROUTINEEND', 'PIPE', 'CYLINDEREND', 'DIAMOND_STOP', 'TAGEND', 'TRAPEND', 'INVTRAPEND', 'UNICODE_TEXT', 'TEXT', 'TAGSTART', got 'PS'

Mermaid 图表:数据写入流程图

小于阈值

大于阈值

IoT 设备

发送数据到应用服务器

应用服务器接收数据

数据校验

数据量大小

单条插入

批量插入

写入 MySQL

记录写入成功

通知前端/下游系统

Mermaid 图表:数据生命周期管理

原始数据表

定期聚合

聚合表

归档条件

归档表

删除原始数据

查询优化

长期存储/分析

释放空间

希望这篇博客能帮助你更好地理解和实践 MySQL 在物联网场景下的应用!🚀📊


🙌 感谢你读到这里!
🔍 技术之路没有捷径,但每一次阅读、思考和实践,都在悄悄拉近你与目标的距离。
💡 如果本文对你有帮助,不妨 👍 点赞、📌 收藏、📤 分享 给更多需要的朋友!
💬 欢迎在评论区留下你的想法、疑问或建议,我会一一回复,我们一起交流、共同成长 🌿
🔔 关注我,不错过下一篇干货!我们下期再见!✨

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐