ztsdb
net_handler.hpp
1 // (C) 2016 Leonardo Silvestri
2 //
3 // This file is part of ztsdb.
4 //
5 // ztsdb is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // ztsdb is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with ztsdb. If not, see <http://www.gnu.org/licenses/>.
17 
18 
19 #ifndef COMM_HPP
20 #define COMM_HPP
21 
22 #include <netinet/in.h>
23 #include <deque>
24 #include <map>
25 #include <unordered_map>
26 #include <utility>
27 #include <mutex>
28 #include <chrono>
29 #include <memory>
30 #include <boost/circular_buffer.hpp>
31 #include <sys/epoll.h>
32 #include "misc.hpp"
33 #include "stats.hpp"
34 #include "info.hpp"
35 #include "config.hpp"
36 
37 
38 
40 namespace net {
41 
42  // Message format (number are bytes):
43  //
44 
45  // | MAGIC_NB | SIZE | TYPE |
46  // |----------|----------|----------|
47  // | 8 | 8 | 8 |
48  //
49  // When type REQ
50  //
51  // When type RSP
52  //
53  // When type APPEND
54  // | var name length |
55  // | var names string + padding |
56  // | dim length |
57  // | dim 0 |
58  // | ... |
59  // | v0 data type |
60  // | v0 data |
61  // | ... |
62  // | v1 data type |
63  // | v1 data |
64  // | ... |
65  //
66  // all above can be memcopied in!
67 
68 
69  // When type ZTS
70  // | var name length |
71  // | var names string + padding |
72  // | length time
73  // | time |
74  // | length |
75  // | ... |
76  // | data |
77  // | ... |
78  //
79  // length must be a multiple of (length time)* (Mul 2..n of dims)
80 
81 
82  const unsigned INIT_OFFSET = 16; // the magic number, followed by size
83 
84  using namespace std::chrono;
85  using namespace std::literals;
86  using time_point = std::chrono::system_clock::time_point;
87 
88  struct Buf {
89 
90  Buf() : id(0), len(0), offset(0), timestamp(0s), data(nullptr) { }
91  Buf(Global::conn_id_t id_p, size_t len_p) : id(id_p), len(len_p), offset(0),
92  timestamp(std::chrono::system_clock::now()),
93  data(std::make_unique<char[]>(len_p)) { }
94 
95 
96  Global::conn_id_t id; // owner of the buffer
97  size_t len;
98  size_t offset; // usable buffer has offset=len
99  time_point timestamp; // can't allow buffers to live forever
100  std::unique_ptr<char[]> data;
101  };
102 
103 
104  struct Connection {
105  enum Direction { INCOMING, OUTGOING };
106 
107  Connection(Global::conn_id_t id_p, int fd_p, const sockaddr_in& addr_p, Direction dir_p) :
108  id(id_p), fd(fd_p), addr(addr_p), dir(dir_p) { }
109 
110  Global::conn_id_t id;
111  int fd;
112  sockaddr_in addr; // contains both IP address and port
113  Direction dir;
114  };
115 
117 
118  Global::conn_id_t createConnection(int fd, const sockaddr_in& addr, Connection::Direction dir);
119  const Connection getConnection(Global::conn_id_t id);
120  Global::conn_id_t getId(int fd);
121  int getFd(Global::conn_id_t id);
122  void deleteConnection(Global::conn_id_t id);
123 
124  std::map<Global::conn_id_t, Connection> getAllConnections() const;
125 
126  private:
128  std::map<int, Global::conn_id_t> fdToId;
129  std::map<Global::conn_id_t, Connection> connections;
130  mutable std::mutex mx;
131  };
132 
133 
134  struct BufferMgt {
135  BufferMgt(size_t max_size_p, zcore::NetStats& stats_p)
136  : max_size(max_size_p), stats(stats_p) { }
137 
139  void addBufferToReadylist(Global::conn_id_t id, Buf&& b);
140  bool get_data(Global::conn_id_t& id, Buf& buf);
141 
145  size_t gc();
146 
149  void removeAllBuffersForId(Global::conn_id_t id);
150 
153  std::unordered_map<Global::conn_id_t, Buf> bufmap;
154 
155  size_t getReadyBufSz() const;
156 
157  private:
159  std::deque<std::pair<Global::conn_id_t, Buf>> readybuflist;
160  mutable std::mutex mx; // only need to protect 'readybuflist'
161 
162  const size_t max_size;
163  zcore::NetStats& stats;
164  };
165 
166 
167  struct SignallingMgt {
168  SignallingMgt(size_t max_size_p=10) : max_size(max_size_p) { }
169 
170  enum Status { UP, DOWN };
171 
172  void add_sig(Global::conn_id_t id, Status st);
173  bool get_sig(Global::conn_id_t& id, Status& st);
174 
175  size_t getListSz() const;
176 
177  private:
178  const size_t max_size;
179  std::deque<std::pair<Global::conn_id_t, Status>> l;
180  mutable std::mutex mx;
181  };
182 
183 
184 
185  using namespace std::literals;
186 
187  struct NetHandler {
188 
189  NetHandler(const std::string ip_addr,
190  int port_p,
191  int data_out_fd,
192  int signalling_out_fd,
193  size_t datalist_max_size=1e5,
194  size_t siglist_max_size=10);
195 
196  ~NetHandler();
197 
198  void run(volatile bool& stop);
199 
200  ssize_t send(Global::conn_id_t id, char* buf, size_t len);
201 
202  bool get_data(Global::conn_id_t& id, Buf& buf) { return bufmgt.get_data(id, buf); }
203  bool get_sig(Global::conn_id_t& id, SignallingMgt::Status& st) { return sigmgt.get_sig(id, st); }
204 
207  Global::conn_id_t connect(const std::string& ip, int port);
210  void disconnect(Global::conn_id_t id);
211 
212  static constexpr time_point::duration BUF_TIME_TO_LIVE = 60s;
213 
214  inline const zcore::NetStats& getNetStats() { return stats; }
215  inline void resetNetStats() { stats.reset(); }
216 
217  zcore::NetInfo getNetInfo() const;
218 
219  std::atomic_uint_fast64_t ready;
220  private:
221  void acceptConnection();
222  ssize_t read(Global::conn_id_t id, int fd, char* buf, size_t n);
223 
224  int fd; // TCP listen sock
225  int port; // local listen port
226 
227  sockaddr_in addr; // local address
228 
234  const int data_out_fd;
235  const int sig_out_fd;
236 
237  ConnectionMappings connMappings;
238 
239  BufferMgt bufmgt;
240  SignallingMgt sigmgt;
241 
242  zcore::NetStats stats;
243 
244  std::mutex mx; // global mutex
245 
246  int epollfd;
247  epoll_event ev;
248 
249  static const int EPOLL_TIMEOUT = 1000;
250  };
251 
252 }
253 
254 
255 
256 #endif
net::Connection
Definition: net_handler.hpp:104
net::Buf
Definition: net_handler.hpp:88
net
Low level network communication: TCP connection management and buffering.
Definition: net_handler.hpp:40
net::SignallingMgt
Definition: net_handler.hpp:167
net::BufferMgt
Definition: net_handler.hpp:134
net::ConnectionMappings
Definition: net_handler.hpp:116
net::NetHandler
Definition: net_handler.hpp:187
zcore::NetStats
Definition: stats.hpp:29
zcore::NetInfo
Definition: info.hpp:39
net::BufferMgt::bufmap
std::unordered_map< Global::conn_id_t, Buf > bufmap
Definition: net_handler.hpp:153