解决方法:添加riak_kv_util:is_x_deleted(Record)判断
143 get_map_bucket_values(Record, undefined, {Nodes, CategoryList})->
144 case riak_kv_util:is_x_deleted(Record)of
145 true ->
146 [];
147 false ->
148 %io:format("get_map_bucket_values {} Record=~p, Nodes=~p, CategoryList=~p, DR=~p~n", [r1Record, Nodes, CategoryList, dR]),
149 PubsubItem = binary_to_term(riak_object:get_value(Record)),
150 %io:format("Nodes=~p,PublishItem.name~p~n", [Nodes, PubsubItem#pubsub_item.node_name]),
151 IsNode =lists:member(PubsubItem#pubsub_item.node_name, Nodes),
152 IsCategory = lists:member(PubsubItem#pubsub_item.category, CategoryList),
153 %io:format("IsNode=~p,IsCategory~p~n", [IsNode, IsCategory]),
154 if IsNode == true orelse
155 IsCategory == true ->
156 [PubsubItem];
157 true ->
158 []
159 end
160 end;
Hi guys, I need yours help.
I using mapreduce in my server. I only query from riak, that is ok. But first delete some data,after query is error in the follow.
My environment:
Linux jason-lxw 3.2.0-30-generic #48-Ubuntu SMP Fri Aug 24 16:52:48 UTC 2012 x86_64 x86_64 x86_64 GNU/Linux
Erlang R14B04
riak-1.2
riak-erlang-1.2
Thanks in advance.
Jason
My server error message:
=ERROR REPORT==== 25-Sep-2012::18:30:48 ===
E(<0.411.0>:mod_pubsub_riak:3168) : In mod_pubsub_riak:get_newsfeed 3168 Erro={error,
{0,
<<"{\"phase\":0,\"error\":\"badarg\",\"input\":\"{ok,{r_object,<<\\\"itrends/ejabberd/pubsub_item\\\">>,<<131,104,2,107,0,13,53,52,52,56,56,48,70,56,66,52,54,53,48,97,5>>,[{r_content,{dict,4,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<\\\"X-Riak-VTag\\\">>,51,100,111,68,108,85,106,107,48,88,110,120,51,79,106,56,71,68,97,87,74,54]],[[<<\\\"index\\\">>]],[[<<\\\"X-Riak-Deleted\\\">>,116,114,117,101]],[[<<\\\"X-Riak-Last-Modified\\\">>|{1348,569048,516672}]],[],[]}}},<<>>}],[{<<35,9,254,249,80,93,197,65>>,{2,63515788248}}],...},...}\",\"type\":\"error\",\"stack\":\"[{erlang,binary_to_term,[<<>>]},{trend_riak,get_map_bucket_values,3},{riak_kv_mrc_map,map,3},{riak_kv_mrc_map,process,3},{riak_pipe_vnode_worker,process_input,3},{riak_pipe_vnode_worker,wait_for_input,2},{gen_fsm,handle_msg,7},{proc_lib,init_p_do_apply,3}]\"}">>}}
Riak error in the console:
undefined}, Nodes=["jason1"], CategoryList=[]
16:24:01.346 [error] gen_fsm <0.2681.0> in state wait_for_input terminated with reason: processing_error
16:24:01.346 [error] Pipe worker startup failed:
exit:{shutdown,{gen_fsm,sync_send_event,[<0.2654.0>,{get_details,#Ref<0.0.0.8144>,456719261665907161938651510223838443642478919680,<0.327.0>}]}}
[{gen_fsm,sync_send_event,2},{riak_pipe_fitting,get_details,2},{riak_pipe_vnode,new_worker,2},{riak_pipe_vnode,enqueue_internal,3},{riak_core_vnode,vnode_command,3},{gen_fsm,handle_msg,7},{proc_lib,init_p_do_apply,3}]
16:24:01.346 [error] Pipe worker startup failed:
exit:{shutdown,{gen_fsm,sync_send_event,[<0.2654.0>,{get_details,#Ref<0.0.0.8144>,799258707915337533392640142891717276374338109440,<0.357.0>}]}}
[{gen_fsm,sync_send_event,2},{riak_pipe_fitting,get_details,2},{riak_pipe_vnode,new_worker,2},{riak_pipe_vnode,enqueue_internal,3},{riak_core_vnode,vnode_command,3},{gen_fsm,handle_msg,7},{proc_lib,init_p_do_apply,3}]
16:24:01.347 [error] Pipe worker startup failed:fitting was gone before startup
16:24:01.348 [error] Pipe worker startup failed:fitting was gone before startup
16:24:01.389 [error] CRASH REPORT Process <0.2681.0> with 0 neighbours exited with reason: processing_error in gen_fsm:terminate/7
16:24:01.408 [error] Unrecognized message {pipe_log,#Ref<0.0.0.8144>,{kvget_map,0},{trace,[error],{vnode,{fitting_died,707914855582156101004909840846949587645842325504}}}}
16:24:01.545 [error] Supervisor riak_pipe_vnode_worker_sup had child undefined started with {riak_pipe_vnode_worker,start_link,undefined} at <0.2681.0> exit with reason processing_error in context child_terminated
16:24:01.567 [error] Unrecognized message {pipe_log,#Ref<0.0.0.8144>,{kvget_map,0},{trace,[error],{vnode,{fitting_died,411047335499316445744786359201454599278231027712}}}}
16:24:01.622 [error] Unrecognized message {pipe_log,#Ref<0.0.0.8144>,{kvget_map,0},{trace,[error],{vnode,{fitting_died,22835963083295358096932575511191922182123945984}}}}
16:24:01.645 [error] Unrecognized message {pipe_log,#Ref<0.0.0.8144>,{kvget_map,0},{trace,[error],{vnode,{fitting_died,0}}}}
16:24:01.667 [error] Unrecognized message {pipe_log,#Ref<0.0.0.8144>,{kvget_map,0},{trace,[error],{vnode,{fitting_died,1392993748081016843912887106182707253109560705024}}}}
16:24:01.729 [error] Unrecognized message {pipe_log,#Ref<0.0.0.8144>,listkeys,{trace,[error],{vnode,{fitting_died,707914855582156101004909840846949587645842325504}}}}
get_map_bucket_values {} Record=rRecord, Nodes=["jason1"], CategoryList=[]
get_map_bucket_values {} Record=r1Record, Nodes=["jason1"], CategoryList=[]
18:30:48.630 [error] gen_fsm <0.9097.0> in state wait_for_input terminated with reason: processing_error
18:30:48.661 [error] CRASH REPORT Process <0.9097.0> with 0 neighbours exited with reason: processing_error in gen_fsm:terminate/7
18:30:48.690 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,22835963083295358096932575511191922182123945984}}}}
18:30:48.717 [error] Supervisor riak_pipe_vnode_worker_sup had child undefined started with {riak_pipe_vnode_worker,start_link,undefined} at <0.9097.0> exit with reason processing_error in context child_terminated
18:30:48.750 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,936274486415109681974235595958868809467081785344}}}}
18:30:48.772 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,1392993748081016843912887106182707253109560705024}}}}
18:30:48.794 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,1255977969581244695331291653115555720016817029120}}}}
18:30:48.817 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,616571003248974668617179538802181898917346541568}}}}
18:30:48.828 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,0}}}}
18:30:48.839 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,411047335499316445744786359201454599278231027712}}}}
18:30:48.850 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,799258707915337533392640142891717276374338109440}}}}
18:30:48.872 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,685078892498860742907977265335757665463718379520}}}}
18:30:48.986 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,1187470080331358621040493926581979953470445191168}}}}
18:30:49.006 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,707914855582156101004909840846949587645842325504}}}}
18:30:49.039 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,{xform_map,0},{trace,[error],{vnode,{fitting_died,662242929415565384811044689824565743281594433536}}}}
18:30:49.072 [error] Unrecognized message {pipe_log,#Ref<0.0.0.63402>,1,{trace,[error],{vnode,{fitting_died,45671926166590716193865151022383844364247891968}}}}
Kresten Krab Thorup krab@trifork.com
|
|
|
|
|
|
Deleted objects may show up as objects with the special meta-data property X-Riak-Deleted set, and an empty contents. So, in your map function trend_riak:get_map_bucket_values you need to check riak_kv_util:is_x_deleted(Obj)(Which is a utility function that checks for that property), and in that case just return the empty list from your map function.KrestenTrifork
分享到:
相关推荐
MapReduce_Simplified_Data_Processing_on_Large_Clusters
MapReduce是Google提出的一种分布式计算模型,被广泛应用于大数据处理领域,尤其是在Hadoop框架下。紫色GW6(purplegw6)可能是指一个特定的用户或团队名称,他们在Hadoop MapReduce上进行了实践项目。这个项目的...
这是谷歌三大论文之一的 MapReduce: Simplified Data Processing on Large Clusters 英文原文。我的翻译可以见https://blog.csdn.net/m0_37809890/article/details/87830686
《MapReduce: Simplified Data Processing on Large Clusters》这篇论文由Google的研究员Jeffrey Dean和Sanjay Ghemawat撰写,旨在介绍一种名为MapReduce的分布式计算模型。在MapReduce出现之前,Google和其他公司...
在大数据处理领域,Java语言因其稳定性和可扩展性,成为了实现MapReduce算法的首选工具。MapReduce是一种分布式计算模型,由Google提出,主要用于处理和生成大规模数据集。在这个项目"0324大数据代码与数据_JAVA...
MapReduce 编程模型简介 MapReduce 是一种编程模型,由 Jeffrey Dean 和 Sanjay Ghemawat 于 2004 年提出,用于处理大规模数据集的分布式计算。该模型将计算任务分解成两个主要阶段:Map 和 Reduce。Map 阶段将...
MapReduce框架实现的单源最短路径算法(SSSP, Single-source Shortest Path),包含源文件及数据。sssp.c是MapReduce实现,ss seq.cpp是串行的普通实现。
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性...
本实验以"mpi.rar"压缩包为载体,主要涵盖了MPI(Message Passing Interface)、MapReduce以及pthread库在并行计算中的应用,特别是用于寻找历史最高气温的算法。 首先,MPI,全称Message Passing Interface,是一...
在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组...
"Data Analytics with Hadoop: An Introduction for Data Scientists" ISBN: 1491913703 | 2016 | PDF | 288 pages | 7 MB Ready to use statistical and machine-learning techniques across large data sets? ...
【标题】"WOA.zip"中的内容主要围绕“WOA算法”及其在“k-means mapreduce”中的应用,以及“WOA聚类算法”的讨论。这个压缩包提供了多个PDF文档,显然是一份关于分布式计算和优化算法的资料集合。 【WOA算法】全称...
MapReduce是一种编程模型,它适用于在超大型集群上进行大规模数据集的处理。其主要思想是通过定义两个关键函数:Map和Reduce,来实现分布式数据处理。Map函数负责处理输入的键/值(key/value)对,生成中间的键/值对...
MapReduce的翻译,我只是个搬运工qwq
MapReduce由Google提出,它将大规模数据处理分解为两个主要阶段:Map阶段和Reduce阶段,适合于并行和分布式计算。 Map阶段的主要任务是对原始数据进行预处理,包括数据分片、特征提取以及计算每个样本到其他所有...
MapReduce是一种分布式计算模型,由Google在2004年提出,主要用于处理和生成大规模数据集。这个模型将复杂的计算任务拆分成两个主要阶段:Map(映射)和Reduce(化简),使得海量数据能够在多台计算机上并行处理,...
MapReduce_and_filter
A number of organizations are focusing on big data processing, particularly with Hadoop. This course will help you understand how Hadoop, as an ecosystem, helps us store, process, and analyze data. ...