在数仓团队,异步任务设计非常常见,主要原因就是数据量太大,不适合做成同步,在自动驾驶这个业务上,数据大到什么程度呢,单模块每天标签的上报数量就能达到5000W,如果算上车端挖掘、云端挖掘、标注、数据生产、仿真等,每天标签上亿,那是太简单的事了,因此,为了高效完成数据的入库,一个稳定、高吞吐量的异步架构设计显得非常之重要。
在之前的一篇文章中介绍过异步设计的思想:一个优秀的rabbitmq消费者(consumer)设计,可直接上线使用。这篇文章就是上一篇文章思想的具体落地:
这次就是把上面架构图中消费者部分的批量处理具体为批量写入elasticsearch,这是一种常见的行为,具体场景就是为自动驾驶入库数据创建索引,以支持检索,方便各业务低成本获取所需的数据。
1、RabbitMQ消费者实现
rabbitmq消费者实现的一个核心设计思想:数据消费与业务逻辑处理解耦,即接收数据只从mq中获取数据,不做任何其他事宜,业务逻辑处理放到其他线程中执行。
业务逻辑的处理放到主线程中进行,而且要批量化处理,这样才能实现高吞吐量。
上面数据接收,是把mq的消息放到了一个buffer中,业务获取数据就可以直接从该buffer中读取:
批量读取到数据后,就可以批量处理,比如建索引,写es。
2、Elasticsearch的批量写入
实现es批量写入的核心代码如下:
func (b *EsBatchInsert) BatchInSert(docs []Document) error {
var json = jsoniter.Config***patibleWithStandardLibrary
buf := bytes.Buffer{}
for _, doc := range docs {
meta := map[string]any{
"index": map[string]any{
"_index": doc.Index,
"_id": doc.ID,
},
}
if err := json.Ne