54class OPENDHT_PUBLIC DhtRunner {
62 std::string proxy_server {};
63 std::string push_node_id {};
64 std::string push_token {};
65 std::string push_topic {};
66 std::string push_platform {};
67 bool peer_discovery {
false};
68 bool peer_publish {
false};
69 std::shared_ptr<dht::crypto::Certificate> server_ca;
70 dht::crypto::Identity client_identity;
75 std::shared_ptr<Logger> logger {};
76 std::unique_ptr<net::DatagramSocket> sock;
77 std::shared_ptr<PeerDiscovery> peerDiscovery {};
78 StatusCallback statusChangedCallback {};
79 CertificateStoreQuery certificateStore {};
80 IdentityAnnouncedCb identityAnnouncedCb {};
81 PublicAddressChangedCb publicAddressChangedCb {};
82 std::unique_ptr<std::mt19937_64> rng {};
89 void get(InfoHash
id, GetCallbackSimple cb, DoneCallback donecb={},
Value::Filter f = {}, Where w = {}) {
90 get(
id, bindGetCb(cb), donecb, f, w);
93 void get(InfoHash
id, GetCallbackSimple cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
94 get(
id, bindGetCb(cb), donecb, f, w);
97 void get(InfoHash hash, GetCallback vcb, DoneCallback dcb, Value::Filter f={}, Where w = {});
99 void get(InfoHash
id, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter f = {}, Where w = {}) {
100 get(
id, cb, bindDoneCb(donecb), f, w);
102 void get(
const std::string& key, GetCallback vcb, DoneCallbackSimple dcb={}, Value::Filter f = {}, Where w = {});
105 void get(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb, DoneCallbackSimple dcb={})
107 get(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
108 return cb(unpackVector<T>(vals));
114 void get(InfoHash hash, std::function<
bool(T&&)> cb, DoneCallbackSimple dcb={})
116 get(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
117 for (
const auto& v : vals) {
119 if (not cb(Value::unpack<T>(*v)))
121 }
catch (
const std::exception&) {
131 std::future<std::vector<std::shared_ptr<dht::Value>>> get(InfoHash key, Value::Filter f = {}, Where w = {}) {
132 auto p = std::make_shared<std::promise<std::vector<std::shared_ptr< dht::Value >>>>();
133 auto values = std::make_shared<std::vector<std::shared_ptr< dht::Value >>>();
134 get(key, [=](
const std::vector<std::shared_ptr<dht::Value>>& vlist) {
135 values->insert(values->end(), vlist.begin(), vlist.end());
138 p->set_value(std::move(*values));
141 return p->get_future();
145 std::future<std::vector<T>> get(InfoHash key) {
146 auto p = std::make_shared<std::promise<std::vector<T>>>();
147 auto values = std::make_shared<std::vector<T>>();
148 get<T>(key, [=](T&& v) {
149 values->emplace_back(std::move(v));
152 p->set_value(std::move(*values));
154 return p->get_future();
157 void query(
const InfoHash& hash, QueryCallback cb, DoneCallback done_cb = {}, Query q = {});
158 void query(
const InfoHash& hash, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query q = {}) {
159 query(hash, cb, bindDoneCb(done_cb), q);
162 std::future<size_t> listen(InfoHash key, ValueCallback vcb, Value::Filter f = {}, Where w = {});
164 std::future<size_t> listen(InfoHash key, GetCallback cb, Value::Filter f={}, Where w={}) {
165 return listen(key, [cb=std::move(cb)](
const std::vector<Sp<Value>>& vals,
bool expired){
169 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
171 std::future<size_t> listen(
const std::string& key, GetCallback vcb, Value::Filter f = {}, Where w = {});
172 std::future<size_t> listen(InfoHash key, GetCallbackSimple cb, Value::Filter f = {}, Where w = {}) {
173 return listen(key, bindGetCb(cb), f, w);
177 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&)> cb)
179 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
180 return cb(unpackVector<T>(vals));
185 std::future<size_t> listen(InfoHash hash, std::function<
bool(std::vector<T>&&,
bool)> cb)
187 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals,
bool expired) {
188 return cb(unpackVector<T>(vals), expired);
193 template <
typename T>
194 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&)> cb, Value::Filter f = {}, Where w = {})
196 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals) {
197 for (
const auto& v : vals) {
199 if (not cb(Value::unpack<T>(*v)))
201 }
catch (
const std::exception&) {
207 getFilterSet<T>(f), w);
209 template <
typename T>
210 std::future<size_t> listen(InfoHash hash, std::function<
bool(T&&,
bool)> cb, Value::Filter f = {}, Where w = {})
212 return listen(hash, [cb=std::move(cb)](
const std::vector<std::shared_ptr<Value>>& vals,
bool expired) {
213 for (
const auto& v : vals) {
215 if (not cb(Value::unpack<T>(*v), expired))
217 }
catch (
const std::exception&) {
223 getFilterSet<T>(f), w);
226 void cancelListen(InfoHash h,
size_t token);
227 void cancelListen(InfoHash h, std::shared_future<size_t> token);
229 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={}, time_point created=time_point::max(),
bool permanent =
false);
230 void put(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb, time_point created=time_point::max(),
bool permanent =
false) {
231 put(hash, value, bindDoneCb(cb), created, permanent);
234 void put(InfoHash hash, Value&& value, DoneCallback cb={}, time_point created=time_point::max(),
bool permanent =
false);
235 void put(InfoHash hash, Value&& value, DoneCallbackSimple cb, time_point created=time_point::max(),
bool permanent =
false) {
236 put(hash, std::forward<Value>(value), bindDoneCb(cb), created, permanent);
238 void put(
const std::string& key, Value&& value, DoneCallbackSimple cb={}, time_point created=time_point::max(),
bool permanent =
false);
240 void cancelPut(
const InfoHash& h, Value::Id
id);
241 void cancelPut(
const InfoHash& h,
const std::shared_ptr<Value>& value);
243 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallback cb={},
bool permanent =
false);
244 void putSigned(InfoHash hash, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false) {
245 putSigned(hash, value, bindDoneCb(cb), permanent);
248 void putSigned(InfoHash hash, Value&& value, DoneCallback cb={},
bool permanent =
false);
249 void putSigned(InfoHash hash, Value&& value, DoneCallbackSimple cb,
bool permanent =
false) {
250 putSigned(hash, std::forward<Value>(value), bindDoneCb(cb), permanent);
252 void putSigned(
const std::string& key, Value&& value, DoneCallbackSimple cb={},
bool permanent =
false);
254 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallback cb={},
bool permanent =
false);
255 void putEncrypted(InfoHash hash, InfoHash to, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false) {
256 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
259 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallback cb={},
bool permanent =
false);
260 void putEncrypted(InfoHash hash, InfoHash to, Value&& value, DoneCallbackSimple cb,
bool permanent =
false) {
261 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
263 void putEncrypted(
const std::string& key, InfoHash to, Value&& value, DoneCallback cb={},
bool permanent =
false);
265 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallback cb={},
bool permanent =
false);
266 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, std::shared_ptr<Value> value, DoneCallbackSimple cb,
bool permanent =
false) {
267 putEncrypted(hash, to, value, bindDoneCb(cb), permanent);
270 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallback cb={},
bool permanent =
false);
271 void putEncrypted(InfoHash hash,
const std::shared_ptr<crypto::PublicKey>& to, Value&& value, DoneCallbackSimple cb,
bool permanent =
false) {
272 putEncrypted(hash, to, std::forward<Value>(value), bindDoneCb(cb), permanent);
280 void bootstrap(std::vector<SockAddr> nodes, DoneCallbackSimple cb={});
281 void bootstrap(
SockAddr addr, DoneCallbackSimple cb={});
295 void bootstrap(
const std::string& host,
const std::string& service);
296 void bootstrap(
const std::string& hostService);
316 void dumpTables()
const;
322 std::shared_ptr<crypto::PublicKey> getPublicKey()
const;
341 std::pair<size_t, size_t> getStoreSize()
const;
343 void getStorageLimit()
const;
344 void setStorageLimit(
size_t limit = DEFAULT_STORAGE_LIMIT);
346 std::vector<NodeExport> exportNodes()
const;
348 std::vector<ValuesExport> exportValues()
const;
350 void setLogger(
const Sp<Logger>& logger = {});
351 void setLogger(
const Logger& logger) {
352 setLogger(std::make_shared<Logger>(logger));
360 void registerType(
const ValueType& type);
362 void importValues(
const std::vector<ValuesExport>& values);
364 bool isRunning()
const {
365 return running != State::Idle;
368 NodeStats getNodesStats(sa_family_t af)
const;
369 unsigned getNodesStats(sa_family_t af,
unsigned *good_return,
unsigned *dubious_return,
unsigned *cached_return,
unsigned *incoming_return)
const;
370 NodeInfo getNodeInfo()
const;
371 void getNodeInfo(std::function<
void(std::shared_ptr<NodeInfo>)>);
373 std::vector<unsigned> getNodeMessageStats(
bool in =
false)
const;
374 std::string getStorageLog()
const;
375 std::string getStorageLog(
const InfoHash&)
const;
376 std::string getRoutingTablesLog(sa_family_t af)
const;
377 std::string getSearchesLog(sa_family_t af = AF_UNSPEC)
const;
378 std::string getSearchLog(
const InfoHash&, sa_family_t af = AF_UNSPEC)
const;
379 std::vector<SockAddr> getPublicAddress(sa_family_t af = AF_UNSPEC)
const;
380 std::vector<std::string> getPublicAddressStr(sa_family_t af = AF_UNSPEC)
const;
381 void getPublicAddress(std::function<
void(std::vector<SockAddr>&&)>, sa_family_t af = AF_UNSPEC);
385 void findCertificate(InfoHash hash, std::function<
void(
const std::shared_ptr<crypto::Certificate>&)>);
386 void registerCertificate(std::shared_ptr<crypto::Certificate> cert);
387 void setLocalCertificateStore(CertificateStoreQuery&& query_method);
395 void run(in_port_t port = dht::net::DHT_DEFAULT_PORT,
const crypto::Identity& identity = {},
bool threaded =
true, NetId network = 0) {
397 config.dht_config.node_config.
network = network;
398 config.dht_config.id = identity;
399 config.threaded = threaded;
402 void run(in_port_t port, Config& config, Context&& context = {});
407 void run(
const char* ip4,
const char* ip6,
const char* service,
Config& config,
Context&& context = {});
409 void run(
const Config& config, Context&& context);
411 void setOnStatusChanged(StatusCallback&& cb) {
413 statusCbs.emplace_back(std::move(cb));
422 std::lock_guard<std::mutex> lck(dht_mtx);
429 void shutdown(ShutdownCallback cb = {},
bool stop =
false);
438 std::shared_ptr<PeerDiscovery> getPeerDiscovery()
const {
return peerDiscovery_; };
440 void setProxyServer(
const std::string& proxy,
const std::string& pushNodeId =
"");
471 void forwardAllMessages(
bool forward);
483 return std::max(status4, status6);
486 bool checkShutdown();
488 DoneCallback bindOpDoneCallback(DoneCallback&& cb);
489 DoneCallbackSimple bindOpDoneCallback(DoneCallbackSimple&& cb);
492 std::unique_ptr<SecureDht> dht_;
495 std::atomic_bool use_proxy {
false};
499 IdentityAnnouncedCb identityAnnouncedCb_;
506 mutable std::mutex dht_mtx {};
507 std::thread dht_thread {};
508 std::condition_variable cv {};
509 std::mutex sock_mtx {};
510 net::PacketList rcv {};
511 decltype(rcv) rcv_free {};
513 std::queue<std::function<void(SecureDht&)>> pending_ops_prio {};
514 std::queue<std::function<void(SecureDht&)>> pending_ops {};
515 std::mutex storage_mtx {};
517 std::atomic<State> running {State::Idle};
518 std::atomic_size_t ongoing_ops {0};
519 std::vector<ShutdownCallback> shutdownCallbacks_;
521 NodeStatus status4 {NodeStatus::Disconnected},
522 status6 {NodeStatus::Disconnected};
524 std::vector<StatusCallback> statusCbs {};
527 std::shared_ptr<PeerDiscovery> peerDiscovery_;
533 std::shared_ptr<dht::Logger> logger_;