首先创建一个Topology主类,
然后spout是从kafka就收的数据流 KafkaSpout,
第一个bolt会从前面的spout接收数据,做一些初步的处理,传输给下一个bolt
不适应重量级的计算。
j实时UI无法准确的查看数据的执行情况,准确的性能调优存在一定困难。
所以在这里创建了一个抽象类继承BaseBasicBolt ,然后其他的bolt会继承创建的这个抽象类。
可以在bolt执行前和执行后记录时间。
并记录每个数据的执行流程和各个环节bolt的执行状态和耗时。
import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.ZooKeeper; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; public class BasicTopology { public static void main(String[] args) throws Exception { if (args == null || args.length < 2) { throw new NullPointerException("************** Topology args number must be three!"); } String zkhosts = args[0]; String nimbusHost = args[1]; String name = "user_profile_full_log_test"; TopologyBuilder builder = new TopologyBuilder(); //ZkHosts zkhost = new ZkHosts("192.168.112.138:2181,192.168.112.139:2181,192.168.112.140:2181"); ZkHosts zkhost = new ZkHosts(zkhosts); String topic = "tracker"; String spoutId = "kafkaSpout"; SpoutConfig spoutConfig = new SpoutConfig(zkhost, topic, "", spoutId); List<String> zkServers = new ArrayList<String>(); if (zkhosts != null && !zkhosts.isEmpty()) { for (String host : zkhosts.split(",")) { zkServers.add(host.split(":")[0]); } } spoutConfig.zkServers = zkServers; spoutConfig.zkPort = Integer.valueOf(2181); // spoutConfig.forceFromStart = true; spoutConfig.socketTimeoutMs = 60 * 1000; spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); String zkRoot = "/consumers/" + name + "0"; spoutConfig.zkRoot=zkRoot; ZKUtils dm=new ZKUtils(); try{ ZooKeeper zk = dm.createZKInstance( zkhosts ); zk.delete(zkRoot, -1); }catch(Exception e){ System.out.println("e:"+e.getMessage()); } // builder.setSpout("kafka_reader_test", new RandomSentenceSpout(), Integer.valueOf(1)); builder.setSpout("kafka_reader", new KafkaSpout(spoutConfig), 1); builder.setBolt("get_usertrack", new GetTrackInfoBolt(),2).shuffleGrouping("kafka_reader"); builder.setBolt("save_userstat", new SaveUserStatBolt(),2).shuffleGrouping("get_usertrack"); // builder.setBolt("save_userprofile", new SaveUserProfileBolt(),16).shuffleGrouping("save_userstat"); //builder.setBolt("save_useraction", new SaveUserActionBolt(),4).shuffleGrouping("get_usertrack"); // builder.setSpout("order_reader", new OrderSpout(), Integer.valueOf(1)); //builder.setBolt("save_useraction_order", new SaveUserActionBolt(),2).shuffleGrouping("order_reader"); // builder.setBolt("order_save_userstat", new SaveUserStatBolt(),8).shuffleGrouping("order_reader"); // builder.setBolt("order_save_userprofile", new SaveUserProfileBolt(),14).shuffleGrouping("order_save_userstat"); // builder.setBolt("save_usermobileprofile", new SaveMobileUserProfileBolt(),4).shuffleGrouping("save_userprofile"); // builder.setBolt("save_mergeuserprofile", new MobileMergePcProfileBolt(),4).shuffleGrouping("save_usermobileprofile"); Config conf = new Config(); //conf.setDebug(true); // conf.registerMetricsConsumer(MonitorLogConsumer.class, 1); // List list = new ArrayList(); // list.add("com.yhd.monitor.genlog.TraceTaskHook"); try { if (args != null && args.length == 2 ) { //UserProfileModel model = new UserProfileModel(); //String modelString = JSON.toJSONString(model); //conf.put(UserProfileConstants.USER_PROFILE_MODEL, modelString); conf.put("topology.max.spout.pending", Integer.valueOf(1024)); conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 60000); conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 10); conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 1000); conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); // conf.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, list);// conf.setNumWorkers(12); conf.setMaxTaskParallelism(100); conf.put("nimbus.host", nimbusHost); conf.put("nimbus.thrift.port", Integer.valueOf(6627)); conf.put("storm.zookeeper.servers", zkServers); conf.setMessageTimeoutSecs(300); StormSubmitter.submitTopology(name, conf, builder.createTopology()); } else { System.out.println("local:" ); UserProfileModel model = new UserProfileModel(); String modelString = JSON.toJSONString(model); conf.put(UserProfileConstants.USER_PROFILE_MODEL, modelString); conf.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("local_user_profile", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } } catch (Exception e) { e.printStackTrace(); } } }
上面是topology主类,主要是实现bolt流程的衔接,本地运行和线上环境的切换。
下面是创建的基础拦截继承类
public abstract class MonitorBaseBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; final public static String SPLIT_TAG = new String(new byte[] { 1 }); private static Logger log = Logger.getLogger(MonitorBaseBolt.class); public void execute(Tuple input, BasicOutputCollector collector) { MessageId mi = input.getMessageId(); Map<Long, Long> map = mi.getAnchorsToIds(); String uid = UUID.randomUUID().toString(); String key = getRootId(map); MonitorLogExecutor.getInstance().put("s" + SPLIT_TAG + key + SPLIT_TAG + uid + SPLIT_TAG + super.getClass().getName() + SPLIT_TAG + new Date().getTime()); preExecute(input, collector); MonitorLogExecutor.getInstance().put("e" + SPLIT_TAG + key + SPLIT_TAG + uid + SPLIT_TAG + super.getClass().getName() + SPLIT_TAG + new Date().getTime()); } public String getRootId(Map<Long, Long> map) { if(map == null || map.keySet() == null || map.keySet().isEmpty()) { return ""; } String result = ""; for(Long root : map.keySet()) { result += "k" + root; } return result; } public void preExecute(Tuple input, BasicOutputCollector collector) { }
还有执行日志存储
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.hbase.client.Put; import com.yhd.common.hbase.UserProfileDBHelper; public class MonitorLogExecutor { BlockingQueue<String> queue = new LinkedBlockingQueue<String>(1000); public static MonitorLogExecutor executor; public static synchronized MonitorLogExecutor getInstance() { if(executor == null) { executor = new MonitorLogExecutor(); executor.execute(); } return executor; } public void execute() { while(true) { try { String ml = queue.poll(); if(ml == null) { Thread.sleep(5000); continue; } //"s" + SPLIT_TAG + key + SPLIT_TAG + uid + SPLIT_TAG + super.getClass().getName() //+ SPLIT_TAG + new Date().getTime() String[] vals = ml.split(MonitorBaseBolt.SPLIT_TAG); Put put = new Put(vals[1].getBytes()); put.add("log".getBytes(), (vals[3] + MonitorBaseBolt.SPLIT_TAG + vals[2] + MonitorBaseBolt.SPLIT_TAG + vals[0]).getBytes(), vals[4].getBytes()); UserProfileDBHelper.getInstance().save(put, "real_log"); //batch save } catch (Exception e) { e.printStackTrace(); } } } public void put(String data) { queue.offer(data); } }
然后就是第一个bolt的创建
public class GetTrackInfoBolt extends MonitorBaseBolt { private static final long serialVersionUID = 1L; private String strDev = UserStatConstants.PC_TAG; private int trackNumber = 0; private static Logger log = Logger.getLogger(GetTrackInfoBolt.class); private String getRowKey(String userId, String guId) { if (null == userId || userId.equals("\\N") || userId.isEmpty()) { if(null == guId) return null; Matcher matcher = UserActionConstants.NOT_GUID_PATTERN.matcher(guId); if(matcher.find()){ log.error("invail guid:" + guId ); return null; } userId = guId; } if(userId.equals("\\N") || userId.isEmpty() || userId.equals("null")) { return null; } return userId; } public String GetDev(String url) { boolean bMobile = false; if(null != url && url.startsWith(UserStatConstants.MOBILE_URL_TAG)) { bMobile = true; } String strDev = bMobile ? UserStatConstants.MOBILE_TAG: UserStatConstants.PC_TAG; return strDev; } @Override public void preExecute(Tuple input, BasicOutputCollector collector) { try { String mesg = input.getString(0); if ((mesg != null) && (!mesg.isEmpty())) { String[] trackList = mesg.split("\n"); // List<UserActionTuple> infos = new ArrayList<UserActionTuple>(); for (String track : trackList) { //flume中是\t,测试是byte 1 String[] trackInfo = (track + " ").split("\t"); //很多时候只发送39个 if ( trackInfo.length < 42 ) { log.error(trackInfo.length + " " + trackInfo[1]); //FileUtil.write(CommonConstants.NORMAL_LOG, trackInfo[1]); //System.out.println("item count is wrong, size:" + trackInfo.length + trackInfo[1]); continue; } String url = trackInfo[1]; String referer = trackInfo[2]; String guId = trackInfo[5]; String sessionId = trackInfo[10]; String trackTime = trackInfo[17]; String userId = trackInfo[18]; String productIds = trackInfo[21]; String provinceId = trackInfo[38]; String cityId = trackInfo[41].trim(); String ieVersion = trackInfo[29]; String platform = trackInfo[30]; String linkPositon = trackInfo[34]; String buttonPosition = trackInfo[35]; log.error("trackTime:" + trackTime ); // System.out.println("######:" + userId + " " + guId + " " + // url + " " + linkPositon + " " + buttonPosition); StringBuilder strProductIds = new StringBuilder( productIds ); UserActionQualifier userActionQualifier = null; userActionQualifier = ProActionAnalyzer.getProductActionType(url,strProductIds, linkPositon, buttonPosition); if(null == userActionQualifier) { userActionQualifier = ProSetActionAnalyzer.getProductSetActionType( url ); } // if(null == userActionQualifier) { // userActionQualifier = OtherActionAnalyzer.getOtherActionType( url ); // } if ( null == userActionQualifier ) { //userActionQualifier = new UserActionQualifier(UserActionConstants.LEAVEACTION); //System.out.println("no action:" + url); continue; } //String strKey = getRowKey(userId, guId, trackTime); String strKey = getRowKey(userId, guId); if (null == strKey) continue; strDev = GetDev( url ); UserAction userAction = new UserAction(); userAction.setS(sessionId); userAction.setP(provinceId); userAction.setC(cityId); userAction.setO(platform); userAction.setB(ieVersion); userAction.setR(referer); userAction.setL(linkPositon); userAction.setBP(buttonPosition); userAction.setU(url); userAction.setA(userActionQualifier.getActionType()); String userKey = strDev + strKey; String userType = userActionQualifier.getLogType() + CommonConstants.TRACK_SPLIT + userActionQualifier.getActionType() + CommonConstants.TRACK_SPLIT + userActionQualifier.getActionObject(); //collector.emit(new Values(userKey,userType,trackTime,userAction)); UserActionTuple userActionTuple = new UserActionTuple(); userActionTuple.setUserKey(userKey); userActionTuple.setUserType(userType); userActionTuple.setTrackTime(trackTime); userActionTuple.setUserAction(userAction); trackNumber += 1; //infos.add(userActionTuple); collector.emit(new Values(userActionTuple)); } // if ((infos != null) && (!infos.isEmpty())) { // log.error("The number of useful track is " + trackNumber + " " + infos.size()); // collector.emit(new Values(new Object[] { infos })); // } } } catch (Exception e) { log.error("split track wrong:" + e.toString()); e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { //declarer.declare(new Fields("userkey","type","time","userAction")); declarer.declare(new Fields("trackInfos")); } }
描述了大致的storm实现流程。
相关推荐
Storm实时数据处理
**Storm实时数据处理** Apache Storm是一个开源的分布式实时计算系统,它被设计用来处理无界数据流,确保每个事件都能得到正确的处理,即使在高并发和大规模数据输入的情况下也能保持低延迟。本资料《Storm实时数据...
在"storm实时单词计数"这个场景中,我们主要探讨的是如何使用Storm来实现一个实时分析应用,该应用可以统计输入文本中的单词数量。 在Storm中,数据流是通过拓扑结构(Topology)进行组织的,由多个 bolts(处理...
《Storm实时数据处理》这本书是大数据处理领域的重要参考资料,它主要聚焦于Apache Storm这一开源分布式实时计算系统。Storm被广泛应用于实时分析、持续计算、分布式RPC、机器学习等多个场景,其核心理念是允许用户...
实时计算:Apache Storm:ApacheStorm实时机器学习应用.docx
标题“storm实时代码”涉及的核心技术是Apache Storm,这是一个开源的分布式实时计算系统。Storm能够处理无界的数据流,确保每个消息都得到精确一次(exactly-once)的处理,这在实时大数据处理中至关重要。在这个...
Storm实时数据处理_PDF电子书下载 带书签目录 完整版
### Storm实时计算框架详解 #### 一、Storm概述 Storm是一种分布式的实时计算框架,能够高效地处理大量的数据流,并且具有低延迟的特点。相比于传统的批处理方式,Storm提供了更实时的数据处理能力,使得它在...
《Storm实时数据处理》
《大数据技术丛书:Storm实时数据处理》通过丰富的实例,系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同...
《storm实时数据处理》通过丰富的实例,系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法...
storm实时数据处理 带书签目录pdf高清完整版 这个是带完整目录书签的高清扫描版
《storm实时数据处理》这本书深入探讨了Apache Storm这一强大的实时计算系统,它是大数据处理领域中的重要工具,尤其在实时流处理方面具有显著优势。Storm设计的核心理念是简单、可扩展和容错性,使得它在处理大规模...
在分析Storm实时数据分析时,我们可以从以下几个方面入手: 1. 实时流处理框架:Storm的核心是一个实时计算的框架,它可以用来处理大量的数据流,而且是可扩展的。它能够保证每个消息至少被处理一次,这对于需要高...
根据提供的文件信息,“Storm实时数据处理.pdf”,我们可以深入探讨与Apache Storm相关的实时数据处理技术。 ### Apache Storm简介 Apache Storm是一种分布式实时计算系统,能够处理无界数据流,即连续不断的数据...
"Storm实时处理方案架构" 本文档介绍了基于Storm的实时处理架构,该架构包括数据收集部分、实时处理部分和数据落地部分。本文将详细解释每个部分的技术选型和业务需求,并对相关技术的熟悉度进行分析。 1. 数据...
在大数据实时处理领域,Apache Storm是一个关键的开源框架,它被广泛用于实时流处理系统,尤其是在交通大数据的分析中。本教程将深入探讨如何利用Storm处理来自Kafka的数据,并通过Zookeeper进行集群管理。 首先,...
Storm实时数据处理_中文版Storm实时数据处理_中文版Storm实时数据处理_中文版
实时 Hadoop 实战篇:基于 Storm 实时路况分析和实时路径推荐系统方案 本文主要介绍了基于 Storm 实时路况分析和实时路径推荐系统的设计和实现,该系统能够实时计算出市 10 万多条路段的实时平均速度,并提供了最短...
大数据实时计算,storm, kafka ,flume ,zookeeper 组件结合完成大数据storm实时计算