22 #include <netinet/in.h>
25 #include <unordered_map>
30 #include <boost/circular_buffer.hpp>
31 #include <sys/epoll.h>
82 const unsigned INIT_OFFSET = 16;
84 using namespace std::chrono;
85 using namespace std::literals;
86 using time_point = std::chrono::system_clock::time_point;
90 Buf() : id(0), len(0), offset(0), timestamp(0s), data(
nullptr) { }
91 Buf(Global::conn_id_t id_p,
size_t len_p) : id(id_p), len(len_p), offset(0),
92 timestamp(std::chrono::system_clock::now()),
93 data(std::make_unique<
char[]>(len_p)) { }
100 std::unique_ptr<char[]> data;
105 enum Direction { INCOMING, OUTGOING };
107 Connection(Global::conn_id_t id_p,
int fd_p,
const sockaddr_in& addr_p, Direction dir_p) :
108 id(id_p), fd(fd_p), addr(addr_p), dir(dir_p) { }
110 Global::conn_id_t id;
118 Global::conn_id_t createConnection(
int fd,
const sockaddr_in& addr, Connection::Direction dir);
119 const Connection getConnection(Global::conn_id_t
id);
120 Global::conn_id_t getId(
int fd);
121 int getFd(Global::conn_id_t
id);
122 void deleteConnection(Global::conn_id_t
id);
124 std::map<Global::conn_id_t, Connection> getAllConnections()
const;
128 std::map<int, Global::conn_id_t> fdToId;
129 std::map<Global::conn_id_t, Connection> connections;
130 mutable std::mutex mx;
136 : max_size(max_size_p), stats(stats_p) { }
139 void addBufferToReadylist(Global::conn_id_t
id,
Buf&& b);
140 bool get_data(Global::conn_id_t&
id,
Buf& buf);
149 void removeAllBuffersForId(Global::conn_id_t
id);
153 std::unordered_map<Global::conn_id_t, Buf>
bufmap;
155 size_t getReadyBufSz()
const;
159 std::deque<std::pair<Global::conn_id_t, Buf>> readybuflist;
160 mutable std::mutex mx;
162 const size_t max_size;
168 SignallingMgt(
size_t max_size_p=10) : max_size(max_size_p) { }
170 enum Status { UP, DOWN };
172 void add_sig(Global::conn_id_t
id, Status st);
173 bool get_sig(Global::conn_id_t&
id, Status& st);
175 size_t getListSz()
const;
178 const size_t max_size;
179 std::deque<std::pair<Global::conn_id_t, Status>> l;
180 mutable std::mutex mx;
185 using namespace std::literals;
192 int signalling_out_fd,
193 size_t datalist_max_size=1e5,
194 size_t siglist_max_size=10);
198 void run(
volatile bool& stop);
200 ssize_t send(Global::conn_id_t
id,
char* buf,
size_t len);
202 bool get_data(Global::conn_id_t&
id,
Buf& buf) {
return bufmgt.get_data(
id, buf); }
203 bool get_sig(Global::conn_id_t&
id, SignallingMgt::Status& st) {
return sigmgt.get_sig(
id, st); }
207 Global::conn_id_t connect(
const std::string& ip,
int port);
210 void disconnect(Global::conn_id_t
id);
212 static constexpr time_point::duration BUF_TIME_TO_LIVE = 60s;
215 inline void resetNetStats() { stats.reset(); }
219 std::atomic_uint_fast64_t ready;
221 void acceptConnection();
222 ssize_t read(Global::conn_id_t
id,
int fd,
char* buf,
size_t n);
234 const int data_out_fd;
235 const int sig_out_fd;
249 static const int EPOLL_TIMEOUT = 1000;