Kademlia
概要
Kademlia は二分木構造に基づく実用的な分散ハッシュテーブルアルゴリズム。P2P ファイル共有では Gnutella, BitTorrent, IPFS で使用されており、ブロックチェーンではアイディアや PoC として Ethereum 1 や Storj, Dfinity などのノード検索で検討されている。
Kademlia は構造のシンプルさや \(O(\log N)\) でのノード検索といった一般的な分散ハッシュテーブルの特徴に加えて、より効率的なルーティング経路の選択と冗長化、並列化可能なノード検索、Churn 耐性、そして Gnutella の実行統計から得られた成果として長期間駆動しやすいノードの参照を選択的に残す方法など、より現実的な機能を備えている。
概要
アルゴリズム構造
ルーティングテーブル
XOR 距離関数
プロトコル
libp2p を使用した実装サンプル
特徴
参考文献
アルゴリズム構造
ノードはその ID の 2 進数表現に基づいて概念上の二分木構造に配置される。ノード ID およびキーの名前空間のサイズにはアルゴリズムとしての制約はないが、論文では SHA-1 を意識して 160 ビットと想定している。
Fig 1 . 5 ビットの名前空間に存在するノードを二分木に再配置した
ルーティングテーブル
Kademlia の各ノードが持つルーティングは、Fig 2 が示している二分木の根と自身のノード ID を結ぶ経路 (青いライン) から分岐した部分木に含まれる任意のノードを管理している。これは \(k\)-バケットと呼ばれ、部分木ごとに最大 \(k\) 個のノードの参照を保持する。
Fig 2 . Fig 1 の木構造における 00111
のルーティングテーブルの例。\(k\)-バケットは二分木の根と 00111
を結ぶ経路から分岐した先に存在する最大 \(k\) 個のノードを参照する。
最上位の部分木は 2 分割した名前空間のうち自身のノードが含まれていない半分である。二層目の部分技は 2 分割した名前空間をさらに 2 分割したうちの自身のノードが含まれていない方である。ルーティングテーブルのエントリはこのように二分木の 2 分割を繰り返した範囲それぞれに含まれている最大 \(k\) 個のノードを参照している。
もし分割された範囲にノードが一つも存在しない場合、\(k\)-バケットのノード数は 0 となる。\(k\)-バケットのノード数が \(k\) 個より少ないとき、\(k\)-バケットに含まれているノードはその範囲で分かっているすべてのノードを表している。
Kademlia のプロトコルには明示的なノードの参加や離脱の通知は存在しない代わりに、各ノードはメッセージを中継するときにそのメッセージに含まれる送信者ノードを「発見」し、必要に応じてその ID とアドレスで適切な \(k\)-バケットを更新することができる。
Kademlia ネットワーク開始時の最初のノードはすべての名前空間を範囲とする一つの \(k\)-バケットのみを持っている。ノード A が任意のメッセージを受信したとき、そのメッセージの送信ノード B に対応する \(k\)-バケットに送信ノードが含まれているかを確認する。その \(k\)-バケットにノード B が:
含まれている場合、ノード B をバケットの末尾に移動する。
含まれていない場合、バケットのノード数が:
\(k\) 個より少なければ単純にその末尾にノード B を追加する。
\(k\) 個の場合、その \(k\)-バケットの範囲に自身のノードが:
含まれる場合、\(k\)-バケットを二分割し、すでに存在する \(k\) 個のノードをそれぞれ適切な \(k\)-バケットへ分配する。その後に再びノード B の追加を試行する。
含まれない場合、\(k\)-バケットのノード交換用キャッシュに保存する。後の動作で応答しないノードが発生したとき、この交換用キャッシュのもっとも最近に追加されたノードと疎通確認を行い、正しい応答があれば応答しなくなったノードと置き換える。
上記のように Kademlia のルーティングテーブルはトラフィックによって最新の状態に保たれる構造をしている。一定時間 (論文では 1 時間) ノード検索が行われていないバケットは、そのバケットに含まれるノードをランダムに選択し検索を行うことで強制的にリフレッシュされる。結果的に \(k\)-バケットには長期間正常に駆動しているノードが優先して残るようになり、もっとも最近に疎通が確認されたノードが末尾に配置される。
満杯の \(k\)-バケットでは、既存のノードのいずれかの疎通確認が失敗したときのみノードの入れ替えが起きうることに注意。これは、一度に大量のノードを参加させ特定の範囲の \(k\)-バケットのルーティングを支配するような DoS 攻撃に耐性がある (Churn 耐性) ことを示している。
XOR 距離関数
Kademlia での key-value ペアはキーとの距離がもっとも近い ID を持つノードに保存される。ノード ID とキーの間の距離関数はそれらのビット単位での XOR (排他的論理和) で定義される。例えばノード 3 (b011
) とノード 5 (b101
) が存在したとき、キー 1 (b001
) に対する距離はそれぞれ: \[ \left\{ \begin{array}{lcl} {\tt 011} \oplus {\tt 001} & = & {\tt 010} \ \ \mbox{(2 in decimal)}\\ {\tt 101} \oplus {\tt 001} & = & {\tt 100} \ \ \mbox{(4 in decimal)} \end{array} \right. \] となることから、キー 1 はノード 5 よりもノード 3 に近いと言える。
距離関数に XOR を使用する利点はトポロジーの距離の捉え方が単純になることである。ノード ID やキーを点 \(x, y\) としたとき、その距離が関数 \(d(x,y)\) で表されるとすると、XOR による距離は以下の特徴を持つ。
\(d(x, x)=0\), 例えば \(d({\tt 011},{\tt 011})=0\)
\(x \ne y\) のとき \(d(x,y)\gt 0\)
すべての \(x,y\) に対して \(d(x,y)=d(y,x)\)
\(d(x,z) \le d(x,y) + d(y,z)\)、すなはち \(d(x,z) \lt d(x,y)\) であれば
Chord のような剰余環は順方向と逆方向の 2 つのトポロジーが存在し双方で距離の捉え方が異なる。時計盤に例えると、4 時から 6 時までの距離は 2 時間だが、同じトポロジーでの 6 時から 4 時までの距離は 10 時間となる。これらを同じ距離とするなら状況に応じてトポロジーを変換しなければならない複雑さをもたらす。このため Chord は順方向のトポロジーしか考慮しておらず、点 \(x\) からもっとも遠い点はその直前の点 \((x - 1) \bmod n\) である。
一方、XOR 距離関数では 2 つの点 \(x\) と \(y\) は特徴 3 より可換である。つまり \(x\) と \(y\) のどちらを基準にしても他方への距離は同じである。これは Kademlia のトポロジーが単一方向性 (unidirectional) であることを示している。
特徴 4 は、\(x\) のルーティングテーブルの中で \(z\) にもっとも近いノードを選択したとき、\(x\)-\(z\) 間の距離より近くなるような距離を持つ \(x\)-\(y\)-\(z\) 経路は存在しないこと、つまり、一見 \(y\) を経由して遠回りに見えるが実際は \(x\)-\(z\) を直接結ぶより近くなるような経路は存在しないことを意味している。この特徴により、\(z\) に近いノードへの移動によって \(z\) までの距離は必ず縮まること、それを繰り返すことでどのような経路であっても最終的に \(z\) にもっとも近いノードに収束することを示している。実際、2 点 \(x\), \(y\) からある点 \(z\) へ向かう探索はどちらも同じ経路に収束する傾向を持つ。
XOR
0 (b000 )
1 (b001 )
2 (b010 )
3 (b011 )
4 (b100 )
5 (b101 )
6 (b110 )
7 (b111 )
0 (b000 )
0 (b000 )
1 (b001 )
2 (b010 )
3 (b011 )
4 (b100 )
5 (b101 )
6 (b110 )
7 (b111 )
1 (b001 )
1 (b001 )
0 (b000 )
3 (b011 )
2 (b010 )
5 (b101 )
4 (b100 )
7 (b111 )
6 (b110 )
2 (b010 )
2 (b010 )
3 (b011 )
0 (b000 )
1 (b001 )
6 (b110 )
7 (b111 )
4 (b100 )
5 (b101 )
3 (b011 )
3 (b011 )
2 (b010 )
1 (b001 )
0 (b000 )
7 (b111 )
6 (b110 )
5 (b101 )
4 (b100 )
4 (b100 )
4 (b100 )
5 (b101 )
6 (b110 )
7 (b111 )
0 (b000 )
1 (b001 )
2 (b010 )
3 (b011 )
5 (b101 )
5 (b101 )
4 (b100 )
7 (b111 )
6 (b110 )
1 (b001 )
0 (b000 )
3 (b011 )
2 (b010 )
6 (b110 )
6 (b110 )
7 (b111 )
4 (b100 )
5 (b101 )
2 (b010 )
3 (b011 )
0 (b000 )
1 (b001 )
7 (b111 )
7 (b111 )
6 (b110 )
5 (b101 )
4 (b100 )
3 (b011 )
2 (b010 )
1 (b001 )
0 (b000 )
Table 1 . 3 ビット空間における XOR を使った 2 点間の距離。
プロトコル
def STORE(key, value)
libp2p を使用した実装サンプル
main.rs
main.rs use chrono::Local;
use clap::{Arg, App};
use colored::*;
use std::fs;
use std::path::Path;
use std::io::{stdout, Write};
use libp2p::identity::{ed25519, Keypair, PublicKey};
use libp2p::Multiaddr;
use async_std::{io, task};
use futures::prelude::*;
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{
record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult,
Quorum, Record,
};
use libp2p::{
build_development_transport, identity,
mdns::{Mdns, MdnsEvent},
swarm::NetworkBehaviourEventProcess,
NetworkBehaviour, PeerId, Swarm,
};
use std::{
error::Error,
task::{Context, Poll},
};
fn main() -> Result<(), Box<dyn Error>> {
// コマンドラインパラメータの解析
let matches = App::new("Kadmlia-based Simple KVS").version("1.0").author("TAKAMI Torao <koiroha@gmail.com>")
.about("Simple key-value store implementation using the Kademlia DHT of libp2p.")
.arg(Arg::with_name("ID")
.help("numeric node id")
.required(true))
.arg(Arg::with_name("daemon")
.short("d")
.long("daemon")
.help("use when no input from stdin is accepted.")
.required(false)
.takes_value(false))
.arg(Arg::with_name("bootstrap")
.short("b")
.long("bootstrap")
.value_name("PEER_ID=NODE_ADDRESS")
.help("bootstrap node address (e.g., 12D3KooWCzsfEkYrFf2djHem3qdagCjV4M6EXkcPUQzP9Xq76EPV=/ip4/127.0.0.1/tcp/6200)")
.takes_value(true))
.get_matches();
// コマンドラインオプションからノード ID を取得し鍵ペアをロード
let id:u32 = matches.value_of("ID").map(|s| s.parse().unwrap()).unwrap_or(0);
let key_pair = load_keypair(id);
let peer_id = PeerId::from(PublicKey::Ed25519(key_pair.public()));
let kv_store = MemoryStore::new(peer_id.clone());
let mut kademlia = Kademlia::new(peer_id.clone(), kv_store);
log(id, format!("Peer ID: {}", peer_id));
log(id, format!("Protocol Name: {}", String::from_utf8(kademlia.protocol_name().to_vec()).unwrap()));
let bind_address = address(id);
log(id, format!("Bind Address: {}", bind_address.to_string()));
let transport = libp2p::build_development_transport(Keypair::Ed25519(key_pair)).unwrap();
let behaviour = DHT{ id, kademlia };
let mut swarm = Swarm::new(transport, behaviour, peer_id);
Swarm::listen_on(&mut swarm, bind_address).unwrap();
env_logger::init();
// Create a random key for ourselves.
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
// Set up a an encrypted DNS-enabled TCP Transport over the Mplex protocol.
let transport = build_development_transport(local_key)?;
// Create a swarm to manage peers and events.
let mut swarm = {
// Create a Kademlia behaviour.
let store = MemoryStore::new(local_peer_id.clone());
let kademlia = Kademlia::new(local_peer_id.clone(), store);
let mdns = Mdns::new()?;
let behaviour = DHT { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id)
};
// Read full lines from stdin
let mut stdin = io::BufReader::new(io::stdin()).lines();
// Listen on all interfaces and whatever port the OS assigns.
Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?;
// Kick it off.
let mut listening = false;
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => handle_input_line(&mut swarm.kademlia, line),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
}
}
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => println!("{:?}", event),
Poll::Ready(None) => return Poll::Ready(Ok(())),
Poll::Pending => {
if !listening {
if let Some(a) = Swarm::listeners(&swarm).next() {
println!("Listening on {:?}", a);
listening = true;
}
}
break;
}
}
}
Poll::Pending
}))
}
#[derive(NetworkBehaviour)]
struct DHT {
kademlia: Kademlia<MemoryStore>,
mdns: Mdns,
}
impl NetworkBehaviourEventProcess<MdnsEvent> for DHT {
// Called when `mdns` produces an event.
fn inject_event(&mut self, event: MdnsEvent) {
if let MdnsEvent::Discovered(list) = event {
for (peer_id, multiaddr) in list {
self.kademlia.add_address(&peer_id, multiaddr);
}
}
}
}
impl NetworkBehaviourEventProcess<KademliaEvent> for DHT {
// Called when `kademlia` produces an event.
fn inject_event(&mut self, message: KademliaEvent) {
match message {
KademliaEvent::QueryResult { result, .. } => match result {
QueryResult::GetProviders(Ok(ok)) => {
for peer in ok.providers {
println!(
"Peer {:?} provides key {:?}",
peer,
std::str::from_utf8(ok.key.as_ref()).unwrap()
);
}
}
QueryResult::GetProviders(Err(err)) => {
eprintln!("Failed to get providers: {:?}", err);
}
QueryResult::GetRecord(Ok(ok)) => {
for PeerRecord {
record: Record { key, value, .. },
..
} in ok.records
{
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
std::str::from_utf8(&value).unwrap(),
);
}
}
QueryResult::GetRecord(Err(err)) => {
eprintln!("Failed to get record: {:?}", err);
}
QueryResult::PutRecord(Ok(PutRecordOk { key })) => {
println!(
"Successfully put record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::PutRecord(Err(err)) => {
eprintln!("Failed to put record: {:?}", err);
}
QueryResult::StartProviding(Ok(AddProviderOk { key })) => {
println!(
"Successfully put provider record {:?}",
std::str::from_utf8(key.as_ref()).unwrap()
);
}
QueryResult::StartProviding(Err(err)) => {
eprintln!("Failed to put provider record: {:?}", err);
}
_ => {}
},
_ => {}
}
}
}
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
let mut args = line.split(" ");
match args.next() {
Some("GET") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
};
kademlia.get_record(&key, Quorum::One);
}
Some("GET_PROVIDERS") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
};
kademlia.get_providers(key);
}
Some("PUT") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
};
let value = {
match args.next() {
Some(value) => value.as_bytes().to_vec(),
None => {
eprintln!("Expected value");
return;
}
}
};
let record = Record {
key,
value,
publisher: None,
expires: None,
};
kademlia
.put_record(record, Quorum::One)
.expect("Failed to store record locally.");
}
Some("PUT_PROVIDER") => {
let key = {
match args.next() {
Some(key) => Key::new(&key),
None => {
eprintln!("Expected key");
return;
}
}
};
kademlia
.start_providing(key)
.expect("Failed to start providing key");
}
_ => {
eprintln!("expected GET, GET_PROVIDERS, PUT or PUT_PROVIDER");
}
}
}
fn load_keypair(id:u32) -> ed25519::Keypair {
let path = format!("node/{}/", id);
let dir = Path::new(&path);
if !dir.is_dir() {
fs::create_dir_all(dir).unwrap();
}
let file = dir.join("key");
if file.is_file() {
let mut binary = fs::read(file).unwrap();
ed25519::Keypair::decode(&mut binary).unwrap()
} else {
let key_pair = ed25519::Keypair::generate();
let binary = key_pair.encode();
fs::write(file, binary.to_vec()).unwrap();
key_pair
}
}
fn address(id:u32) -> Multiaddr {
let bind_address = format!("/ip4/127.0.0.1/tcp/{}", 6200 + id);
bind_address.parse().unwrap()
}
fn prompt<T: std::str::FromStr>(id:u32, msg:&str) -> T {
let color = color(id);
print!("{}> ", msg.color(color));
stdout().flush().unwrap();
let mut s = String::new();
std::io::stdin().read_line(&mut s).unwrap();
s.trim().parse().ok().unwrap()
}
fn color(id:u32) -> Color {
let colors = [Color::BrightRed, Color::BrightGreen, Color::BrightYellow, Color::BrightBlue, Color::BrightMagenta,
Color::BrightCyan];
colors[id as usize % colors.len()]
}
fn log(id:u32, msg:String) {
let color = color(id);
let tm = Local::now().format("%Y-%m-%d %H:%M:%S%.3f").to_string();
println!("{} {}", tm.bright_black(), format!("Node{} {}", id, msg).color(color));
}
他のノードの lookup は mDNS を前提としていること。
Cargo.toml
Cargo.toml [package]
name = "kvs"
version = "0.1.0"
authors = ["TAKAMI Torao <koiroha@gmail.com>"]
edition = "2018"
[dependencies]
libp2p = "0.28"
#libp2p-swarm = "0.21"
parity-multiaddr = "0.9"
clap = "2"
tokio = { version = "0.2", features = ["full"] }
chrono = "0.4"
colored = "2"
atomic = "0.4.6"
bytes = "0.5"
futures = "0.3.1"
lazy_static = "1.2"
#multiaddr = { package = "parity-multiaddr", version = "0.9.1", path = "misc/multiaddr" }
multihash = "0.11.0"
parking_lot = "0.10.0"
pin-project = "0.4.17"
smallvec = "1.0"
wasm-timer = "0.2.4"
#[dev-dependencies]
async-std = "1.6.2"
env_logger = "0.7.1"
#tokio = { version = "0.2", features = ["io-util", "io-std", "stream"] }
特徴
ある区間内の任意のノードを参照する。したがって区間の先頭のノードだけを参照する Chord などとは異なり、区間内でもより距離のより近いノードや性能の良いノードを選択してメッセージを転送することができる。
ある区間に対して最大 \(k\) 個のノードを参照する。これは、その区間に対する経路に \(k\) 個の冗長性を持つ。
参考文献
Maymounkov P., Mazières D. Kademlia: A Peer-to-Peer Information System Based on the XOR Metric (日本語訳 ) . In: Druschel P., Kaashoek F., Rowstron A. (eds) Peer-to-Peer Systems. IPTPS 2002. Lecture Notes in Computer Science, vol 2429. Springer, Berlin, Heidelberg.
Xing Shi Cai, Luc Devroye. The Analysis of Kademlia for random IDs