- 浏览: 88157 次
- 性别:
- 来自: 郑州
- 全部博客 (69)
- java (28)
- linux (6)
- redis (4)
- C# (3)
- 架构 (10)
- java ee (1)
- web (1)
- 操作系统 (7)
- sqlserver (1)
- android (2)
- Hadoop (12)
- 大数据 (21)
- 分布式 事务 消息 (10)
- linux mysql (1)
- 数据库 (3)
- 关于hadoop之bootshell使用 (2)
- 关于hbase---HTableInterfaceFactory (1)
- Spring (3)
- Hbase (5)
- jstorm (10)
- nginx (1)
- 分布式 (1)
- 区块链 (3)
- dubbo (1)
- nacos (1)
- 阿里 (1)
- go (3)
- 缓存 (1)
- memcached (1)
- ssdb (1)
- 源码 (1)
楼主,能不能给发一份源代码,1300246542@qqq.co ...
spring+websocket的使用 -
web实时推送技术使用越来越广泛,但是自己开发又太麻烦了,我觉 ...
websocket -
前提是你用的是spring mvc 才需要加的1、在web.x ...
spring+websocket的使用 -
CharacterEncodingFilter这个filter ...
// 1、创建一个topology对象
TridentTopology topology = new TridentTopology();
// 2、设置相关参数: each分片、group by分组、聚合aggregate
// 返回TridentState
TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)
.each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
// 3、传输
topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
// 4、build topology
return topology.build();
package storm.trident;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.jgrapht.DirectedGraph;
import org.jgrapht.UndirectedGraph;
import org.jgrapht.alg.ConnectivityInspector;
import org.jgrapht.graph.DefaultDirectedGraph;
import org.jgrapht.graph.Pseudograph;
import com.alibaba.jstorm.client.ConfigExtension;
import backtype.storm.Config;
import backtype.storm.ILocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.trident.drpc.ReturnResultsReducer;
import storm.trident.fluent.GroupedStream;
import storm.trident.fluent.IAggregatableStream;
import storm.trident.fluent.UniqueIdGen;
import storm.trident.graph.GraphGrouper;
import storm.trident.graph.Group;
import storm.trident.operation.GroupedMultiReducer;
import storm.trident.operation.MultiReducer;
import storm.trident.operation.impl.FilterExecutor;
import storm.trident.operation.impl.GroupedMultiReducerExecutor;
import storm.trident.operation.impl.IdentityMultiReducer;
import storm.trident.operation.impl.JoinerMultiReducer;
import storm.trident.operation.impl.TrueFilter;
import storm.trident.partition.IdentityGrouping;
import storm.trident.planner.Node;
import storm.trident.planner.NodeStateInfo;
import storm.trident.planner.PartitionNode;
import storm.trident.planner.ProcessorNode;
import storm.trident.planner.SpoutNode;
import storm.trident.planner.SubtopologyBolt;
import storm.trident.planner.processor.EachProcessor;
import storm.trident.planner.processor.MultiReducerProcessor;
import storm.trident.spout.BatchSpoutExecutor;
import storm.trident.spout.IBatchSpout;
import storm.trident.spout.IOpaquePartitionedTridentSpout;
import storm.trident.spout.IPartitionedTridentSpout;
import storm.trident.spout.ITridentSpout;
import storm.trident.spout.OpaquePartitionedTridentSpoutExecutor;
import storm.trident.spout.PartitionedTridentSpoutExecutor;
import storm.trident.spout.RichSpoutBatchExecutor;
import storm.trident.state.StateFactory;
import storm.trident.state.StateSpec;
import storm.trident.topology.TridentTopologyBuilder;
import storm.trident.util.ErrorEdgeFactory;
import storm.trident.util.IndexedEdge;
import storm.trident.util.TridentUtils;
// graph with 3 kinds of nodes:
// operation, partition, or spout
// all operations have finishBatch and can optionally be committers
public class TridentTopology {
//TODO: add a method for drpc stream, needs to know how to automatically do returnresults, etc
// is it too expensive to do a batch per drpc request?
// 有序无环图 主要记录该topology中对应的spout、bolt以及spout和bolt之间的stream(tuples)关联关系
DefaultDirectedGraph<Node, IndexedEdge> _graph;
// 记录topology包含的node(spout/bolt)
Map<String, List<Node>> _colocate = new HashMap();
// 唯一全局Id
UniqueIdGen _gen;
// 构建一个有序无环图 初始化 DirectedGraph 以及 id记录仪(用于topology无component(spout和bolt))
public TridentTopology() {
_graph = new DefaultDirectedGraph(new ErrorEdgeFactory());
_gen = new UniqueIdGen();
// 构建一个有序无环图 初始化 DirectedGraph 以及 id记录仪(用于topology有component(spout和bolt))
private TridentTopology(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
_graph = graph;
_colocate = colocate;
_gen = gen;
// automatically turn it into a batch spout, should take parameters as to how much to batch
// public Stream newStream(IRichSpout spout) {
// Node n = new SpoutNode(getUniqueStreamId(), TridentUtils.getSingleOutputStreamFields(spout), null, spout, SpoutNode.SpoutType.BATCH);
// return addNode(n);
// }
// 根据每一个stream 在jstorm中都有一个对应的txId(关于这个txId都是UniqueIdGen来生成的从0开始递增);注意:spout有两种一种是IRichSpout另外一种是IBatchSpout
public Stream newStream(String txId, IRichSpout spout) {
return newStream(txId, new RichSpoutBatchExecutor(spout));
public Stream newStream(String txId, IBatchSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
// 根据指定的stream的streamId 、spout输出fileds、事务txId、spout、Spout的Node Type构建SpoutNode
// 并将该节点Node添加有序无环图_graph 中 同时并判断对应的_colocate是否存在该节点的信息(根据state info的id作为node的唯一标示)
public Stream newStream(String txId, ITridentSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
public Stream newStream(String txId, IPartitionedTridentSpout spout) {
return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
// 创建Drpc的stream
public Stream newDRPCStream(String function) {
return newDRPCStream(new DRPCSpout(function));
public Stream newDRPCStream(String function, ILocalDRPC server) {
DRPCSpout spout;
if(server==null) {
spout = new DRPCSpout(function);
} else {
spout = new DRPCSpout(function, server);
return newDRPCStream(spout);
// 根据指定的unique stream id 、单个single output fields(jstorm在trident topology中只支持single output fields集同时也不支持direct的stream)、不需要指定事务txId
// Spout Node Type(spout的节点类型)
// 将SpoutNode添加到Ttopology的有序无环图_graph中
// 同时调用stream的project将该节点添加到topology的source node中(换句话说也是在graph指定该节点node对应的source和IndexedEdge 向该节点发送数据的所有节点)
private Stream newDRPCStream(DRPCSpout spout) {
// TODO: consider adding a shuffle grouping after the spout to avoid so much routing of the args/return-info all over the place
// (at least until its possible to just pack bolt logic into the spout itself)
Node n = new SpoutNode(getUniqueStreamId(), TridentUtils.getSingleOutputStreamFields(spout), null, spout, SpoutNode.SpoutType.DRPC);
Stream nextStream = addNode(n);
// later on, this will be joined back with return-info and all the results
return nextStream.project(new Fields("args"));
public TridentState newStaticState(StateFactory factory) {
return newStaticState(new StateSpec(factory));
// 根据StateSpec参数----》唯一状态Id,通过Node构造一个new node 同时指定StateInfo
// 并该节点添加到_colocate中
// 并返回TridentState
public TridentState newStaticState(StateSpec spec) {
String stateId = getUniqueStateId();
Node n = new Node(getUniqueStreamId(), null, new Fields());
n.stateInfo = new NodeStateInfo(stateId, spec);
return new TridentState(this, n);
// # jstrom 里面提供的reduce和join方法只是用来完成 上游节点 输出Fields 采用指定function 完成合并(reduce)和扩展(join)
// 再通过新的OutputFields输出
// 将stream 1和stream2通过reducer处理将output fields 存放 outputFields 不再存在原有的stream1和stream2的outputfields
public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(s1, s2), function, outputFields);
public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(s1, s2), function, outputFields);
public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) {
return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
List<String> names = new ArrayList<String>();
for(Stream s: streams) {
if(s._name!=null) {
Node n = new ProcessorNode(getUniqueStreamId(), Utils.join(names, "-"), outputFields, outputFields, new MultiReducerProcessor(inputFields, function));
return addSourcedNode(streams, n);
* process Group stream
* @param inputFields
* @param groupedStreams
* @param function
* @param outputFields
* @return
public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
List<Fields> fullInputFields = new ArrayList<Fields>();
List<Stream> streams = new ArrayList<Stream>();
List<Fields> fullGroupFields = new ArrayList<Fields>();
for(int i=0; i<groupedStreams.size(); i++) {
GroupedStream gs = groupedStreams.get(i);
Fields groupFields = gs.getGroupFields();
fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i)));
return multiReduce(fullInputFields, streams, new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields);
public Stream merge(Fields outputFields, Stream... streams) {
return merge(outputFields, Arrays.asList(streams));
public Stream merge(Fields outputFields, List<Stream> streams) {
return multiReduce(streams, new IdentityMultiReducer(), outputFields);
public Stream merge(Stream... streams) {
return merge(Arrays.asList(streams));
public Stream merge(List<Stream> streams) {
return merge(streams.get(0).getOutputFields(), streams);
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields);
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) {
return join(streams, joinFields, outFields, JoinType.INNER);
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type);
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
return join(streams, joinFields, outFields, repeat(streams.size(), type));
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
return multiReduce(strippedInputFields(streams, joinFields),
groupedStreams(streams, joinFields),
new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
// 注:由于transaction不支持jstorm的batch mode 需要关闭TASK_BATCH_TUPLE = false
public StormTopology build() {
// Transaction is not compatible with jstorm batch mode(task.batch.tuple)
// so we close batch mode via system property
System.setProperty(ConfigExtension.TASK_BATCH_TUPLE, "false");
DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();
// 在此处主要完成如下几点内容
// 1、通过使用第三方的图形库 判断整个topology的graph中component之间的connection ,并将component之间有关联的放到一个set中 那么整个topology形成的graph的集合List<Set<Node>>
// 2、 针对生成后的graph进行spout检查 spout不同时支持batch和drpc
// 3、获取每个分支中的DrpcNode 并将该分支中数据源投递 通过drpc接着后续操作
completeDRPC(graph, _colocate, _gen);
List<SpoutNode> spoutNodes = new ArrayList<SpoutNode>();
// can be regular nodes (static state) or processor nodes
Set<Node> boltNodes = new HashSet<Node>();
// 循环遍历graph的有序无环图中的各个node(bolt和spout) 判断 添加到不同的nodes中
for(Node n: graph.vertexSet()) {
if(n instanceof SpoutNode) {
spoutNodes.add((SpoutNode) n);
} else if(!(n instanceof PartitionNode)) {
// 分组 group
Set<Group> initialGroups = new HashSet<Group>();
for(List<Node> colocate: _colocate.values()) {
Group g = new Group(graph, colocate);
for(Node n: boltNodes) {
initialGroups.add(new Group(graph, n));
GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
Collection<Group> mergedGroups = grouper.getAllGroups();
// add identity partitions between groups
// 如下代码目前没有完全搞透
// 大体上要实现在sourceNode 和 targetNode 在不同group中 添加一个ThriftNode(PartitionNode)
// 并将该节点 填充到有序无环图中 (添加顶点(Vertex)、Edge)
for(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {
if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {
Group g1 = grouper.nodeGroup(e.source);
Group g2 = grouper.nodeGroup(e.target);
// g1 being null means the source is a spout node
if(g1==null && !(e.source instanceof SpoutNode))
throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
if(g1==null || !g1.equals(g2)) {
PartitionNode pNode = makeIdentityPartition(e.source);
graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));
// if one group subscribes to the same stream with same partitioning multiple times,
// merge those together (otherwise can end up with many output streams created for that partitioning
// if need to split into multiple output streams because of same input having different
// partitioning to the group)
// this is because can't currently merge splitting logic into a spout
// not the most kosher algorithm here, since the grouper indexes are being trounced via the adding of nodes to random groups, but it
// works out
// 通过指定的Group获取其额外的PartitionNode
// 再根据指定outputFields生成唯一的Node
// 同时找出PartitionNode所对应的parent node 和 该节点的edge信息
// 清除原有节点
// 构建新的节点 idNode和newPartitonNode 设置相关的vertex 和 edges 重新添加到graph中
List<Node> forNewGroups = new ArrayList<Node>();
for(Group g: mergedGroups) {
for(PartitionNode n: extraPartitionInputs(g)) {
Node idNode = makeIdentityNode(n.allOutputFields);
Node newPartitionNode = new PartitionNode(idNode.streamId, n.name, idNode.allOutputFields, n.thriftGrouping);
Node parentNode = TridentUtils.getParent(graph, n);
Set<IndexedEdge> outgoing = graph.outgoingEdgesOf(n);
addEdge(graph, parentNode, idNode, 0);
addEdge(graph, idNode, newPartitionNode, 0);
for(IndexedEdge e: outgoing) {
addEdge(graph, newPartitionNode, e.target, e.index);
Group parentGroup = grouper.nodeGroup(parentNode);
if(parentGroup==null) {
} else {
// TODO: in the future, want a way to include this logic in the spout itself,
// or make it unecessary by having storm include metadata about which grouping a tuple
// came from
for(Node n: forNewGroups) {
grouper.addGroup(new Group(graph, n));
// add in spouts as groups so we can get parallelisms
for(Node n: spoutNodes) {
grouper.addGroup(new Group(graph, n));
// 每次添加新的group之后需要重新生成index
mergedGroups = grouper.getAllGroups();
Map<Node, String> batchGroupMap = new HashMap();
List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
for(int i=0; i<connectedComponents.size(); i++) {
String groupId = "bg" + i;
for(Node n: connectedComponents.get(i)) {
batchGroupMap.put(n, groupId);
// System.out.println("GRAPH:");
// System.out.println(graph);
// 计算parallelism
Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
// 创建 topology builder
TridentTopologyBuilder builder = new TridentTopologyBuilder();
Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
Map<Group, String> boltIds = genBoltIds(mergedGroups);
// 设置 spout
for(SpoutNode sn: spoutNodes) {
Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
if(sn.type == SpoutNode.SpoutType.DRPC) {
builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
(IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
} else {
ITridentSpout s;
if(sn.spout instanceof IBatchSpout) {
s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
} else if(sn.spout instanceof ITridentSpout) {
s = (ITridentSpout) sn.spout;
} else {
throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
// TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
// 设置bolt
for(Group g: mergedGroups) {
if(!isSpoutGroup(g)) {
Integer p = parallelisms.get(g);
Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
committerBatches(g, batchGroupMap), streamToGroup);
Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
for(PartitionNode n: inputs) {
Node parent = TridentUtils.getParent(graph, n);
String componentId;
if(parent instanceof SpoutNode) {
componentId = spoutIds.get(parent);
} else {
componentId = boltIds.get(grouper.nodeGroup(parent));
d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
// 构建topology
return builder.buildTopology();
private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
for(Set<Node> g: connectedComponents) {
TridentTopology helper = new TridentTopology(graph, colocate, gen);
for(Set<Node> g: connectedComponents) {
SpoutNode drpcNode = getDRPCSpoutNode(g);
if(drpcNode!=null) {
Stream lastStream = new Stream(helper, null, getLastAddedNode(g));
Stream s = new Stream(helper, null, drpcNode);
s.project(new Fields("return-info"))
new ReturnResultsReducer(),
new Fields());
private static Node getLastAddedNode(Collection<Node> g) {
Node ret = null;
for(Node n: g) {
if(ret==null || n.creationIndex > ret.creationIndex) {
ret = n;
return ret;
//returns null if it's not a drpc group
private static SpoutNode getDRPCSpoutNode(Collection<Node> g) {
for(Node n: g) {
if(n instanceof SpoutNode) {
SpoutNode.SpoutType type = ((SpoutNode) n).type;
if(type==SpoutNode.SpoutType.DRPC) {
return (SpoutNode) n;
return null;
private static void checkValidJoins(Collection<Node> g) {
boolean hasDRPCSpout = false;
boolean hasBatchSpout = false;
for(Node n: g) {
if(n instanceof SpoutNode) {
SpoutNode.SpoutType type = ((SpoutNode) n).type;
if(type==SpoutNode.SpoutType.BATCH) {
hasBatchSpout = true;
} else if(type==SpoutNode.SpoutType.DRPC) {
hasDRPCSpout = true;
if(hasBatchSpout && hasDRPCSpout) {
throw new RuntimeException("Cannot join DRPC stream with streams originating from other spouts");
private static boolean isSpoutGroup(Group g) {
return g.nodes.size() == 1 && g.nodes.iterator().next() instanceof SpoutNode;
private static Collection<PartitionNode> uniquedSubscriptions(Set<PartitionNode> subscriptions) {
Map<String, PartitionNode> ret = new HashMap();
for(PartitionNode n: subscriptions) {
PartitionNode curr = ret.get(n.streamId);
if(curr!=null && !curr.thriftGrouping.equals(n.thriftGrouping)) {
throw new RuntimeException("Multiple subscriptions to the same stream with different groupings. Should be impossible since that is explicitly guarded against.");
ret.put(n.streamId, n);
return ret.values();
private static Map<Node, String> genSpoutIds(Collection<SpoutNode> spoutNodes) {
Map<Node, String> ret = new HashMap();
int ctr = 0;
for(SpoutNode n: spoutNodes) {
ret.put(n, "spout" + ctr);
return ret;
private static Map<Group, String> genBoltIds(Collection<Group> groups) {
Map<Group, String> ret = new HashMap();
int ctr = 0;
for(Group g: groups) {
if(!isSpoutGroup(g)) {
List<String> name = new ArrayList();
name.add("" + ctr);
String groupName = getGroupName(g);
if(groupName!=null && !groupName.isEmpty()) {
ret.put(g, Utils.join(name, "-"));
return ret;
private static String getGroupName(Group g) {
TreeMap<Integer, String> sortedNames = new TreeMap();
for(Node n: g.nodes) {
if(n.name!=null) {
sortedNames.put(n.creationIndex, n.name);
List<String> names = new ArrayList<String>();
String prevName = null;
for(String n: sortedNames.values()) {
if(prevName==null || !n.equals(prevName)) {
prevName = n;
return Utils.join(names, "-");
private static Map<String, String> getOutputStreamBatchGroups(Group g, Map<Node, String> batchGroupMap) {
Map<String, String> ret = new HashMap();
Set<PartitionNode> externalGroupOutputs = externalGroupOutputs(g);
for(PartitionNode n: externalGroupOutputs) {
ret.put(n.streamId, batchGroupMap.get(n));
return ret;
private static Set<String> committerBatches(Group g, Map<Node, String> batchGroupMap) {
Set<String> ret = new HashSet();
for(Node n: g.nodes) {
if(n instanceof ProcessorNode) {
if(((ProcessorNode) n).committer) {
return ret;
private static Map<Group, Integer> getGroupParallelisms(DirectedGraph<Node, IndexedEdge> graph, GraphGrouper grouper, Collection<Group> groups) {
UndirectedGraph<Group, Object> equivs = new Pseudograph<Group, Object>(Object.class);
for(Group g: groups) {
for(Group g: groups) {
for(PartitionNode n: externalGroupInputs(g)) {
if(isIdentityPartition(n)) {
Node parent = TridentUtils.getParent(graph, n);
Group parentGroup = grouper.nodeGroup(parent);
if(parentGroup!=null && !parentGroup.equals(g)) {
equivs.addEdge(parentGroup, g);
Map<Group, Integer> ret = new HashMap();
List<Set<Group>> equivGroups = new ConnectivityInspector<Group, Object>(equivs).connectedSets();
for(Set<Group> equivGroup: equivGroups) {
Integer fixedP = getFixedParallelism(equivGroup);
Integer maxP = getMaxParallelism(equivGroup);
if(fixedP!=null && maxP!=null && maxP < fixedP) {
throw new RuntimeException("Parallelism is fixed to " + fixedP + " but max parallelism is less than that: " + maxP);
Integer p = 1;
for(Group g: equivGroup) {
for(Node n: g.nodes) {
if(n.parallelismHint!=null) {
p = Math.max(p, n.parallelismHint);
if(maxP!=null) p = Math.min(maxP, p);
if(fixedP!=null) p = fixedP;
for(Group g: equivGroup) {
ret.put(g, p);
return ret;
private static Integer getMaxParallelism(Set<Group> groups) {
Integer ret = null;
for(Group g: groups) {
if(isSpoutGroup(g)) {
SpoutNode n = (SpoutNode) g.nodes.iterator().next();
Map conf = getSpoutComponentConfig(n.spout);
if(conf==null) conf = new HashMap();
Number maxP = (Number) conf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
if(maxP!=null) {
if(ret==null) ret = maxP.intValue();
else ret = Math.min(ret, maxP.intValue());
return ret;
private static Map getSpoutComponentConfig(Object spout) {
if(spout instanceof IRichSpout) {
return ((IRichSpout) spout).getComponentConfiguration();
} else if (spout instanceof IBatchSpout) {
return ((IBatchSpout) spout).getComponentConfiguration();
} else {
return ((ITridentSpout) spout).getComponentConfiguration();
private static Integer getFixedParallelism(Set<Group> groups) {
Integer ret = null;
for(Group g: groups) {
for(Node n: g.nodes) {
if(n.stateInfo != null && n.stateInfo.spec.requiredNumPartitions!=null) {
int reqPartitions = n.stateInfo.spec.requiredNumPartitions;
if(ret!=null && ret!=reqPartitions) {
throw new RuntimeException("Cannot have one group have fixed parallelism of two different values");
ret = reqPartitions;
return ret;
private static boolean isIdentityPartition(PartitionNode n) {
Grouping g = n.thriftGrouping;
if(g.is_set_custom_serialized()) {
CustomStreamGrouping csg = (CustomStreamGrouping) Utils.javaDeserialize(g.get_custom_serialized(), Serializable.class);
return csg instanceof IdentityGrouping;
return false;
private static void addEdge(DirectedGraph g, Object source, Object target, int index) {
g.addEdge(source, target, new IndexedEdge(source, target, index));
private Node makeIdentityNode(Fields allOutputFields) {
return new ProcessorNode(getUniqueStreamId(), null, allOutputFields, new Fields(),
new EachProcessor(new Fields(), new FilterExecutor(new TrueFilter())));
private static List<PartitionNode> extraPartitionInputs(Group g) {
List<PartitionNode> ret = new ArrayList();
Set<PartitionNode> inputs = externalGroupInputs(g);
Map<String, List<PartitionNode>> grouped = new HashMap();
for(PartitionNode n: inputs) {
if(!grouped.containsKey(n.streamId)) {
grouped.put(n.streamId, new ArrayList());
for(List<PartitionNode> group: grouped.values()) {
PartitionNode anchor = group.get(0);
for(int i=1; i<group.size(); i++) {
PartitionNode n = group.get(i);
if(!n.thriftGrouping.equals(anchor.thriftGrouping)) {
return ret;
private static Set<PartitionNode> externalGroupInputs(Group g) {
Set<PartitionNode> ret = new HashSet();
for(Node n: g.incomingNodes()) {
if(n instanceof PartitionNode) {
ret.add((PartitionNode) n);
return ret;
private static Set<PartitionNode> externalGroupOutputs(Group g) {
Set<PartitionNode> ret = new HashSet();
for(Node n: g.outgoingNodes()) {
if(n instanceof PartitionNode) {
ret.add((PartitionNode) n);
return ret;
private static PartitionNode makeIdentityPartition(Node basis) {
return new PartitionNode(basis.streamId, basis.name, basis.allOutputFields,
Grouping.custom_serialized(Utils.javaSerialize(new IdentityGrouping())));
protected String getUniqueStreamId() {
return _gen.getUniqueStreamId();
protected String getUniqueStateId() {
return _gen.getUniqueStateId();
protected void registerNode(Node n) {
if(n.stateInfo!=null) {
String id = n.stateInfo.id;
if(!_colocate.containsKey(id)) {
_colocate.put(id, new ArrayList());
protected Stream addNode(Node n) {
return new Stream(this, n.name, n);
protected void registerSourcedNode(List<Stream> sources, Node newNode) {
int streamIndex = 0;
for(Stream s: sources) {
_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));
protected Stream addSourcedNode(List<Stream> sources, Node newNode) {
registerSourcedNode(sources, newNode);
return new Stream(this, newNode.name, newNode);
protected TridentState addSourcedStateNode(List<Stream> sources, Node newNode) {
registerSourcedNode(sources, newNode);
return new TridentState(this, newNode);
protected Stream addSourcedNode(Stream source, Node newNode) {
return addSourcedNode(Arrays.asList(source), newNode);
protected TridentState addSourcedStateNode(Stream source, Node newNode) {
return addSourcedStateNode(Arrays.asList(source), newNode);
private static List<Fields> getAllOutputFields(List streams) {
List<Fields> ret = new ArrayList<Fields>();
for(Object o: streams) {
ret.add(((IAggregatableStream) o).getOutputFields());
return ret;
private static List<GroupedStream> groupedStreams(List<Stream> streams, List<Fields> joinFields) {
List<GroupedStream> ret = new ArrayList<GroupedStream>();
for(int i=0; i<streams.size(); i++) {
return ret;
private static List<Fields> strippedInputFields(List<Stream> streams, List<Fields> joinFields) {
List<Fields> ret = new ArrayList<Fields>();
for(int i=0; i<streams.size(); i++) {
ret.add(TridentUtils.fieldsSubtract(streams.get(i).getOutputFields(), joinFields.get(i)));
return ret;
private static List<JoinType> repeat(int n, JoinType type) {
List<JoinType> ret = new ArrayList<JoinType>();
for(int i=0; i<n; i++) {
return ret;
// 1、创建一个topology对象
TridentTopology topology = new TridentTopology();
// 2、设置相关参数: each分片、group by分组、聚合aggregate
// 返回TridentState
TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16)
.each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
// 3、传输
topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
// 4、build topology
return topology.build();
package storm.trident;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.jgrapht.DirectedGraph;
import org.jgrapht.UndirectedGraph;
import org.jgrapht.alg.ConnectivityInspector;
import org.jgrapht.graph.DefaultDirectedGraph;
import org.jgrapht.graph.Pseudograph;
import com.alibaba.jstorm.client.ConfigExtension;
import backtype.storm.Config;
import backtype.storm.ILocalDRPC;
import backtype.storm.drpc.DRPCSpout;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.trident.drpc.ReturnResultsReducer;
import storm.trident.fluent.GroupedStream;
import storm.trident.fluent.IAggregatableStream;
import storm.trident.fluent.UniqueIdGen;
import storm.trident.graph.GraphGrouper;
import storm.trident.graph.Group;
import storm.trident.operation.GroupedMultiReducer;
import storm.trident.operation.MultiReducer;
import storm.trident.operation.impl.FilterExecutor;
import storm.trident.operation.impl.GroupedMultiReducerExecutor;
import storm.trident.operation.impl.IdentityMultiReducer;
import storm.trident.operation.impl.JoinerMultiReducer;
import storm.trident.operation.impl.TrueFilter;
import storm.trident.partition.IdentityGrouping;
import storm.trident.planner.Node;
import storm.trident.planner.NodeStateInfo;
import storm.trident.planner.PartitionNode;
import storm.trident.planner.ProcessorNode;
import storm.trident.planner.SpoutNode;
import storm.trident.planner.SubtopologyBolt;
import storm.trident.planner.processor.EachProcessor;
import storm.trident.planner.processor.MultiReducerProcessor;
import storm.trident.spout.BatchSpoutExecutor;
import storm.trident.spout.IBatchSpout;
import storm.trident.spout.IOpaquePartitionedTridentSpout;
import storm.trident.spout.IPartitionedTridentSpout;
import storm.trident.spout.ITridentSpout;
import storm.trident.spout.OpaquePartitionedTridentSpoutExecutor;
import storm.trident.spout.PartitionedTridentSpoutExecutor;
import storm.trident.spout.RichSpoutBatchExecutor;
import storm.trident.state.StateFactory;
import storm.trident.state.StateSpec;
import storm.trident.topology.TridentTopologyBuilder;
import storm.trident.util.ErrorEdgeFactory;
import storm.trident.util.IndexedEdge;
import storm.trident.util.TridentUtils;
// graph with 3 kinds of nodes:
// operation, partition, or spout
// all operations have finishBatch and can optionally be committers
public class TridentTopology {
//TODO: add a method for drpc stream, needs to know how to automatically do returnresults, etc
// is it too expensive to do a batch per drpc request?
// 有序无环图 主要记录该topology中对应的spout、bolt以及spout和bolt之间的stream(tuples)关联关系
DefaultDirectedGraph<Node, IndexedEdge> _graph;
// 记录topology包含的node(spout/bolt)
Map<String, List<Node>> _colocate = new HashMap();
// 唯一全局Id
UniqueIdGen _gen;
// 构建一个有序无环图 初始化 DirectedGraph 以及 id记录仪(用于topology无component(spout和bolt))
public TridentTopology() {
_graph = new DefaultDirectedGraph(new ErrorEdgeFactory());
_gen = new UniqueIdGen();
// 构建一个有序无环图 初始化 DirectedGraph 以及 id记录仪(用于topology有component(spout和bolt))
private TridentTopology(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
_graph = graph;
_colocate = colocate;
_gen = gen;
// automatically turn it into a batch spout, should take parameters as to how much to batch
// public Stream newStream(IRichSpout spout) {
// Node n = new SpoutNode(getUniqueStreamId(), TridentUtils.getSingleOutputStreamFields(spout), null, spout, SpoutNode.SpoutType.BATCH);
// return addNode(n);
// }
// 根据每一个stream 在jstorm中都有一个对应的txId(关于这个txId都是UniqueIdGen来生成的从0开始递增);注意:spout有两种一种是IRichSpout另外一种是IBatchSpout
public Stream newStream(String txId, IRichSpout spout) {
return newStream(txId, new RichSpoutBatchExecutor(spout));
public Stream newStream(String txId, IBatchSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
// 根据指定的stream的streamId 、spout输出fileds、事务txId、spout、Spout的Node Type构建SpoutNode
// 并将该节点Node添加有序无环图_graph 中 同时并判断对应的_colocate是否存在该节点的信息(根据state info的id作为node的唯一标示)
public Stream newStream(String txId, ITridentSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
public Stream newStream(String txId, IPartitionedTridentSpout spout) {
return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
// 创建Drpc的stream
public Stream newDRPCStream(String function) {
return newDRPCStream(new DRPCSpout(function));
public Stream newDRPCStream(String function, ILocalDRPC server) {
DRPCSpout spout;
if(server==null) {
spout = new DRPCSpout(function);
} else {
spout = new DRPCSpout(function, server);
return newDRPCStream(spout);
// 根据指定的unique stream id 、单个single output fields(jstorm在trident topology中只支持single output fields集同时也不支持direct的stream)、不需要指定事务txId
// Spout Node Type(spout的节点类型)
// 将SpoutNode添加到Ttopology的有序无环图_graph中
// 同时调用stream的project将该节点添加到topology的source node中(换句话说也是在graph指定该节点node对应的source和IndexedEdge 向该节点发送数据的所有节点)
private Stream newDRPCStream(DRPCSpout spout) {
// TODO: consider adding a shuffle grouping after the spout to avoid so much routing of the args/return-info all over the place
// (at least until its possible to just pack bolt logic into the spout itself)
Node n = new SpoutNode(getUniqueStreamId(), TridentUtils.getSingleOutputStreamFields(spout), null, spout, SpoutNode.SpoutType.DRPC);
Stream nextStream = addNode(n);
// later on, this will be joined back with return-info and all the results
return nextStream.project(new Fields("args"));
public TridentState newStaticState(StateFactory factory) {
return newStaticState(new StateSpec(factory));
// 根据StateSpec参数----》唯一状态Id,通过Node构造一个new node 同时指定StateInfo
// 并该节点添加到_colocate中
// 并返回TridentState
public TridentState newStaticState(StateSpec spec) {
String stateId = getUniqueStateId();
Node n = new Node(getUniqueStreamId(), null, new Fields());
n.stateInfo = new NodeStateInfo(stateId, spec);
return new TridentState(this, n);
// # jstrom 里面提供的reduce和join方法只是用来完成 上游节点 输出Fields 采用指定function 完成合并(reduce)和扩展(join)
// 再通过新的OutputFields输出
// 将stream 1和stream2通过reducer处理将output fields 存放 outputFields 不再存在原有的stream1和stream2的outputfields
public Stream multiReduce(Stream s1, Stream s2, MultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(s1, s2), function, outputFields);
public Stream multiReduce(Fields inputFields1, Stream s1, Fields inputFields2, Stream s2, MultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
public Stream multiReduce(GroupedStream s1, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(s1, s2), function, outputFields);
public Stream multiReduce(Fields inputFields1, GroupedStream s1, Fields inputFields2, GroupedStream s2, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(Arrays.asList(inputFields1, inputFields2), Arrays.asList(s1, s2), function, outputFields);
public Stream multiReduce(List<Stream> streams, MultiReducer function, Fields outputFields) {
return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
public Stream multiReduce(List<GroupedStream> streams, GroupedMultiReducer function, Fields outputFields) {
return multiReduce(getAllOutputFields(streams), streams, function, outputFields);
public Stream multiReduce(List<Fields> inputFields, List<Stream> streams, MultiReducer function, Fields outputFields) {
List<String> names = new ArrayList<String>();
for(Stream s: streams) {
if(s._name!=null) {
Node n = new ProcessorNode(getUniqueStreamId(), Utils.join(names, "-"), outputFields, outputFields, new MultiReducerProcessor(inputFields, function));
return addSourcedNode(streams, n);
* process Group stream
* @param inputFields
* @param groupedStreams
* @param function
* @param outputFields
* @return
public Stream multiReduce(List<Fields> inputFields, List<GroupedStream> groupedStreams, GroupedMultiReducer function, Fields outputFields) {
List<Fields> fullInputFields = new ArrayList<Fields>();
List<Stream> streams = new ArrayList<Stream>();
List<Fields> fullGroupFields = new ArrayList<Fields>();
for(int i=0; i<groupedStreams.size(); i++) {
GroupedStream gs = groupedStreams.get(i);
Fields groupFields = gs.getGroupFields();
fullInputFields.add(TridentUtils.fieldsUnion(groupFields, inputFields.get(i)));
return multiReduce(fullInputFields, streams, new GroupedMultiReducerExecutor(function, fullGroupFields, inputFields), outputFields);
public Stream merge(Fields outputFields, Stream... streams) {
return merge(outputFields, Arrays.asList(streams));
public Stream merge(Fields outputFields, List<Stream> streams) {
return multiReduce(streams, new IdentityMultiReducer(), outputFields);
public Stream merge(Stream... streams) {
return merge(Arrays.asList(streams));
public Stream merge(List<Stream> streams) {
return merge(streams.get(0).getOutputFields(), streams);
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields);
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields) {
return join(streams, joinFields, outFields, JoinType.INNER);
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, JoinType type) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, type);
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, JoinType type) {
return join(streams, joinFields, outFields, repeat(streams.size(), type));
public Stream join(Stream s1, Fields joinFields1, Stream s2, Fields joinFields2, Fields outFields, List<JoinType> mixed) {
return join(Arrays.asList(s1, s2), Arrays.asList(joinFields1, joinFields2), outFields, mixed);
public Stream join(List<Stream> streams, List<Fields> joinFields, Fields outFields, List<JoinType> mixed) {
return multiReduce(strippedInputFields(streams, joinFields),
groupedStreams(streams, joinFields),
new JoinerMultiReducer(mixed, joinFields.get(0).size(), strippedInputFields(streams, joinFields)),
// 注:由于transaction不支持jstorm的batch mode 需要关闭TASK_BATCH_TUPLE = false
public StormTopology build() {
// Transaction is not compatible with jstorm batch mode(task.batch.tuple)
// so we close batch mode via system property
System.setProperty(ConfigExtension.TASK_BATCH_TUPLE, "false");
DefaultDirectedGraph<Node, IndexedEdge> graph = (DefaultDirectedGraph) _graph.clone();
// 在此处主要完成如下几点内容
// 1、通过使用第三方的图形库 判断整个topology的graph中component之间的connection ,并将component之间有关联的放到一个set中 那么整个topology形成的graph的集合List<Set<Node>>
// 2、 针对生成后的graph进行spout检查 spout不同时支持batch和drpc
// 3、获取每个分支中的DrpcNode 并将该分支中数据源投递 通过drpc接着后续操作
completeDRPC(graph, _colocate, _gen);
List<SpoutNode> spoutNodes = new ArrayList<SpoutNode>();
// can be regular nodes (static state) or processor nodes
Set<Node> boltNodes = new HashSet<Node>();
// 循环遍历graph的有序无环图中的各个node(bolt和spout) 判断 添加到不同的nodes中
for(Node n: graph.vertexSet()) {
if(n instanceof SpoutNode) {
spoutNodes.add((SpoutNode) n);
} else if(!(n instanceof PartitionNode)) {
// 分组 group
Set<Group> initialGroups = new HashSet<Group>();
for(List<Node> colocate: _colocate.values()) {
Group g = new Group(graph, colocate);
for(Node n: boltNodes) {
initialGroups.add(new Group(graph, n));
GraphGrouper grouper = new GraphGrouper(graph, initialGroups);
Collection<Group> mergedGroups = grouper.getAllGroups();
// add identity partitions between groups
// 如下代码目前没有完全搞透
// 大体上要实现在sourceNode 和 targetNode 在不同group中 添加一个ThriftNode(PartitionNode)
// 并将该节点 填充到有序无环图中 (添加顶点(Vertex)、Edge)
for(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {
if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) {
Group g1 = grouper.nodeGroup(e.source);
Group g2 = grouper.nodeGroup(e.target);
// g1 being null means the source is a spout node
if(g1==null && !(e.source instanceof SpoutNode))
throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");
if(g1==null || !g1.equals(g2)) {
PartitionNode pNode = makeIdentityPartition(e.source);
graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));
graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index));
// if one group subscribes to the same stream with same partitioning multiple times,
// merge those together (otherwise can end up with many output streams created for that partitioning
// if need to split into multiple output streams because of same input having different
// partitioning to the group)
// this is because can't currently merge splitting logic into a spout
// not the most kosher algorithm here, since the grouper indexes are being trounced via the adding of nodes to random groups, but it
// works out
// 通过指定的Group获取其额外的PartitionNode
// 再根据指定outputFields生成唯一的Node
// 同时找出PartitionNode所对应的parent node 和 该节点的edge信息
// 清除原有节点
// 构建新的节点 idNode和newPartitonNode 设置相关的vertex 和 edges 重新添加到graph中
List<Node> forNewGroups = new ArrayList<Node>();
for(Group g: mergedGroups) {
for(PartitionNode n: extraPartitionInputs(g)) {
Node idNode = makeIdentityNode(n.allOutputFields);
Node newPartitionNode = new PartitionNode(idNode.streamId, n.name, idNode.allOutputFields, n.thriftGrouping);
Node parentNode = TridentUtils.getParent(graph, n);
Set<IndexedEdge> outgoing = graph.outgoingEdgesOf(n);
addEdge(graph, parentNode, idNode, 0);
addEdge(graph, idNode, newPartitionNode, 0);
for(IndexedEdge e: outgoing) {
addEdge(graph, newPartitionNode, e.target, e.index);
Group parentGroup = grouper.nodeGroup(parentNode);
if(parentGroup==null) {
} else {
// TODO: in the future, want a way to include this logic in the spout itself,
// or make it unecessary by having storm include metadata about which grouping a tuple
// came from
for(Node n: forNewGroups) {
grouper.addGroup(new Group(graph, n));
// add in spouts as groups so we can get parallelisms
for(Node n: spoutNodes) {
grouper.addGroup(new Group(graph, n));
// 每次添加新的group之后需要重新生成index
mergedGroups = grouper.getAllGroups();
Map<Node, String> batchGroupMap = new HashMap();
List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
for(int i=0; i<connectedComponents.size(); i++) {
String groupId = "bg" + i;
for(Node n: connectedComponents.get(i)) {
batchGroupMap.put(n, groupId);
// System.out.println("GRAPH:");
// System.out.println(graph);
// 计算parallelism
Map<Group, Integer> parallelisms = getGroupParallelisms(graph, grouper, mergedGroups);
// 创建 topology builder
TridentTopologyBuilder builder = new TridentTopologyBuilder();
Map<Node, String> spoutIds = genSpoutIds(spoutNodes);
Map<Group, String> boltIds = genBoltIds(mergedGroups);
// 设置 spout
for(SpoutNode sn: spoutNodes) {
Integer parallelism = parallelisms.get(grouper.nodeGroup(sn));
if(sn.type == SpoutNode.SpoutType.DRPC) {
builder.setBatchPerTupleSpout(spoutIds.get(sn), sn.streamId,
(IRichSpout) sn.spout, parallelism, batchGroupMap.get(sn));
} else {
ITridentSpout s;
if(sn.spout instanceof IBatchSpout) {
s = new BatchSpoutExecutor((IBatchSpout)sn.spout);
} else if(sn.spout instanceof ITridentSpout) {
s = (ITridentSpout) sn.spout;
} else {
throw new RuntimeException("Regular rich spouts not supported yet... try wrapping in a RichSpoutBatchExecutor");
// TODO: handle regular rich spout without batches (need lots of updates to support this throughout)
builder.setSpout(spoutIds.get(sn), sn.streamId, sn.txId, s, parallelism, batchGroupMap.get(sn));
// 设置bolt
for(Group g: mergedGroups) {
if(!isSpoutGroup(g)) {
Integer p = parallelisms.get(g);
Map<String, String> streamToGroup = getOutputStreamBatchGroups(g, batchGroupMap);
BoltDeclarer d = builder.setBolt(boltIds.get(g), new SubtopologyBolt(graph, g.nodes, batchGroupMap), p,
committerBatches(g, batchGroupMap), streamToGroup);
Collection<PartitionNode> inputs = uniquedSubscriptions(externalGroupInputs(g));
for(PartitionNode n: inputs) {
Node parent = TridentUtils.getParent(graph, n);
String componentId;
if(parent instanceof SpoutNode) {
componentId = spoutIds.get(parent);
} else {
componentId = boltIds.get(grouper.nodeGroup(parent));
d.grouping(new GlobalStreamId(componentId, n.streamId), n.thriftGrouping);
// 构建topology
return builder.buildTopology();
private static void completeDRPC(DefaultDirectedGraph<Node, IndexedEdge> graph, Map<String, List<Node>> colocate, UniqueIdGen gen) {
List<Set<Node>> connectedComponents = new ConnectivityInspector<Node, IndexedEdge>(graph).connectedSets();
for(Set<Node> g: connectedComponents) {
TridentTopology helper = new TridentTopology(graph, colocate, gen);
for(Set<Node> g: connectedComponents) {
SpoutNode drpcNode = getDRPCSpoutNode(g);
if(drpcNode!=null) {
Stream lastStream = new Stream(helper, null, getLastAddedNode(g));
Stream s = new Stream(helper, null, drpcNode);
s.project(new Fields("return-info"))
new ReturnResultsReducer(),
new Fields());
private static Node getLastAddedNode(Collection<Node> g) {
Node ret = null;
for(Node n: g) {
if(ret==null || n.creationIndex > ret.creationIndex) {
ret = n;
return ret;
//returns null if it's not a drpc group
private static SpoutNode getDRPCSpoutNode(Collection<Node> g) {
for(Node n: g) {
if(n instanceof SpoutNode) {
SpoutNode.SpoutType type = ((SpoutNode) n).type;
if(type==SpoutNode.SpoutType.DRPC) {
return (SpoutNode) n;
return null;
private static void checkValidJoins(Collection<Node> g) {
boolean hasDRPCSpout = false;
boolean hasBatchSpout = false;
for(Node n: g) {
if(n instanceof SpoutNode) {
SpoutNode.SpoutType type = ((SpoutNode) n).type;
if(type==SpoutNode.SpoutType.BATCH) {
hasBatchSpout = true;
} else if(type==SpoutNode.SpoutType.DRPC) {
hasDRPCSpout = true;
if(hasBatchSpout && hasDRPCSpout) {
throw new RuntimeException("Cannot join DRPC stream with streams originating from other spouts");
private static boolean isSpoutGroup(Group g) {
return g.nodes.size() == 1 && g.nodes.iterator().next() instanceof SpoutNode;
private static Collection<PartitionNode> uniquedSubscriptions(Set<PartitionNode> subscriptions) {
Map<String, PartitionNode> ret = new HashMap();
for(PartitionNode n: subscriptions) {
PartitionNode curr = ret.get(n.streamId);
if(curr!=null && !curr.thriftGrouping.equals(n.thriftGrouping)) {
throw new RuntimeException("Multiple subscriptions to the same stream with different groupings. Should be impossible since that is explicitly guarded against.");
ret.put(n.streamId, n);
return ret.values();
private static Map<Node, String> genSpoutIds(Collection<SpoutNode> spoutNodes) {
Map<Node, String> ret = new HashMap();
int ctr = 0;
for(SpoutNode n: spoutNodes) {
ret.put(n, "spout" + ctr);
return ret;
private static Map<Group, String> genBoltIds(Collection<Group> groups) {
Map<Group, String> ret = new HashMap();
int ctr = 0;
for(Group g: groups) {
if(!isSpoutGroup(g)) {
List<String> name = new ArrayList();
name.add("" + ctr);
String groupName = getGroupName(g);
if(groupName!=null && !groupName.isEmpty()) {
ret.put(g, Utils.join(name, "-"));
return ret;
private static String getGroupName(Group g) {
TreeMap<Integer, String> sortedNames = new TreeMap();
for(Node n: g.nodes) {
if(n.name!=null) {
sortedNames.put(n.creationIndex, n.name);
List<String> names = new ArrayList<String>();
String prevName = null;
for(String n: sortedNames.values()) {
if(prevName==null || !n.equals(prevName)) {
prevName = n;
return Utils.join(names, "-");
private static Map<String, String> getOutputStreamBatchGroups(Group g, Map<Node, String> batchGroupMap) {
Map<String, String> ret = new HashMap();
Set<PartitionNode> externalGroupOutputs = externalGroupOutputs(g);
for(PartitionNode n: externalGroupOutputs) {
ret.put(n.streamId, batchGroupMap.get(n));
return ret;
private static Set<String> committerBatches(Group g, Map<Node, String> batchGroupMap) {
Set<String> ret = new HashSet();
for(Node n: g.nodes) {
if(n instanceof ProcessorNode) {
if(((ProcessorNode) n).committer) {
return ret;
private static Map<Group, Integer> getGroupParallelisms(DirectedGraph<Node, IndexedEdge> graph, GraphGrouper grouper, Collection<Group> groups) {
UndirectedGraph<Group, Object> equivs = new Pseudograph<Group, Object>(Object.class);
for(Group g: groups) {
for(Group g: groups) {
for(PartitionNode n: externalGroupInputs(g)) {
if(isIdentityPartition(n)) {
Node parent = TridentUtils.getParent(graph, n);
Group parentGroup = grouper.nodeGroup(parent);
if(parentGroup!=null && !parentGroup.equals(g)) {
equivs.addEdge(parentGroup, g);
Map<Group, Integer> ret = new HashMap();
List<Set<Group>> equivGroups = new ConnectivityInspector<Group, Object>(equivs).connectedSets();
for(Set<Group> equivGroup: equivGroups) {
Integer fixedP = getFixedParallelism(equivGroup);
Integer maxP = getMaxParallelism(equivGroup);
if(fixedP!=null && maxP!=null && maxP < fixedP) {
throw new RuntimeException("Parallelism is fixed to " + fixedP + " but max parallelism is less than that: " + maxP);
Integer p = 1;
for(Group g: equivGroup) {
for(Node n: g.nodes) {
if(n.parallelismHint!=null) {
p = Math.max(p, n.parallelismHint);
if(maxP!=null) p = Math.min(maxP, p);
if(fixedP!=null) p = fixedP;
for(Group g: equivGroup) {
ret.put(g, p);
return ret;
private static Integer getMaxParallelism(Set<Group> groups) {
Integer ret = null;
for(Group g: groups) {
if(isSpoutGroup(g)) {
SpoutNode n = (SpoutNode) g.nodes.iterator().next();
Map conf = getSpoutComponentConfig(n.spout);
if(conf==null) conf = new HashMap();
Number maxP = (Number) conf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM);
if(maxP!=null) {
if(ret==null) ret = maxP.intValue();
else ret = Math.min(ret, maxP.intValue());
return ret;
private static Map getSpoutComponentConfig(Object spout) {
if(spout instanceof IRichSpout) {
return ((IRichSpout) spout).getComponentConfiguration();
} else if (spout instanceof IBatchSpout) {
return ((IBatchSpout) spout).getComponentConfiguration();
} else {
return ((ITridentSpout) spout).getComponentConfiguration();
private static Integer getFixedParallelism(Set<Group> groups) {
Integer ret = null;
for(Group g: groups) {
for(Node n: g.nodes) {
if(n.stateInfo != null && n.stateInfo.spec.requiredNumPartitions!=null) {
int reqPartitions = n.stateInfo.spec.requiredNumPartitions;
if(ret!=null && ret!=reqPartitions) {
throw new RuntimeException("Cannot have one group have fixed parallelism of two different values");
ret = reqPartitions;
return ret;
private static boolean isIdentityPartition(PartitionNode n) {
Grouping g = n.thriftGrouping;
if(g.is_set_custom_serialized()) {
CustomStreamGrouping csg = (CustomStreamGrouping) Utils.javaDeserialize(g.get_custom_serialized(), Serializable.class);
return csg instanceof IdentityGrouping;
return false;
private static void addEdge(DirectedGraph g, Object source, Object target, int index) {
g.addEdge(source, target, new IndexedEdge(source, target, index));
private Node makeIdentityNode(Fields allOutputFields) {
return new ProcessorNode(getUniqueStreamId(), null, allOutputFields, new Fields(),
new EachProcessor(new Fields(), new FilterExecutor(new TrueFilter())));
private static List<PartitionNode> extraPartitionInputs(Group g) {
List<PartitionNode> ret = new ArrayList();
Set<PartitionNode> inputs = externalGroupInputs(g);
Map<String, List<PartitionNode>> grouped = new HashMap();
for(PartitionNode n: inputs) {
if(!grouped.containsKey(n.streamId)) {
grouped.put(n.streamId, new ArrayList());
for(List<PartitionNode> group: grouped.values()) {
PartitionNode anchor = group.get(0);
for(int i=1; i<group.size(); i++) {
PartitionNode n = group.get(i);
if(!n.thriftGrouping.equals(anchor.thriftGrouping)) {
return ret;
private static Set<PartitionNode> externalGroupInputs(Group g) {
Set<PartitionNode> ret = new HashSet();
for(Node n: g.incomingNodes()) {
if(n instanceof PartitionNode) {
ret.add((PartitionNode) n);
return ret;
private static Set<PartitionNode> externalGroupOutputs(Group g) {
Set<PartitionNode> ret = new HashSet();
for(Node n: g.outgoingNodes()) {
if(n instanceof PartitionNode) {
ret.add((PartitionNode) n);
return ret;
private static PartitionNode makeIdentityPartition(Node basis) {
return new PartitionNode(basis.streamId, basis.name, basis.allOutputFields,
Grouping.custom_serialized(Utils.javaSerialize(new IdentityGrouping())));
protected String getUniqueStreamId() {
return _gen.getUniqueStreamId();
protected String getUniqueStateId() {
return _gen.getUniqueStateId();
protected void registerNode(Node n) {
if(n.stateInfo!=null) {
String id = n.stateInfo.id;
if(!_colocate.containsKey(id)) {
_colocate.put(id, new ArrayList());
protected Stream addNode(Node n) {
return new Stream(this, n.name, n);
protected void registerSourcedNode(List<Stream> sources, Node newNode) {
int streamIndex = 0;
for(Stream s: sources) {
_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));
protected Stream addSourcedNode(List<Stream> sources, Node newNode) {
registerSourcedNode(sources, newNode);
return new Stream(this, newNode.name, newNode);
protected TridentState addSourcedStateNode(List<Stream> sources, Node newNode) {
registerSourcedNode(sources, newNode);
return new TridentState(this, newNode);
protected Stream addSourcedNode(Stream source, Node newNode) {
return addSourcedNode(Arrays.asList(source), newNode);
protected TridentState addSourcedStateNode(Stream source, Node newNode) {
return addSourcedStateNode(Arrays.asList(source), newNode);
private static List<Fields> getAllOutputFields(List streams) {
List<Fields> ret = new ArrayList<Fields>();
for(Object o: streams) {
ret.add(((IAggregatableStream) o).getOutputFields());
return ret;
private static List<GroupedStream> groupedStreams(List<Stream> streams, List<Fields> joinFields) {
List<GroupedStream> ret = new ArrayList<GroupedStream>();
for(int i=0; i<streams.size(); i++) {
return ret;
private static List<Fields> strippedInputFields(List<Stream> streams, List<Fields> joinFields) {
List<Fields> ret = new ArrayList<Fields>();
for(int i=0; i<streams.size(); i++) {
ret.add(TridentUtils.fieldsSubtract(streams.get(i).getOutputFields(), joinFields.get(i)));
return ret;
private static List<JoinType> repeat(int n, JoinType type) {
List<JoinType> ret = new ArrayList<JoinType>();
for(int i=0; i<n; i++) {
return ret;
2018-12-17 11:35 1242首先从github上获取对应的源码Nacos源码git cl ... -
2016-03-21 19:31 903一、作用 主要是通过结合zookeeper,在zookee ... -
2016-03-21 19:29 596一、作用 构建一个Rotationg transacti ... -
2016-03-21 19:28 899一、作用 Partition Spout对应的exec ... -
jstorm源码之 RichSpoutBatchExecutor
2016-03-21 19:28 0一、作用 RichSpoutBatchExecutor是IRi ... -
2016-03-21 19:27 889一、作用 基于LinkedList + HashM ... -
jstorm源码之 RichSpoutBatchExecutor
2016-03-21 19:24 633一、作用 RichSpoutBatchExecutor是IRi ... -
2016-03-15 18:04 2749启动ZOOPKEEPER zkServer.sh start ... -
2016-03-15 18:02 1270一、简介Supervisor是JStorm中的工作节点,类似 ... -
2016-03-15 17:56 927一、简介Storm是开源的 ... -
2016-01-11 14:33 8637数据库中间件mycat的使 ... -
2015-12-03 19:43 1766关于jstorm单机安装可以 ... -
2015-11-30 16:17 722关于hbase 一、客户端类 HTable 和 HTabl ... -
spring hadoop系列(六)---HbaseSystemException
2015-11-30 09:13 525一、源码 /** * HBase Data Access e ... -
spring hadoop系列(五)---spring hadoop hbase之HbaseSynchronizationManager
2015-11-27 18:16 879一、源码如下 /** * Synchronization m ... -
spring hadoop 系列(二)
2015-11-27 15:26 606一、源码分析 /** * * HbaseAccesso ... -
spring hadoop之batch处理(二)
2015-11-24 18:10 1531一、测试 public class MrBatchApp { ... -
spring hadoop之mapreduce batch
2015-11-24 15:51 640一、测试 // 定义hadoop configuration ... -
centos6.7 64位 伪分布 安装 cdh5.4.8 + jdk 8
2015-11-09 00:37 2334一、安装JAVA # 创建JAVA的目录 mkdir -p / ... -
spring hadoop系列二(MapReduce and Distributed cache)
2015-11-06 15:22 994关于MapReduce and Distributed Ca ...
- **Nimbus**:Nimbus是Storm集群的核心组件之一,负责整个集群的管理和协调工作,包括任务调度、故障恢复等。Nimbus通过Zookeeper来实现状态同步和集群协调。 - **Supervisor**:Supervisor运行在每个Worker节点上...
JStorm原始码学习:主要包含Storm重新启动,Nimbus启动,Supervisor启动,Executor创建和启动 风暴编程模型 Nimbus:负责资源分配和任务调度。 主管:负责接受nimbus分配的任务,启动和停止属于自己管理的worker...
1. **JStorm与Storm的关系**:讲解JStorm与Apache Storm的关系,包括JStorm的诞生背景、主要改进和优化之处。 2. **JStorm的核心概念**:介绍Spout、Bolt、拓扑等核心概念,解释它们在实时数据处理中的作用。 3. *...
**JStorm介绍** JStorm是由阿里巴巴开发的分布式实时计算引擎,它是基于Apache Storm的一个增强版本。JStorm的设计目标是提供比Storm更高的稳定性、更强的功能和更快的计算性能。这个技术分享主要涵盖了JStorm的...
本示例将探讨如何将Apache JStorm与Kafka进行集成,以利用它们各自的优势。JStorm是一款分布式实时计算系统,它允许用户处理数据流,而Kafka则是一个高吞吐量的分布式消息系统,用于实时数据传递。接下来,我们将...
接着,文档会详细阐述JStorm的安装与配置过程,包括下载源码、编译、部署以及设置环境变量。对于初学者,这部分内容至关重要,因为正确配置JStorm环境是后续开发和运行的基础。 在JStorm的使用方法部分,你会了解到...
**JStorm 2.1.1 API 深度解析** JStorm是阿里巴巴开源的一款分布式实时计算系统,它基于Apache Storm并进行了大量的优化,提供了一种高效、稳定且易用的流处理框架。JStorm 2.1.1是其一个重要的版本,此版本的API为...
**JStorm简介** JStorm是阿里巴巴开源的一款分布式实时计算系统,它是基于Apache Storm的一个高性能、高可用、热扩展的实时处理框架。JStorm的核心设计理念是简单、高效和稳定,能够处理大规模的数据流处理任务,...
JStorm是中国淘宝团队开发的一款分布式实时计算系统,它是基于Apache Storm的设计理念,但在性能、稳定性和易用性上进行了大量的优化。本文档将详细介绍JStorm的基础知识,包括其核心概念、工作原理、安装配置以及...
标题"jstorm课程"指的是关于JStorm的教育课程,JStorm是阿里巴巴开源的一个分布式实时计算框架,它是Apache Storm的Java版本,主要用于处理大规模数据流的实时计算。 描述中提到的"全套storm资料初学者必备 比较...
《JStorm 2.2.1:分布式流处理框架详解》 JStorm是阿里巴巴开源的一款高性能、高可靠的分布式实时计算系统,它基于Apache Storm并针对大规模数据处理进行了优化。JStorm 2.2.1是该框架的一个稳定版本,提供了许多...
storm-core-1.0.3-sources.jar 源码文件,1.0.3版本
1. **内存隔离与资源管理**:Flink作业运行在YARN之上,提供了内存隔离,确保了作业间的资源独立,同时YARN的队列机制便于进行资源分配和预算审核。 2. **统一资源管理**:通过YARN管理资源,减轻了运维负担,实现了...
阿里巴巴JStorm是一款由阿里巴巴开源的分布式实时计算系统,它基于Apache Storm,但在性能、稳定性、易用性等多个方面进行了优化,是大数据处理领域的重要工具之一。JStorm的设计理念是“简单、稳定、低延迟”,旨在...
JStorm是参考storm的实时流式计算框架,在网络IO、线程模型、资源调度、可用性及稳定性上做了持续改进,已被...经过4年发展,阿里巴巴JStorm集群已经成为世界上最大的集群之一,基于JStorm的应用数量超过1000个。 6198}