`

storm 实时

 
阅读更多

首先创建一个Topology主类,

然后spout是从kafka就收的数据流 KafkaSpout,

第一个bolt会从前面的spout接收数据,做一些初步的处理,传输给下一个bolt

不适应重量级的计算。

j实时UI无法准确的查看数据的执行情况,准确的性能调优存在一定困难。

所以在这里创建了一个抽象类继承BaseBasicBolt  ,然后其他的bolt会继承创建的这个抽象类。

可以在bolt执行前和执行后记录时间。

并记录每个数据的执行流程和各个环节bolt的执行状态和耗时。

import java.util.ArrayList;
import java.util.List;

import org.apache.zookeeper.ZooKeeper;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

public class BasicTopology
{

	public static void main(String[] args)
			throws Exception
	{
	    if (args == null || args.length < 2) {
	        throw new NullPointerException("************** Topology args number must be three!");
	    }

		String zkhosts = args[0];
	   	String nimbusHost = args[1];
	   	String name = "user_profile_full_log_test";
	   	
	   	
	   	
	    TopologyBuilder builder = new TopologyBuilder();
	
	    //ZkHosts zkhost = new ZkHosts("192.168.112.138:2181,192.168.112.139:2181,192.168.112.140:2181");
	    ZkHosts zkhost = new ZkHosts(zkhosts);
	
	    String topic = "tracker";
	
	    
	    String spoutId = "kafkaSpout";
	    SpoutConfig spoutConfig = new SpoutConfig(zkhost, topic, "", spoutId);
	    
	    List<String> zkServers = new ArrayList<String>();
	    if (zkhosts != null && !zkhosts.isEmpty()) {
	        for (String host : zkhosts.split(",")) {
	            zkServers.add(host.split(":")[0]);
	        }
	    }
	    spoutConfig.zkServers = zkServers;
	    spoutConfig.zkPort = Integer.valueOf(2181);
//	    spoutConfig.forceFromStart = true;
	    spoutConfig.socketTimeoutMs = 60 * 1000;
	    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
	    String zkRoot = "/consumers/" + name + "0";
	    spoutConfig.zkRoot=zkRoot;
	    
    	ZKUtils dm=new ZKUtils();
    	try{
    		ZooKeeper zk = dm.createZKInstance( zkhosts );
    		zk.delete(zkRoot, -1);
    	}catch(Exception e){
    		System.out.println("e:"+e.getMessage());
    	}
           

//	    builder.setSpout("kafka_reader_test", new RandomSentenceSpout(), Integer.valueOf(1));
	    builder.setSpout("kafka_reader", new KafkaSpout(spoutConfig), 1);
	    builder.setBolt("get_usertrack", new GetTrackInfoBolt(),2).shuffleGrouping("kafka_reader");
	    builder.setBolt("save_userstat", new SaveUserStatBolt(),2).shuffleGrouping("get_usertrack");
//	    builder.setBolt("save_userprofile", new SaveUserProfileBolt(),16).shuffleGrouping("save_userstat");
	    //builder.setBolt("save_useraction", new SaveUserActionBolt(),4).shuffleGrouping("get_usertrack");

	    
//	    builder.setSpout("order_reader", new OrderSpout(), Integer.valueOf(1));
	    //builder.setBolt("save_useraction_order", new SaveUserActionBolt(),2).shuffleGrouping("order_reader");
//	    builder.setBolt("order_save_userstat", new SaveUserStatBolt(),8).shuffleGrouping("order_reader");
//	    builder.setBolt("order_save_userprofile", new SaveUserProfileBolt(),14).shuffleGrouping("order_save_userstat");

//	    builder.setBolt("save_usermobileprofile", new SaveMobileUserProfileBolt(),4).shuffleGrouping("save_userprofile");
//	    builder.setBolt("save_mergeuserprofile", new MobileMergePcProfileBolt(),4).shuffleGrouping("save_usermobileprofile");

	    Config conf = new Config();
	    //conf.setDebug(true);
//	    conf.registerMetricsConsumer(MonitorLogConsumer.class, 1);
//	    List list = new ArrayList();
//	    list.add("com.yhd.monitor.genlog.TraceTaskHook");
	    try
	    {
	        if (args != null && args.length == 2 ) {
	        	
	        	//UserProfileModel model = new UserProfileModel();
	        	//String modelString = JSON.toJSONString(model);
	        	//conf.put(UserProfileConstants.USER_PROFILE_MODEL, modelString);
    
	            conf.put("topology.max.spout.pending", Integer.valueOf(1024));
	            conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 60000);
	            conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 10);
	            conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 1000);
	            conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
//	            conf.put(Config.TOPOLOGY_AUTO_TASK_HOOKS, list);//
	            conf.setNumWorkers(12);
	            conf.setMaxTaskParallelism(100);

	            conf.put("nimbus.host", nimbusHost);
	            conf.put("nimbus.thrift.port", Integer.valueOf(6627));
	            conf.put("storm.zookeeper.servers", zkServers);
	            conf.setMessageTimeoutSecs(300);
	           
	        	StormSubmitter.submitTopology(name, conf, builder.createTopology());

	        }
	        else {
	        	System.out.println("local:" );
	          	UserProfileModel model = new UserProfileModel();
	          	String modelString = JSON.toJSONString(model);
	          	conf.put(UserProfileConstants.USER_PROFILE_MODEL, modelString);
	              
	    	    conf.setMaxTaskParallelism(1);
	    	    LocalCluster cluster = new LocalCluster();
	    	    cluster.submitTopology("local_user_profile", conf, builder.createTopology());
	
	    	    Thread.sleep(10000);
	
	    	    cluster.shutdown();
	        }
	    } catch (Exception e) {
	      e.printStackTrace();
	    }
	}
}

 

上面是topology主类,主要是实现bolt流程的衔接,本地运行和线上环境的切换。

下面是创建的基础拦截继承类

public abstract class MonitorBaseBolt extends BaseBasicBolt {

	private static final long serialVersionUID = 1L;
	final public static String SPLIT_TAG = new String(new byte[] { 1 });
	private static Logger log = Logger.getLogger(MonitorBaseBolt.class);

	public void execute(Tuple input, BasicOutputCollector collector) {
		MessageId mi = input.getMessageId();
		Map<Long, Long> map = mi.getAnchorsToIds();
		String uid = UUID.randomUUID().toString();
		String key = getRootId(map);
		MonitorLogExecutor.getInstance().put("s" + SPLIT_TAG + key + SPLIT_TAG + uid + SPLIT_TAG + super.getClass().getName() + SPLIT_TAG + new Date().getTime());
		preExecute(input, collector);
		MonitorLogExecutor.getInstance().put("e" + SPLIT_TAG + key + SPLIT_TAG + uid + SPLIT_TAG + super.getClass().getName() + SPLIT_TAG + new Date().getTime());
	}
	
	public String getRootId(Map<Long, Long> map) {
		if(map == null || map.keySet() == null || map.keySet().isEmpty()) {
			return "";
		}
		String result = "";
		for(Long root : map.keySet()) {
			result += "k" + root;
		}
		return result;
	}

	public void preExecute(Tuple input, BasicOutputCollector collector) {

	}

 

还有执行日志存储

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.hadoop.hbase.client.Put;

import com.yhd.common.hbase.UserProfileDBHelper;


public class MonitorLogExecutor {
	BlockingQueue<String> queue = new LinkedBlockingQueue<String>(1000);
	public static MonitorLogExecutor executor;
	public static synchronized MonitorLogExecutor getInstance() {
		if(executor == null) {
			executor = new MonitorLogExecutor();
			executor.execute();
		}
		return executor;
	}
	public void execute() {
		while(true) {
			try {
				String ml = queue.poll();
				if(ml == null) {
					Thread.sleep(5000);
					continue;
				}
				//"s" + SPLIT_TAG + key + SPLIT_TAG + uid + SPLIT_TAG + super.getClass().getName()
				//+ SPLIT_TAG + new Date().getTime()
				String[] vals = ml.split(MonitorBaseBolt.SPLIT_TAG);
				Put put = new Put(vals[1].getBytes());
				put.add("log".getBytes(), (vals[3] + MonitorBaseBolt.SPLIT_TAG + vals[2] + MonitorBaseBolt.SPLIT_TAG
						+ vals[0]).getBytes(), vals[4].getBytes());
				UserProfileDBHelper.getInstance().save(put, "real_log");
				//batch save
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
	public void put(String data) {
		queue.offer(data);
	}
}

 

然后就是第一个bolt的创建

 

public class GetTrackInfoBolt extends MonitorBaseBolt
{
	private static final long serialVersionUID = 1L;
	private String strDev = UserStatConstants.PC_TAG;
	private int trackNumber = 0;
	private static Logger log = Logger.getLogger(GetTrackInfoBolt.class);

	private String getRowKey(String userId, String guId) {

		if (null == userId || userId.equals("\\N") ||
				userId.isEmpty()) {
			if(null == guId)
				return null;
			
	      	Matcher matcher = UserActionConstants.NOT_GUID_PATTERN.matcher(guId);
	    	if(matcher.find()){
				log.error("invail guid:" + guId );
	    		return null;	
	    	}
	    	userId = guId;
		}
		
		if(userId.equals("\\N") ||  
				userId.isEmpty() || userId.equals("null")) {
			return null;
		}
		

		return userId;
	}
	

	public String GetDev(String url) {
		boolean bMobile = false;
		if(null != url && url.startsWith(UserStatConstants.MOBILE_URL_TAG)) {
			bMobile = true;
		}
		
		String strDev = bMobile ? UserStatConstants.MOBILE_TAG: 
			UserStatConstants.PC_TAG;
		return strDev;
	}
	@Override
  public void preExecute(Tuple input, BasicOutputCollector collector)
  {
    try
    {
      String mesg = input.getString(0);
      
      if ((mesg != null) && (!mesg.isEmpty())) {
        
        String[] trackList = mesg.split("\n");
//        List<UserActionTuple> infos = new ArrayList<UserActionTuple>();
        for (String track : trackList) {
        	//flume中是\t,测试是byte 1
        	String[] trackInfo = (track + " ").split("\t");
    		
        	//很多时候只发送39个
        	if ( trackInfo.length < 42 ) { 
        		log.error(trackInfo.length + " " + trackInfo[1]);
        		//FileUtil.write(CommonConstants.NORMAL_LOG, trackInfo[1]);
    			//System.out.println("item count is wrong, size:" + trackInfo.length +  trackInfo[1]);
    			continue;
    		}
    		
            String url = trackInfo[1];
            String referer = trackInfo[2];
            String guId = trackInfo[5];
            String sessionId = trackInfo[10];
            String trackTime = trackInfo[17];
            String userId = trackInfo[18];
            String productIds = trackInfo[21];
            String provinceId = trackInfo[38];
            String cityId = trackInfo[41].trim(); 
            String ieVersion = trackInfo[29];
            String platform = trackInfo[30];
            String linkPositon = trackInfo[34];
            String buttonPosition = trackInfo[35];
            
            log.error("trackTime:" + trackTime );
           

			
//			System.out.println("######:" + userId + " " + guId + " " +
//			url + " " + linkPositon + " " + buttonPosition);
			
			StringBuilder strProductIds = new StringBuilder( productIds ); 
			UserActionQualifier userActionQualifier = null;
			userActionQualifier = ProActionAnalyzer.getProductActionType(url,strProductIds, 
					linkPositon, buttonPosition);
			
			if(null == userActionQualifier) {
				
				userActionQualifier = ProSetActionAnalyzer.getProductSetActionType( url );
			}
			
	//		if(null == userActionQualifier) {
	//			userActionQualifier = OtherActionAnalyzer.getOtherActionType( url );
	//		}
	
			if ( null == userActionQualifier ) {
				//userActionQualifier = new UserActionQualifier(UserActionConstants.LEAVEACTION);
				//System.out.println("no action:" + url);
				continue;
			}
			//String strKey =  getRowKey(userId, guId, trackTime);
			String strKey =  getRowKey(userId, guId);
			
			if (null == strKey)
				continue;
			
			strDev = GetDev( url );
			
			UserAction userAction = new UserAction();
			userAction.setS(sessionId);
			userAction.setP(provinceId);
			userAction.setC(cityId);
			userAction.setO(platform);
			userAction.setB(ieVersion);
			userAction.setR(referer);
			userAction.setL(linkPositon);
			userAction.setBP(buttonPosition);
			userAction.setU(url);
			userAction.setA(userActionQualifier.getActionType());
			
			String userKey = strDev + strKey;
			String userType = userActionQualifier.getLogType() + CommonConstants.TRACK_SPLIT 
					+ userActionQualifier.getActionType()
					+ CommonConstants.TRACK_SPLIT + userActionQualifier.getActionObject();
			
	        //collector.emit(new Values(userKey,userType,trackTime,userAction));
			UserActionTuple userActionTuple = new UserActionTuple();
			userActionTuple.setUserKey(userKey);
			userActionTuple.setUserType(userType);
			userActionTuple.setTrackTime(trackTime);
			userActionTuple.setUserAction(userAction);

			trackNumber += 1;
			//infos.add(userActionTuple);
			collector.emit(new Values(userActionTuple));
        }
        
//        if ((infos != null) && (!infos.isEmpty())) {
//        	log.error("The number of useful track is " + trackNumber + " " + infos.size());
//            collector.emit(new Values(new Object[] { infos }));
//        }
      }
    }
    catch (Exception e) {
    	log.error("split track wrong:" + e.toString());
    	e.printStackTrace();
    }
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer)
  {
    //declarer.declare(new Fields("userkey","type","time","userAction"));
    declarer.declare(new Fields("trackInfos"));
  }
}

 描述了大致的storm实现流程。

分享到:
评论

相关推荐

    Storm实时数据处理

    Storm实时数据处理

    ( Storm实时数据处理.zip )PDF 高清版

    **Storm实时数据处理** Apache Storm是一个开源的分布式实时计算系统,它被设计用来处理无界数据流,确保每个事件都能得到正确的处理,即使在高并发和大规模数据输入的情况下也能保持低延迟。本资料《Storm实时数据...

    storm实时单词计数

    在"storm实时单词计数"这个场景中,我们主要探讨的是如何使用Storm来实现一个实时分析应用,该应用可以统计输入文本中的单词数量。 在Storm中,数据流是通过拓扑结构(Topology)进行组织的,由多个 bolts(处理...

    Storm实时数据处理-超清文字版.pdf

    《Storm实时数据处理》这本书是大数据处理领域的重要参考资料,它主要聚焦于Apache Storm这一开源分布式实时计算系统。Storm被广泛应用于实时分析、持续计算、分布式RPC、机器学习等多个场景,其核心理念是允许用户...

    实时计算:Apache Storm:ApacheStorm实时机器学习应用.docx

    实时计算:Apache Storm:ApacheStorm实时机器学习应用.docx

    storm实时代码

    标题“storm实时代码”涉及的核心技术是Apache Storm,这是一个开源的分布式实时计算系统。Storm能够处理无界的数据流,确保每个消息都得到精确一次(exactly-once)的处理,这在实时大数据处理中至关重要。在这个...

    Storm实时数据处理_PDF电子书下载 带书签目录 完整版

    Storm实时数据处理_PDF电子书下载 带书签目录 完整版

    storm实时计算

    ### Storm实时计算框架详解 #### 一、Storm概述 Storm是一种分布式的实时计算框架,能够高效地处理大量的数据流,并且具有低延迟的特点。相比于传统的批处理方式,Storm提供了更实时的数据处理能力,使得它在...

    《Storm实时数据处理》PDF

    《Storm实时数据处理》

    大数据-Storm实时数据处理

     《大数据技术丛书:Storm实时数据处理》通过丰富的实例,系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同...

    《Storm实时数据处理》PDF.zip

    《storm实时数据处理》通过丰富的实例,系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法...

    storm实时数据处理 带书签目录pdf高清完整版

    storm实时数据处理 带书签目录pdf高清完整版 这个是带完整目录书签的高清扫描版

    storm实时数据处理

    《storm实时数据处理》这本书深入探讨了Apache Storm这一强大的实时计算系统,它是大数据处理领域中的重要工具,尤其在实时流处理方面具有显著优势。Storm设计的核心理念是简单、可扩展和容错性,使得它在处理大规模...

    storm实时数据分析 用到的技术分析

    在分析Storm实时数据分析时,我们可以从以下几个方面入手: 1. 实时流处理框架:Storm的核心是一个实时计算的框架,它可以用来处理大量的数据流,而且是可扩展的。它能够保证每个消息至少被处理一次,这对于需要高...

    Storm实时数据处理.pdf

    根据提供的文件信息,“Storm实时数据处理.pdf”,我们可以深入探讨与Apache Storm相关的实时数据处理技术。 ### Apache Storm简介 Apache Storm是一种分布式实时计算系统,能够处理无界数据流,即连续不断的数据...

    Storm实时处理方案架构.docx

    "Storm实时处理方案架构" 本文档介绍了基于Storm的实时处理架构,该架构包括数据收集部分、实时处理部分和数据落地部分。本文将详细解释每个部分的技术选型和业务需求,并对相关技术的熟悉度进行分析。 1. 数据...

    使用Storm实时处理交通大数据(数据源:kafka,集群管理:zookeeper).zip

    在大数据实时处理领域,Apache Storm是一个关键的开源框架,它被广泛用于实时流处理系统,尤其是在交通大数据的分析中。本教程将深入探讨如何利用Storm处理来自Kafka的数据,并通过Zookeeper进行集群管理。 首先,...

    Storm实时数据处理_中文版

    Storm实时数据处理_中文版Storm实时数据处理_中文版Storm实时数据处理_中文版

    实时Hadoop实战篇:基于Storm实时路况分析和实时路径推荐系统方案.doc

    实时 Hadoop 实战篇:基于 Storm 实时路况分析和实时路径推荐系统方案 本文主要介绍了基于 Storm 实时路况分析和实时路径推荐系统的设计和实现,该系统能够实时计算出市 10 万多条路段的实时平均速度,并提供了最短...

    大数据storm实时计算

    大数据实时计算,storm, kafka ,flume ,zookeeper 组件结合完成大数据storm实时计算

Global site tag (gtag.js) - Google Analytics