Ⅰ. Etcd的介绍
Etcd 是一个 golang 编写的分布式、高可用的一致性键值存储系统,用于配置共享和服务发现等。它使用 Raft 一致性算法来保持集群数据的一致性,且客户端通过长连接 watch 功能,能够及时收到数据变化通知,相较于 Zookeeper 框架更加轻量化。以下是关于 etcd 的安装与使用方法的详细介绍。
安装手册
一、Etcd的安装与启动
首先,需要在你的系统中安装 Etcd。Etcd 是一个分布式键值存储,通常用于服务发现和配置管理。以下是在 Linux 系统上安装 Etcd 的基本步骤:
1. 安装Etcd
sudo apt-get install etcd
2. 启动Etcd服务
sudo systemctl start etcd
3. 重启Etcd服务
sudo systemctl restart etcd
4. 设置Etcd开机自启
sudo systemctl enable etcd
然后可以用下面命令看到 etcd 已经启动:
sudo ***stat -natpu | head -2 && sudo ***stat -natpu | grep etcd
二、节点配置
如果是单节点集群其实就可以不用进行配置,默认 etcd 的集群节点通信端口为 2380,客户端访问端口为 2379。
若需要修改,则可以配置:sudo vim /etc/default/etcd,如下所示:
# 节点名称,默认为 "default"
ETCD_NAME="etcd1"
# 数据目录,默认为 "${name}.etcd"
ETCD_DATA_DIR="/var/lib/etcd/default.etcd"
# etcd服务的默认监听端口,是用于客户端连接的URL(使用内网地址)
ETCD_LISTEN_CLIENT_URLS="http://192.168.65.132:2379,http://127.0.0.1:2379"
# 用于客户端访问的公开,也就是提供服务的URL(使用外网地址,因为不是当前机器访问)
ETCD_ADVERTISE_CLIENT_URLS="http://192.168.65.132:2379,http://127.0.0.1:2379"
# 用于集群节点间通信的URL
ETCD_LISTEN_PEER_URLS="http://192.168.65.132:2380"
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://192.168.65.132:2380"
# 心跳间隔时间-毫秒
ETCD_HEARTBEAT_INTERVAL=100
# 选举超时时间-毫秒
ETCD_ELECTION_TIMEOUT=1000
# 以下为集群配置,若无集群则需要注销
# 初始集群状态和配置--集群中所有节点
#ETCD_INITIAL_CLUSTER="etcd1=http://192.168.65.132:2380,etcd2=http://192.168.65.132:2381,etcd3=http://192.168.65.132:2382"
# 初始集群令牌-集群的 ID
#ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster"
#ETCD_INITIAL_CLUSTER_STATE="new"
# 以下为安全配置,如果要求 SSL 连接 etcd 的话,把下面的配置启用,并修改文件路径
#ETCD_CERT_FILE="/etc/ssl/client.pem"
#ETCD_KEY_FILE="/etc/ssl/client-key.pem"
#ETCD_CLIENT_CERT_AUTH="true"
#ETCD_TRUSTED_CA_FILE="/etc/ssl/ca.pem"
#ETCD_AUTO_TLS="true"
#ETCD_PEER_CERT_FILE="/etc/ssl/member.pem"
#ETCD_PEER_KEY_FILE="/etc/ssl/member-key.pem"
#ETCD_PEER_CLIENT_CERT_AUTH="false"
#ETCD_PEER_TRUSTED_CA_FILE="/etc/ssl/ca.pem"
#ETCD_PEER_AUTO_TLS="true"
三、运行验证
etcdctl put mykey "this is amazing!"
如果出现报错:No help topic for 'put',则需要 sudo vi /etc/profile 在末尾声明环境变量 ETCDCTL_API=3 以确定 etcd 的版本:
export ETCDCTL_API=3
完毕后,加载配置文件,并重新执行测试指令:
liren@hcss-ecs-7ba8:~/chat_platform/test/etcd$ source /etc/profile # 加载配置文件
liren@hcss-ecs-7ba8:~/chat_platform/test/etcd$ etcdctl put mykey "你好啊,this is amazing!" # 添加键值对
OK
liren@hcss-ecs-7ba8:~/chat_platform/test/etcd$ etcdctl get mykey # 获取键值对
mykey
你好啊,this is amazing!
liren@hcss-ecs-7ba8:~/chat_platform/test/etcd$ etcdctl del mykey # 删除键值对
1
Ⅱ. 搭建服务注册发现中心
使用 Etcd 作为服务注册发现中心,你需要定义服务的注册和发现逻辑。这通常涉及到以下几个操作:
-
服务注册:服务启动时,向
Etcd注册自己的地址和端口。 -
服务发现:客户端通过
Etcd获取服务的地址和端口,用于远程调用。 -
健康检查:服务定期向
Etcd发送心跳,以维持其注册信息的有效性。
上述内容可以看封装思想那部分的讲述!
因为 etcd 采用 golang 编写,v3 版本通信采用 grpc API,即 HTTP2+protobuf,而官方只维护了 go 语言版本的 client 库,因此需要找到 C/C++ 非官方的 client 开发库!
一、etcd-cpp-apiv3
etcd-cpp-apiv3 是一个 etcd 的 C++ 版本客户端 API。它依赖于 mipsasm、boost、protobuf、gRPC、cpprestsdk 等库。
它的 GitHub 地址是:https://github.***/etcd-cpp-apiv3/etcd-cpp-apiv3
依赖安装:
sudo apt-get install -y libboost-all-dev libssl-dev
sudo apt-get install -y libprotobuf-dev protobuf-***piler-grpc # 记得最好删除原来的protobuf,防止后面版本冲突
sudo apt-get install -y libgrpc-dev libgrpc++-dev
sudo apt-get install -y libcpprest-dev
api 框架安装:
git clone https://github.***/etcd-cpp-apiv3/etcd-cpp-apiv3.git
cd etcd-cpp-apiv3
sudo mkdir build && cd build
sudo cmake .. -DCMAKE_INSTALL_PREFIX=/usr
sudo make -j$(nproc) && sudo make install
编译时候要链接以下动态库:
-letcd-cpp-api
-lcpprest
二、客户端类与接口介绍(理解租约的概念)
首先要知道 KeepAlive 对象是由 Client 对象创建,用于 etcd 客户端保持与 etcd 服务的活跃连接,防止因租约过期而导致的数据丢失或其他问题!
所以 KeepAlive 对象其实就相当于是一个 加油器,负责给租约续租,当 Client 对象掉线的时候,KeepAlive 对象就没了,自然那些租约就得不到续租,就会慢慢都析构掉!
此外在 etcd 中,租约是一种用于管理键值对生命周期的机制,它主要包含一个唯一的租约 ID 和一个 TTL(Time To Live)属性。我们在使用 etcd 的目的无非就是要存放键值对到系统中,需要时找出来用,那么自然就得关注这个存放在系统中的时间问题,所以才有 KeepAlive 对象以及租约来控制这个生命周期!可以简单认为一个租约本身就是一个键值对,只不过添加了有效时间,但是要清楚租约本身并不存放键值对!
// pplx::task 是一个并行库异步结果对象,有以下两个接口:
// 阻塞方式 get(): 阻塞直到任务执行完成,并获取任务结果
// 非阻塞方式 wait(): 等待任务到达终止状态,然后返回任务状态
namespace etcd {
class Value {
bool is_dir();// 判断是否是一个目录
std::string const& key(); // 键值对的key
std::string const& as_string(); // 键值对的val
int64_t lease(); // 用于创建租约的响应中,返回租约的ID
}
// etcd 会监控所管理的数据的变化,一旦数据产生变化会通知客户端
// 在通知客户端的时候,会返回改变前的数据和改变后的数据
class Event {
enum class EventType {
PUT, // 键值对新增或数据发生改变
DELETE_, // 键值对被删除,注意有个下划线
INVALID
};
enum EventType event_type();
const Value& kv();
const Value& prev_kv();
}
class Response {
bool is_ok(); // 表示当前响应是否成功
std::string const& error_message(); // 响应失败的错误信息
std::string const& key(int index) const;
std::vector<std::string> keys(); // 用ls得到的多个键,可以通过该接口得到存放这些键的数组(用于遍历)
Value const& value(); // 当前的数值或者一个请求的处理结果
Value const& value(int index); // 键为index的数值
Value const& prev_value(); // 之前的数值
std::vector<Event> const& events(); // 触发的事件
}
class KeepAlive {
KeepAlive(Client const& client, int ttl, int64_t lease_id = 0);
int64_t Lease(); // 返回租约的ID
void Cancel(); // 停止保活动作
}
class Client {
// etcd_url:"http://127.0.0.1:2379"
Client(std::string const& etcd_url,
std::string const& load_balancer = "round_robin");
// 新增一个键值对(Put a new key-value pair )
pplx::task<Response> put(std::string const& key,
std::string const& value);
// 新增带有租约的键值对(一定时间后,如果没有续租,数据自动删除)
pplx::task<Response> put(std::string const& key,
std::string const& value,
const int64_t leaseId);
// 获取一个指定 key 目录下的数据列表
pplx::task<Response> ls(std::string const& key);
// 创建并获取一个存活 ttl 时间的租约
pplx::task<Response> leasegrant(int ttl);
// 创建一个租约保活对象,其参数 ttl 表示租约有效时间
pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int ttl);
// 撤销一个指定的租约
pplx::task<Response> leaserevoke(int64_t lease_id);
// 数据锁
pplx::task<Response> lock(std::string const& key);
}
class Watcher {
Watcher(Client const& client,
std::string const& key, // 要监控的键值对的key
std::function<void(Response)> callback, // 发生改变后的回调
bool recursive = false); // 是否递归监控目录下的所有数据改变
Watcher(std::string const& address,
std::string const& key,
std::function<void(Response)> callback,
bool recursive = false);
// 阻塞等待,直到监控任务被停止
bool Wait();
bool Cancel();
}
}
不过我们上面讨论的是 存放键值对的 Client 对象,那对于 查看键值对的 Client 对象,这个过程有所不同!
因为有可能一些 Client 对象在查看键值对期间,该键值对的租约到期了,那么就需要做特殊处理,此时就 得有一个 Watcher 类对象来负责通知查看键值对的 Client 对象,这就是 Watcher 类的由来!
Watcher 是一个独立的对象,只有在程序结束或者主动析构的时候才会结束!在构造时候需要传入一个回调函数 callback,用于完成键值对改变或者删除时的处理,还得结合 Event 类和 Response 类做好特殊情况分类以及响应获取!
三、使用样例
put.***:负责添加键值对
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
int main()
{
// 1. 实例化客户端对象
std::string addr = "http://127.0.0.1:2379";
etcd::Client client(addr);
// 2. 获取租约保活对象,设置租约时间为3秒
auto keepalive = client.leasekeepalive(3).get();
// 3. 获取租约id
auto lease_id = keepalive->Lease();
// 4. 向etcd新增数据
auto resp1 = client.put("/service/user", "127.0.0.1:8080", lease_id).get();
if(resp1.is_ok() == false) {
std::cout << "新增数据失败:" << resp1.error_message() << std::endl;
return -1;
}else {
std::cout << "新增数据成功!" << std::endl;
}
auto resp2 = client.put("/service/friend", "127.0.0.1:9090", lease_id).get();
if(resp2.is_ok() == false) {
std::cout << "新增数据失败:" << resp2.error_message() << std::endl;
return -1;
}else {
std::cout << "新增数据成功!" << std::endl;
}
// 让线程睡眠,观察现象
std::this_thread::sleep_for(std::chrono::seconds(10));
std::cout << "put线程退出!" << std::endl;
return 0;
}
get.***:负责获取键值对,以及对键值对进行变化监听
#include <etcd/Client.hpp>
#include <etcd/Value.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>
void callback(const etcd::Response& resp)
{
if(resp.is_ok() == false)
{
std::cout << "收到一个错误的事件通知:" << resp.error_message() << std::endl;
return;
}
// 遍历resp中的所有的Event对象观察键值对变化情况
for(auto const& es : resp.events())
{
if(es.event_type() == etcd::Event::EventType::PUT)
{
std::cout << "键值对发生了变化:\n";
std::cout << " 之前的键值对:" << es.prev_kv().key() << " : " << es.prev_kv().as_string() << std::endl;
std::cout << " 现在的键值对:" << es.kv().key() << " : " << es.kv().as_string() << std::endl;
}
else if(es.event_type() == etcd::Event::EventType::DELETE_)
{
std::cout << "键值对被删除:\n";
std::cout << " 之前的键值对:" << es.prev_kv().key() << " : " << es.prev_kv().as_string() << std::endl;
std::cout << " 现在的键值对:" << es.kv().key() << " : " << es.kv().as_string() << std::endl;
}
}
}
int main()
{
// 1. 实例化客户端对象
std::string addr = "http://127.0.0.1:2379";
etcd::Client client(addr);
// 2. 获取键值对信息
auto resp = client.ls("/service").get();
if(resp.is_ok() == false) {
std::cout << "获取键值对数据失败:" << resp.error_message() << std::endl;
return -1;
}
for(int i = 0; i < resp.keys().size(); ++i)
{
std::cout << "键:" << resp.value(i).key() << " ";
std::cout << "值:" << resp.value(i).as_string() << std::endl;
}
// 3. 创建watcher对象来监听键值对变化
auto watcher = etcd::Watcher(client, "/service", callback, true);
watcher.Wait();
return 0;
}
makefile 文件:
all : put get
put : put.***
g++ -std=c++17 -o $@ $^ -letcd-cpp-api -lcpprest
get : get.***
g++ -std=c++17 -o $@ $^ -letcd-cpp-api -lcpprest
执行结果如下所示:
Ⅲ. 封装服务发现与注册功能
1. 服务注册
服务注册主要是在 etcd 服务器上存储一个租期 ns 的保活键值对,表示所能提供指定服务的节点主机,比如 /service/user/instance-1 的 key,且对应的 val 为提供服务的主机节点地址:<key, val> -- </service/user/instance-1, 127.0.0.1:9000>
-
/service:主目录,其下级会有不同服务的键值对存储。 -
/user:服务名称,表示该键值对是一个用户服务的节点。 -
/instance-1:节点实例名称,提供用户服务可能会有很多节点,每个节点都应该有自己独立且唯一的实例名称。
当这个键值对注册之后,服务发现方可以基于目录进行键值对的发现。
且一旦注册节点退出,保活失败,则 3s 后租约失效,键值对被删除,etcd 会通知发现方数据的失效,进而实现服务下线通知的功能。
2. 服务发现
服务发现分为两个过程:
- 刚启动客户端的时候,进行
ls目录浏览,进行/service路径下所有键值对的获取。 - 对关心的服务进行
watcher观测,一旦数值发生变化(新增/删除),收到通知进行节点的管理。
如果 ls 得到的路径为 /service,则会获取到 /service/user、/service/firend、…… 等其路径下的所有能够提供服务的实例节点数据。
如果 ls 得到的路径为 /service/user, 则会获取到 /service/user/instancd-1、/service/user/instance-2、…… 等所有提供用户服务的实例节点数据。
客户端可以将发现的所有 <实例 - 地址> 管理起来,以便于进行节点的管理:
- 收到新增数据通知,则向本地管理 添加 新增的节点地址 – 服务上线
- 收到删除数据通知,则从本地管理 删除 对应的节点地址 – 服务下线
因为 etcd 管理了所有的能够提供服务的节点主机的地址,因此当需要进行 rpc 调用的时候,则根据服务名称,获取一个能够提供服务的主机节点地址进行访问就可以了,而获取策略我们采用 RR 轮转策略!
3. 封装思想
将 etcd 的操作全部封装起来,也不需要用户管理数据,只需要向外四个基础操作接口,然后由两个客户端类分别负责:
-
服务注册客户端类:
-
进行服务注册的接口:向
etcd添加<服务名称-主机地址>的数据,并进行保活
-
进行服务注册的接口:向
-
服务发现客户端类:
- 进行服务发现的接口:获取当前所有能提供服务的信息,并进行改变事件的监控
- 设置服务上线的处理回调接口
- 设置服务下线的处理回调接口
这样封装之后,外部的 rpc 调用模块,可以先获取所有的当前服务信息,建立通信连接进行 rpc 调用,也能在有新服务上线的时候新增连接,以及下线的时候移除连接。
4. etcd.hpp
下面是封装后的文件:
注意事项:
- 实际上第
72行初始化_watcher的时候,对于第三个参数callback来说,也可以不用bind绑定,直接写callback就行,但是为了语法严谨性,最好还是习惯用上bind比较好!- 注意 智能指针调用
get()函数返回的是一个指针,所以要得到对象的话得解引用,即71行的*_client.get(),这和我们前面接触到的异步任务对象不太一样,注意区别!
// etcd.hpp
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include <functional>
#include "logger.hpp"
class Registry
{
public:
using ptr = std::shared_ptr<Registry>;
Registry(const std::string& host) // host:主机地址
: _client(std::make_shared<etcd::Client>(host))
, _keepalive(_client->leasekeepalive(3).get())
, _leaseid(_keepalive->Lease())
{}
~Registry() {
_keepalive->Cancel();
}
// 服务注册函数
bool regiter(const std::string& key, const std::string& value)
{
// 将键值对添加到etcd服务中,然后进行判断即可
auto resp = _client->put(key, value, _leaseid).get();
if(resp.is_ok() == false) {
LOG_ERROR("新增数据失败:{}", resp.error_message());
return false;
}
LOG_DEBUG("新增数据成功:{}-{}", key, value);
return true;
}
private:
std::shared_ptr<etcd::Client> _client; // 客户端对象
std::shared_ptr<etcd::KeepAlive> _keepalive; // 保活对象
uint64_t _leaseid; // 租约id
};
class Discovery
{
public:
using ptr = std::shared_ptr<Discovery>;
using NotifyCallback = std::function<void(std::string, std::string)>; // 参数为键值对
Discovery(const std::string& host, // 主机地址
const std::string& basedir, // 表示主目录
const NotifyCallback& put_cb, // 新增键值对后的回调处理
const NotifyCallback& del_cb) // 删除键值对后的回调处理
: _client(std::make_shared<etcd::Client>(host))
, _put_cb(put_cb)
, _del_cb(del_cb)
{
// 1. 先进行服务发现,获取到当前已有的数据
auto resp = _client->ls(basedir).get();
if(resp.is_ok() == false) {
LOG_ERROR("获取键值对数据失败:{}", resp.error_message());
}
// 对获取到的数据进行回调处理
for(int i = 0; i < resp.keys().size(); ++i) {
if(_put_cb) _put_cb(resp.key(i), resp.value(i).as_string());
}
// 2. 然后进行事件监控,监控数据发生的改变并调用回调进行处理
_watcher = std::make_shared<etcd::Watcher>(*_client.get(), basedir,
std::bind(&Discovery::callback, this, std::placeholders::_1), true);
}
~Discovery() {
_watcher->Cancel();
}
private:
void callback(const etcd::Response& resp)
{
if(resp.is_ok() == false)
{
LOG_ERROR("收到一个错误的事件通知:{}", resp.error_message())
return;
}
// 遍历resp中的所有的Event对象观察键值对变化情况
for(auto const& es : resp.events())
{
if(es.event_type() == etcd::Event::EventType::PUT)
{
if(_put_cb) _put_cb(es.kv().key(), es.kv().as_string());
LOG_DEBUG("新增服务:{}-{}", es.kv().key(), es.kv().as_string());
}
else if(es.event_type() == etcd::Event::EventType::DELETE_)
{
if(_put_cb) _del_cb(es.prev_kv().key(), es.prev_kv().as_string());
LOG_DEBUG("删除服务:{}-{}", es.prev_kv().key(), es.prev_kv().as_string());
}
}
}
private:
std::shared_ptr<etcd::Client> _client; // 客户端对象
std::shared_ptr<etcd::Watcher> _watcher; // 监听对象
NotifyCallback _put_cb, _del_cb; // 两个回调函数
};
封装测试
registry.***文件:
#include "../header/etcd.hpp"
#include <gflags/gflags.h>
DEFINE_bool(run_mode, false, "程序的运行模式,false为调试模式,true为发布模式");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志的输出等级");
DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(key, "/service", "服务监控的主目录");
DEFINE_string(instance, "/friend/instance", "当前实例名称");
DEFINE_string(value, "127.0.0.1:8080", "主机地址");
int main(int argc, char* argv[])
{
// 初始化
google::Parse***mandLineFlags(&argc, &argv, true);
init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
// 创建注册类对象并用智能指针保护起来
Registry::ptr reg = std::make_shared<Registry>(FLAGS_etcd_host);
reg->regiter(FLAGS_key + FLAGS_instance, FLAGS_value);
std::this_thread::sleep_for(std::chrono::seconds(600));
return 0;
}
discovery.*** 文件:
#include "../header/etcd.hpp"
#include <gflags/gflags.h>
DEFINE_bool(run_mode, false, "程序的运行模式,false为调试模式,true为发布模式");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志的输出等级");
DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(key, "/service", "服务监控的主目录");
DEFINE_string(instance, "/friend/instance", "当前实例名称");
DEFINE_string(value, "127.0.0.1:8080", "当前实例的外部访问地址");
void online(const std::string& key, const std::string& value)
{
LOG_DEBUG("上线服务:{}-{}", key, value);
}
void offline(const std::string& key, const std::string& value)
{
LOG_DEBUG("下线服务:{}-{}", key, value);
}
int main(int argc, char* argv[])
{
// 初始化
google::Parse***mandLineFlags(&argc, &argv, true);
init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);
// 创建注册类对象并用智能指针保护起来
Discovery::ptr dis = std::make_shared<Discovery>(FLAGS_etcd_host, FLAGS_key, online, offline);
std::this_thread::sleep_for(std::chrono::seconds(600));
return 0;
}
makefile 文件:
all : registry discovery
registry : registry.***
g++ -std=c++17 -o $@ $^ -lspdlog -lfmt -lgflags -letcd-cpp-api -lcpprest
discovery : discovery.***
g++ -std=c++17 -o $@ $^ -lspdlog -lfmt -lgflags -letcd-cpp-api -lcpprest
.PHONY:clean
clean:
rm -f discovery registry