My Project 3.2.0
C++ Distributed Hash Table
Loading...
Searching...
No Matches
network_engine.h
1/*
2 * Copyright (C) 2014-2023 Savoir-faire Linux Inc.
3 * Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4 * Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <https://www.gnu.org/licenses/>.
18 */
19
20#pragma once
21
22#include "node_cache.h"
23#include "value.h"
24#include "infohash.h"
25#include "node.h"
26#include "scheduler.h"
27#include "utils.h"
28#include "rng.h"
29#include "rate_limiter.h"
30#include "logger.h"
31#include "network_utils.h"
32
33#include <vector>
34#include <string>
35#include <functional>
36#include <algorithm>
37#include <memory>
38#include <queue>
39
40namespace dht {
41namespace net {
42
43struct Request;
44struct Socket;
45struct TransId;
46
47#ifndef MSG_CONFIRM
48#define MSG_CONFIRM 0
49#endif
50
52 NetId network {0};
53 ssize_t max_req_per_sec {0};
54 ssize_t max_peer_req_per_sec {0};
55 bool is_client {false};
56};
57
58class DhtProtocolException : public DhtException {
59public:
60 // sent to another peer (http-like).
61 static const constexpr uint16_t NON_AUTHORITATIVE_INFORMATION {203}; /* incomplete request packet. */
62 static const constexpr uint16_t UNAUTHORIZED {401}; /* wrong tokens. */
63 static const constexpr uint16_t NOT_FOUND {404}; /* storage not found */
64 // for internal use (custom).
65 static const constexpr uint16_t INVALID_TID_SIZE {421}; /* id was truncated. */
66 static const constexpr uint16_t UNKNOWN_TID {422}; /* unknown tid */
67 static const constexpr uint16_t WRONG_NODE_INFO_BUF_LEN {423}; /* node info length is wrong */
68
69 static const std::string GET_NO_INFOHASH; /* received "get" request with no infohash */
70 static const std::string LISTEN_NO_INFOHASH; /* got "listen" request without infohash */
71 static const std::string LISTEN_WRONG_TOKEN; /* wrong token in "listen" request */
72 static const std::string PUT_NO_INFOHASH; /* no infohash in "put" request */
73 static const std::string PUT_WRONG_TOKEN; /* got "put" request with wrong token */
74 static const std::string STORAGE_NOT_FOUND; /* got access request for an unknown storage */
75 static const std::string PUT_INVALID_ID; /* invalid id in "put" request */
76
77 DhtProtocolException(uint16_t code, const std::string& msg="", InfoHash failing_node_id={})
78 : DhtException(msg), msg(msg), code(code), failing_node_id(failing_node_id) {}
79
80 const std::string& getMsg() const { return msg; }
81 uint16_t getCode() const { return code; }
82 const InfoHash& getNodeId() const { return failing_node_id; }
83
84private:
85 std::string msg;
86 uint16_t code;
87 InfoHash failing_node_id;
88};
89
90struct ParsedMessage;
91
95struct RequestAnswer {
96 Blob ntoken {};
97 Value::Id vid {};
98 std::vector<Sp<Value>> values {};
99 std::vector<Value::Id> refreshed_values {};
100 std::vector<Value::Id> expired_values {};
101 std::vector<Sp<FieldValueIndex>> fields {};
102 std::vector<Sp<Node>> nodes4 {};
103 std::vector<Sp<Node>> nodes6 {};
104 RequestAnswer() {}
105 RequestAnswer(ParsedMessage&& msg);
106};
107
126class NetworkEngine final
127{
128private:
132 std::function<void(Sp<Request>, DhtProtocolException)> onError;
133
140 std::function<void(const Sp<Node>&, int)> onNewNode;
147 std::function<void(const InfoHash&, const SockAddr&)> onReportedAddr;
153 std::function<RequestAnswer(Sp<Node>)> onPing {};
162 std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t)> onFindNode {};
171 std::function<RequestAnswer(Sp<Node>, const InfoHash&, want_t, const Query&)> onGetValues {};
180 std::function<RequestAnswer(Sp<Node>,
181 const InfoHash&,
182 const Blob&,
183 Tid,
184 const Query&,
185 int)> onListen {};
195 std::function<RequestAnswer(Sp<Node>,
196 const InfoHash&,
197 const Blob&,
198 const std::vector<Sp<Value>>&,
199 const time_point&)> onAnnounce {};
208 std::function<RequestAnswer(Sp<Node>,
209 const InfoHash&,
210 const Blob&,
211 const Value::Id&)> onRefresh {};
212
213public:
214 using RequestCb = std::function<void(const Request&, RequestAnswer&&)>;
215 using RequestErrorCb = std::function<bool(const Request&, DhtProtocolException&&)>;
216 using RequestExpiredCb = std::function<void(const Request&, bool)>;
217
218 NetworkEngine(
219 InfoHash& myid,
220 NetworkConfig config,
221 std::unique_ptr<DatagramSocket>&& sock,
222 const Sp<Logger>& log,
223 std::mt19937_64& rd,
224 Scheduler& scheduler,
225 decltype(NetworkEngine::onError)&& onError,
226 decltype(NetworkEngine::onNewNode)&& onNewNode,
227 decltype(NetworkEngine::onReportedAddr)&& onReportedAddr,
228 decltype(NetworkEngine::onPing)&& onPing,
229 decltype(NetworkEngine::onFindNode)&& onFindNode,
230 decltype(NetworkEngine::onGetValues)&& onGetValues,
231 decltype(NetworkEngine::onListen)&& onListen,
232 decltype(NetworkEngine::onAnnounce)&& onAnnounce,
233 decltype(NetworkEngine::onRefresh)&& onRefresh);
234
235 ~NetworkEngine();
236
237 net::DatagramSocket* getSocket() const { return dht_socket.get(); };
238
239 void clear();
240
256 void tellListener(const Sp<Node>& n, Tid socket_id, const InfoHash& hash, want_t want, const Blob& ntoken,
257 std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6,
258 std::vector<Sp<Value>>&& values, const Query& q, int version);
259
260 void tellListenerRefreshed(const Sp<Node>& n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values, int version);
261 void tellListenerExpired(const Sp<Node>& n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values, int version);
262
263 bool isRunning(sa_family_t af) const;
264 inline want_t want () const { return dht_socket->hasIPv4() and dht_socket->hasIPv6() ? (WANT4 | WANT6) : -1; }
265
266 void connectivityChanged(sa_family_t);
267
268 /**************
269 * Requests *
270 **************/
271
281 Sp<Request>
282 sendPing(const Sp<Node>& n, RequestCb&& on_done, RequestExpiredCb&& on_expired);
283
294 Sp<Request>
295 sendPing(SockAddr&& sa, RequestCb&& on_done, RequestExpiredCb&& on_expired) {
296 return sendPing(std::make_shared<Node>(InfoHash::zero(), std::move(sa), rd),
297 std::forward<RequestCb>(on_done),
298 std::forward<RequestExpiredCb>(on_expired));
299 }
300
313 Sp<Request> sendFindNode(const Sp<Node>& n,
314 const InfoHash& hash,
315 want_t want = -1,
316 RequestCb&& on_done = {},
317 RequestExpiredCb&& on_expired = {});
332 Sp<Request> sendGetValues(const Sp<Node>& n,
333 const InfoHash& hash,
334 const Query& query,
335 want_t want,
336 RequestCb&& on_done,
337 RequestExpiredCb&& on_expired);
361 Sp<Request> sendListen(const Sp<Node>& n,
362 const InfoHash& hash,
363 const Query& query,
364 const Blob& token,
365 Tid socketId,
366 RequestCb&& on_done,
367 RequestExpiredCb&& on_expired);
381 Sp<Request> sendAnnounceValue(const Sp<Node>& n,
382 const InfoHash& hash,
383 const Sp<Value>& v,
384 time_point created,
385 const Blob& token,
386 RequestCb&& on_done,
387 RequestExpiredCb&& on_expired);
401 Sp<Request> sendRefreshValue(const Sp<Node>& n,
402 const InfoHash& hash,
403 const Value::Id& vid,
404 const Blob& token,
405 RequestCb&& on_done,
406 RequestErrorCb&& on_error,
407 RequestExpiredCb&& on_expired);
420 void sendUpdateValues(const Sp<Node>& n,
421 const InfoHash& infohash,
422 std::vector<Sp<Value>>&& values,
423 time_point created,
424 const Blob& token,
425 size_t sid);
426 Sp<Request> sendUpdateValues(const Sp<Node>& n,
427 const InfoHash& infohash,
428 std::vector<Sp<Value>>::iterator begin,
429 std::vector<Sp<Value>>::iterator end,
430 time_point created,
431 const Blob& token,
432 size_t sid);
433
443 void processMessage(const uint8_t *buf, size_t buflen, SockAddr addr);
444
445 Sp<Node> insertNode(const InfoHash& id, const SockAddr& addr) {
446 auto n = cache.getNode(id, addr, scheduler.time(), 0);
447 onNewNode(n, 0);
448 return n;
449 }
450
451 std::vector<unsigned> getNodeMessageStats(bool in) {
452 auto& st = in ? in_stats : out_stats;
453 std::vector<unsigned> stats {st.ping, st.find, st.get, st.listen, st.put};
454 st = {};
455 return stats;
456 }
457
458 void blacklistNode(const Sp<Node>& n);
459
460 std::vector<Sp<Node>> getCachedNodes(const InfoHash& id, sa_family_t sa_f, size_t count) {
461 return cache.getCachedNodes(id, sa_f, count);
462 }
463
464 size_t getNodeCacheSize() const {
465 return cache.size();
466 }
467 size_t getNodeCacheSize(sa_family_t af) const {
468 return cache.size(af);
469 }
470
471 size_t getRateLimiterSize() const {
472 return address_rate_limiter.size();
473 }
474
475 size_t getPartialCount() const {
476 return partial_messages.size();
477 }
478
479private:
480
481 struct PartialMessage;
482
483 /***************
484 * Constants *
485 ***************/
486 /* the length of a node info buffer in ipv4 format */
487 static const constexpr size_t NODE4_INFO_BUF_LEN {HASH_LEN + sizeof(in_addr) + sizeof(in_port_t)};
488 /* the length of a node info buffer in ipv6 format */
489 static const constexpr size_t NODE6_INFO_BUF_LEN {HASH_LEN + sizeof(in6_addr) + sizeof(in_port_t)};
490 /* after a UDP reply, the period during which we tell the link layer about it */
491 static constexpr std::chrono::seconds UDP_REPLY_TIME {15};
492
493 /* Max. time to receive a full fragmented packet */
494 static constexpr std::chrono::seconds RX_MAX_PACKET_TIME {10};
495 /* Max. time between packet fragments */
496 static constexpr std::chrono::seconds RX_TIMEOUT {3};
497 /* The maximum number of nodes that we snub. There is probably little
498 reason to increase this value. */
499 static constexpr unsigned BLACKLISTED_MAX {10};
500
501 static constexpr size_t MTU {1280};
502 static constexpr size_t MAX_PACKET_VALUE_SIZE {600};
503 static constexpr size_t MAX_MESSAGE_VALUE_SIZE {56 * 1024};
504
505 void process(std::unique_ptr<ParsedMessage>&&, const SockAddr& from);
506
507 bool rateLimit(const SockAddr& addr);
508
509 static bool isMartian(const SockAddr& addr);
510 bool isNodeBlacklisted(const SockAddr& addr) const;
511
512 void requestStep(Sp<Request> req);
513
518 void sendRequest(const Sp<Request>& request);
519
520 struct MessageStats {
521 unsigned ping {0};
522 unsigned find {0};
523 unsigned get {0};
524 unsigned put {0};
525 unsigned listen {0};
526 unsigned refresh {0};
527 unsigned updateValue {0};
528 };
529
530
531 // basic wrapper for socket sendto function
532 int send(const SockAddr& addr, const char *buf, size_t len, bool confirmed = false);
533
534 void sendValueParts(Tid tid, const std::vector<Blob>& svals, const SockAddr& addr);
535 std::vector<Blob> packValueHeader(msgpack::sbuffer&, std::vector<Sp<Value>>::const_iterator, std::vector<Sp<Value>>::const_iterator) const;
536 std::vector<Blob> packValueHeader(msgpack::sbuffer& buf, const std::vector<Sp<Value>>& values) const {
537 return packValueHeader(buf, values.begin(), values.end());
538 }
539 void maintainRxBuffer(Tid tid);
540
541 /*************
542 * Answers *
543 *************/
544 /* answer to a ping request */
545 void sendPong(const SockAddr& addr, Tid tid);
546 /* answer to findnodes/getvalues request */
547 void sendNodesValues(const SockAddr& addr,
548 Tid tid,
549 const Blob& nodes,
550 const Blob& nodes6,
551 const std::vector<Sp<Value>>& st,
552 const Query& query,
553 const Blob& token);
554 Blob bufferNodes(sa_family_t af, const InfoHash& id, std::vector<Sp<Node>>& nodes);
555
556 std::pair<Blob, Blob> bufferNodes(sa_family_t af,
557 const InfoHash& id,
558 want_t want,
559 std::vector<Sp<Node>>& nodes,
560 std::vector<Sp<Node>>& nodes6);
561 /* answer to a listen request */
562 void sendListenConfirmation(const SockAddr& addr, Tid tid);
563 /* answer to put request */
564 void sendValueAnnounced(const SockAddr& addr, Tid, Value::Id);
565 /* answer in case of error */
566 void sendError(const SockAddr& addr,
567 Tid tid,
568 uint16_t code,
569 const std::string& message,
570 bool include_id=false);
571
572 void deserializeNodes(ParsedMessage& msg, const SockAddr& from);
573
574 /* DHT info */
575 const InfoHash& myid;
576 const NetworkConfig config {};
577 const std::unique_ptr<DatagramSocket> dht_socket;
578 Sp<Logger> logger_;
579 std::mt19937_64& rd;
580
581 NodeCache cache;
582
583 // global limiting should be triggered by at least 8 different IPs
584 using IpLimiter = RateLimiter;
585 using IpLimiterMap = std::map<SockAddr, IpLimiter, SockAddr::ipCmp>;
586 IpLimiterMap address_rate_limiter;
587 RateLimiter rate_limiter;
588 ssize_t limiter_maintenance {0};
589
590 // requests handling
591 std::map<Tid, Sp<Request>> requests {};
592 std::map<Tid, PartialMessage> partial_messages;
593
594 MessageStats in_stats {}, out_stats {};
595 std::set<SockAddr> blacklist {};
596
597 Scheduler& scheduler;
598
599 bool logIncoming_ {false};
600};
601
602} /* namespace net */
603} /* namespace dht */
Job scheduler.
Definition scheduler.h:36
void processMessage(const uint8_t *buf, size_t buflen, SockAddr addr)
void tellListener(const Sp< Node > &n, Tid socket_id, const InfoHash &hash, want_t want, const Blob &ntoken, std::vector< Sp< Node > > &&nodes, std::vector< Sp< Node > > &&nodes6, std::vector< Sp< Value > > &&values, const Query &q, int version)
Sp< Request > sendPing(SockAddr &&sa, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendFindNode(const Sp< Node > &n, const InfoHash &hash, want_t want=-1, RequestCb &&on_done={}, RequestExpiredCb &&on_expired={})
void sendUpdateValues(const Sp< Node > &n, const InfoHash &infohash, std::vector< Sp< Value > > &&values, time_point created, const Blob &token, size_t sid)
Sp< Request > sendRefreshValue(const Sp< Node > &n, const InfoHash &hash, const Value::Id &vid, const Blob &token, RequestCb &&on_done, RequestErrorCb &&on_error, RequestExpiredCb &&on_expired)
Sp< Request > sendGetValues(const Sp< Node > &n, const InfoHash &hash, const Query &query, want_t want, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendPing(const Sp< Node > &n, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendAnnounceValue(const Sp< Node > &n, const InfoHash &hash, const Sp< Value > &v, time_point created, const Blob &token, RequestCb &&on_done, RequestExpiredCb &&on_expired)
Sp< Request > sendListen(const Sp< Node > &n, const InfoHash &hash, const Query &query, const Blob &token, Tid socketId, RequestCb &&on_done, RequestExpiredCb &&on_expired)
std::vector< uint8_t > Blob
Definition utils.h:151
Describes a query destined to another peer.
Definition value.h:924