论坛首页 Java企业应用论坛

Apache Cassandra Learning Step by Step (5): 实战性的JTwissandra项目

浏览 2589 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2012-02-25  

在完成了Apache Cassandra的四个基本学习步骤之后,可以尝试下实战性的编码了。

 

如有必要,建议再简单回顾一下:

 

Apache Cassandra Learning Step by Step (1)

 

Apache Cassandra Learning Step by Step (2): Core Concepts

 

Apache Cassandra Learning Step by Step (3): Samples ABC

 

Apache Cassandra Learning Step by Step (4): Data Modeling

 

 

 

 

基于第四点的建模思路,接下来我们要做的,就是搭建一个叫做JTwissandra的实战性项目,就是所谓的Java版本的Twissandra了。

 

其目的是为了以Twitter为假想对象,使用最简约(或者直接说简陋得了)的建模和实现,表达采用Apache Cassandra作为NoSQL平台的基本实现过程。

 

JTwissandra的基本编码环境:

1. Maven来管理

2. JUnit来测试

 

3. 基于hector client来作为Apache Cassandra的Java 客户端

 

大家可以通过下面的Github链接,直接clone出来最新的代码:

JTwissandra: https://github.com/itstarting/jtwissandra

 

也欢迎大家Fork或在这里直接拍砖——反正咱在NoSQL也是新手,脸皮厚点不要紧啦:)

 

1. 首先需要一个HFactoryHelper来初始化并建立Cassandra的客户端连接池和必要的对象:

 

import java.io.IOException;
import java.util.Properties;

import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.HConsistencyLevel;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.factory.HFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Helper for Cassandra initialization
 * 
 * @author bright_zheng
 *
 */
public class HFactoryHelper {
	private static Logger logger = LoggerFactory.getLogger(HFactoryHelper.class);
	
	private static Cluster cluster;
	private static Keyspace keyspace = initKeyspace();
	private static Properties properties;
    
    private HFactoryHelper(){}
    
    public static Keyspace getKeyspace(){
    	return keyspace;
    }
    
    private static Keyspace initKeyspace() {
        properties = new Properties();
        try {
            properties.load(HFactoryHelper.class.getResourceAsStream("/config.properties"));
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
        
        String cluster_name = properties.getProperty("cluster.name", "Test Cluster");
        logger.debug("cluster.name={}", cluster_name);
        String cluster_hosts = properties.getProperty("cluster.hosts", "127.0.0.1:9160");
        logger.debug("cluster.hosts={}", cluster_hosts);
        String active_keyspace = properties.getProperty("keyspace", "JTWISSANDRA");
        logger.debug("keyspace={}", active_keyspace);
        
        cluster = HFactory.getOrCreateCluster(cluster_name, cluster_hosts);
        
        ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
        ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.ONE);
        
        return HFactory.createKeyspace(
        		active_keyspace,
        		cluster, 
        		ccl);
    }
}

 

 

2. 建立各项业务服务的基类BaseService。

 import java.util.UUID;

import me.prettyprint.cassandra.serializers.LongSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.clock.MicrosecondsClockResolution;
import me.prettyprint.cassandra.utils.TimeUUIDUtils;
import me.prettyprint.hector.api.ClockResolution;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import bright.zheng.jtwissandra.HFactoryHelper;

/**
 * Base service which all business services should extend
 * 
 * @author bright_zheng
 *
 */
public class BaseService {
	protected Logger logger = LoggerFactory.getLogger(getClass());
	
	protected static Keyspace KEYSPACE = HFactoryHelper.getKeyspace();
	protected static final String CF_USER = "USER";
	protected static final String CF_FRIEND = "FRIEND";
	protected static final String CF_FOLLOWER = "FOLLOWER";
	protected static final String CF_TWEET = "TWEET";
	protected static final String CF_TIMELINE = "TIMELINE";

	protected static final StringSerializer SERIALIZER_STRING 
		= StringSerializer.get();
	protected static final LongSerializer SERIALIZER_LONG 
		= LongSerializer.get();
	
	protected static final int TWEETS_LIMIT_DEFAULT = 10;
	protected static final int TWEETS_LIMIT_MAX = 50;
	
	protected HColumn<String, String> createColumn(String name, String value) {		
		return HFactory.createColumn(name, value, SERIALIZER_STRING, SERIALIZER_STRING);
	}

	protected HColumn<String, Long> createColumn(String name, Long value) {		
		return HFactory.createColumn(name, value, SERIALIZER_STRING, SERIALIZER_LONG);
	}

	protected HColumn<Long, String> createColumn(Long name, String value) {		
		return HFactory.createColumn(name, value, SERIALIZER_LONG, SERIALIZER_STRING);
	}
	
	/**
	 * REF: http://wiki.apache.org/cassandra/FAQ#working_with_timeuuid_in_java
	 * 
	 * @return UUID
	 */
	public UUID getUUID(){
		//TODO: which UUID should we use to make sure it's unique?
		ClockResolution clock = new MicrosecondsClockResolution();
	    return TimeUUIDUtils.getTimeUUID(clock);
		//return TimeUUIDUtils.getUniqueTimeUUIDinMillis();
	}
	
	protected Long getTimestamp(UUID uuid){
		//return uuid.timestamp();
		return TimeUUIDUtils.getTimeFromUUID(uuid);
	}
	
	protected Long generateTimestamp(){
		return getTimestamp(getUUID());
	}
}
 

3. 下面是各项业务服务代码:

3.1 UserService

 

package bright.zheng.jtwissandra.service;

import java.util.UUID;

import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import bright.zheng.jtwissandra.bean.User;

/**
 * User service
 * 
 * @author bright_zheng
 *
 */
public class UserService extends BaseService{
	
	/**
	 * Sample CLI cmd:
	 * set USER['550e8400-e29b-41d4-a716-446655440000']['user_name'] = 'itstarting';
	 * set USER['550e8400-e29b-41d4-a716-446655440000']['password'] = '111222';
	 * set USER['550e8400-e29b-41d4-a716-446655440000']['create_timestamp'] = 1329836819890000;
	 * 
	 * @param user
	 */
    public String addUser(User user) {
    	Mutator<String> mutator = HFactory.createMutator(
    			KEYSPACE, SERIALIZER_STRING);
    	UUID uuid = this.getUUID();
    	String user_uuid = uuid.toString();   
    	Long create_timestamp = this.getTimestamp(uuid);
    	logger.debug("user_uuid={}", user_uuid);  
    	logger.debug("user_name={}", user.getUser_name());
    	logger.debug("password={}", user.getUser_password());
    	logger.debug("create_timestamp={}", create_timestamp);
        
        mutator.addInsertion(user_uuid, CF_USER, 
        		this.createColumn("user_name", user.getUser_name()));
        mutator.addInsertion(user_uuid, CF_USER, 
        		this.createColumn("password", user.getUser_password()));
        mutator.addInsertion(user_uuid, CF_USER, 
        		this.createColumn("create_timestamp", create_timestamp));
        
        mutator.execute();
        
        //return the generated uuid
        return user_uuid;
    }   
    
}
 

 

3.2 FriendService

 

package bright.zheng.jtwissandra.service;

import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.MutationResult;
import me.prettyprint.hector.api.mutation.Mutator;

/**
 * Friend service
 * 
 * @author bright_zheng
 *
 */
public class FriendService extends BaseService{
	
	/**
	 * Adding a friend has two business logic:
	 * 1. Add the friend's uuid to the Friend CF under my uuid
	 * 2. Add my uuid to the friend's uuid as follower
	 * 
	 * set FRIEND['550e8400-e29b-41d4-a716-446655440000']['1329836819859000']
	 * 	= '550e8400-e29b-41d4-a716-446655440001;
	 * 
	 * set FOLLOWER['550e8400-e29b-41d4-a716-446655440001']['1329836819859000'']
	 * 	= '550e8400-e29b-41d4-a716-446655440000;
	 * 
	 * @param me
	 * @param friend
	 */
    public MutationResult followFriend(String me, String friend) {
    	Mutator<String> mutator = HFactory.createMutator(
    			KEYSPACE, SERIALIZER_STRING);
    	
    	Long timestamp = this.generateTimestamp();
    	logger.debug("timestamp={}", timestamp);
        
        mutator.addInsertion(me, CF_FRIEND, 
        		this.createColumn(timestamp, friend));
        
        mutator.addInsertion(friend, CF_FOLLOWER, 
        		this.createColumn(timestamp, me));
        
        return mutator.execute();
    }   
    
}
 

 

 

3.3 TimelineService

 

package bright.zheng.jtwissandra.service;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceQuery;
import bright.zheng.jtwissandra.bean.Timeline;

/**
 * Timeline service
 * 
 * @author bright_zheng
 *
 */
public class TimelineService extends BaseService{
	
	/**
	 * get specified user's first Timeline
	 * 
	 * @param user_uuid
	 * @return
	 */
	public TimelineWrapper getTimeline(String user_uuid){
		return getTimeline(user_uuid, 0L, TWEETS_LIMIT_DEFAULT);
	}
	
	/**
	 * get specified user's Timeline with start point
	 * 
	 * @param user_uuid
	 * @param start
	 * @return
	 */
	public TimelineWrapper getTimeline(String user_uuid, long start){
		return getTimeline(user_uuid, start, TWEETS_LIMIT_DEFAULT);
	}
	
	/**
	 * get specified user's Timeline with start point and limit
	 * 
	 * @param user_uuid
	 * @param start
	 * @param limit
	 * @return
	 */
    public TimelineWrapper getTimeline(String user_uuid, long start, int limit){
    	if (start<0) start = 0;
    	if (limit<0) limit = TWEETS_LIMIT_DEFAULT;
    	if (limit>TWEETS_LIMIT_MAX) limit = TWEETS_LIMIT_MAX;
    	
    	SliceQuery<String, Long, String> sliceQuery = 
            HFactory.createSliceQuery(KEYSPACE, SERIALIZER_STRING, SERIALIZER_LONG, SERIALIZER_STRING);        
        sliceQuery.setColumnFamily(CF_TIMELINE);
        sliceQuery.setKey(user_uuid);
        sliceQuery.setRange(start, Long.MAX_VALUE, false, limit+1);
        QueryResult<ColumnSlice<Long, String>> result = sliceQuery.execute();
        List<HColumn<Long, String>> list = result.get().getColumns();
        
        long next = 0L;
        if(list==null){
        	return new TimelineWrapper(null, next);
        }else if (list.size()<=limit){        	
        	return new TimelineWrapper(convertToTimeline(list), 0L);
        }else{
        	HColumn<Long,String> last = list.get(list.size()-1);
        	next = last.getName(); //the name is the timestamp as the "next" start
        	list.remove(list.size()-1);
        	
        	return new TimelineWrapper(convertToTimeline(list), next);
        }
    }
    
    private List<Timeline> convertToTimeline(List<HColumn<Long,String>> cols){
    	Iterator<HColumn<Long,String>> it = cols.iterator();
    	List<Timeline> result = new ArrayList<Timeline>();
    	while(it.hasNext()){
    		HColumn<Long,String> col = it.next();
    		result.add(new Timeline(col.getValue(), col.getName()));
    	}
    	return result;
    }
    
    public class TimelineWrapper{
    	private List<Timeline> timelines;
    	private long nextTimeline;
    	
    	public TimelineWrapper(List<Timeline> timelines, long nextTimeline){
    		this.timelines = timelines;
    		this.nextTimeline = nextTimeline;
    	}

		public long getNextTimeline() {
			return nextTimeline;
		}

		public List<Timeline> getTimelines() {
			return timelines;
		}
    	
    }
}
 

 

 

3.4 TweetService

 

package bright.zheng.jtwissandra.service;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;

import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.beans.Row;
import me.prettyprint.hector.api.beans.Rows;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.MultigetSliceQuery;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceQuery;
import bright.zheng.jtwissandra.bean.Tweet;

/**
 * Tweet service
 * 
 * @author bright_zheng
 *
 */
public class TweetService extends BaseService{
	
	/**
	 * Adding a tweet has following logic:
	 * 1. Save the tweet to CF of TWEET
	 * 2. Add the new tweet to my TIMELINE
	 * 3. Add the new tweet to all my followers' TIMELINE
	 * 
	 * @param me
	 * @param friend
	 */
    public String addTweet(String user_uuid, String tweet_content) {
    	Mutator<String> mutator = HFactory.createMutator(
    			KEYSPACE, SERIALIZER_STRING);
    	//the tweet uuid
    	UUID uuid = this.getUUID();
    	String tweet_uuid = uuid.toString();
    	logger.debug("tweet_uuid={}", tweet_uuid);
    	
    	//the timestamp to build the timeline
    	Long timestamp = this.getTimestamp(uuid);
    	logger.debug("timestamp={}", timestamp);
        
        mutator.addInsertion(tweet_uuid, CF_TWEET, 
        		this.createColumn("user_uuid", user_uuid));
        mutator.addInsertion(tweet_uuid, CF_TWEET, 
        		this.createColumn("tweet_content", tweet_content));
        
        mutator.addInsertion(user_uuid, CF_TIMELINE, 
        		this.createColumn(timestamp, tweet_uuid));
        
        // get back all my follower and insert the tweet to his/her TIMELINE
        SliceQuery<String, Long, String> sliceQuery = 
            HFactory.createSliceQuery(KEYSPACE, SERIALIZER_STRING, SERIALIZER_LONG, SERIALIZER_STRING);        
        sliceQuery.setColumnFamily(CF_FOLLOWER);
        sliceQuery.setKey(user_uuid);
        sliceQuery.setRange(Long.MIN_VALUE, Long.MAX_VALUE, false, 500); //TODO: 500 followers hard code here?
        QueryResult<ColumnSlice<Long, String>> result = sliceQuery.execute();
        Iterator<HColumn<Long, String>> followers = result.get().getColumns().iterator();
        while(followers.hasNext()) {
        	HColumn<Long, String> follower = followers.next();
        	String follower_uuid = follower.getValue();
        	logger.debug("follower's uuid={}", follower_uuid);
        	logger.debug("timestamp={}", follower.getName());
            
        	//insert the tweet to the follower's TIMELINE
            mutator.addInsertion(follower_uuid, CF_TIMELINE, 
            		this.createColumn(timestamp, tweet_uuid));
        }
        
        mutator.execute();
        
        //return the new generated tweet's uuid
        return tweet_uuid;
    }
    
    /**
     * Should we add this service?
     * 
     * @param tweet_uuid
     * @return
     */
    public Tweet getTweet(String tweet_uuid){
    	return null;
    }
    
    public List<Tweet> getTweets(List<String> tweet_uuids){
    	MultigetSliceQuery<String, String, String> multigetSlicesQuery =
            HFactory.createMultigetSliceQuery(KEYSPACE, SERIALIZER_STRING, SERIALIZER_STRING, SERIALIZER_STRING);
        multigetSlicesQuery.setColumnFamily(CF_TWEET);
        multigetSlicesQuery.setColumnNames("user_uuid","tweet_content");        
        multigetSlicesQuery.setKeys(tweet_uuids);
        QueryResult<Rows<String, String, String>> results = multigetSlicesQuery.execute();
    	return convertRowsToTweets(results.get());
    }
    
    private List<Tweet> convertRowsToTweets(Rows<String, String, String> rows){
    	List<Tweet> list = new ArrayList<Tweet>();
    	Iterator<Row<String, String, String>> iterator = rows.iterator();
    	while(iterator.hasNext()){
    		Row<String, String, String> row = iterator.next();
    		ColumnSlice<String, String> cs = row.getColumnSlice();
        	list.add(new Tweet(row.getKey(), 
        					   cs.getColumnByName("tweet_content").getValue(),
        					   cs.getColumnByName("user_uuid").getValue()));
    	}
    	return list;
    }
    
}
 

 

4. 当然少不了JUnit测试用例了:

 

package bright.zheng.jtwissandra;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import junit.framework.Assert;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import bright.zheng.jtwissandra.bean.Timeline;
import bright.zheng.jtwissandra.bean.Tweet;
import bright.zheng.jtwissandra.bean.User;
import bright.zheng.jtwissandra.service.FriendService;
import bright.zheng.jtwissandra.service.TimelineService;
import bright.zheng.jtwissandra.service.TimelineService.TimelineWrapper;
import bright.zheng.jtwissandra.service.TweetService;
import bright.zheng.jtwissandra.service.UserService;

/**
 * Test cases for all services currently provided.
 * Please drop and create schema first and then run all cases as one round
 * The 'me' and 'friend' will be created each round dynamically for easier testing
 * 
 * @author bright_zheng
 *
 */
public class ServiceTest{
	Logger logger = LoggerFactory.getLogger(ServiceTest.class);
	
	private static UserService SERVICE_USER = new UserService();
	private static FriendService SERVICE_FRIEND = new FriendService();
	private static TweetService SERVICE_TWEET = new TweetService();
	private static TimelineService SERVICE_TIMELINE = new TimelineService();
	
	private static String me;
	private static String friend;
	
	private static long nextTimeline = 0L;
	
	@BeforeClass
	public static void setUp(){
		//
	}

	@Test
    public void addUser() {
		logger.debug("=====================addUser{====================");
		//add user 1
		me = SERVICE_USER.addUser(new User("itstarting","1234"));
		logger.debug("This round of tesing, ME={}", me);
		Assert.assertNotNull(me);

		//add user 2
		friend = SERVICE_USER.addUser(new User("test1","1234"));	
		logger.debug("This round of tesing, FRIEND={}", friend);
		Assert.assertNotNull(friend);	
		logger.debug("=====================}//addUser====================");
    }   
    
	/**
	 * I'm following a friend
	 */
    @Test
    public void followFriend() {
		logger.debug("=====================followFriend{====================");
		SERVICE_FRIEND.followFriend(me, friend);
		logger.debug("=====================}//followFriend====================");
    }
    
    /**
     * I'm followed by a follower
     */
    @Test
    public void followedByFollower() {
		logger.debug("=====================followedByFollower{====================");
		SERVICE_FRIEND.followFriend(friend, me);		
		logger.debug("=====================}//followedByFollower====================");
    }
    
    /**
     * I'm twittering
     */
    @Test
    public void addTweetByMe() {
		logger.debug("=====================addTweetByMe{====================");
		for(int i=0; i<100; i++){
			String tweet_uuid = SERVICE_TWEET.addTweet(me, "Hellow JTWISSANDRA -- by itstarting:" + i);
			Assert.assertNotNull(tweet_uuid);
		}
		logger.debug("=====================}//addTweetByMe====================");
    }
    
    /**
     * My friend is twittering
     * 
     */
    @Test
    public void addTweetByFriend() {
		logger.debug("=====================addTweetByFriend{====================");
		for(int i=0; i<100; i++){
	    	String tweet_uuid = SERVICE_TWEET.addTweet(friend, "Hellow JTWISSANDRA -- by test1:" + i);
			Assert.assertNotNull(tweet_uuid);
		}
		logger.debug("=====================}//addTweetByFriend====================");
    }
    
    /**
     * Get tweets for me
     */
    @Test
    public void getTweetsByMe(){
		logger.debug("=====================getTweetsByMe{====================");
    	getTweets(me, 0);
		logger.debug("=====================}//getTweetsByMe====================");
    }
    
    /**
     * Get tweets at next Timeline (if any)
     */
    @Test
    public void getTweetsByMeForNextTimeline(){
		logger.debug("=====================getTweetsByMeForNextTimeline{====================");
		if(nextTimeline>0L){
			getTweets(me, nextTimeline);
		}
		logger.debug("=====================}//getTweetsByMeForNextTimeline====================");
    }
    
    /**
     * Get tweets for my friend
     */
    @Test
    public void getTweetsByMyFriend(){
		logger.debug("=====================getTweetsByMyFriend{====================");
    	getTweets(friend, 0);
		logger.debug("=====================}//getTweetsByMyFriend====================");
    }
    
    /**
     * 
     */
    @Test
    public void getTweetsByMyFriendForNextTimeline(){
		logger.debug("=====================getTweetsByMyFriendForNextTimeline{====================");
    	getTweets(friend, nextTimeline);
		logger.debug("=====================}//getTweetsByMyFriendForNextTimeline====================");
    }
    
    private void getTweets(String user_uuid, long start){
    	TimelineWrapper wrapper = SERVICE_TIMELINE.getTimeline(user_uuid, start);
    	Assert.assertNotNull(wrapper);
    	List<Timeline> list = wrapper.getTimelines();
    	List<String> tweet_uuids = new ArrayList<String>();
    	for(Timeline timeline: list){
    		String tweet_uuid = timeline.getTweet_uuid();
    		logger.debug("From Timeline: tweet_uuid={}, tweet_timestamp={}", 
    				tweet_uuid, timeline.getTweet_timestamp());
    		tweet_uuids.add(tweet_uuid);
    	}
    	List<Tweet> tweets = SERVICE_TWEET.getTweets(tweet_uuids);
    	Iterator<Tweet> it = tweets.iterator();
    	while(it.hasNext()){
    		Tweet tweet = it.next();
    		logger.debug("From Tweet: tweet_uuid={}, tweet_content={}, user_uuid={}", 
    				new Object[]{tweet.getTweet_uuid(), 
    							 tweet.getTweet_content(),
    							 tweet.getUser_uuid()
    							});
    	}
    	if(wrapper.getNextTimeline() > 0L){
    		logger.debug("The start timeline of next page is: {}", wrapper.getNextTimeline());
    		nextTimeline = wrapper.getNextTimeline();
    	}else{
    		logger.debug("No next page available");
    		nextTimeline = 0L;
    	}
    }
    
    @AfterClass
    public static void shutdown(){
    	//cluster.getConnectionManager().shutdown();
    }
}

 这是个一锅端的测试用例,全部跑一次可以覆盖几乎所有的业务服务逻辑。

 

 

5. 最后,别忘了在跑之前创建必要的schema:

 

drop keyspace JTWISSANDRA;

create keyspace JTWISSANDRA
	with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy'
	and strategy_options = [{replication_factor:1}];

use JTWISSANDRA;

create column family USER
	with comparator = UTF8Type   
	and key_validation_class = UTF8Type
	and default_validation_class = UTF8Type
	and column_metadata = [
		{column_name: user_name, validation_class: UTF8Type, 
				   index_name:user_name_idx, index_type:KEYS }
		{column_name: user_password, validation_class: UTF8Type}
		{column_name: create_timestamp, validation_class: LongType,
				   index_name:create_timestamp_idx, index_type:KEYS}
	];

create column family FRIEND
	with comparator = LongType   
	and key_validation_class = UTF8Type
	and default_validation_class = UTF8Type;

create column family FOLLOWER
	with comparator = LongType   
	and key_validation_class = UTF8Type
	and default_validation_class = UTF8Type;

create column family TWEET
	with comparator = UTF8Type   
	and key_validation_class = UTF8Type
	and default_validation_class = UTF8Type
	and column_metadata = [
		{column_name: user_uuid, validation_class: UTF8Type}
		{column_name: tweet_content, validation_class: UTF8Type}
	];	

create column family TIMELINE
	with comparator = LongType   
	and key_validation_class = UTF8Type
	and default_validation_class = UTF8Type;

 

6. 其他?

 

有些咋想,就不贴了吧,略过……有兴趣的可从Github clone下来跑跑看。

 

 

 

==========发现的问题,接下来再谈再讨论,希望有丰富经验的高手前来助阵,去我谜团,谢谢!==========

 

   发表时间:2012-03-02  
第一个问题发现在getTweets里:
public List<Tweet> getTweets(List<String> tweet_uuids){
    	MultigetSliceQuery<String, String, String> multigetSlicesQuery =
            HFactory.createMultigetSliceQuery(KEYSPACE, SERIALIZER_STRING, SERIALIZER_STRING, SERIALIZER_STRING);
        multigetSlicesQuery.setColumnFamily(CF_TWEET);
        multigetSlicesQuery.setColumnNames("user_uuid","tweet_content");        
        multigetSlicesQuery.setKeys(tweet_uuids);
        QueryResult<Rows<String, String, String>> results = multigetSlicesQuery.execute();
    	return convertRowsToTweets(results.get());
    }



该问题是:如果通过MultigetSliceQuery.setKeys(Iterable<String> keys)传入系列key以进行批量的slice query,其结果并不依赖于keys的顺序,而是依赖于所在CF的建模。

也就是说,通过CF_TIMELINE找到了漂亮的排序良好的tweet uuid,如果要获得同样排序良好的tweet列表,必须:
1. 一个一个的找,比如使用ColumnQuery,然后add到结果列表去;
2. CF_TWEET也要考虑通过timestamp来进行Column的排序——跟CF_TIMELINE一样

方案一的做法感觉不怎么可取:这意味着需要跟Cassandra交互次数,瞬间X10倍
方案二需要考虑一下,从感觉上来看,Super CF再说难免——这个需要进一步验证一下
0 请登录后投票
   发表时间:2012-03-02  
对于第一个问题,我暂时采用了方案一,土办法。因为对于方案二而言:
1. Super CF根本解决不了问题;
2. Super CF是一种有争议的概念,很可能会被废掉


下面是第二个问题:
我发现uuid会重复,导致循环调用addTweet后的实际tweet数量,少于预期的数量(e.g. 100次循环,预期产生100个tweet,而实际上只有不到90个)。为什么?因为如果是相同的tweet_uuid的话,就算用mutator.addInsertion,一样会replace掉(好比RDBMS的update)

这块涉及的问题代码是:
/**
	 * REF: http://wiki.apache.org/cassandra/FAQ#working_with_timeuuid_in_java
	 * 
	 * @return UUID
	 */
	public UUID getUUID(){
		//TODO: which UUID should we use to make sure it's unique?
		ClockResolution clock = new MicrosecondsClockResolution();
		return TimeUUIDUtils.getTimeUUID(clock);
		//return TimeUUIDUtils.getUniqueTimeUUIDinMillis();
	}



从我引用的讨论和作者Patricio Echague的说法来看,如果一个毫秒内可能产生多次调用的话,建议用TimeUUIDUtils.getTimeUUID(clock)。然而结果不是这样的,为什么?待解,打算直接跟作者沟通一下,可能是用法不当

POST UPDATE:
其作者Patricio Echague建议使用MicrosecondsSyncClock,而非MicrosecondsClockResolution。其原理也很简单:如果一个毫秒内大于一次以上调用,第二次开始,每次调用在long类型的timestamp上加1
参考讨论:https://groups.google.com/forum/?fromgroups#!topic/hector-users/IfABWOh0HLg

所以最后的代码是:
	/**
	 * REF: 
	 * 1. FAQ
	 * 		http://wiki.apache.org/cassandra/FAQ#working_with_timeuuid_in_java
	 * 2. DISCUSSION:
	 * 		https://groups.google.com/forum/?fromgroups#!topic/hector-users/IfABWOh0HLg
	 * 
	 * @return UUID
	 */
	public UUID getUUID(){
		ClockResolution clock = new MicrosecondsSyncClockResolution();
		return TimeUUIDUtils.getTimeUUID(clock);
	}
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics