storm的2种运行方法
先简单介绍一下我的信息
开发工具:idea
storm版本:1.0.3
一下都以wordcount为例子
1 storm集群运行
1 pom文件:
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.3</version>
</dependency>
</dependencies>
<build>
<finalName>wordcount</finalName>
<plugins>
<!--<plugin>-->
<!--<artifactId>maven-assembly-plugin</artifactId>-->
<!--<configuration>-->
<!--<descriptorRefs>-->
<!--<descriptorRef>jar-with-dependencies</descriptorRef>-->
<!--</descriptorRefs>-->
<!--<archive>-->
<!--<manifest>-->
<!--<mainClass>com.datachina.storm.WordCountStorm</mainClass>-->
<!--</manifest>-->
<!--</archive>-->
<!--</configuration>-->
<!--<executions>-->
<!--<execution>-->
<!--<id>make-assembly</id>-->
<!--<phase>package</phase>-->
<!--<goals>-->
<!--<goal>single</goal>-->
<!--</goals>-->
<!--</execution>-->
<!--</executions>-->
<!--</plugin>-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
2 运行主类
public class WordCountStorm {
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("mySpout", new MySpout(), 2);
builder.setBolt("mySplitBolt", new MySplitBolt(), 4).shuffleGrouping("mySpout");
builder.setBolt("myCountBolt", new MyCountBolt(), 2).fieldsGrouping("mySplitBolt", new Fields("word"));
//2、创建一个configuration,用来指定当前topology 需要的worker的数量
Config config = new Config();
config.setNumWorkers(2);
//3、提交任务 -----两种模式 本地模式和集群模式
StormSubmitter.submitTopology("mywordcount", config, builder.createTopology());
// LocalCluster localCluster = new LocalCluster();
// localCluster.submitTopology("mywordcount",config,builder.createTopology());
}
}
3 打包
4 上传jar包到master节点
scp wordcount.jar hadoop@192.168.56.93:~/
5 启动任务
storm jar wordcount.jar com.datachina.storm.WordCountStorm wordcount
看到finished submitting topology : mywordcount表示任务提交成功
6 ui 页面
也可以看到任务在运行
7 kill 任务
可以用命令kill也可以在UI界面kill
storm kill mywordcount
2 storm本地运行
1 pom文件和上面的一样
2 主类:
public class WordCountStorm {
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("mySpout", new MySpout(), 2);
builder.setBolt("mySplitBolt", new MySplitBolt(), 4).shuffleGrouping("mySpout");
builder.setBolt("myCountBolt", new MyCountBolt(), 2).fieldsGrouping("mySplitBolt", new Fields("word"));
//2、创建一个configuration,用来指定当前topology 需要的worker的数量
Config config = new Config();
config.setNumWorkers(2);
//3、提交任务 -----两种模式 本地模式和集群模式
// StormSubmitter.submitTopology("mywordcount", config, builder.createTopology());
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mywordcount",config,builder.createTopology());
}
}
3 直接运行main方法
4 运行结果如下:
count{a=821, i=822, loser=821}
count{a=821, i=823, loser=821}
count{a=822, i=823, loser=821}
count{a=822, i=823, loser=822}
count{a=822, i=824, loser=822}
count{a=823, i=824, loser=822}
count{a=823, i=824, loser=823}
count{a=823, i=825, loser=823}
count{a=824, i=825, loser=823}
count{a=824, i=825, loser=824}
count{a=824, i=826, loser=824}
count{a=825, i=826, loser=824}
count{a=825, i=826, loser=825}
count{a=825, i=827, loser=825}
count{a=826, i=827, loser=825}
count{a=826, i=827, loser=826}
count{a=826, i=828, loser=826}
count{a=827, i=828, loser=826}
count{a=827, i=828, loser=827}
count{a=827, i=829, loser=827}
count{a=828, i=829, loser=827}
count{a=828, i=829, loser=828}
count{a=828, i=830, loser=828}
count{a=829, i=830, loser=828}
程序运行没问题!