架构图
物理模型:
Nimbus:负责资源分配和任务调度。
Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。
Worker:运行具体处理组件逻辑的进程。
Task:worker中每一个spout/bolt的线程称为一个task。同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。
编程模型:
Topology:一次任务的统称;
Spout组件:是消息生产者。
Bolt组件:负责接收Spout组件发射的信息流,在复杂的业务逻辑中可以串联多个Bolt组件。
Stream流:负责spout与bolt或者bolt与bolt之间交互的管道;
Tuple:一次消息传递的基本单元,理解为一组消息就是一个Tuple
eg:Topology:任务名称 Nimbus:项目经理 Supervisor:开组长、产品经理 Worker:开人员 Spout/Bolt:开人员中的两种角色,一种是服务器开发、一种是客户端开发;
Storm特性:
Storm的并行度计算
并行度设置:
work并行度
work并行度代表产生几个topology工作进程;
注意点:此处的工作进程数量设置不能大于集群配置的端口数量,workNum<=slots.ports.size;
Config conf = new Config();
// 设置worker数,默认是1
conf.setNumWorkers(2);
executor并行度
表示一个进程中产生几个工作线程;
如果不创建task,executor的数量=task的数量
// 创建2个executor,消费随机分配到groupSpout的数据
// 如果不创建task,则 executor:task=1:1,有几个executor就有几个task
builder.setBolt("group1Bolt", new GroupOneParseBolt(),2).shuffleGrouping("groupSpout");
task并行度
表示一个线程中,有几个实例在运行;
默认情况下,一个线程对应一个task,如果不设置executor,只设置task数量,那就表示一个线程中有多个task实例运行;
如果即设置task又设置executor,那么要注意,#threads<=#tasks,否则会造成资源浪费;(个人建议,只设置executor控制实例就可以)
// 创建2个task,一个线程里边运行两个task实例
// 如果设置了task,直接创建对应的task数量
builder.setBolt("group1Bolt", new GroupOneParseBolt()).setNumTasks(2).shuffleGrouping("groupSpout");
// 保证task>=executor,否则会造成资源浪费
builder.setBolt("group1Bolt", new GroupOneParseBolt(), 2).setNumTasks(3).shuffleGrouping("groupSpout");
程序运行过程中改变并行度的方法,专业术语:再平衡(rebalance)
- 通过storm web UI 再平衡功能
- 使用CLI tool命令工具
-n 5 将worker并行度修改成5
-e spout=3 将线程并行度修改成3
## Reconfigure the topology "mytopology" to use 5 worker processes,
## the spout "blue-spout" to use 3 executors and
## the bolt "yellow-bolt" to use 10 executors.
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
Storm的分发策略
Shuffle grouping(随机分组)
元组随机分布在 Bolt 的任务中,以便保证每个 bolt 获得相同数量的元组。
public class GroupSpout extends BaseRichSpout {
// 声明发送器对象,用于发送任务
private SpoutOutputCollector collector;
// 声明一个计数器
private int number;
private String[] arr = new String[]{"aa","bb","***"};
/**
* 打开数据流
*
* @param map
* @param topologyContext topo上下文对象
* @param spoutOutputCollector 发送器,将tuple发送到下一个处理器
*/
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
/**
*
* 封装tuple
*/
@Override
public void nextTuple() {
this.collector.emit(new Values(number ++,arr[(int) (Math.random()*3)]));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 封装发出流的格式
*
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("num","word"));
}
}
public class ShuffleParseBolt extends BaseBasicBolt {
/**
* 处理数据
*
* @param tuple
* @param basicOutputCollector
*/
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
System.out.println("ShuffleParseBolt接收上游的值:【" + this + "】【" + tuple + "】【" + tuple.getStringByField("word") + "】");
}
/**
* 处理向下游传递的数据
*
* @param outputFieldsDeclarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
public static void main(String[] args) {
// 创建拓扑图
TopologyBuilder builder = new TopologyBuilder();
// 设置拓扑关系 spout
builder.setSpout("groupSpout", new GroupSpout());
// 分区策略
builder.setBolt("group1Bolt", new ShuffleParseBolt(), 2).shuffleGrouping("groupSpout");
// builder.setBolt("group1Bolt", new FieldsParseBolt(), 2).fieldsGrouping("groupSpout", new Fields("word"));
Config conf = new Config();
if (args != null && args.length > 0) {
//提交到集群运行
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
//本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("GroupTopology", conf, builder.createTopology());
Utils.sleep(Long.MAX_VALUE);
cluster.killTopology("GroupTopology");
cluster.shutdown();
}
}
Fields grouping(字段分组)
流按分组中指定的字段进行分区。例如,如果流按“user-id”字段分组,则具有相同“user-id”的元组将始终转到同一任务,但具有不同“user-id”的元组可能会转到不同的任务。
demo如上图;
需要注意:根据字段分组时,如果将线程设置的大于组总数,其他线程会空闲,造成资源浪费;
Partial Key grouping(部分键分组)
流按分组中指定的字段(如字段分组)进行分区,但在两个下游 Bolt 之间进行负载平衡,从而在传入数据倾斜时更好地利用资源。本文很好地解释了它的工作原理及其提供的优势。
All grouping(所有分组)
流在所有 bolt 的任务中复制。请谨慎使用此分组。
Global grouping(全局分组)
整个流转到 Bolt 的单个任务。具体来说,它转到具有最低ID的任务。
None grouping(无分组)
此分组指定您不关心流的分组方式。目前,没有分组等同于随机分组。但最终,Storm 将按下没有分组的螺栓,以便在与它们订阅的螺栓或喷嘴相同的线程中执行(如果可能)。
Direct grouping(直接分组)
这是一种特殊的分组。以这种方式分组的流意味着元组的创建者决定使用者的哪个任务将接收此元组。直接分组只能在已声明为直接流的流上声明。发送到直接流的元组必须使用 [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) 方法之一发出。Bolt 可以通过使用提供的 TopologyContext 或在 OutputCollector 中跟踪 emit 方法的输出(返回元组发送到的任务 ID)来获取其使用者的任务 ID。
Local or shuffle grouping(本地或随机分组)
如果目标 Bolt 在同一工作进程中有一个或多个任务,则元组将仅随机组随机分配给那些进程内任务。否则,这类似于正常的随机分组。
Storm的可靠性
背景:如果消息发送错误,会消息重传;
操作:在spout中重写fail实现失败之后处理逻辑,每次bolt处理完数据,如果正常需要手动上报ack,如果失败上报fail,通过重写的方法觉得数据如何处理;
BaseRichSpout 与 BaseBasicSpout 区别在于BaseRichSpout 需要手动提交处理结果; BaseBasicSpout 会主动提交结果;
spout:
继承BaseRichSpout重写ack方法和fail方法
public class AckerSpout extends BaseRichSpout {
// 声明发送器对象,用于发送任务
private SpoutOutputCollector collector;
// 声明一个发送存储器 <msgId,Values>
private ConcurrentHashMap<Object, Values> ackMap = new ConcurrentHashMap<>();
// 声明一个失败存储器 <msgId,失败次数>
private ConcurrentHashMap<Object, Integer> failMap = new ConcurrentHashMap<>();
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
String uuid = UUID.randomUUID().toString().substring(0, 8);
int messageId = new Random().nextInt(Integer.MAX_VALUE);
Values values = new Values(uuid);
// 发送消息
this.collector.emit(values, messageId);
// 将发送过的消息,存入map
ackMap.put(messageId, values);
System.out.println("AskerSpout开始发出消息,messageID=" + messageId + "--value=" + uuid);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("value"));
}
/**
* bolt处理tuple成功,回调此方法
*
* @param msgId
*/
@Override
public void ack(Object msgId) {
this.ackMap.remove(msgId);
this.failMap.remove(msgId);
System.out.println("failMap size 剩余:" + failMap.values());
}
/**
* bolt处理tuple失败,回调此方法,决定重试还是放弃
*
* @param msgId 消息ID
*/
@Override
public void fail(Object msgId) {
System.out.println("-------------------------------3、spout fail-------------------------------:" + msgId.toString());
Integer fail_count = failMap.get(msgId); //获取该Tuple失败的次数
if (fail_count == null) {
fail_count = 0;
}
fail_count++;
if (fail_count >= 3) {
//重试次数已满,不再进行重新emit
failMap.remove(msgId);
System.out.println(msgId+"-重试次数已满,移除map");
} else {
//记录该tuple失败次数
failMap.put(msgId, fail_count);
//重发
this.collector.emit(this.ackMap.get(msgId), msgId);
System.out.println(msgId+"-处理失败,重试");
}
}
}
Bolt:
public class AckerParseBolt extends BaseRichBolt {
// 声明发送器对象,用于发送任务
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
System.out.println("-------------------------------1、AckerBolt处理过程-------------------------------"+tuple.getMessageId()+"---tuple--:"+tuple);
// 随机失败
if (new Random().nextInt(10) == 0) {
System.out.println("AckerBolt处理失败:" + tuple);
this.collector.fail(tuple);
} else {
System.out.println("AckerBolt处理成功:" + tuple);
// 建立tuple tree
this.collector.emit(tuple ,new Values(tuple.getStringByField("value")));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("value"));
}
}
Topology:
public static void main(String[] args) {
// 创建拓扑图
TopologyBuilder builder = new TopologyBuilder();
// 设置拓扑关系 spout
builder.setSpout("ackerSpout", new AckerSpout());
// 设置拓扑关系 bolt
builder.setBolt("ackerBolt", new AckerParseBolt()).shuffleGrouping("ackerSpout");
builder.setBolt("ackerLastBolt", new AckerLastParseBolt()).shuffleGrouping("ackerBolt");
Config conf = new Config();
// 设置worker数
conf.setNumWorkers(1);
conf.setNumAckers(1);
if (args != null && args.length > 0) {
//提交到集群运行
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
//本地模式运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("TopoTestStorm", conf, builder.createTopology());
Utils.sleep(Long.MAX_VALUE);
cluster.killTopology("TopoTestStorm");
cluster.shutdown();
}
}
Storm通讯原理
- worker进程间的通讯原理
通过***ty通讯
- worker进程内的通讯原理
Disruptor看成一个时间监听机制,在队列中,一边生产消息,一边消费消息
底层是一个ringbuffer(环形数据缓冲区)
REST API
搭建集群:
安装zookeeper
下载tar包,上传至服务器解压
配置环境变量
[root@VM-24-11-centos etc]# vim /etc/profile
export ZOOKEEPER_HOME=/software/zookeeper/zookeeper.1.2.3
export PATH=$ZOOKEEPER_HOME/bin:$PATH
[root@VM-24-11-centos etc]# source /etc/profile
安装依赖
- Java 8+ (Apache Storm 2.x 通过 travis ci 针对 java 8 JDK 进行测试)
- Python 2.7.x 或 Python 3.x
配置环境变量
[root@VM-24-11-centos etc]# vim /etc/profile
export JAVA_HOME=/software/jdk/jdk.1.8.0
export PATH=$JAVA_HOME/bin:$PATH
[root@VM-24-11-centos etc]# source /etc/profile
安装storm包
- 上传apache-storm-2.4.0.tar.gz到服务器并解压
tar -xzvf apache-storm-2.4.0.tar.gz
配置storm.yaml
1、进入目录
cd /software/apache-storm-2.4.0/conf
2、修改文件
vim storm.yaml
storm.zookeeper.port: 2181
storm.zookeeper.servers:
- "10.12.54.124"
# - "server2"
# 状态存储目录
storm.local.dir: "/usr/local/storm/status"
# 主节点地址 ip1,ip2
nimbus.seeds: ["127.0.0.1"]
# 工作进程端口,配置几个,最多分配几个工作进程
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704
- 6705
- 6706
- 6707
- 6708
- 6709
- 6710
- 6711
配置环境变量
[root@VM-24-11-centos etc]# vim /etc/profile
export STORM_HOME=/software/storm/apache-storm-2.4.0
export PATH=$STORM_HOME/bin:$PATH
[root@VM-24-11-centos etc]# source /etc/profile
创建状态存储目录
[root@VM-24-11-centos etc]# mkdir /usr/local/storm/status
启动程序
storm nimbus # 启动nimbus(主控)
storm supervisor # 启动 supervisor
storm ui # 启动 stormUI
storm drpc # 启动 drpc
StormUI查看 默认端口8080
10.12.54.124:8080
jar包上传
jar包上传上去,执行命令运行
// storm jar jar包名 主类 topology名称
storm jar xxx.jar ***.qiaoxc.demo.TopologyApplication NumberTopology