storm 自己的理解,遇到的问题记录一


storm 自己的理解,遇到的问题记录一

1 离线计算

离线计算:批量获取数据、批量传输数据、周期性批量计算数据、数据展示

代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算数据、Hive批量计算数据、***任务调度

1,hivesql
2、调度平台
3、Hadoop集群运维
4、数据清洗(脚本语言)
5、元数据管理
6、数据稽查
7、数据仓库模型架构

2 流式计算,或者叫实时计算

流式计算:数据实时产生、数据实时传输、数据实时计算、实时展示

代表技术:Flume实时获取数据、Kafka/metaq实时数据存储、Storm/JStorm实时数据计算、Redis实时结果缓存、持久化存储(mysql)。

一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快的得到计算结果

他们的区别就是:

最大的区别:实时收集、实时计算、实时展示

3 Storm是什么?

storm 官网:http://storm.apache.org/index.html 在里面介绍了storm是什么!

Apache Storm is a free and open source distributed realtime computation system. 
Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. 
Storm is simple, can be used with any programming language, and is a lot of fun to use!

我大概翻译一下:
    Apache Storm是一个免费的开源分布式实时计算系统。 
    Storm可以轻松可靠地处理无限数据流,实现Hadoop对批处理所做的实时处理。
    Storm非常简单,可以与任何编程语言一起使用,并且使用起来很有趣!

还有关于版本的问题:
    http://storm.apache.org/2018/06/04/storm122-released.html
    可以在官网上查询到每个版本都有那些新特性, 增强了那些包, 修复了那些bug,关于这个版本的文档.

还有一个问题就是jstrom:
    jstorm是阿里重新写了storm,用java语言,现在jstorm的代码已经大部分都迁移到storm主流程序里面了,
    所以现在用storm和jstrom没有什么本质的区别!

Storm用来实时处理数据.

**特点**:低延迟、高可用、分布式、可扩展、数据不丢失。提供简单容易理解的接口,便于开发。

4 Storm与Hadoop的区别

• Storm用于实时计算    Hadoop用于离线计算。
• Storm处理的数据保存在内存中,源源不断    Hadoop处理的数据保存在文件系统中,一批一批。
• Storm的数据通过网络传输进来    Hadoop的数据保存在磁盘中。
• Storm与Hadoop的编程模型相似

5 storm典型的运用场景

1.运用场景:

  • 日志分析

    从海量日志中分析出特定的数据,并将分析的结果存入外部存储器用来辅佐决策。

  • 管道系统

    将一个数据从一个系统传输到另外一个系统,比如将数据库同步到Hadoop

  • 消息转化器

    将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件

2.典型案例:

  • 淘-实时分析系统:实时分析用户的属性,并反馈给搜索引擎

  • 携程-网站性能监控:实时分析系统监控携程网的网站性能

  • 阿里妈妈-用户画像:实时计算用户的兴趣数据

  • 一个游戏新版本上线,有一个实时分析系统,收集游戏中的数据,运营或者开发者可以在上线后几秒钟得到持续不断更新的游戏监控报告和分析结果,然后马上针对游戏的参数 和平衡性进行调整。这样就能够大大缩短游戏迭代周期,加强游戏的生命力。

  • 实时计算在腾讯的运用:精准推荐(广点通广告推荐、新闻推荐、视频推荐、游戏道具推荐);实时分析(微信运营数据门户、效果统计、订单画像分析);实时监控(实时监控平台、游戏内接口调用)

  • 精准投放广告

6 storm的核心组件和架构 <重点>

  • Nimbus:负责资源分配和任务调度。

  • Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。—通过配置文件设置当前supervisor上启动多少个worker。

  • Worker:运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务。

  • Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,不同spout/bolt的task可能会共享一个物理线程,该线程称为executor。

7 Storm编程模型 <重点>

  • Topology:Storm中运行的一个实时应用程序的名称。(拓扑)

  • Spout:在一个topology中获取源数据流的组件。
    通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。

  • Bolt:接受数据然后执行处理的组件,用户可以在其中执行自己想要的操作。

  • Tuple:一次消息传递的基本单元,理解为一组消息就是一个Tuple。

  • Stream:表示数据的流向。

  • Stream grouping:即消息的partition方法。Storm中提供若干种实用的grouping方式,包括shuffle, fields hash, all, global, none, direct和localOrShuffle等
    <!–hexoPostRenderEscape:

    Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型:

1.Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。

2.Fields Grouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。

3.All Grouping:广播发送,对于每一个tuple,所有的bolts都会收到。

4.Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。

5.Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果, 有一点不同的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。

6.Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。

  1. Local or shuffle grouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks。否则,和普通的Shuffle Grouping行为一致。:hexoPostRenderEscape–>

8 流式计算一般架构图 <重点>

其中flume用来获取数据。
Kafka用来临时保存数据。
Strom用来计算数据。
Redis是个内存数据库,用来保存数据。

9 关于storm集群部署的问题可以参考我大数据的一些常用大数据软件的安装

我这是只说一下重点:

1、部署成功之后,启动storm集群。
    依次启动集群的各种角色

2、查看nimbus的日志信息
在nimbus的服务器上
cd /export/servers/storm/logs
tail -100f /export/servers/storm/logs/nimbus.log

3、查看ui运行日志信息
在ui的服务器上,一般和nimbus一个服务器
cd /export/servers/storm/logs
tail -100f /export/servers/storm/logs/ui.log

4、查看supervisor运行日志信息
在supervisor服务上
cd /export/servers/storm/logs
tail -100f /export/servers/storm/logs/supervisor.log

5、查看supervisor上worker运行日志信息
在supervisor服务上
cd /export/servers/storm/logs
tail -100f /export/servers/storm/logs/worker-6702.log
访问nimbus.host:/8080,即可看到storm的ui界面。

Storm常用操作命令:

有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑。

1.提交任务命令格式:
    storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
bin/storm jar examples/storm-starter/storm-starter-topologies-0.9.6.jar storm.starter.WordCountTopology wordcount

2.杀死任务命令格式:
    storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
storm kill topology-name -w 10

3.停用任务命令格式:
    storm deactivte  【拓扑名称】
storm deactivte topology-name

我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。

4.启用任务命令格式:storm activate【拓扑名称】
        storm activate topology-name

5.重新部署任务命令格式:storm rebalance  【拓扑名称】
        storm rebalance topology-name
再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配工人,并重启拓扑。

10 做一下总结

1、编程模型

DataSource:外部数据源

Spout:接受外部数据源的组件,将外部数据源转化成Storm内部的数据,以Tuple为基本的传输单元下发给Bolt

Bolt:接受Spout发送的数据,或上游的bolt的发送的数据。根据业务逻辑进行处理。发送给下一个Bolt或者是存储到某种介质上。介质可以是Redis可以是mysql,或者其他。

Tuple:Storm内部中数据传输的基本单元,里面封装了一个List对象,用来保存数据。

StreamGrouping:数据分组策略:
7种:shuffleGrouping(Random函数),Non Grouping(Random函数),FieldGrouping(Hash取模)、Local or ShuffleGrouping 本地或随机,优先本地。

2、并发度

用户指定的一个任务,可以被多个线程执行,并发度的数量等于线程的数量。一个任务的多个线程,会被运行在多个Worker(JVM)上,有一种类似于平均算法的负载均衡策略。尽可能减少网络IO,和Hadoop中的MapReduce中的本地计算的道理一样。

3、架构

Nimbus:任务分配

Supervisor:接受任务,并启动worker。worker的数量根据端口号来的。

Worker:执行任务的具体组件(其实就是一个JVM),可以执行两种类型的任务,Spout任务或者bolt任务。

Task:Task=线程=executor。 一个Task属于一个Spout或者Bolt并发任务。

Zookeeper:保存任务分配的信息、心跳信息、元数据信息。

4、Worker与topology

一个worker只属于一个topology,每个worker中运行的task只能属于这个topology。 反之,一个topology包含多个worker,其实就是这个topology运行在多个worker上。

一个topology要求的worker数量如果不被满足,集群在任务分配时,根据现有的worker先运行topology。如果当前集群中worker数量为0,那么最新提交的topology将只会被标识active,不会运行,只有当集群有了空闲资源之后,才会被运行。

最后来张图最能显示strom各个组件之间的关系:

10现在动手写一个storm基本的程序wordcount

在idea中创建maven项目,这个我就不说了

  • pom文件: 我这里使用的版本的是1.0.3 并且我组自己安装的storm集群也是这个版本
<dependencies>
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.3</version>
</dependency>
   </dependencies>
  • WordCountTopologMain.java storm启动主类
package cn.itcast.storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopologMain {
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

//1、准备一个TopologyBuilder
TopologyBuilder topologyBuilder = new TopologyBuilder();

topologyBuilder.setSpout("mySpout",new MySpout(),2);
topologyBuilder.setBolt("mybolt1",new MySplitBolt(),2).shuffleGrouping("mySpout");
 topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).fieldsGrouping("mybolt1", new Fields("word"));

//topologyBuilder.setBolt("mybolt2",new MyCountBolt(),4).shuffleGrouping("mybolt1");
//config.setNumWorkers(2);
        /**
         * i
         * am
         * lilei
         * love
         * hanmeimei
         */


//2、创建一个configuration,用来指定当前topology 需要的worker的数量
Config config =  new Config();
config.setNumWorkers(2);

//3、提交任务  -----两种模式 本地模式和集群模式 StormSubmitter.submitTopology("mywordcount",config,topologyBuilder.createTopology());
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mywordcount",config,topologyBuilder.createTopology());
    }
}
  • MySpout.java 1.spout获取数据 2. 向bolt通过tuple类型射数据
package cn.itcast.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;

public class MySpout extends BaseRichSpout {
    SpoutOutputCollector collector;

 //初始化方法
 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
}

//storm 框架在 while(true) 调用nextTuple方法
public void nextTuple() {
        collector.emit(new Values("i am lilei love hanmeimei"));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("love"));
    }
}
  • MySplitBolt.java 1.切割spout发射过来的数据 2.处理后向其他的bolt继续发射数据
package cn.itcast.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class MySplitBolt extends BaseRichBolt {
    OutputCollector collector;

//初始化方法
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
}

// 被storm框架 while(true) 循环调用  传入参数tuple
public void execute(Tuple input) {
        String line = input.getString(0);
        String[] arrWords = line.split(" ");
        for (String word:arrWords){
            collector.emit(new Values(word,1));
        }
    }

public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","num"));
    }
}
  • MyCountBolt.java 做最后的单词计数
package cn.itcast.storm;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;


public class MyCountBolt extends BaseRichBolt {
    OutputCollector collector;

    Map<String, Integer> map = new HashMap<String, Integer>();

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getString(0);
        Integer num = input.getInteger(1);
        System.out.println(Thread.currentThread().getId() + "    word:"+word);
        if (map.containsKey(word)){
            Integer count = map.get(word);
            map.put(word,count + num);
        }else {
            map.put(word,num);
        }
        System.out.println("count:"+map);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
       //不輸出
    }
}
  • 在idea中就可以直接运行main方法以本地模式运行, 如果是集群模式需要先打包成jar 然后上传到服务器上通过storm jar运行!

看一下我运行的一些数据吧:速度很快!很快

136    word:am
count:{am=82525}
136    word:am
count:{am=82526}
136    word:am
count:{am=82527}
136    word:am
count:{am=82528}
136    word:am
count:{am=82529}
136    word:am
count:{am=82530}
136    word:am
count:{am=82531}
136    word:am
count:{am=82532}
136    word:am
count:{am=82533}

记录二 写storm提交任务的流程, 内部通信机制, 消息容错机制,  自己动手写一个类似storm的框架!


文章作者: 邓滔
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 邓滔 !
评论
  目录