1 Mosquitto 简介与核心特性
Mosquitto 是 Eclipse 基金会下的开源 MQTT 代理服务器,采用 C 语言编写,轻量高效,支持 MQTT 3.1、3.1.1 和 5.0 协议。
核心特性
· 轻量级,资源占用少
· 支持 QoS 0/1/2 消息等级
· 持久化会话支持
· TLS/SSL 加密通信
· 灵活的认证授权机制
· 桥接模式支持分布式部署
2 安装与使用
项目连接:https://github.***/eclipse-mosquitto/mosquitto
下载一份代码 git clone https://github.***/eclipse-mosquitto/mosquitto,直接make编译,我的环境是Centos7,缺少依赖包就先安装依赖的包。
编译完成,直接运行程序:
mosquitto已经运行在监听1883端口,在另外的终端分别运行订阅与发布命令测试下,编译完成后在项目的client目录下已经有这两个命令。
订阅:
发布:
mosquitto已经可以正常运行,客户端正常订阅与发布消息,接下来就可以项目化使用。
3. 最佳实践:分布式实践方案
3.1 架构设计
3.2 多节点部署配置
Mosquitto作为MQTT的一个Broker,可以分布式部署,设备端通过一个API接口获取Broker列表,然后根据连接策略选择一个Broker进行连接与登陆。
主节点配置文件 (mosquitto_primary.conf)
# 基础配置
listener 1883
protocol mqtt
# 持久化
persistence true
persistence_location /var/lib/mosquitto/
# 日志配置
log_dest file /var/log/mosquitto/mosquitto.log
log_type all
# 安全配置
allow_anonymous false
password_file /etc/mosquitto/passwd
# 桥接配置 - 连接到其他节点
connection bridge_secondary
address 192.168.1.2:1883
topic # both 2 "" ""
# 集群发现
listener 1884
protocol websockets
从节点配置文件 (mosquitto_secondary.conf)
listener 1883
protocol mqtt
persistence true
persistence_location /var/lib/mosquitto_secondary/
# 连接到主节点
connection bridge_primary
address 192.168.1.1:1883
topic # both 2 "" ""
3.3 使用 Docker 部署集群
docker-***pose.yml
version: '3.8'
services:
mosquitto-primary:
image: eclipse-mosquitto:latest
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./primary.conf:/mosquitto/config/mosquitto.conf
- ./primary-data:/mosquitto/data
- ./primary-log:/mosquitto/log
mosquitto-secondary:
image: eclipse-mosquitto:latest
ports:
- "1884:1883"
volumes:
- ./secondary.conf:/mosquitto/config/mosquitto.conf
- ./secondary-data:/mosquitto/data
- ./secondary-log:/mosquitto/log
depends_on:
- mosquitto-primary
4. 自定义认证插件开发
mosquitto本身没有实现对接数据库进行访问,不过可以开发一个简单插件实现访问数据库对每个设备进行认证鉴权。
按照项目提供的插件例子,实现相应的接口,把插件编译为一个so文件,然后在配置文件中配置插件文件名、数据库用户名与密码,就可以完成一个可访问数据库进行认证的Broker。
4.1 插件基础结构
mosquitto_auth_db.h
#ifndef MOSQUITTO_AUTH_DB_H
#define MOSQUITTO_AUTH_DB_H
#include "mosquitto_broker.h"
#include "mosquitto_plugin.h"
#include "mosquitto.h"
// 插件配置结构
struct auth_db_config {
char *db_host;
int db_port;
char *db_name;
char *db_user;
char *db_password;
int cache_timeout;
};
// 数据库连接结构
struct db_connection {
// 数据库连接句柄
void *conn;
// 连接状态
int connected;
};
#endif
4.2 主要认证函数实现
mosquitto_auth_db.c
#include "mosquitto_auth_db.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
// 插件版本
int mosquitto_auth_plugin_version(void) {
return MOSQ_AUTH_PLUGIN_VERSION;
}
// 插件初始化
int mosquitto_auth_plugin_init(void **user_data, struct mosquitto_opt *opts, int opt_count) {
struct auth_db_config *config = malloc(sizeof(struct auth_db_config));
// 解析配置参数
for (int i = 0; i < opt_count; i++) {
if (strcmp(opts[i].key, "db_host") == 0) {
config->db_host = strdup(opts[i].value);
} else if (strcmp(opts[i].key, "db_port") == 0) {
config->db_port = atoi(opts[i].value);
}
// 其他参数解析...
}
*user_data = config;
// 初始化数据库连接
if (db_connect(config) != 0) {
mosquitto_log_printf(MOSQ_LOG_ERR, "Failed to connect to database");
return MOSQ_ERR_UNKNOWN;
}
return MOSQ_ERR_SU***ESS;
}
// 用户名/密码认证
int mosquitto_auth_plugin_authenticate_user(
void *user_data,
struct mosquitto *client,
const char *username,
const char *password
) {
struct auth_db_config *config = (struct auth_db_config *)user_data;
if (!username || !password) {
return MOSQ_ERR_AUTH;
}
// 查询数据库验证用户
int auth_result = db_authenticate_user(config, username, password);
if (auth_result == 0) {
mosquitto_log_printf(MOSQ_LOG_INFO, "User %s authenticated su***essfully", username);
return MOSQ_ERR_SU***ESS;
} else {
mosquitto_log_printf(MOSQ_LOG_WARNING, "Authentication failed for user %s", username);
return MOSQ_ERR_AUTH;
}
}
// ACL 检查
int mosquitto_auth_plugin_acl_check(
void *user_data,
int a***ess,
struct mosquitto *client,
const struct mosquitto_acl_msg *msg
) {
// 实现主题访问控制逻辑
const char *username = mosquitto_client_username(client);
if (username && check_topic_permission(username, msg->topic, a***ess)) {
return MOSQ_ERR_SU***ESS;
}
return MOSQ_ERR_ACL_DENIED;
}
// 数据库认证函数
int db_authenticate_user(struct auth_db_config *config