ztsdb
msg_handler.hpp
1 // (C) 2016 Leonardo Silvestri
2 //
3 // This file is part of ztsdb.
4 //
5 // ztsdb is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // ztsdb is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with ztsdb. If not, see <http://www.gnu.org/licenses/>.
17 
18 
19 #ifndef MSG_HANDLER_HPP
20 #define MSG_HANDLER_HPP
21 
22 #include <string>
23 #include <unordered_map>
24 #include <memory>
25 #include <cstdint>
26 #include <poll.h>
27 #include "env.hpp"
28 #include "interp_ctx.hpp"
29 #include "stats.hpp"
30 #include "config.hpp"
31 #include "net_handler.hpp"
32 
33 
34 namespace zcore {
35 
39  struct MsgHandlerBase {
40  virtual Global::conn_id_t connect(const string& ip, int port, InterpCtx& ic) {
41  throw std::logic_error("sendReq not implemented");
42  }
43  virtual size_t sendReq(InterpCtx* reqCtx, Global::reqid_t reqid, Global::reqid_t sourceid,
44  val::VConn con, const E* e, const val::SpVList& boundvars) {
45  throw std::logic_error("sendReq not implemented");
46  }
47  virtual size_t sendRsp(Global::conn_id_t id, Global::reqid_t reqid, Global::reqid_t sourceid,
48  const val::Value& rsp) {
49  throw std::logic_error("sendReq not implemented");
50  }
51  inline virtual void suspendKeyboardPoll() {
52  throw std::logic_error("suspendKeyboardPoll not implemented");
53  }
54  inline virtual void enableKeyboardPoll() {
55  throw std::logic_error("enableKeyboardPoll not implemented");
56  }
57  inline virtual void addTimer(int fd, std::unique_ptr<InterpCtxTimer> ctx) {
58  throw std::logic_error("enableKeyboardPoll not implemented");
59  }
60  inline virtual void removeTimer(int fd) {
61  throw std::logic_error("enableKeyboardPoll not implemented");
62  }
63  virtual const NetStats& getNetStats() const {
64  throw std::logic_error("getNetStats not implemented");
65  }
66  virtual void resetNetStats() {
67  throw std::logic_error("resetNetStats not implemented");
68  }
69  virtual zcore::NetInfo getNetInfo() const {
70  throw std::logic_error("getNetInfo not implemented");
71  }
72  virtual zcore::MsgInfo getMsgInfo() const {
73  throw std::logic_error("getMsgInfo not implemented");
74  }
75 
76  const MsgStats& getMsgStats() { return stats; }
77  void resetMsgStats() { stats.reset(); }
78 
79  virtual ~MsgHandlerBase() { }
80 
81  protected:
82  MsgStats stats;
83  };
84 
85  // just the higher level protocol:
86  struct MsgHandler : public MsgHandlerBase {
88  interp::shpfrm& global_p,
89  int fd_read_data_p,
90  int fd_read_sig_p,
91  int fd_input_p=STDIN_FILENO,
92  bool once_p=false,
93  const std::string& initialCode="");
94 
95  int run();
96 
97  Global::conn_id_t connect(const string& ip, int port, InterpCtx& ic);
98  size_t sendReq(InterpCtx* reqCtx,
99  Global::reqid_t reqid,
100  Global::reqid_t sourceid,
101  val::VConn con,
102  const E* e,
103  const val::SpVList& boundvars);
104  size_t sendRsp(Global::conn_id_t id,
105  Global::reqid_t reqid,
106  Global::reqid_t sourceid,
107  const val::Value& rsp);
108 
109  static volatile sig_atomic_t waitingOnResp;
110 
111  void suspendKeyboardPoll();
112  void enableKeyboardPoll();
113 
114  void addTimer(int fd, std::unique_ptr<InterpCtxTimer> ctx);
115  void removeTimer(int fd);
116 
117  inline const InterpCtxLocal& getLocalCtx() const { return *localContext; }
118 
119  const NetStats& getNetStats() const;
120  void resetNetStats();
121  zcore::NetInfo getNetInfo() const;
122  zcore::MsgInfo getMsgInfo() const;
123 
124  private:
125  net::NetHandler& com;
126 
127  interp::shpfrm global;
128 
129  int fd_read_data;
130  int fd_read_sig;
131  int fd_input;
132  bool once;
133  unique_ptr<InterpCtxLocal> localContext;
134 
135  // each one of these needs to be garbage collected:
136  unordered_map<Global::conn_id_t, unique_ptr<InterpCtx>> reqContexts; // incoming requests
137  unordered_map<Global::conn_id_t, InterpCtx&> rspContexts; // incoming responses
138  unordered_map<int, unique_ptr<InterpCtxTimer>> timerContexts; // timers
139 
140  int epollfd;
141  epoll_event events[Global::EPOLL_MAX_EVENTS];
142  static const int EPOLL_TIMEOUT = 1000;
143  };
144 
145 }
146 
147 #endif
E
Definition: ast.hpp:76
arr::cow_ptr
Definition: cow_ptr.hpp:42
net::NetHandler
Definition: net_handler.hpp:187
zcore::InterpCtxLocal
Definition: interp_ctx.hpp:211
zcore::NetStats
Definition: stats.hpp:29
zcore::MsgHandler::run
int run()
Definition: msg_handler.cpp:144
zcore::MsgStats
Definition: stats.hpp:64
zcore::MsgInfo
Definition: info.hpp:46
zcore::InterpCtx
Definition: interp_ctx.hpp:129
val::VConn
Definition: valuevar.hpp:101
zcore::MsgHandler
Definition: msg_handler.hpp:86
zcore::MsgHandlerBase
Definition: msg_handler.hpp:39
zcore::NetInfo
Definition: info.hpp:39
zcore::MsgHandler::MsgHandler
MsgHandler(net::NetHandler &com_p, interp::shpfrm &global_p, int fd_read_data_p, int fd_read_sig_p, int fd_input_p=STDIN_FILENO, bool once_p=false, const std::string &initialCode="")
Definition: msg_handler.cpp:55