My Project 3.2.0
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dht_proxy_server.h
1/*
2 * Copyright (C) 2014-2023 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include "callbacks.h"
24#include "def.h"
25#include "infohash.h"
26#include "proxy.h"
27#include "scheduler.h"
28#include "sockaddr.h"
29#include "value.h"
30#include "http.h"
31
32#include <restinio/all.hpp>
33#include <restinio/tls.hpp>
34#include <json/json.h>
35
36#include <memory>
37#include <mutex>
38
39namespace dht {
40enum class PushType {
41 None = 0,
42 Android,
43 iOS,
44 UnifiedPush
45};
46}
47MSGPACK_ADD_ENUM(dht::PushType)
48
49namespace Json {
50class Value;
51}
52
53namespace dht {
54
55namespace http {
56class Request;
57struct ListenerSession;
58}
59
60class DhtRunner;
61
62using RestRouter = restinio::router::express_router_t<>;
63using RequestStatus = restinio::request_handling_status_t;
64
65struct OPENDHT_PUBLIC ProxyServerConfig {
66 std::string address {};
67 in_port_t port {8000};
68 std::string pushServer {};
69 std::string persistStatePath {};
70 dht::crypto::Identity identity {};
71 std::string bundleId {};
72};
73
77class OPENDHT_PUBLIC DhtProxyServer
78{
79public:
88 DhtProxyServer(const std::shared_ptr<DhtRunner>& dht,
89 const ProxyServerConfig& config = {},
90 const std::shared_ptr<log::Logger>& logger = {});
91
92 virtual ~DhtProxyServer();
93
94 DhtProxyServer(const DhtProxyServer& other) = delete;
95 DhtProxyServer(DhtProxyServer&& other) = delete;
96 DhtProxyServer& operator=(const DhtProxyServer& other) = delete;
97 DhtProxyServer& operator=(DhtProxyServer&& other) = delete;
98
99 asio::io_context& io_context() const;
100
101 struct ServerStats {
103 size_t listenCount {0};
105 size_t putCount {0};
111 double requestRate {0};
113 std::shared_ptr<NodeInfo> nodeInfo {};
114
115 std::string toString() const {
116 std::ostringstream ss;
117 ss << "Listens: " << listenCount << " Puts: " << putCount << " PushListeners: " << pushListenersCount << std::endl;
118 ss << "Requests: " << requestRate << " per second." << std::endl;
119 if (nodeInfo) {
120 auto& ipv4 = nodeInfo->ipv4;
121 if (ipv4.table_depth > 1)
122 ss << "IPv4 Network estimation: " << ipv4.getNetworkSizeEstimation() << std::endl;;
123 auto& ipv6 = nodeInfo->ipv6;
124 if (ipv6.table_depth > 1)
125 ss << "IPv6 Network estimation: " << ipv6.getNetworkSizeEstimation() << std::endl;;
126 }
127 return ss.str();
128 }
129
133 Json::Value toJson() const {
134 Json::Value result;
135 result["listenCount"] = static_cast<Json::UInt64>(listenCount);
136 result["putCount"] = static_cast<Json::UInt64>(putCount);
137 result["totalPermanentPuts"] = static_cast<Json::UInt64>(totalPermanentPuts);
138 result["pushListenersCount"] = static_cast<Json::UInt64>(pushListenersCount);
139 result["requestRate"] = requestRate;
140 if (nodeInfo)
141 result["nodeInfo"] = nodeInfo->toJson();
142 return result;
143 }
144 };
145
146 std::shared_ptr<ServerStats> stats() const { return stats_; }
147
148 std::shared_ptr<ServerStats> updateStats(std::shared_ptr<NodeInfo> info) const;
149
150 std::shared_ptr<DhtRunner> getNode() const { return dht_; }
151
152private:
153 class ConnectionListener;
154 struct RestRouterTraitsTls;
155 struct RestRouterTraits;
156
157 template <typename HttpResponse>
158 static HttpResponse initHttpResponse(HttpResponse response);
159 static restinio::request_handling_status_t serverError(restinio::request_t& request);
160
161 template< typename ServerSettings >
162 void addServerSettings(ServerSettings& serverSettings,
163 const unsigned int max_pipelined_requests = 16);
164
165 std::unique_ptr<RestRouter> createRestRouter();
166
167 void onConnectionClosed(restinio::connection_id_t);
168
176 RequestStatus getNodeInfo(restinio::request_handle_t request,
177 restinio::router::route_params_t params) const;
178
185 RequestStatus getStats(restinio::request_handle_t request,
186 restinio::router::route_params_t params);
187
198 RequestStatus get(restinio::request_handle_t request,
199 restinio::router::route_params_t params);
200
211 RequestStatus listen(restinio::request_handle_t request,
212 restinio::router::route_params_t params);
213
223 RequestStatus put(restinio::request_handle_t request,
224 restinio::router::route_params_t params);
225
226 void handleCancelPermamentPut(const asio::error_code &ec, const InfoHash& key, Value::Id vid);
227
228#ifdef OPENDHT_PROXY_SERVER_IDENTITY
238 RequestStatus putSigned(restinio::request_handle_t request,
239 restinio::router::route_params_t params) const;
240
250 RequestStatus putEncrypted(restinio::request_handle_t request,
251 restinio::router::route_params_t params);
252
253#endif // OPENDHT_PROXY_SERVER_IDENTITY
254
265 RequestStatus getFiltered(restinio::request_handle_t request,
266 restinio::router::route_params_t params);
267
275 RequestStatus options(restinio::request_handle_t request,
276 restinio::router::route_params_t params);
277
278 struct PushSessionContext {
279 std::mutex lock;
280 std::string sessionId;
281 PushSessionContext(const std::string& id) : sessionId(id) {}
282 };
283
284#ifdef OPENDHT_PUSH_NOTIFICATIONS
285 PushType getTypeFromString(const std::string& type);
286 std::string getDefaultTopic(PushType type);
287
288 RequestStatus pingPush(restinio::request_handle_t request,
289 restinio::router::route_params_t /*params*/);
299 RequestStatus subscribe(restinio::request_handle_t request,
300 restinio::router::route_params_t params);
301
309 RequestStatus unsubscribe(restinio::request_handle_t request,
310 restinio::router::route_params_t params);
311
317 void sendPushNotification(const std::string& key, Json::Value&& json, PushType type, bool highPriority, const std::string& topic);
318
327 void handleNotifyPushListenExpire(const asio::error_code &ec, const std::string pushToken,
328 std::function<Json::Value()> json, PushType type, const std::string& topic);
329
337 void handleCancelPushListen(const asio::error_code &ec, const std::string pushToken,
338 const InfoHash key, const std::string clientId);
339
353 bool handlePushListen(const InfoHash& infoHash, const std::string& pushToken,
354 PushType type, const std::string& clientId,
355 const std::shared_ptr<DhtProxyServer::PushSessionContext>& sessionCtx, const std::string& topic,
356 const std::vector<std::shared_ptr<Value>>& values, bool expired);
357
358#endif //OPENDHT_PUSH_NOTIFICATIONS
359
360 void handlePrintStats(const asio::error_code &ec);
361 void updateStats();
362
363 template <typename Os>
364 void saveState(Os& stream);
365
366 template <typename Is>
367 void loadState(Is& is, size_t size);
368
369 using clock = std::chrono::steady_clock;
370 using time_point = clock::time_point;
371
372 std::shared_ptr<asio::io_context> ioContext_;
373 std::shared_ptr<DhtRunner> dht_;
374 Json::StreamWriterBuilder jsonBuilder_;
375 Json::CharReaderBuilder jsonReaderBuilder_;
376 std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};
377
378 std::string persistPath_;
379
380 // http server
381 std::thread serverThread_;
382 std::unique_ptr<restinio::http_server_t<RestRouterTraitsTls>> httpsServer_;
383 std::unique_ptr<restinio::http_server_t<RestRouterTraits>> httpServer_;
384
385 // http client
386 std::pair<std::string, std::string> pushHostPort_;
387
388 mutable std::mutex requestLock_;
389 std::map<unsigned int /*id*/, std::shared_ptr<http::Request>> requests_;
390
391 std::shared_ptr<log::Logger> logger_;
392
393 std::shared_ptr<ServerStats> stats_;
394 std::shared_ptr<NodeInfo> nodeInfo_ {};
395 std::unique_ptr<asio::steady_timer> printStatsTimer_;
396
397 // Thread-safe access to listeners map.
398 std::mutex lockListener_;
399 // Shared with connection listener.
400 std::map<restinio::connection_id_t, http::ListenerSession> listeners_;
401 // Connection Listener observing conn state changes.
402 std::shared_ptr<ConnectionListener> connListener_;
403 struct PermanentPut {
404 time_point expiration;
405 std::string pushToken;
406 std::string clientId;
407 std::shared_ptr<PushSessionContext> sessionCtx;
408 std::unique_ptr<asio::steady_timer> expireTimer;
409 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
410 Sp<Value> value;
411 PushType type;
412 std::string topic;
413
414 template <typename Packer>
415 void msgpack_pack(Packer& p) const
416 {
417 p.pack_map(2 + (sessionCtx ? 1 : 0) + (clientId.empty() ? 0 : 1) + (type == PushType::None ? 0 : 2) + (topic.empty() ? 0 : 1));
418 p.pack("value"); p.pack(value);
419 p.pack("exp"); p.pack(to_time_t(expiration));
420 if (not clientId.empty()) {
421 p.pack("cid"); p.pack(clientId);
422 }
423 if (sessionCtx) {
424 std::lock_guard<std::mutex> l(sessionCtx->lock);
425 p.pack("sid"); p.pack(sessionCtx->sessionId);
426 }
427 if (type != PushType::None) {
428 p.pack("t"); p.pack(type);
429 p.pack("token"); p.pack(pushToken);
430 }
431 if (not topic.empty()) {
432 p.pack("top"); p.pack(topic);
433 }
434 }
435
436 void msgpack_unpack(const msgpack::object& o);
437 };
438 struct SearchPuts {
439 std::map<dht::Value::Id, PermanentPut> puts;
440 MSGPACK_DEFINE_ARRAY(puts)
441 };
442 std::mutex lockSearchPuts_;
443 std::map<InfoHash, SearchPuts> puts_;
444
445 mutable std::atomic<size_t> requestNum_ {0};
446 mutable std::atomic<time_point> lastStatsReset_ {time_point::min()};
447
448 std::string pushServer_;
449 std::string bundleId_;
450
451#ifdef OPENDHT_PUSH_NOTIFICATIONS
452 struct Listener {
453 time_point expiration;
454 std::string clientId;
455 std::shared_ptr<PushSessionContext> sessionCtx;
456 std::future<size_t> internalToken;
457 std::unique_ptr<asio::steady_timer> expireTimer;
458 std::unique_ptr<asio::steady_timer> expireNotifyTimer;
459 PushType type;
460 std::string topic;
461
462 template <typename Packer>
463 void msgpack_pack(Packer& p) const
464 {
465 p.pack_map(3 + (sessionCtx ? 1 : 0) + (topic.empty() ? 0 : 1));
466 p.pack("cid"); p.pack(clientId);
467 p.pack("exp"); p.pack(to_time_t(expiration));
468 if (sessionCtx) {
469 std::lock_guard<std::mutex> l(sessionCtx->lock);
470 p.pack("sid"); p.pack(sessionCtx->sessionId);
471 }
472 p.pack("t"); p.pack(type);
473 if (!topic.empty()) {
474 p.pack("top"); p.pack(topic);
475 }
476 }
477
478 void msgpack_unpack(const msgpack::object& o);
479 };
480 struct PushListener {
481 std::map<InfoHash, std::vector<Listener>> listeners;
482 MSGPACK_DEFINE_ARRAY(listeners)
483 };
484 std::map<std::string, PushListener> pushListeners_;
485#endif //OPENDHT_PUSH_NOTIFICATIONS
486};
487
488}
DhtProxyServer(const std::shared_ptr< DhtRunner > &dht, const ProxyServerConfig &config={}, const std::shared_ptr< log::Logger > &logger={})
std::shared_ptr< NodeInfo > nodeInfo