【c++中间件】etcd存储系统 && 服务注册 && 服务发现 && 二次封装


Ⅰ. Etcd的介绍

Etcd 是一个 golang 编写的分布式、高可用的一致性键值存储系统,用于配置共享和服务发现等。它使用 Raft 一致性算法来保持集群数据的一致性,且客户端通过长连接 watch 功能,能够及时收到数据变化通知,相较于 Zookeeper 框架更加轻量化。以下是关于 etcd 的安装与使用方法的详细介绍。

安装手册

一、Etcd的安装与启动

​ 首先,需要在你的系统中安装 EtcdEtcd 是一个分布式键值存储,通常用于服务发现和配置管理。以下是在 Linux 系统上安装 Etcd 的基本步骤:

1. 安装Etcd
	sudo apt-get install etcd

2. 启动Etcd服务
	sudo systemctl start etcd
	
3. 重启Etcd服务
	sudo systemctl restart etcd
	
4. 设置Etcd开机自启
	sudo systemctl enable etcd

​ 然后可以用下面命令看到 etcd 已经启动:

sudo ***stat -natpu | head -2 && sudo ***stat -natpu | grep etcd

二、节点配置

​ 如果是单节点集群其实就可以不用进行配置,默认 etcd 的集群节点通信端口为 2380,客户端访问端口为 2379

​ 若需要修改,则可以配置:sudo vim /etc/default/etcd,如下所示:

# 节点名称,默认为 "default"
ETCD_NAME="etcd1"

# 数据目录,默认为 "${name}.etcd"
ETCD_DATA_DIR="/var/lib/etcd/default.etcd"

# etcd服务的默认监听端口,是用于客户端连接的URL(使用内网地址)
ETCD_LISTEN_CLIENT_URLS="http://192.168.65.132:2379,http://127.0.0.1:2379"

# 用于客户端访问的公开,也就是提供服务的URL(使用外网地址,因为不是当前机器访问)
ETCD_ADVERTISE_CLIENT_URLS="http://192.168.65.132:2379,http://127.0.0.1:2379"

# 用于集群节点间通信的URL
ETCD_LISTEN_PEER_URLS="http://192.168.65.132:2380"
ETCD_INITIAL_ADVERTISE_PEER_URLS="http://192.168.65.132:2380"

# 心跳间隔时间-毫秒
ETCD_HEARTBEAT_INTERVAL=100
# 选举超时时间-毫秒
ETCD_ELECTION_TIMEOUT=1000

# 以下为集群配置,若无集群则需要注销
# 初始集群状态和配置--集群中所有节点
#ETCD_INITIAL_CLUSTER="etcd1=http://192.168.65.132:2380,etcd2=http://192.168.65.132:2381,etcd3=http://192.168.65.132:2382"

# 初始集群令牌-集群的 ID
#ETCD_INITIAL_CLUSTER_TOKEN="etcd-cluster"
#ETCD_INITIAL_CLUSTER_STATE="new"

# 以下为安全配置,如果要求 SSL 连接 etcd 的话,把下面的配置启用,并修改文件路径
#ETCD_CERT_FILE="/etc/ssl/client.pem"
#ETCD_KEY_FILE="/etc/ssl/client-key.pem"
#ETCD_CLIENT_CERT_AUTH="true"
#ETCD_TRUSTED_CA_FILE="/etc/ssl/ca.pem"
#ETCD_AUTO_TLS="true"
#ETCD_PEER_CERT_FILE="/etc/ssl/member.pem"
#ETCD_PEER_KEY_FILE="/etc/ssl/member-key.pem"
#ETCD_PEER_CLIENT_CERT_AUTH="false"
#ETCD_PEER_TRUSTED_CA_FILE="/etc/ssl/ca.pem"
#ETCD_PEER_AUTO_TLS="true"

三、运行验证

etcdctl put mykey "this is amazing!"

​ 如果出现报错:No help topic for 'put',则需要 sudo vi /etc/profile 在末尾声明环境变量 ETCDCTL_API=3 以确定 etcd 的版本:

export ETCDCTL_API=3

​ 完毕后,加载配置文件,并重新执行测试指令:

liren@hcss-ecs-7ba8:~/chat_platform/test/etcd$ source /etc/profile  # 加载配置文件
liren@hcss-ecs-7ba8:~/chat_platform/test/etcd$ etcdctl put mykey "你好啊,this is amazing!" # 添加键值对
OK
liren@hcss-ecs-7ba8:~/chat_platform/test/etcd$ etcdctl get mykey  # 获取键值对
mykey
你好啊,this is amazing!
liren@hcss-ecs-7ba8:~/chat_platform/test/etcd$ etcdctl del mykey  # 删除键值对
1

Ⅱ. 搭建服务注册发现中心

使用 Etcd 作为服务注册发现中心,你需要定义服务的注册和发现逻辑。这通常涉及到以下几个操作:

  1. 服务注册:服务启动时,向 Etcd 注册自己的地址和端口。
  2. 服务发现:客户端通过 Etcd 获取服务的地址和端口,用于远程调用。
  3. 健康检查:服务定期向 Etcd 发送心跳,以维持其注册信息的有效性。

上述内容可以看封装思想那部分的讲述

​ 因为 etcd 采用 golang 编写,v3 版本通信采用 grpc API,即 HTTP2+protobuf,而官方只维护了 go 语言版本的 client 库,因此需要找到 C/C++ 非官方的 client 开发库!

一、etcd-cpp-apiv3

etcd-cpp-apiv3 是一个 etcdC++ 版本客户端 API。它依赖于 mipsasmboostprotobufgRPCcpprestsdk 等库。

​ 它的 GitHub 地址是:https://github.***/etcd-cpp-apiv3/etcd-cpp-apiv3

​ 依赖安装:

sudo apt-get install -y libboost-all-dev libssl-dev
sudo apt-get install -y libprotobuf-dev protobuf-***piler-grpc # 记得最好删除原来的protobuf,防止后面版本冲突
sudo apt-get install -y libgrpc-dev libgrpc++-dev 
sudo apt-get install -y libcpprest-dev

api 框架安装:

git clone https://github.***/etcd-cpp-apiv3/etcd-cpp-apiv3.git
cd etcd-cpp-apiv3
sudo mkdir build && cd build

sudo cmake .. -DCMAKE_INSTALL_PREFIX=/usr
sudo make -j$(nproc) && sudo make install

编译时候要链接以下动态库

-letcd-cpp-api
-lcpprest

二、客户端类与接口介绍(理解租约的概念)

​ 首先要知道 KeepAlive 对象是由 Client 对象创建,用于 etcd 客户端保持与 etcd 服务的活跃连接,防止因租约过期而导致的数据丢失或其他问题

​ 所以 KeepAlive 对象其实就相当于是一个 加油器,负责给租约续租,当 Client 对象掉线的时候,KeepAlive 对象就没了,自然那些租约就得不到续租,就会慢慢都析构掉!

​ 此外在 etcd 中,租约是一种用于管理键值对生命周期的机制,它主要包含一个唯一的租约 ID 和一个 TTL(Time To Live)属性。我们在使用 etcd 的目的无非就是要存放键值对到系统中,需要时找出来用,那么自然就得关注这个存放在系统中的时间问题,所以才有 KeepAlive 对象以及租约来控制这个生命周期!可以简单认为一个租约本身就是一个键值对,只不过添加了有效时间,但是要清楚租约本身并不存放键值对

// pplx::task 是一个并行库异步结果对象,有以下两个接口:
// 		阻塞方式 get(): 阻塞直到任务执行完成,并获取任务结果
// 		非阻塞方式 wait(): 等待任务到达终止状态,然后返回任务状态

namespace etcd {
    class Value {
        bool is_dir()// 判断是否是一个目录
        std::string const& key(); 	    // 键值对的key
        std::string const& as_string(); // 键值对的val

        int64_t lease(); // 用于创建租约的响应中,返回租约的ID
    }
    
	// etcd 会监控所管理的数据的变化,一旦数据产生变化会通知客户端
	// 在通知客户端的时候,会返回改变前的数据和改变后的数据
    class Event {
        enum class EventType {
            PUT, 	 // 键值对新增或数据发生改变
            DELETE_, // 键值对被删除,注意有个下划线
            INVALID 
        };
        enum EventType event_type();  
        const Value& kv();
        const Value& prev_kv();
    }
    
    class Response {
        bool is_ok(); // 表示当前响应是否成功
        std::string const& error_message(); // 响应失败的错误信息
        
        std::string const& key(int index) const;
        std::vector<std::string> keys(); 	// 用ls得到的多个键,可以通过该接口得到存放这些键的数组(用于遍历)
        
        Value const& value();	       	    // 当前的数值或者一个请求的处理结果
        Value const& value(int index); 	    // 键为index的数值
        Value const& prev_value();          // 之前的数值
        
        std::vector<Event> const& events(); // 触发的事件
    } 

    class KeepAlive {
        KeepAlive(Client const& client, int ttl, int64_t lease_id = 0);
        
        int64_t Lease(); // 返回租约的ID
        void Cancel();   // 停止保活动作
    }
    
    class Client {
        // etcd_url:"http://127.0.0.1:2379"
        Client(std::string const& etcd_url,
               std::string const& load_balancer = "round_robin");
        
        // 新增一个键值对(Put a new key-value pair )
        pplx::task<Response> put(std::string const& key, 
        					   std::string const& value);
        // 新增带有租约的键值对(一定时间后,如果没有续租,数据自动删除)
        pplx::task<Response> put(std::string const& key, 
        					   std::string const& value,
        					   const int64_t leaseId);
        
        // 获取一个指定 key 目录下的数据列表
        pplx::task<Response> ls(std::string const& key);
        
        // 创建并获取一个存活 ttl 时间的租约
        pplx::task<Response> leasegrant(int ttl);
        
        // 创建一个租约保活对象,其参数 ttl 表示租约有效时间
        pplx::task<std::shared_ptr<KeepAlive>> leasekeepalive(int ttl);
        
        // 撤销一个指定的租约
        pplx::task<Response> leaserevoke(int64_t lease_id);
        
        // 数据锁
        pplx::task<Response> lock(std::string const& key);
    } 
    
    class Watcher {
        Watcher(Client const& client, 
        	    std::string const& key, 			   // 要监控的键值对的key
        	    std::function<void(Response)> callback, // 发生改变后的回调
        	    bool recursive = false); 			   // 是否递归监控目录下的所有数据改变
        
        Watcher(std::string const& address, 
        	    std::string const& key,
        	    std::function<void(Response)> callback, 
        	    bool recursive = false);
        
        // 阻塞等待,直到监控任务被停止
        bool Wait();
        bool Cancel();
    }
}

​ 不过我们上面讨论的是 存放键值对的 Client 对象,那对于 查看键值对的 Client 对象,这个过程有所不同!

​ 因为有可能一些 Client 对象在查看键值对期间,该键值对的租约到期了,那么就需要做特殊处理,此时就 得有一个 Watcher 类对象来负责通知查看键值对的 Client 对象,这就是 Watcher 类的由来!

Watcher 是一个独立的对象,只有在程序结束或者主动析构的时候才会结束!在构造时候需要传入一个回调函数 callback,用于完成键值对改变或者删除时的处理,还得结合 Event 类和 Response 类做好特殊情况分类以及响应获取!

三、使用样例

put.***:负责添加键值对

#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>

int main()
{
    // 1. 实例化客户端对象
    std::string addr = "http://127.0.0.1:2379";
    etcd::Client client(addr);

    // 2. 获取租约保活对象,设置租约时间为3秒
    auto keepalive = client.leasekeepalive(3).get();
    
    // 3. 获取租约id
    auto lease_id = keepalive->Lease();

    // 4. 向etcd新增数据
    auto resp1 = client.put("/service/user", "127.0.0.1:8080", lease_id).get();
    if(resp1.is_ok() == false) {
        std::cout << "新增数据失败:" << resp1.error_message() << std::endl;
        return -1;
    }else {
        std::cout << "新增数据成功!" << std::endl;
    }

    auto resp2 = client.put("/service/friend", "127.0.0.1:9090", lease_id).get();
    if(resp2.is_ok() == false) {
        std::cout << "新增数据失败:" << resp2.error_message() << std::endl;
        return -1;
    }else {
        std::cout << "新增数据成功!" << std::endl;
    }

    // 让线程睡眠,观察现象
    std::this_thread::sleep_for(std::chrono::seconds(10));
    std::cout << "put线程退出!" << std::endl;

    return 0;
}

get.***:负责获取键值对,以及对键值对进行变化监听

#include <etcd/Client.hpp>
#include <etcd/Value.hpp>
#include <etcd/Response.hpp>
#include <etcd/Watcher.hpp>

void callback(const etcd::Response& resp)
{
    if(resp.is_ok() == false)
    {
        std::cout << "收到一个错误的事件通知:" << resp.error_message() << std::endl;
        return;
    }

    // 遍历resp中的所有的Event对象观察键值对变化情况
    for(auto const& es : resp.events())
    {
        if(es.event_type() == etcd::Event::EventType::PUT)
        {
            std::cout << "键值对发生了变化:\n";
            std::cout << "  之前的键值对:" << es.prev_kv().key() << " : " << es.prev_kv().as_string() << std::endl;
            std::cout << "  现在的键值对:" << es.kv().key() << " : " << es.kv().as_string() << std::endl;
        }
        else if(es.event_type() == etcd::Event::EventType::DELETE_)
        {
            std::cout << "键值对被删除:\n";
            std::cout << "  之前的键值对:" << es.prev_kv().key() << " : " << es.prev_kv().as_string() << std::endl;
            std::cout << "  现在的键值对:" << es.kv().key() << " : " << es.kv().as_string() << std::endl;
        }
    }
}

int main()
{
    // 1. 实例化客户端对象
    std::string addr = "http://127.0.0.1:2379";
    etcd::Client client(addr);

    // 2. 获取键值对信息
    auto resp = client.ls("/service").get();
    if(resp.is_ok() == false) {
        std::cout << "获取键值对数据失败:" << resp.error_message() << std::endl;
        return -1;
    }

    for(int i = 0; i < resp.keys().size(); ++i)
    {
        std::cout << "键:" << resp.value(i).key() << "  ";
        std::cout << "值:" << resp.value(i).as_string() << std::endl;
    }

    // 3. 创建watcher对象来监听键值对变化
    auto watcher = etcd::Watcher(client, "/service", callback, true);
    watcher.Wait();

    return 0;
}

makefile 文件:

all : put get
put : put.***
	g++ -std=c++17 -o $@ $^ -letcd-cpp-api -lcpprest
get : get.***	
	g++ -std=c++17 -o $@ $^ -letcd-cpp-api -lcpprest

​ 执行结果如下所示:

Ⅲ. 封装服务发现与注册功能

1. 服务注册

​ 服务注册主要是在 etcd 服务器上存储一个租期 ns 的保活键值对,表示所能提供指定服务的节点主机,比如 /service/user/instance-1key,且对应的 val 为提供服务的主机节点地址:<key, val> -- </service/user/instance-1, 127.0.0.1:9000>

  • /service主目录,其下级会有不同服务的键值对存储。
  • /user服务名称,表示该键值对是一个用户服务的节点。
  • /instance-1节点实例名称,提供用户服务可能会有很多节点,每个节点都应该有自己独立且唯一的实例名称。

​ 当这个键值对注册之后,服务发现方可以基于目录进行键值对的发现。

​ 且一旦注册节点退出,保活失败,则 3s 后租约失效,键值对被删除,etcd 会通知发现方数据的失效,进而实现服务下线通知的功能。

2. 服务发现

服务发现分为两个过程:

  1. 刚启动客户端的时候,进行 ls 目录浏览,进行 /service 路径下所有键值对的获取。
  2. 对关心的服务进行 watcher 观测,一旦数值发生变化(新增/删除),收到通知进行节点的管理。

​ 如果 ls 得到的路径为 /service,则会获取到 /service/user/service/firend、…… 等其路径下的所有能够提供服务的实例节点数据。

​ 如果 ls 得到的路径为 /service/user, 则会获取到 /service/user/instancd-1/service/user/instance-2、…… 等所有提供用户服务的实例节点数据。

客户端可以将发现的所有 <实例 - 地址> 管理起来,以便于进行节点的管理:

  • 收到新增数据通知,则向本地管理 添加 新增的节点地址 – 服务上线
  • 收到删除数据通知,则从本地管理 删除 对应的节点地址 – 服务下线

​ 因为 etcd 管理了所有的能够提供服务的节点主机的地址,因此当需要进行 rpc 调用的时候,则根据服务名称,获取一个能够提供服务的主机节点地址进行访问就可以了,而获取策略我们采用 RR 轮转策略!

3. 封装思想

etcd 的操作全部封装起来,也不需要用户管理数据,只需要向外四个基础操作接口,然后由两个客户端类分别负责:

  1. 服务注册客户端类
    • 进行服务注册的接口:向 etcd 添加 <服务名称-主机地址> 的数据,并进行保活
  2. 服务发现客户端类
    • 进行服务发现的接口:获取当前所有能提供服务的信息,并进行改变事件的监控
    • 设置服务上线的处理回调接口
    • 设置服务下线的处理回调接口

​ 这样封装之后,外部的 rpc 调用模块,可以先获取所有的当前服务信息,建立通信连接进行 rpc 调用,也能在有新服务上线的时候新增连接,以及下线的时候移除连接。

4. etcd.hpp

​ 下面是封装后的文件:

注意事项:

  • 实际上第 72 行初始化 _watcher 的时候,对于第三个参数 callback 来说,也可以不用 bind 绑定,直接写 callback 就行,但是为了语法严谨性,最好还是习惯用上 bind 比较好!
  • 注意 智能指针调用 get() 函数返回的是一个指针,所以要得到对象的话得解引用,即 71 行的 *_client.get(),这和我们前面接触到的异步任务对象不太一样,注意区别!
// etcd.hpp
#include <etcd/Client.hpp>
#include <etcd/KeepAlive.hpp>
#include <etcd/Response.hpp>
#include <etcd/Value.hpp>
#include <etcd/Watcher.hpp>
#include <functional>
#include "logger.hpp"

class Registry 
{
public:
    using ptr = std::shared_ptr<Registry>;
    
    Registry(const std::string& host) // host:主机地址
        : _client(std::make_shared<etcd::Client>(host))
        , _keepalive(_client->leasekeepalive(3).get())
        , _leaseid(_keepalive->Lease())
    {}

    ~Registry() {
        _keepalive->Cancel();
    }

    // 服务注册函数
    bool regiter(const std::string& key, const std::string& value)
    {
        // 将键值对添加到etcd服务中,然后进行判断即可
        auto resp = _client->put(key, value, _leaseid).get();
        if(resp.is_ok() == false) {
            LOG_ERROR("新增数据失败:{}", resp.error_message());
            return false;
        }
        LOG_DEBUG("新增数据成功:{}-{}", key, value);
        return true;
    }

private:
    std::shared_ptr<etcd::Client> _client;       // 客户端对象
    std::shared_ptr<etcd::KeepAlive> _keepalive; // 保活对象
    uint64_t _leaseid; // 租约id
};


class Discovery
{
public:
    using ptr = std::shared_ptr<Discovery>;
    using NotifyCallback = std::function<void(std::string, std::string)>; // 参数为键值对

    Discovery(const std::string& host,      // 主机地址
              const std::string& basedir,   // 表示主目录
              const NotifyCallback& put_cb, // 新增键值对后的回调处理
              const NotifyCallback& del_cb) // 删除键值对后的回调处理
        : _client(std::make_shared<etcd::Client>(host))
        , _put_cb(put_cb)
        , _del_cb(del_cb)
    {
        // 1. 先进行服务发现,获取到当前已有的数据
        auto resp = _client->ls(basedir).get();
        if(resp.is_ok() == false) {
            LOG_ERROR("获取键值对数据失败:{}", resp.error_message());
        }

        // 对获取到的数据进行回调处理
        for(int i = 0; i < resp.keys().size(); ++i) {
            if(_put_cb) _put_cb(resp.key(i), resp.value(i).as_string());
        }

        // 2. 然后进行事件监控,监控数据发生的改变并调用回调进行处理
        _watcher = std::make_shared<etcd::Watcher>(*_client.get(), basedir,
            std::bind(&Discovery::callback, this, std::placeholders::_1), true);
    }

    ~Discovery() {
        _watcher->Cancel();
    }

private:
    void callback(const etcd::Response& resp)
    {
        if(resp.is_ok() == false)
        {
            LOG_ERROR("收到一个错误的事件通知:{}", resp.error_message())
            return;
        }

        // 遍历resp中的所有的Event对象观察键值对变化情况
        for(auto const& es : resp.events())
        {
            if(es.event_type() == etcd::Event::EventType::PUT)
            {
                if(_put_cb) _put_cb(es.kv().key(), es.kv().as_string());
                LOG_DEBUG("新增服务:{}-{}", es.kv().key(), es.kv().as_string());
            }
            else if(es.event_type() == etcd::Event::EventType::DELETE_)
            {
                if(_put_cb) _del_cb(es.prev_kv().key(), es.prev_kv().as_string());
                LOG_DEBUG("删除服务:{}-{}", es.prev_kv().key(), es.prev_kv().as_string());
            }
        }
    }

private:
    std::shared_ptr<etcd::Client> _client;   // 客户端对象
    std::shared_ptr<etcd::Watcher> _watcher; // 监听对象

    NotifyCallback _put_cb, _del_cb; // 两个回调函数
};

封装测试

registry.***文件:

#include "../header/etcd.hpp"
#include <gflags/gflags.h>
 
DEFINE_bool(run_mode, false, "程序的运行模式,false为调试模式,true为发布模式");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志的输出等级");

DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(key, "/service", "服务监控的主目录");
DEFINE_string(instance, "/friend/instance", "当前实例名称");
DEFINE_string(value, "127.0.0.1:8080", "主机地址");

int main(int argc, char* argv[])
{
    // 初始化
    google::Parse***mandLineFlags(&argc, &argv, true);
    init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

    // 创建注册类对象并用智能指针保护起来
    Registry::ptr reg = std::make_shared<Registry>(FLAGS_etcd_host);
    reg->regiter(FLAGS_key + FLAGS_instance, FLAGS_value);

    std::this_thread::sleep_for(std::chrono::seconds(600));
    return 0;
}

discovery.*** 文件:

#include "../header/etcd.hpp"
#include <gflags/gflags.h>

DEFINE_bool(run_mode, false, "程序的运行模式,false为调试模式,true为发布模式");
DEFINE_string(log_file, "", "发布模式下,用于指定日志的输出文件");
DEFINE_int32(log_level, 0, "发布模式下,用于指定日志的输出等级");

DEFINE_string(etcd_host, "http://127.0.0.1:2379", "服务注册中心地址");
DEFINE_string(key, "/service", "服务监控的主目录");
DEFINE_string(instance, "/friend/instance", "当前实例名称");
DEFINE_string(value, "127.0.0.1:8080", "当前实例的外部访问地址");

void online(const std::string& key, const std::string& value)
{
    LOG_DEBUG("上线服务:{}-{}", key, value);
}

void offline(const std::string& key, const std::string& value)
{
    LOG_DEBUG("下线服务:{}-{}", key, value);
}

int main(int argc, char* argv[])
{
    // 初始化
    google::Parse***mandLineFlags(&argc, &argv, true);
    init_logger(FLAGS_run_mode, FLAGS_log_file, FLAGS_log_level);

    // 创建注册类对象并用智能指针保护起来
    Discovery::ptr dis = std::make_shared<Discovery>(FLAGS_etcd_host, FLAGS_key, online, offline);

    std::this_thread::sleep_for(std::chrono::seconds(600));
    return 0;
}

makefile 文件:

all : registry discovery
registry : registry.***
	g++ -std=c++17 -o $@ $^ -lspdlog -lfmt -lgflags -letcd-cpp-api -lcpprest
discovery : discovery.***
	g++ -std=c++17 -o $@ $^ -lspdlog -lfmt -lgflags -letcd-cpp-api -lcpprest

.PHONY:clean
clean:
	rm -f discovery registry
转载请说明出处内容投诉
CSS教程网 » 【c++中间件】etcd存储系统 &amp;&amp; 服务注册 &amp;&amp; 服务发现 &amp;&amp; 二次封装

发表评论

欢迎 访客 发表评论

一个令你着迷的主题!

查看演示 官网购买