一、需求背景
随着智能电网建设加速,某电力公司需要对辖区内 10 万户家庭的智能电表进行实时数据采集与分析。该系统面临三大技术挑战:一是电表每 15 分钟产生一条记录,日均产生 960 万条数据,传统关系型数据库写入性能不足;二是需要支持按时间范围快速查询(如查询某用户近 7 天用电量);三是需实时计算用电峰值、谷值等统计指标,为负荷预测提供数据支撑。传统数据库方案的局限性明显:
MySQL 等关系型数据库在高频写入场景下易出现锁表,无法满足每秒 thousands 级别的写入需求;
时序数据按时间自然过期,传统数据库缺乏自动数据生命周期管理;
对时间范围查询和聚合分析支持不足,复杂统计查询耗时过长;
难以高效支持按区域、用户类型等多维度标签组合查询。
为解决这些问题,公司选择SpringBoot 整合 InfluxDB方案:InfluxDB 作为开源时序数据库,专为时间序列数据设计,提供高写入性能、高效时间范围查询和自动数据保留策略,完美适配智能电表数据采集场景。
二、为什么选择 InfluxDB?
1. 时序数据优化的存储结构
数据按时间戳排序,写入时追加到文件末尾,避免随机 IO;
同一字段数据连续存储,大幅提升时间范围查询效率;
对设备 ID、区域等标签建立索引,支持快速过滤查询;
针对时序数据特性优化的压缩算法,存储效率比传统数据库高 5-10 倍。
2. 高性能写入能力
支持批量提交多条记录,减少网络开销;
内存缓存 + 异步刷盘,单机写入性能可达每秒数十万条;
写入操作不阻塞查询,适合读写并发场景。
3. 强大的时序数据查询能力
InfluxQL 和 Flux 支持时间范围过滤、聚合计算等操作;
内置滑动窗口、滚动窗口等函数,方便计算时段统计值;
自动预计算并存储聚合结果(如每小时用电量),加速查询。
4. 数据生命周期管理
可配置数据保留时间(如保留 365 天数据),自动删除过期数据;
按时间分片存储(如每 7 天一个分片),优化查询和删除效率。
5. 与物联网场景深度适配
单节点部署简单,资源占用低,适合边缘计算场景;
支持集群模式,满足大规模数据存储需求;
社区版完全免费,降低项目成本。
三、系统设计
1. 整体架构
系统分为四层:
智能电表通过 NB-IoT/4G 将数据发送到 MQTT broker;
SpringBoot 应用订阅 MQTT 主题,接收电表数据;
InfluxDB 存储原始数据和预计算指标;
提供数据查询、统计分析和异常监控功能。
2. 数据流程
智能电表每 15 分钟采集一次数据(电压、电流、功率、用电量);
/meter/{meterId}/data数据接入服务订阅 MQTT 主题,接收并解析数据;
接入服务将数据写入 InfluxDB,并触发实时计算;
应用服务从 InfluxDB 查询数据,提供查询和分析接口。
3. 数据模型设计
InfluxDB 数据模型包括:
- Measurement:类似关系型数据库的表,此处定义为
electricity_meter;- Tag:带索引的字段(查询条件),包括
meter_id(电表 ID)、user_id(用户 ID)、region(区域);- Field:不带索引的数值字段,包括
voltage(电压)、current(电流)、power(功率)、consumption(用电量);- Timestamp:数据产生时间戳。
4、代码实操
1. 环境准备
|
组件 |
版本 |
作用 |
|---|---|---|
|
SpringBoot |
2.7.15 |
应用开发框架 |
|
InfluxDB |
2.7.5 |
时序数据库 |
|
MQTT Client |
1.2.5 |
接收电表 MQTT 数据 |
|
Lombok |
1.18.24 |
简化 Java 代码 |
|
Spring Scheduler |
- |
定时任务(数据聚合) |
2. 安装与配置 InfluxDB
(1)安装 InfluxDB
# Docker安装
docker run -d --name influxdb -p 8086:8086 \
-v influxdb-data:/var/lib/influxdb2 \
-e DOCKER_INFLUXDB_INIT_MODE=setup \
-e DOCKER_INFLUXDB_INIT_USERNAME=admin \
-e DOCKER_INFLUXDB_INIT_PASSWORD=admin123 \
-e DOCKER_INFLUXDB_INIT_ORG=power-org \
-e DOCKER_INFLUXDB_INIT_BUCKET=meter-data \
influxdb:2.7.5
(2)创建保留策略
# 进入容器
docker exec -it influxdb influx
# 切换到组织
influx config set -n default -o power-org
# 创建原始数据保留策略(1年)
influx bucket create \
--name meter-raw \
--retention 365d \
--shard-group-duration 7d
# 创建小时级聚合数据保留策略(2年)
influx bucket create \
--name meter-hourly \
--retention 730d \
--shard-group-duration 30d
3. 项目依赖配置(pom.xml)
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- InfluxDB Client -->
<dependency>
<groupId>***.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>6.10.0</version>
</dependency>
<!-- MQTT Client -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Spring Boot Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
4. 配置文件(application.yml)
spring:
application:
name: electricity-meter-collector
# InfluxDB配置
influx:
url: http://localhost:8086
token: your-influxdb-token # 从InfluxDB控制台获取
org: power-org
bucket:
raw: meter-raw # 原始数据桶
hourly: meter-hourly # 小时级聚合桶
# MQTT配置
mqtt:
broker: tcp://localhost:1883
client-id: meter-collector-${random.value}
topic: /meter/# # 订阅所有电表数据
username: mqtt-user
password: mqtt-pass
qos: 1 # 消息质量等级
# 数据采集配置
collector:
batch-size: 1000 # 批量写入大小
batch-interval: 5000 # 批量写入间隔(毫秒)
5. 数据模型定义
(1)电表数据实体
package ***.example.meter.entity;
import ***.influxdb.annotations.Column;
import ***.influxdb.annotations.Measurement;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
/**
* 电表数据实体(对应InfluxDB的measurement)
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Measurement(name = "electricity_meter")
public class MeterData {
/** 电表ID(标签) */
@Column(tag = true)
private String meterId;
/** 用户ID(标签) */
@Column(tag = true)
private String userId;
/** 区域(标签) */
@Column(tag = true)
private String region;
/** 电压(V) */
@Column
private Double voltage;
/** 电流(A) */
@Column
private Double current;
/** 功率(W) */
@Column
private Double power;
/** 用电量(kWh) */
@Column
private Double consumption;
/** 采集时间 */
@Column(timestamp = true)
private Instant timestamp;
}
(2)MQTT消息DTO
package ***.example.meter.dto;
import lombok.Data;
import java.time.LocalDateTime;
/**
* MQTT接收的电表数据DTO
*/
@Data
public class MqttMessageDTO {
private String meterId; // 电表ID
private String userId; // 用户ID
private String region; // 区域
private Double voltage; // 电压
private Double current; // 电流
private Double power; // 功率
private Double consumption; // 用电量
private LocalDateTime collectTime; // 采集时间
}
6. InfluxDB 配置类
package ***.example.meter.config;
import ***.influxdb.client.InfluxDBClient;
import ***.influxdb.client.InfluxDBClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* InfluxDB客户端配置
*/
@Configuration
public class InfluxDbConfig {
@Value("${influx.url}")
private String url;
@Value("${influx.token}")
private String token;
@Value("${influx.org}")
private String org;
/**
* 创建InfluxDB客户端
*/
@Bean
public InfluxDBClient influxDBClient() {
return InfluxDBClientFactory.create(url, token.toCharArray(), org);
}
}
7. MQTT 配置与消息处理
(1)MQTT 配置
package ***.example.meter.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* MQTT客户端配置
*/
@Configuration
public class MqttConfig {
@Value("${mqtt.broker}")
private String broker;
@Value("${mqtt.client-id}")
private String clientId;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.topic}")
private String topic;
@Value("${mqtt.qos}")
private Integer qos;
/**
* 创建MQTT客户端工厂
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{broker});
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
factory.setConnectionOptions(options);
return factory;
}
/**
* 接收消息的通道
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* MQTT消息驱动适配器(接收消息)
*/
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId, mqttClientFactory(), topic);
adapter.set***pletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(qos);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 消息处理器(处理接收到的MQTT消息)
*/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler mqttMessageHandler() {
return message -> {
String payload = message.getPayload().toString();
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// 交给MeterDataService处理消息
meterDataService.processMqttMessage(topic, payload);
};
}
@Autowired
private ***.example.meter.service.MeterDataService meterDataService;
}
8. 核心服务实现
(1)电表数据服务
package ***.example.meter.service;
import ***.example.meter.dto.MqttMessageDTO;
import ***.example.meter.entity.MeterData;
import ***.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 电表数据处理服务
*/
@Service
@Slf4j
public class MeterDataService {
@Autowired
private InfluxDbService influxDbService;
@Autowired
private ObjectMapper objectMapper;
// 线程安全的消息队列,用于缓存待写入的数据
private final ConcurrentLinkedQueue<MeterData> dataQueue = new ConcurrentLinkedQueue<>();
@Value("${collector.batch-size}")
private int batchSize;
/**
* 处理MQTT消息
*/
public void processMqttMessage(String topic, String payload) {
try {
// 1. 解析MQTT消息
MqttMessageDTO dto = objectMapper.readValue(payload, MqttMessageDTO.class);
// 2. 转换为InfluxDB实体
MeterData meterData = MeterData.builder()
.meterId(dto.getMeterId())
.userId(dto.getUserId())
.region(dto.getRegion())
.voltage(dto.getVoltage())
.current(dto.getCurrent())
.power(dto.getPower())
.consumption(dto.getConsumption())
.timestamp(dto.getCollectTime().toInstant(ZoneOffset.UTC))
.build();
// 3. 加入队列
dataQueue.offer(meterData);
log.debug("接收电表数据: meterId={}, time={}", dto.getMeterId(), dto.getCollectTime());
} catch (Exception e) {
log.error("处理MQTT消息失败: topic={}, payload={}, error={}",
topic, payload, e.getMessage());
}
}
/**
* 定时批量写入InfluxDB(每5秒执行一次)
*/
@Scheduled(fixedRateString = "${collector.batch-interval}")
public void batchWriteToInflux() {
if (dataQueue.isEmpty()) {
return;
}
// 1. 从队列中获取数据
List<MeterData> batch = new ArrayList<>(batchSize);
int count = 0;
MeterData data;
while ((data = dataQueue.poll()) != null && count < batchSize) {
batch.add(data);
count++;
}
// 2. 批量写入
try {
long start = System.currentTimeMillis();
influxDbService.writeBatch(batch);
log.info("批量写入成功,条数: {}, 耗时: {}ms", count, System.currentTimeMillis() - start);
} catch (Exception e) {
log.error("批量写入失败,将数据重新加入队列", e);
// 写入失败,将数据重新加入队列
batch.forEach(dataQueue::offer);
}
}
}
(2)InfluxDB 操作服务
package ***.example.meter.service;
import ***.example.meter.entity.MeterData;
import ***.influxdb.client.InfluxDBClient;
import ***.influxdb.client.WriteApi;
import ***.influxdb.client.domain.WritePrecision;
import ***.influxdb.client.write.Point;
import ***.influxdb.query.FluxTable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.time.Instant;
import java.util.List;
import java.util.Map;
/**
* InfluxDB操作服务
*/
@Service
public class InfluxDbService {
@Autowired
private InfluxDBClient influxDBClient;
@Value("${influx.bucket.raw}")
private String rawBucket;
@Value("${influx.bucket.hourly}")
private String hourlyBucket;
@Value("${influx.org}")
private String org;
/**
* 批量写入原始数据
*/
public void writeBatch(List<MeterData> dataList) {
try (WriteApi writeApi = influxDBClient.getWriteApi()) {
// 转换为InfluxDB的Point并写入
List<Point> points = dataList.stream().map(data ->
Point.measurement("electricity_meter")
.addTag("meterId", data.getMeterId())
.addTag("userId", data.getUserId())
.addTag("region", data.getRegion())
.addField("voltage", data.getVoltage())
.addField("current", data.getCurrent())
.addField("power", data.getPower())
.addField("consumption", data.getConsumption())
.time(data.getTimestamp(), WritePrecision.NS)
).toList();
writeApi.writePoints(rawBucket, org, points);
}
}
/**
* 查询指定电表的历史数据
*/
public List<FluxTable> queryMeterHistory(String meterId, Instant startTime, Instant endTime) {
String flux = String.format("from(bucket: \"%s\") " +
"|> range(start: %s, stop: %s) " +
"|> filter(fn: (r) => r._measurement == \"electricity_meter\" and r.meterId == \"%s\") " +
"|> pivot(rowKey: [\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")",
rawBucket, startTime, endTime, meterId);
return influxDBClient.getQueryApi().query(flux, org);
}
/**
* 计算区域日用电量统计
*/
public List<FluxTable> calculateRegionDailyStats(String region, Instant startTime, Instant endTime) {
String flux = String.format("from(bucket: \"%s\") " +
"|> range(start: %s, stop: %s) " +
"|> filter(fn: (r) => r._measurement == \"electricity_meter\" and r.region == \"%s\" and r._field == \"consumption\") " +
"|> aggregateWindow(every: 1d, fn: sum, createEmpty: false) " +
"|> yield(name: \"daily_sum\")",
rawBucket, startTime, endTime, region);
return influxDBClient.getQueryApi().query(flux, org);
}
/**
* 生成小时级聚合数据
*/
public void aggregateHourlyData() {
// 查询前一小时的原始数据并聚合
String flux = "data = from(bucket: \"" + rawBucket + "\") " +
"|> range(start: -1h) " +
"|> filter(fn: (r) => r._measurement == \"electricity_meter\" and r._field == \"consumption\") " +
"|> aggregateWindow(every: 1h, fn: sum, groupColumns: [\"meterId\", \"userId\", \"region\"], createEmpty: false) " +
"|> map(fn: (r) => ({ r with _measurement: \"electricity_meter_hourly\" }))\n" +
"data |> to(bucket: \"" + hourlyBucket + "\", org: \"" + org + "\")";
influxDBClient.getQueryApi().query(flux, org);
}
}
9. 定时聚合任务
package ***.example.meter.task;
import ***.example.meter.service.InfluxDbService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.***ponent;
/**
* 数据聚合定时任务
*/
@***ponent
@Slf4j
public class AggregationTask {
@Autowired
private InfluxDbService influxDbService;
/**
* 每小时执行一次,生成小时级聚合数据
*/
@Scheduled(cron = "0 0 * * * ?") // 整点执行
public void hourlyAggregation() {
log.info("开始执行小时级数据聚合");
try {
long start = System.currentTimeMillis();
influxDbService.aggregateHourlyData();
log.info("小时级数据聚合完成,耗时: {}ms", System.currentTimeMillis() - start);
} catch (Exception e) {
log.error("小时级数据聚合失败", e);
}
}
}
10. 控制器
package ***.example.meter.controller;
import ***.example.meter.service.InfluxDbService;
import ***.influxdb.query.FluxTable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.web.bind.annotation.*;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
/**
* 电表数据查询控制器
*/
@RestController
@RequestMapping("/api/meter")
public class MeterDataController {
@Autowired
private InfluxDbService influxDbService;
/**
* 查询电表历史数据
*/
@GetMapping("/{meterId}/history")
public List<FluxTable> getMeterHistory(
@PathVariable String meterId,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime start,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime end) {
Instant startTime = start.toInstant(ZoneOffset.UTC);
Instant endTime = end.toInstant(ZoneOffset.UTC);
return influxDbService.queryMeterHistory(meterId, startTime, endTime);
}
/**
* 查询区域日用电统计
*/
@GetMapping("/region/{region}/daily-stats")
public List<FluxTable> getRegionDailyStats(
@PathVariable String region,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime start,
@RequestParam @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) LocalDateTime end) {
Instant startTime = start.toInstant(ZoneOffset.UTC);
Instant endTime = end.toInstant(ZoneOffset.UTC);
return influxDbService.calculateRegionDailyStats(region, startTime, endTime);
}
}
11. 启动类
package ***.example.meter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling // 启用定时任务
public class MeterCollectorApplication {
public static void main(String[] args) {
SpringApplication.run(MeterCollectorApplication.class, args);
System.out.println("智能电表数据采集系统启动成功");
}
}
五、测试验证
1. 模拟数据写入测试
使用 MQTT 客户端工具(如 MQTTX)发送测试数据:
# 主题
/meter/M10001/data
# 消息体
{
"meterId": "M10001",
"userId": "U10001",
"region": "beijing_haidian",
"voltage": 220.3,
"current": 4.5,
"power": 991.35,
"consumption": 0.25,
"collectTime": "2025-09-20T08:15:00"
}
验证:应用日志显示 "批量写入成功";
from(bucket: "meter-raw")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "electricity_meter" and r.meterId == "M10001")
2. 数据查询测试
调用 API 查询历史数据:
curl "http://localhost:8080/api/meter/M10001/history?start=2025-09-20T00:00:00&end=2025-09-20T23:59:59"
预期响应:返回 M10001 电表在指定时间范围内的所有记录。
3. 聚合任务测试
手动触发小时级聚合任务后查询:
from(bucket: "meter-hourly")
|> range(start: -2h)
|> filter(fn: (r) => r._measurement == "electricity_meter_hourly")
预期结果:存在聚合后的每小时用电量数据。
4. 性能测试
使用 JMeter 模拟 1000 个并发 MQTT 客户端,每个客户端每秒发送 1 条数据:
验证系统能否稳定处理(无数据丢失);
监控 InfluxDB 写入延迟(应低于 100ms);
检查内存使用情况(无内存泄漏)。
六、总结与扩展
本文通过 SpringBoot 与 InfluxDB 的整合,构建了高效的智能电表数据采集系统,实现三大核心价值:
基于 MQTT 协议和批量写入机制,支持每秒 thousands 级别的数据写入;
利用 InfluxDB 的时间索引和标签索引,快速查询历史数据和统计指标;
通过保留策略和定时聚合,实现数据生命周期管理和查询性能优化。
扩展方向:
结合规则引擎,实时检测电压异常、偷电等异常情况;
集成机器学习模型,基于历史数据预测区域用电负荷;
在边缘端部署轻量级 InfluxDB,实现本地化数据处理;
整合 Grafana,构建用电监控可视化界面。
通过这套系统,电力公司可实现智能电表数据的高效采集、存储和分析,为电网调度、负荷预测和用户用电分析提供数据支撑,助力建设更智能、更高效的电力系统。