`
cryolite
  • 浏览: 582321 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

对Riak Core的探索 (3) - 依赖注入 IoC

阅读更多
二、 基于Riak Core分布式应用开发

开发一个基于Riak Core的分布式数据处理系统,意味着它要处理的数据将在hash后映射到一个环上,也就是说这些数据分布在各个数据节点的partition上,数据的处理也在各个partition上进行。

显然每个数据处理系统有它自己的数据处理逻辑。如果我们实现了这个处理逻辑然后以某种方式注入到Riak Core中,那么相应的系统也就开发成了。

基于Riak Core的开发本质上也是这样的:Riak Core屏蔽了所有和分布式有关的部分,开发者不用关心这些,它提供了某种接口,用户通过这些接口实现数据处理逻辑,然后将实现注入到Riak Core系统中。


1. riak_core_vnode behaviour —— 用户处理逻辑的实现接口

Riak Core为提供了一个riak_core_vnode回调模块作为用户实现业务逻辑的接口。源代码看这里。riak_core_vnode有两个作用:
  1. 作为erlang behaviour提供回调函数实现接口,用户在回调函数中实现业务逻辑;
  2. 作为容器进程运行实现上述接口的用户模块,具体来讲它将作为一个基于gen_fsm的有限状态机进程运行。

也就是说,riak_core_vnode是作为一个容器以erlang进程的形式运行,而用户的业务逻辑栖身于容器中。(BTW:gen_server/gen_fsm都是这种设计和运行模式。)

我想这是一种典型的IoC(依赖注入)设计:系统将用户实现的功能注入到Riak Core系统中,由此构建出基于Riak Core的分布式应用系统。困难的dynamo分布式实现交给Riak Core完成,用户只关心业务逻辑的实现就行了。

2. 业务逻辑的实现,和其他

主要是实现riak_core_vnode behaviour的接口回调函数。除了业务逻辑的实现,还设计到数据的转移等。

Riak Core将partition上的数据处理操作抽象出来,由用户通过riak_core_vnode behaviour的回调函数实现,vnode的回调函数分为以下几类:
0) vnode进程生命周期管理,在vnode进程诞生和毁灭时将被调用的回调函数init/1, terminate/2
1) 对vnode进程负责的partition上数据的处理,当传来对该partition上的数据继续处理的命令时,由回调函数handle_command/3负责处理。例如,如果对数据的处理是存储,那么无非是get/put之类的命令,这些命令的格式(协议)都是由用户自己定义并实现的。
2) 当集群中增/减物理节点时,或者某个宕掉的物理节点恢复时,就需要对partition进行handoff的处理,其核心是对数据的搬运,它包括一组回调函数(is_empty/1, delete/1, handle_handoff_command/3, handoff_starting/2, handoff_cancelled/1, handoff_finished/2, handle_handoff_data/2, handle_handoff_data/2, encode_handoff_item/2 )
3) 对进程错误的处理:与该vnode相关的进程如果出错会调用回调函数handle_exit
4) Covering:覆盖整个ring空间数据的特殊命令,例如对一段连续范围的搜索命令。某个连续的数据集在hash后有可能会落到所有的partition。

这些回调函数在rzezeski的博文中有介绍

3. 业务逻辑的注入过程

我们的分布式数据处理系统作为一个OTP application发布。在application中实现了处理系统的启动,以及业务逻辑的注入。

application启动时会启动supervisor,后者监控着riak_core_vnode_master子进程(riak_core_vnode_master是一个gen_server),参数配制成实现用户逻辑的定制vnode模块,从而实现业务逻辑注入。

riak_core_vnode_master可看成是运行着vnode的容器。

业务逻辑的注入

-module(rts_sup).

-behaviour(supervisor).

%% API 
-export([start_link/0]).      

%% Supervisor callbacks
-export([init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).


init(_Args) ->
    VMaster = { rts_vnode_master,  
                     {riak_core_vnode_master, start_link, [rts_vnode]},
                      permanent, 5000, worker, [riak_core_vnode_master]},

    Entry = { rts_entry_vnode_master,
                {riak_core_vnode_master, start_link, [rts_entry_vnode]},
                 permanent, 5000, worker, [riak_core_vnode_master]},

    Stat = { rts_stat_vnode_master, 
               {riak_core_vnode_master, start_link, [rts_stat_vnode]}, 
                permanent, 5000, worker, [riak_core_vnode_master]},

    {ok,
     {{one_for_one, 5, 10},
      [VMaster, Entry, Stat]}}.


有几种执行特定分区上业务逻辑的API方法,都是riak_core_vnode_master模块提供的,这些函数的参数都是三个,分别是:IdxNode, Msg, VMasterMod

1. 同步发送
sync_command(IndexNode, Msg, VMaster)

2. 异步发送
riak_core_vnode_master:sync_spawn_command(IndexNode, ping, rts_vnode_master)

还有一个比较特殊command函数,它有两种

一种一上述两类相似,直接向vnode同步发送命令:
    riak_core_vnode_master:command(IdxNode,
                                   {entry, Client, Entry} = Msg,
                                   rts_entry_vnode_master = VMaster).

一种向PreList里的所有节点发送command命令:
command(Preflist, Msg, VMaster)


IdxNode是{Index, Node}这样的tuple,其中Index是partition分区的id号,Node是分区所在的节点,知道了Node和rts_entry_vnode_master,就能得到vnode进程Pid,通过发送事件调用相关业务逻辑:
gen_fsm:send_event(Pid, make_request(Msg, Sender, Index));


IndexNode的计算:
1) 用riak_core_util:chash_key/1函数计算hash值,参数是一个含两个二进制数据的tuple
DocIdx = riak_core_util:chash_key({<<"ping">>, term_to_binary(now())}),
2) 通过哈希值得到Preflist,Preflist的长度可以指定:
PrefList = riak_core_apl:get_primary_apl(DocIdx, 1, rts),

最后得到IndexNod
[{IndexNode, _Type}] = PrefList,





BTW:应用的启动过程
-module(rts_app).
-behaviour(application).

%% Application callbacks
-export([start/2, stop/1]).

start(_StartType, _StartArgs) ->
    case rts_sup:start_link() of
        {ok, Pid} ->
            ok = riak_core:register_vnode_module(rts_vnode),
            ok = riak_core_node_watcher:service_up(rts, self()),

            ok = riak_core:register_vnode_module(rts_entry_vnode),
            ok = riak_core_node_watcher:service_up(rts_entry, self()),

            ok = riak_core:register_vnode_module(rts_stat_vnode),
            ok = riak_core_node_watcher:service_up(rts_stat, self()),

            EntryRoute = {["rts", "entry", client], rts_wm_entry, []},
            webmachine_router:add_route(EntryRoute),

            {ok, Pid};
        {error, Reason} ->
            {error, Reason}
    end.

stop(_State) ->
    ok. 
分享到:
评论

相关推荐

    riak-session-manager.zip

    riak-session-manager 是使用 Riak 来存储 Tomcat session 信息的项目。 配置方法:   &lt;?xml version="1.0" encoding="UTF-8"?&gt; &lt;Manager className="com.jbrisbin.vpc.riak.session.RiakManager" ...

    riak-ruby-client, 用于 ruby的Riak客户端.zip

    riak-ruby-client, 用于 ruby的Riak客户端 ) 客户端( Riak客户机)riak-client 是一个富 ruby 客户端/工具箱,分布在,数据库中,包含典型操作的基本包装。在 http://basho.github.io/riak-ruby-client/ 可以使用详尽...

    riak-java-client:Java的Riak客户端

    Riak Java客户端 Riak Java客户端支持与 (开放源代码,分布式数据库)进行通信,该数据库专注于高可用性,水平可伸缩性和可预测的延迟。 Riak和此代码均由维护。 Java客户端的最新版本同时支持Riak KV 2.0+和Riak...

    riak-erlang-http-client:使用HTTP接口的Riak Erlang客户端

    git clone git://github.com/basho/riak-erlang-http-client.gitcd riak-erlang-http-clientmake 如果您对协议缓冲区Riak Erlang客户端( )早已熟悉,则应该发现此客户端很熟悉。 刚刚替补调用riakc_pb_socket通过...

    PyPI 官网下载 | riak_pb-2.1.0.6-py2.6.egg

    **PyPI官网下载 | riak_pb-2.1.0.6-py2.6.egg** 在Python的世界中,PyPI(Python Package Index)是官方的第三方软件包仓库,它为开发者提供了一个集中发布Python模块的地方。"riak_pb-2.1.0.6-py2.6.egg"是PyPI上...

    docker-riak:DevDB Riak - 用于开发的 docker 镜像

    开发数据库 Riak docker 容器中的 Riak v2.0.0 - 当您需要快速的 Riak 开发节点时。 快速开始 docker pull devdb/riak:latest docker run -d --name riak1 -p 8098:8098 -p 8087:8087 devdb/riak:latest 使用以下...

    Riak JSON编辑器「Riak JSON editor」-crx插件

    Riak JSON编辑器 用于Riak的JSON编辑器(NOSQL DB)。 不建议使用,而不要使用此命令:...

    Laravel开发-laravel-riak-auth

    了解如何在Riak中安全地存储敏感信息,如加密密码,以及如何防止SQL注入等安全威胁,是`laravel-riak-auth`必须考虑的问题。 通过这个项目,开发者能够利用Laravel的强大功能和Riak的灵活性,为大型分布式应用构建...

    riak-client-1.1.3.zip

    【标题】"Riak Client 1.1.3与specs2 Maven Plugin" 【描述】在软件开发领域,特别是Java生态系统中,Riak是一个分布式键值存储系统,而specs2是一个广泛使用的Scala测试框架,它提供了强大的行为驱动开发(BDD)...

    riak-java-client, Java的Riak客户端.zip

    riak-java-client, Java的Riak客户端 Java客户端客户端 客户端支持与 通信,它是一个开源的分布式数据库,专注于高可用性。水平可伸缩性和可以预测的可以预测的实时延时。 Riak和这段代码都由 Basho维护。Java客户端...

    riakcs-java-client:RiakCS和Amazon S3的Java客户端

    Riak CS Java客户端适用于Amazon S3和Riak CS的轻量级Java库。 没有外部依赖性。 基于本书中的代码:“对Amazon Web Services进行编程” 可以在以下示例中找到示例代码:examples.com.basho.riakcs.client 用户管理...

    Python库 | riak-1.5.2-py2.7.egg

    资源分类:Python库 所属语言:Python 资源全名:riak-1.5.2-py2.7.egg 资源来源:官方 安装方法:https://lanzao.blog.csdn.net/article/details/101784059

    riak-bucket-cleanup:清除键匹配给定正则表达式的 riak 条目

    riak-bucket-清理 清除键匹配给定正则表达式的 riak 条目 如何使用 npm install -g riak-bucket-cleanup run riak-bucket-cleanup --regex="^old_.*$" [bucketName] grab a cup of coffee all keys starting with...

    riak-cli:Riak 命令行客户端

    带有 NodeJS 的 Riak 命令行工具。 有更好的解决方案,请参考 -&gt;使用带有curl的Riak http api查询riak真的很烦人! 于是工具诞生了。 干杯! 这是一个 Riak 终端查询工具,提供基本的 POST/PUT/DELETE/GET 等方法。...

    前端开源库-riak-js

    3. **使用riak-js** 在项目中引入riak-js,首先需要安装该库,可以通过npm命令完成: ``` npm install riak-js ``` 然后在代码中导入并创建客户端实例,连接到RIAK服务器: ```javascript var Riak = require...

    scruffy-riak_2.10-1.3.10.zip

    "scruffy"通常指的是项目中的一个特定分支或维护者的名字,它可能是对Riak进行了一些定制化修改或者功能增强的版本。对于开发人员来说,了解这些定制化的部分以及它们如何与原版Riak交互是非常重要的。 另一方面,...

    Laravel开发-laravel-riak

    在本文中,我们将深入探讨如何在 Laravel 框架中集成和使用 Riak 数据库,主要关注连接配置、缓存管理以及会话存储。Riak 是一个分布式键值存储系统,提供高可用性、可扩展性和容错性,特别适合大数据量的应用场景。...

    riak-cache:riak-cache 是一个 ActiveSupport

    riak riak-cache需要 ActiveSupport 3.x 版作为其父类,并且riak-client需要连接到 Riak。 开发依赖由打包器处理。 安装 bundler ( gem install bundler ) 并运行以下命令: $ bundle install 要固定到特定版本的...

Global site tag (gtag.js) - Google Analytics