My Project 3.2.0
C++ Distributed Hash Table
Loading...
Searching...
No Matches
dht_proxy_client.h
1/*
2 * Copyright (C) 2014-2023 Savoir-faire Linux Inc.
3 * Author: Sébastien Blin <sebastien.blin@savoirfairelinux.com>
4 * Adrien Béraud <adrien.beraud@savoirfairelinux.com>
5 * Vsevolod Ivanov <vsevolod.ivanov@savoirfairelinux.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <https://www.gnu.org/licenses/>.
19 */
20
21#pragma once
22
23#include <functional>
24#include <mutex>
25
26#include "callbacks.h"
27#include "def.h"
28#include "dht_interface.h"
29#include "proxy.h"
30#include "http.h"
31
32#include <restinio/all.hpp>
33#include <json/json.h>
34
35#include <chrono>
36#include <vector>
37#include <functional>
38
39namespace Json {
40class Value;
41}
42
43namespace http {
44class Resolver;
45class Request;
46}
47
48namespace dht {
49
50class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
51public:
52
53 DhtProxyClient();
54
55 explicit DhtProxyClient(
56 std::shared_ptr<crypto::Certificate> serverCA, crypto::Identity clientIdentity,
57 std::function<void()> loopSignal, const std::string& serverHost,
58 const std::string& pushClientId = "", std::shared_ptr<Logger> logger = {});
59
60 void setHeaderFields(http::Request& request);
61
62 virtual void setPushNotificationToken(const std::string& token) override {
63#ifdef OPENDHT_PUSH_NOTIFICATIONS
64 deviceKey_ = token;
65#else
66 (void) token;
67#endif
68 }
69
70 virtual void setPushNotificationTopic(const std::string& topic) override {
71#ifdef OPENDHT_PUSH_NOTIFICATIONS
72 notificationTopic_ = topic;
73#else
74 (void) topic;
75#endif
76 }
77
78 virtual void setPushNotificationPlatform(const std::string& platform) override {
79#ifdef OPENDHT_PUSH_NOTIFICATIONS
80 platform_ = platform;
81#else
82 (void) platform;
83#endif
84 }
85
86 virtual ~DhtProxyClient();
87
91 inline const InfoHash& getNodeId() const override { return myid; }
92 void setOnPublicAddressChanged(PublicAddressChangedCb cb) override {
93 publicAddressChangedCb_ = std::move(cb);
94 }
95
99 NodeStatus getStatus(sa_family_t af) const override;
100 NodeStatus getStatus() const override {
101 return std::max(getStatus(AF_INET), getStatus(AF_INET6));
102 }
103
107 void shutdown(ShutdownCallback cb, bool) override;
108
115 bool isRunning(sa_family_t af = 0) const override;
116
127 virtual void get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override;
128 virtual void get(const InfoHash& key, GetCallback cb, DoneCallbackSimple donecb={}, Value::Filter&& f={}, Where&& w = {}) override {
129 get(key, cb, bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
130 }
131 virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallback donecb={}, Value::Filter&& f={}, Where&& w = {}) override {
132 get(key, bindGetCb(cb), std::move(donecb), std::forward<Value::Filter>(f), std::forward<Where>(w));
133 }
134 virtual void get(const InfoHash& key, GetCallbackSimple cb, DoneCallbackSimple donecb, Value::Filter&& f={}, Where&& w = {}) override {
135 get(key, bindGetCb(cb), bindDoneCb(std::move(donecb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
136 }
137
145 void put(const InfoHash& key,
146 Sp<Value>,
147 DoneCallback cb=nullptr,
148 time_point created=time_point::max(),
149 bool permanent = false) override;
150 void put(const InfoHash& key,
151 const Sp<Value>& v,
152 DoneCallbackSimple cb,
153 time_point created=time_point::max(),
154 bool permanent = false) override
155 {
156 put(key, v, bindDoneCb(std::move(cb)), created, permanent);
157 }
158
159 void put(const InfoHash& key,
160 Value&& v,
161 DoneCallback cb=nullptr,
162 time_point created=time_point::max(),
163 bool permanent = false) override
164 {
165 put(key, std::make_shared<Value>(std::move(v)), std::move(cb), created, permanent);
166 }
167 void put(const InfoHash& key,
168 Value&& v,
169 DoneCallbackSimple cb,
170 time_point created=time_point::max(),
171 bool permanent = false) override
172 {
173 put(key, std::forward<Value>(v), bindDoneCb(std::move(cb)), created, permanent);
174 }
175
180 NodeStats getNodesStats(sa_family_t af) const override;
181
186 std::vector<SockAddr> getPublicAddress(sa_family_t family = 0) override;
187
195 virtual size_t listen(const InfoHash&, ValueCallback, Value::Filter={}, Where={}) override;
196
197 virtual size_t listen(const InfoHash& key, GetCallback cb, Value::Filter f={}, Where w={}) override {
198 return listen(key, [cb=std::move(cb)](const std::vector<Sp<Value>>& vals, bool expired){
199 if (not expired)
200 return cb(vals);
201 return true;
202 }, std::forward<Value::Filter>(f), std::forward<Where>(w));
203 }
204 virtual size_t listen(const InfoHash& key, GetCallbackSimple cb, Value::Filter f={}, Where w={}) override {
205 return listen(key, bindGetCb(std::move(cb)), std::forward<Value::Filter>(f), std::forward<Where>(w));
206 }
207 /*
208 * This function relies on the cache implementation.
209 * It means that there are no true cancel here, it keeps the caching in higher priority.
210 */
211 virtual bool cancelListen(const InfoHash& key, size_t token) override;
212
217 void pushNotificationReceived(const std::map<std::string, std::string>& notification) override;
218
219 time_point periodic(const uint8_t*, size_t, SockAddr, const time_point& now) override;
220 time_point periodic(const uint8_t* buf, size_t buflen, const sockaddr* from, socklen_t fromlen, const time_point& now) override {
221 return periodic(buf, buflen, SockAddr(from, fromlen), now);
222 }
223
234 virtual void query(const InfoHash& /*key*/, QueryCallback /*cb*/, DoneCallback /*done_cb*/ = {}, Query&& /*q*/ = {}) override { }
235 virtual void query(const InfoHash& key, QueryCallback cb, DoneCallbackSimple done_cb = {}, Query&& q = {}) override {
236 query(key, cb, bindDoneCb(std::move(done_cb)), std::forward<Query>(q));
237 }
238
242 std::vector<Sp<Value>> getPut(const InfoHash&) const override;
243
247 Sp<Value> getPut(const InfoHash&, const Value::Id&) const override;
248
253 bool cancelPut(const InfoHash&, const Value::Id&) override;
254
255 void pingNode(SockAddr, DoneCallbackSimple&& /*cb*/={}) override { }
256
257 virtual void registerType(const ValueType& type) override {
258 types.registerType(type);
259 }
260 const ValueType& getType(ValueType::Id type_id) const override {
261 return types.getType(type_id);
262 }
263
264 std::vector<Sp<Value>> getLocal(const InfoHash& k, const Value::Filter& filter) const override;
265 Sp<Value> getLocalById(const InfoHash& k, Value::Id id) const override;
266
271 void insertNode(const InfoHash&, const SockAddr&) override { }
272 void insertNode(const NodeExport&) override { }
273 std::pair<size_t, size_t> getStoreSize() const override { return {}; }
274 std::vector<NodeExport> exportNodes() const override { return {}; }
275 std::vector<ValuesExport> exportValues() const override { return {}; }
276 void importValues(const std::vector<ValuesExport>&) override {}
277 std::string getStorageLog() const override { return {}; }
278 std::string getStorageLog(const InfoHash&) const override { return {}; }
279 std::string getRoutingTablesLog(sa_family_t) const override { return {}; }
280 std::string getSearchesLog(sa_family_t) const override { return {}; }
281 std::string getSearchLog(const InfoHash&, sa_family_t) const override { return {}; }
282 void dumpTables() const override {}
283 std::vector<unsigned> getNodeMessageStats(bool) override { return {}; }
284 void setStorageLimit(size_t) override {}
285 virtual size_t getStorageLimit() const override { return 0; }
286 void connectivityChanged(sa_family_t) override {
287 getProxyInfos();
288 }
289 void connectivityChanged() override {
290 getProxyInfos();
291 loopSignal_();
292 }
293
294 void listenKeepIdle(uint32_t seconds) {
295 listenKeepIdle_ = seconds;
296 }
297 inline uint32_t listenKeepIdle() { return listenKeepIdle_; }
298
299private:
303 void startProxy();
304 void stop();
305
310 struct InfoState;
311 void getProxyInfos();
312 void queryProxyInfo(const std::shared_ptr<InfoState>& infoState, const std::shared_ptr<http::Resolver>& resolver, sa_family_t family);
313 void onProxyInfos(const Json::Value& val, const sa_family_t family);
314 SockAddr parsePublicAddress(const Json::Value& val);
315
316 void opFailed();
317
318 void handleExpireListener(const asio::error_code &ec, const InfoHash& key);
319
320 struct Listener;
321 struct OperationState;
322 enum class ListenMethod {
323 LISTEN,
324 SUBSCRIBE,
325 RESUBSCRIBE,
326 };
327 using CacheValueCallback = std::function<bool(const std::vector<std::shared_ptr<Value>>& values, bool expired, system_clock::time_point)>;
328
332 void sendListen(const restinio::http_request_header_t& header, const CacheValueCallback& cb,
333 const Sp<OperationState>& opstate, Listener& listener, ListenMethod method = ListenMethod::LISTEN);
334 void handleResubscribe(const asio::error_code& ec, const InfoHash& key,
335 const size_t token, std::shared_ptr<OperationState> opstate);
336
337 void doPut(const InfoHash&, Sp<Value>, DoneCallbackSimple, time_point created, bool permanent);
338 void handleRefreshPut(const asio::error_code& ec, InfoHash key, Value::Id id);
339
343 void getConnectivityStatus();
347 void cancelAllListeners();
348
349 std::atomic_bool isDestroying_ {false};
350
351 std::string proxyUrl_;
352 dht::crypto::Identity clientIdentity_;
353 std::shared_ptr<dht::crypto::Certificate> serverCertificate_;
354 //std::pair<std::string, std::string> serverHostService_;
355 std::string pushClientId_;
356 std::string pushSessionId_;
357
358 mutable std::mutex lockCurrentProxyInfos_;
359 NodeStatus statusIpv4_ {NodeStatus::Disconnected};
360 NodeStatus statusIpv6_ {NodeStatus::Disconnected};
361 NodeStats stats4_ {};
362 NodeStats stats6_ {};
363 SockAddr localAddrv4_;
364 SockAddr localAddrv6_;
365 SockAddr publicAddressV4_;
366 SockAddr publicAddressV6_;
367 std::atomic_bool launchConnectedCbs_ {false};
368 PublicAddressChangedCb publicAddressChangedCb_ {};
369
370 InfoHash myid {};
371
372 // registred types
373 TypeStore types;
374
375 /*
376 * ASIO I/O Context for sockets in httpClient_
377 * Note: Each context is used in one thread only
378 */
379 asio::io_context httpContext_;
380 mutable std::mutex resolverLock_;
381 std::shared_ptr<http::Resolver> resolver_;
382
383 mutable std::mutex requestLock_;
384 std::map<unsigned, std::shared_ptr<http::Request>> requests_;
385 /*
386 * Thread for executing the http io_context.run() blocking call
387 */
388 std::thread httpClientThread_;
389
393 struct ProxySearch;
394
395 mutable std::mutex searchLock_;
396 size_t listenerToken_ {0};
397 std::map<InfoHash, ProxySearch> searches_;
398
402 uint32_t listenKeepIdle_ {120};
403
407 std::mutex lockCallbacks_;
408 std::vector<std::function<void()>> callbacks_;
409
410 Sp<InfoState> infoState_;
411
415 void handleProxyConfirm(const asio::error_code &ec);
416 std::unique_ptr<asio::steady_timer> nextProxyConfirmationTimer_;
417 std::unique_ptr<asio::steady_timer> listenerRestartTimer_;
418
422 void restartListeners(const asio::error_code &ec);
423
428 void resubscribe(const InfoHash& key, const size_t token, Listener& listener);
429
434 std::string deviceKey_ {};
435
439 std::string notificationTopic_ {};
440
444 std::string platform_
445#ifdef __ANDROID__
446 {"android"};
447#else
448#ifdef __APPLE__
449 {"ios"};
450#else
451 {};
452#endif
453#endif
454
455 const std::function<void()> loopSignal_;
456
457#ifdef OPENDHT_PUSH_NOTIFICATIONS
458 std::string fillBody(bool resubscribe);
459 void getPushRequest(Json::Value&) const;
460#endif // OPENDHT_PUSH_NOTIFICATIONS
461
462 Json::StreamWriterBuilder jsonBuilder_;
463 std::unique_ptr<Json::CharReader> jsonReader_;
464
465 std::shared_ptr<http::Request> buildRequest(const std::string& target = {});
466};
467
468}
bool cancelPut(const InfoHash &, const Value::Id &) override
void insertNode(const InfoHash &, const SockAddr &) override
NodeStatus getStatus(sa_family_t af) const override
Sp< Value > getPut(const InfoHash &, const Value::Id &) const override
virtual void query(const InfoHash &, QueryCallback, DoneCallback={}, Query &&={}) override
std::vector< Sp< Value > > getPut(const InfoHash &) const override
const InfoHash & getNodeId() const override
virtual void get(const InfoHash &key, GetCallback cb, DoneCallback donecb={}, Value::Filter &&f={}, Where &&w={}) override
virtual size_t listen(const InfoHash &, ValueCallback, Value::Filter={}, Where={}) override
bool isRunning(sa_family_t af=0) const override
Sp< Value > getLocalById(const InfoHash &k, Value::Id id) const override
std::pair< size_t, size_t > getStoreSize() const override
void setStorageLimit(size_t) override
std::vector< NodeExport > exportNodes() const override
std::vector< Sp< Value > > getLocal(const InfoHash &k, const Value::Filter &filter) const override
std::vector< SockAddr > getPublicAddress(sa_family_t family=0) override
void connectivityChanged(sa_family_t) override
void put(const InfoHash &key, Sp< Value >, DoneCallback cb=nullptr, time_point created=time_point::max(), bool permanent=false) override
virtual size_t listen(const InfoHash &key, GetCallback cb, Value::Filter f={}, Where w={}) override
NodeStats getNodesStats(sa_family_t af) const override
void shutdown(ShutdownCallback cb, bool) override
void pushNotificationReceived(const std::map< std::string, std::string > &notification) override
NodeStatus
Definition callbacks.h:42
Describes a query destined to another peer.
Definition value.h:924
Serializable dht::Value filter.
Definition value.h:797