In the previous post, we started a straightforward implementation of a Kademlia Distributed Hash Table in Go. Today, we'll add in the real meat - interaction between peers to find nodes.
First, though, a correction to the previous post. There were a couple of errors in it, the most notable of which is that the routing table's FindClosest method failed to order the returned results correctly. The original implementation ordered results by their absolute node ID, but the correct ordering is by their ID xor the target ID. To my shame, my rudimentary unit tests did not catch thils. This is now fixed in the original article with the introduction of a 'ContactRecord' struct.
Let's start by defining a Kademlia struct to hold information about a Kademlia network:
type Kademliastruct{ routes *RoutingTable; NetworkIdstring; } func NewKademlia(self*Contact, networkId string)(ret *Kademlia){ ret =new(Kademlia); ret.routes =NewRoutingTable(self); ret.NetworkId= networkId; return; }
Note the presence of the 'NetworkId' field in the above code. This is an arbitrary string that should be unique for each deployment of our Kademlia implementation, to prevent different instances of the network merging together.
Go supports RPC calls using its built in rpc package. RPCs are handled by exporting a struct; all public methods on that struct that meet certain requirements then become available for RPC calls. Go supports RPC calls over HTTP as well as over raw sockets; we'll use the former, as it's slightly easier to use. To start, we'll define a struct that will export the 'core' Kademlia RPCs, and a common header for RPC requests and responses:
type RPCHeaderstruct{ Sender*Contact; NetworkIdstring; } func (k *Kademlia)HandleRPC(request, response *RPCHeader) os.Error{ if request.NetworkId!= k.NetworkId{ return os.NewError(fmt.Sprintf("Expected network ID %s, got %s", k.NetworkId, request.NetworkId)); } if request.Sender!=nil{ k.routes.Update(request.Sender); } response.Sender=&k.routes.node; returnnil; } type KademliaCorestruct{ kad *Kademlia; }
The HandleRPC method is a helper function that should be called for every incoming RPC. It takes care of checking that the network ID is correct, updating the routing table with the details of the peer that just sent us an RPC, and including our own details in the reply message. Let's see a trivial example of an RPC - the Ping RPC:
type PingRequeststruct{ RPCHeader; } type PingResponsestruct{ RPCHeader; } func (kc *KademliaCore)Ping(args *PingRequest, response *PingResponse)(err os.Error){ if err = kc.kad.HandleRPC(&args.RPCHeader,&response.RPCHeader); err ==nil{ log.Stderr("Ping from %s\n", args.RPCHeader); } return; }
The Ping RPC does literally nothing, except provide an opportunity for peers to check that we're still alive, and update both our routing tables with that fact. Note the way we use Go's if statement in Ping: By using Go's support for an arbitrary statement in the if, followed by an expression, we can use the if as a guard clause that executes HandleRPC, and then executes the content of the block iff HandleRPC was successful. The use of a named return parameter means that if the call is unsuccessful, the error is automatically returned to the caller.
Registering the RPC server with Go works like this:
func (k *Kademlia)Serve()(err os.Error){ rpc.Register(&KademliaCore{k}); rpc.HandleHTTP(); if l, err := net.Listen("tcp", k.routes.node.address); err ==nil{ go http.Serve(l,nil); } return; }
First we construct a new KademliaCore instance, and register it with the RPC module. This makes use of a couple of Go features I quite like. First, Go has good support for composite literals, so we can construct a struct using a single literal expression. Second, Go uses escape analysis to determine when something should be allocated on the heap instead of the stack. If we were to attempt to do what you see above in C, we'd be passing a pointer to a struct allocated on the stack, which would fail horribly when it was first accessed after our Serve function returned. Go detects this, however, and allocates our KademliaCore instance on the heap, making the above a much neater version of this:
kc :=new(KademliaCore); kc.kad = k; rpc.Register(kc);
Then, we instruct the RPC module that we want it to handle RPC-over-HTTP calls. I'm not too happy about the design decisions here and with Register to use global state - it makes it impossible to have more than one RPC server exporting different sets of RPCs on different ports, for instance. Hopefully this will be fixed in future versions of the library.
Finally, we call net.Listen to listen on a TCP port, and call http.Serve on the new port in a goroutine, to handle incoming HTTP requests.
Let's examine a slightly less trivial RPC call, the FindNode RPC:
type FindNodeRequeststruct{ RPCHeader; target NodeID; } type FindNodeResponsestruct{ RPCHeader; contacts []Contact; } func (kc *KademliaCore)FindNode(args *FindNodeRequest, response *FindNodeResponse)(err os.Error){ if err = kc.kad.HandleRPC(&args.RPCHeader,&response.RPCHeader); err ==nil{ contacts := kc.kad.routes.FindClosest(args.target,BucketSize); response.contacts = make([]Contact, contacts.Len()); for i :=0; i < contacts.Len(); i++{ response.contacts[i]=*contacts.At(i).(*ContactRecord).node; } } return; }
This RPC is a wrapper around the local FindClosest call on the RoutingTable struct. After doing the standard HandleRPC call successfully, we call FindClosest, then iterate over its results, adding them to an array (actually a slice) of results. Again, the RPC module seamlessly handles serializing and deserializing the RPC and its arguments.
Let's look at how a client makes an RPC. To facilitate this, we can define a Call method on the Kademlia struct:
func (k *Kademlia)Call(contact *Contact, method string, args, reply interface{})(err os.Error){ if client, err := rpc.DialHTTP("tcp", contact.address); err ==nil{ err = client.Call(method, args, reply); if err ==nil{ k.routes.Update(contact); } } return; }
This method sends an RPC to a client by connecting to its address over HTTP with rpc.DialHTTP. This method returns client object, which we then call the Call method on, with the method name (a string in the form 'Struct.Method') and the args and reply structs. If the call was successful, we update the routing table.
I've been saving the hardest part for last, though. The most important part of Kademlia's design is how it finds nodes and data in the network. It does this using an iterative process, described here. In short, it starts off with a list of nodes closest to the target ID, retrieved from the local routing table. It repeatedly queries the closest few nodes that it hasn't yet queried, asking them for the closest nodes _they_ know about. When it receives a response, it updates its result list, and repeats. This procedure is done in parallel, with several RPCs in flight at the same time.
Typically, the procedure terminates when either an RPC call returns no new results, or every contact in the result set has been queried. In our case, we'll use a slightly simpler criteria: We'll stop only when we no longer have any unqueried nodes.
func (k *Kademlia) sendQuery(node *Contact, target NodeID,done chan []Contact){ args :=FindNodeRequest{RPCHeader{&k.routes.node, k.NetworkId}, target}; reply :=FindNodeResponse{}; if err := k.Call(node,"KademliaCore.FindNode",&args,&reply); err ==nil{ done<- reply.contacts; }else{ done<-[]Contact{}; } }
This is the sendQuery method. It takes care of sending a FindNode RPC to the specified contact, then, when it replies, sends the results - a list of contacts - to the channel provided to signal completion. The method's designed this way so that we can call sendQuery in a new goroutine, and it will signal us with its results when it is done.
func (k *Kademlia)IterativeFindNode(target NodeID, delta int)(ret *vector.Vector){ done:= make(chan []Contact); // A vector of *ContactRecord structs ret =new(vector.Vector).Resize(0,BucketSize); // A heap of not-yet-queried *Contact structs frontier :=new(vector.Vector).Resize(0,BucketSize); // A map of client values we've seen so far seen := make(map[string]bool);
This is the start of the IterativeFindNode function. You can see here where we declare the 'done' channel instances of sendQuery will use. We also initialize the return Vector, which will store ContactRecord pointers, another vector called 'frontier', which will be structured as a heap, and store Contact pointers for contacts we have not yet queried, and a map of Node IDs that we've already seen, to prevent us accidentally adding the same node to the return or frontier vectors multiple times. Keys to maps have to be one of several types that support equality comparisons in Go, so we'll serialize our NodeIDs to use as keys in this map.
// Initialize the return list, frontier heap, and seen list with local nodes for node := range k.routes.FindClosest(target, delta).Iter(){ record := node.(*ContactRecord); ret.Push(record); heap.Push(frontier, record.node); seen[record.node.id.String()]=true; } // Start off delta queries pending :=0; for i :=0; i < delta && frontier.Len()>0; i++{ pending++; go k.sendQuery(frontier.Pop().(*Contact), target,done); }
Here, we ask the local routing table for the contacts it knows of that are closest to the target to start us off., and step through them. For each one, we add it to the return values, push it onto the heap, and mark it as seen. Then, we start off a set of concurrent RPCs using our sendQuery method. A counter variable, 'pending', keeps track of how many in-flight RPCs we have.
// Iteratively look for closer nodes for pending >0{ nodes :=<-done; pending--; for _, node := range nodes { // If we haven't seen the node before, add it if _, ok := seen[node.id.String()]; ok ==false{ ret.Push(&ContactRecord{&node, node.id.Xor(target)}); heap.Push(frontier, node); seen[node.id.String()]=true; } } for pending < delta && frontier.Len()>0{ go k.sendQuery(frontier.Pop().(*Contact), target,done); pending++; } }
This is the core of our code. We loop as long as there are queries in-flight (Go has no 'while' loop - it uses a modified 'for' loop for everything), reading results from the done channel. This operation will block until one of our pending RPCs completes. When we have results, we iterate through them, adding any ones we haven't seen before to the return vector and the heap, and marking them as seen. Then, if there are nodes still in the frontier heap, we start off new RPCs to query them, up to our specified concurrency limit. This is in a loop, rather than just an if statement, because it's possible that while one RPC could return nothing new (resulting in no new RPCs being started), a subsequent RPC could return several new results.
An important caveat here is that we're not accounting for unresponsive nodes. If a node is queried and never responds, we'll wait here indefinitely. A real-world implementation would set a timeout for each RPC, and give up on outstanding RPCs if they take too long.
sort.Sort(ret); if ret.Len()>BucketSize{ ret.Cut(BucketSize, ret.Len()); } return; }
Finally, we wrap up by sorting the results and truncating them to the number expected.
That, in a nutshell, is the core functionality in a Kademlia implementation. All other operations build on the IterativeFindNode operation in one manner or another. In the next post, we'll demonstrate this by adding support for storing and retrieving values, making the implementation into a proper hash table.
One major caveat: Because I'm writing this on-the-fly, it's possible - nay, likely - that there are major bugs in the implementation so far. Unit testing can only catch so much, so don't be surprised if my next post once again contains an embarrassed admission that I got it wrong somehow. If you see a bug, shout out!
相关推荐
This will let us cover a fairly broad range of language design and LLVM-specific usage issues, showing and explaining the code for it all along the way, without overwhelming you with tons of details ...
Implementing Cisco IP Telephony and Video, Part 1 (CIPTV1), Foundation Learning Guide(3rd) 英文mobi 第3版 本资源转载自网络,如有侵权,请联系上传者或csdn删除 查看此书详细信息请在美国亚马逊官网搜索...
VMware vRealize Automation Handbook Implementing Cloud Management in the Enterprise Environment 英文azw3 本资源转载自网络,如有侵权,请联系上传者或csdn删除 查看此书详细信息请在美国亚马逊官网搜索...
#### 2. SAP 增强功能概述 SAP提供了多种增强功能,用于功能的扩展和定制。这些增强功能包括用户退出(User Exits)、客户退出(Customer Exits)、菜单/屏幕/字段退出(Menu/Screen/Field Exits)以及BAdI等。每种类型的...
Data Governance is the specification of decision rights and an accountability framework to encourage desirable behavior in the valuation, creation, storage, use, archiving and deletion of information....
在微软实施零信任安全模型,Implementing a Zero Trust security model at Microsoft
Pro Machine Learning Algorithms: A Hands-On Approach to Implementing Algorithms in Python and R by V Kishore Ayyadevara Bridge the gap between a high-level understanding of how an algorithm works and...
Pro Machine Learning Algorithms: A Hands-On Approach to Implementing Algorithms in Python and R by V Kishore Ayyadevara Bridge the gap between a high-level understanding of how an algorithm works and...
本书《70-463 Implementing a Data Warehouse with Microsoft SQL Server 2012》是一本针对数据仓库考试的认证书籍,专门为希望通过70-463认证考试的读者准备。该认证考试是针对Microsoft SQL Server 2012中的数据...
Implementing Cisco IP Telephony and Video, Part 1 (CIPTV1), Foundation Learning Guide(3rd) 英文epub 第3版 本资源转载自网络,如有侵权,请联系上传者或csdn删除 查看此书详细信息请在美国亚马逊官网搜索...
VMware vRealize Automation Handbook Implementing Cloud Management in the Enterprise Environment 英文epub 本资源转载自网络,如有侵权,请联系上传者或csdn删除 查看此书详细信息请在美国亚马逊官网搜索...
2. **集成Kerberos与WebSphere**:需要在WebSphere应用服务器上配置Kerberos相关的安全性策略,例如设置身份验证方法、配置SSL/TLS证书等。 3. **测试与验证**:确保所有配置正确无误,并且能够实现预期的功能,如...
《Implementing Useful Algorithms in C++》是Dmytro Kedyk撰写的一本关于C++编程中实现实用算法的书籍。本书旨在帮助读者深入理解如何在C++中有效地编写和优化算法,提高编程技能。 首先,书中提到算法设计的基本...
system, without written permission from the publisher, except for the inclusion of brief quotations in a review. Printed in the United States of America First Printing June 2010 Library of Congress ...
A class for implementing a thread with a message pump on it. There is an example derived class and an example MFC application. The class itself does not require MFC执行一个弹出消息的线程
TheAlgorithms/C++ 1.0.0 - All the algorithms implemented in C++ # 概述 - 这是一个开源实现的集合,包含各种用C++实现的算法,并在MIT许可证下授权。这些算法涵盖了计算机科学、数学和统计学、数据科学、机器...
Implementing OpenShift will walk the reader through how to easily develop and deploy upon an open source OpenShift Platform-as-a-Service. We will then discuss the architecture of the platform so that ...
Addison.Wesley.Implementing.SOA.Total.Architecture.in.Practice.Apr.2008.part2.rar
Welcome to Planning, Implementing, and Maintaining a Microsoft Windows Server 2003 Active Directory Infrastructure (70-294), a part of the Microsoft Official Academic Course (MOAC) series. Through ...
T he median filter is a popular image processing technique for removing salt and pepper (“shot”) noise from images. With this technique, the eight direct neigh- bors and center point of a sliding 3-...