`
langzhe
  • 浏览: 286926 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

riak MapReduce 中的 {index, bucket(), Index::binary(), key()} 使用方法

    博客分类:
  • riak
 
阅读更多

 

----------这是客户端测试代码-------------------------

-module(test).

-export([test/0,

         get_state/0]).

 

get_state() ->

    Bucket = <<"ejabberd/pubsub_state">> ,

    {ok, Pid}= riakc_pb_socket:start("127.0.0.1", 8087),

    Inputs = {index, Bucket, <<"idx_int">>, 1}, %% that is ok

    %Query = [],

    Query = 

        [{map, {modfun, trend_riak, get_map_bucket_values}, none, true}], %%注意这里的trend_riak:get_map_bucet_values是自己在riak-server中自定义的

        %[{map, {modfun, trend_riak, get_map_bucket_values}, none, false},

        % {reduce, {modfun, trend_riak, get_reduce_bucket_values}, {1,2}, true}],

    riakc_pb_socket:mapred(Pid, Inputs, Query).

 

 

test() ->

    Bucket = <<"ejabberd/pubsub_node">> ,

    {ok, Pid}= riakc_pb_socket:start("127.0.0.1", 8087),

    %Inputs = {modfun, trend_riak, test, [<<"a">>, <<"b">>]},

    %Inputs =  {modfun, riak_search, mapred_search, [ <<"Bucket">>, <<"SearchQuery">>]}, %% that is ok

    %Inputs = {index, Bucket, <<"nodeidx_int">>,11}, %% that is ok

    Inputs = {index, Bucket, <<"host_bin">>,<<"pubsub.meda.com">>}, %% that is ok

    %Query = [],

    Query = 

        [{map, {modfun, trend_riak, test}, none, true}],

        %[{map, {modfun, trend_riak, get_map_bucket_values}, none, false},

        % {reduce, {modfun, trend_riak, get_reduce_bucket_values}, {1,2}, true}],

    riakc_pb_socket:mapred(Pid, Inputs, Query).

 

test1() ->

    Bucket = <<"ejabberd/pubsub_item">> ,                                                                                                                                                                           

    %Merge = fun(Gcounts, none) ->    lists:keysort(6,Gcounts) end,  

    %Count = fun(G, undefined, none) ->  [binary_to_term(riak_object:get_value(G))]  end,     

    %Query =  [{map, {qfun, Count}, none,false},  {reduce, {qfun, Merge}, none, true}],

    {ok, Pid}= riakc_pb_socket:start("127.0.0.1", 8087),

    %riakc_pb_socket:mapred(Pid, Bucket, [{map, {modfun, riak_kv_mapreduce, map_object_value}, none,true}]).

    %riakc_pb_socket:mapred(Pid, Bucket, [{map, {modfun, riak_kv_mapreduce, map_object_value}, none,true}]).

    %Query = [{map,   {modfun, riak_kv_mapreduce, map_object_value},

    %                   {struct,[{<<"sub">>,[<<"0">>]}]},false},

    %                 {reduce,{modfun, riak_kv_mapreduce, reduce_string_to_integer},none,false},

    %                 {reduce,{modfun, riak_kv_mapreduce, reduce_sum},              none,true}],

    %Query = 

    %    [{map, {modfun, riak_kv_mapreduce, map_object_value}, none,true},

    %     {reduce, {modfun, riak_kv_mapreduce, reduce_sum}, none, true}],

    Query = 

        [{map, {modfun, trend_riak, get_map_bucket_values}, none, false},

    %    [{map, {modfun, riak_kv_mapreduce, map_object_value}, none,true},

         {reduce, {modfun, trend_riak, get_reduce_bucket_values}, {1,2}, true}],

    riakc_pb_socket:mapred(Pid, Bucket, Query).


----------------------自定义服务器代码------------------------

1 -module(trend_riak).                                                                                                                                                                                            

  2 

  3 -export([get_map_bucket_values/3,

  4          get_reduce_bucket_values/2,

  5          test/3,

  6          test/2]).

  7 

  8 -record(pubsub_item,

  9 {

 10   itemid,

 11   creation          = {'unknown','unknown'},

 12   modification      = {'unknown','unknown'},

 13   payload           = [],

 14   published         = "",

 15   node_icon         = "",

 16   node_name         = "",

 17   node_creator      = "",

 18   node_creator_jid  = "",

 19   node_creator_icon = "",

 20   creator           = "",

 21   creator_jid       = "",

 22   creator_icon      = "",

 23   category          = ""

 24 }).

 25 

 26 get_map_bucket_values(Record, undefined, {Nodes, CategoryList})->

 27     PubsubItem = binary_to_term(riak_object:get_value(Record)),

 28     IsNode =lists:member(PubsubItem#pubsub_item.node_name, Nodes),

 29     IsCategory = lists:member(PubsubItem#pubsub_item.category, CategoryList),

 30     if  IsNode == true orelse

 31         IsCategory == true ->

 32             [PubsubItem];

 33         true ->

 34             []

 35     end;

 36 %% _s default is undefined _c  default is none 

 37 get_map_bucket_values(Record, undefined, _C)->

 38     [binary_to_term(riak_object:get_value(Record))].

 39 

 40 get_reduce_bucket_values(Records,{Start, Max})->

 41     %Records.

 42     %%lists:keysort(6,Records).

 43     %lists:sublist(Records, Start, Max).

 44     Skip = get_skip_num(Start, Max),

 45     if length(Records) > Skip  ->

 46         lists:reverse(lists:sublist(lists:keysort(#pubsub_item.published, Records), Skip, Max));

 47        true ->

 48             []

 49     end.

 50 test(A,B) ->

 51 ok.

 52 test(Record, B, C) -> %%B,C两个参数可以有客户端传入

 53     [binary_to_term(riak_object:get_value(Record))].

     %[A].

 55   %%  io:format(ssssssssss),

 56  %%   io:format("A=~p,B=~p,C=~p~n", [A,B,C]),

 57     %{ok, [<<"itrends/ejabberd/pubsub_node">>]}.

 58 %%    [{<<"my">>, <<"bu">>}].

 59     %{ok,[]}.

 60     %[].

 61 

 62 

 63 

 64 get_skip_num(1, _Max) ->

 65     1;

 66 get_skip_num(Start, Max) ->

 67     Start*Max.

这里需要注意的是需要在riak的

vi etc/vm.args中添加如下:

 40 ## add by langxw 

 41 -pa /home/jason/learn/riak              

主要目的是在riak中引入trend_riak.erl                                                                                                                                                     

~                                  

-------------------------------------


0
0
分享到:
评论

相关推荐

    riak-bucket-cleanup:清除键匹配给定正则表达式的 riak 条目

    riak-bucket-清理 清除键匹配给定正则表达式的 riak 条目 如何使用 npm install -g riak-bucket-cleanup run riak-bucket-cleanup --regex="^old_.*$" [bucketName] grab a cup of coffee all keys starting with...

    riak-session-manager.zip

    riak-session-manager 是使用 Riak 来存储 Tomcat session 信息的项目。 配置方法:   &lt;?xml version="1.0" encoding="UTF-8"?&gt; &lt;Manager className="com.jbrisbin.vpc.riak.session.RiakManager" ...

    Laravel开发-laravel-riak

    1. **安装 Riak PHP 客户端**:首先,你需要安装官方的 PHP 客户端库 `basho/riak-php-client` 以在 Laravel 中使用 Riak。你可以通过 Composer 来完成安装: ``` composer require basho/riak-php-client ``` 2...

    Riak 学习文档

    **Riak 学习文档** Riak 是一个基于 Erlang 语言开发的分布式数据库,设计时注重容错性和高效性,尤其适用于处理大量键值对(K-V)...通过理解其核心特性和使用方法,你可以有效地利用 Riak 来处理和存储大量数据。

    riak-cli:Riak 命令行客户端

    带有 NodeJS 的 Riak 命令行工具。 有更好的解决方案,请参考 -&gt;使用带有curl的Riak http api查询... $ riak-cli -m post -b bucket-name -k riak-key -d data$ riak-cli -m post -b bucket-name -k riak-key -f ${YO

    riak-Erlang.rar

    3. 安装Riak:从Riak官网获取源码或二进制包,根据操作系统选择合适的安装方法。 4. 配置Riak:配置文件通常位于`etc/riak/app.config`,可以在此调整复制因子、内存分配等参数。 5. 启动Riak:使用Erlang的控制台...

    liza-riak:liza的riak bucket实现(KV抽象)

    可配置的特定内容: 序列化(对内容类型使用多种方法完成) allow-siblings(默认为true) r值最后写赢主机/端口存储后端值区名称用法( require '[liza.store :as store])( require '[liza.store.riak :as riak])( ...

    PyPI 官网下载 | riak_pb-2.1.0.6-py2.6.egg

    要在Python项目中使用`riak_pb`库,首先需要将其安装到Python环境。由于我们已经有了`.egg`文件,可以直接通过Python的`easy_install`工具或者`pip`(可能需要先将文件重命名为`.zip`)进行安装: ```bash # 使用...

    riak:Riak是Basho Technologies的去中心化数据存储

    在Wiki中,您将找到设置和使用Riak的“快速入门”指导。 有关更多信息,请浏览以下文件: 自述文件:此文件 许可证:Riak的发布许可证 doc / admin.org:Riak管理指南 architecture.txt:有关Riak底层设计的详细...

    fakeriak:用于测试和没有 Riak 的机器的内存中 Ruby Riak 驱动程序

    特征支持以下 Riak 功能: 服务器信息基本数据对象查找2.0 之前的计数器Bucket/Bucket Type 道具列出键/桶使用 Javascript 映射/减少二级索引搜索索引/模式CRDT 以下 Riak 功能目前尚未实现: 使用 Solr 搜索查询...

    Laravel开发-laravel-riak .zip

    在本压缩包“Laravel开发-laravel-riak .zip”中,主要涉及的是使用 Laravel 框架集成 Riak 数据库的开发实践。Laravel 是一个基于 PHP 的优雅、强大的 Web 开发框架,它提供了丰富的工具来简化 Web 应用程序的构建...

    Laravel开发-laravel-riak-auth

    5. **配置和设置**:使用`laravel-riak-auth`需要在Laravel的配置文件中设置Riak的连接信息,比如主机地址、端口、集群设置等。 6. **测试和调试**:为了确保`laravel-riak-auth`的正确运行,开发者通常会编写单元...

    riak-erlang-http-client:使用HTTP接口的Riak Erlang客户端

    riak-erlang-http-client建置状态 riak-erlang-http-client是... 作为一个简单的示例,下面是如何使用此客户端在存储桶“ bar”中使用键“ foo”创建和检索值的方法。 首先,启动带有riak-erlang-http-client路径和所有

    riak-admin:Riak 的类似蒲团的 Web 界面

    riak-admin 到目前为止的功能 显示桶的内容 单击时显示文档内容 编辑文档内容(json.object) 删除单个文档或存储桶... http://localhost:8098/riak/riak-admin/index.html?bucket 其中bucket是居住在 Riak 的任何桶。

    riak_kv:Ripple KeyValue商店

    riak_kv概述Riak KV是使用 Erlang库分发的开源Erlang应用程序。 Riak KV提供了键/值数据存储,并具有MapReduce,轻量级数据关系和几种不同的客户端API。快速开始您必须具有或更高版本,以及GNU风格的构建系统才能...

    docker-riak:DevDB Riak - 用于开发的 docker 镜像

    开发数据库 Riak docker 容器中的 Riak v2.0.0 - 当您需要快速的 Riak 开发节点时。 快速开始 docker pull devdb/riak:... 如果您希望数据持久化,请使用以下命令启动 Riak: docker pull devdb/riak:latest do

    riak_ensemble_demo:设置riak_ensemble的小代码示例

    riak_ensemble_demo:read_object(Key)-&gt; {ok,Obj#obj} | {错误,错误} 返回{obj,{Epoch,Seq,Key,Value}}}的元组(实际上是riak_ensemble_demo中obj记录的一个实例)或一个错误。 请注意,未找到表示为#obj ...

    server_monitoring_riak:使用Riak作为后端的服务器监视

    使用Riak作为后端的服务器监视 该项目是我的学士学位工作的一部分: “ NoSQL数据库和应用程序的比较分析” 米兰比可卡大学 关联者:安德烈·毛里诺(Andrea Maurino) 联合主持人:Blerina Spahiu 讲解 先决条件 ...

    riak-java-client:Java的Riak客户端

    Riak Java客户端的此分支用于与Riak 2.0一起使用的新v2.0客户端。 之前的版本: 与Riak 1.4.x一起使用 与&lt;Riak&gt; &lt; groupId&gt;com.basho.riak&lt;/ groupId&gt; &lt; artifactId&gt;riak-client&lt;/ arti

    riak-ruby-client, 用于 ruby的Riak客户端.zip

    riak-ruby-client, 用于 ruby的Riak客户端 ) 客户端( Riak客户机)riak-client 是一个富 ruby 客户端/工具箱,分布在,数据库中,包含典型操作的基本包装。在 http://basho.github.io/riak-ruby-client/ 可以使用详尽...

Global site tag (gtag.js) - Google Analytics