ztsdb
interp_ctx.hpp
1 // (C) 2016 Leonardo Silvestri
2 //
3 // This file is part of ztsdb.
4 //
5 // ztsdb is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // ztsdb is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with ztsdb. If not, see <http://www.gnu.org/licenses/>.
17 
18 
19 #ifndef INTERP_CTX_HPP
20 #define INTERP_CTX_HPP
21 
22 #include <unordered_map>
23 #include <cstdint>
24 #include <vector>
25 #include <stack>
26 #include <chrono>
27 #include <signal.h>
28 #include "env.hpp"
29 #include "interp.hpp"
30 #include "ast.hpp"
31 #include "valuevar.hpp"
32 #include "stats.hpp"
33 #include "info.hpp"
34 #include "encode.hpp"
35 
36 
37 namespace zcore {
38 
39  struct MsgHandlerBase;
40 
41  enum IncomingReqState {
42  CTX_REQ_IDLE,
43  CTX_REQ_CODE,
44  CTX_REQ_VALUE,
45  CTX_REQ_DONE,
46  };
47 
48  enum IncomingRspState {
49  CTX_RSP_IDLE,
50  CTX_RSP,
51  CTX_RSP_DONE
52  };
53 
59  struct ReqState {
60  ReqState(Global::reqid_t reqid_p, Global::reqid_t sourceid_p) :
61  state(CTX_REQ_IDLE), reqid(reqid_p), sourceid(sourceid_p),
62  codeLen(0), valstack_idx(0), timestamp(std::chrono::system_clock::now()) { }
63 
64  IncomingReqState state;
65  Global::reqid_t reqid, sourceid;
66  std::unique_ptr<const E> e;
67  size_t codeLen;
68  vector<char> codeBuf;
69  size_t valstack_idx;
70  vector<ValState> valstack;
71  std::chrono::system_clock::time_point timestamp;
72  void readData(const char* buf, size_t len);
74  };
75 
79  struct RspState {
80  RspState(const string& ip_p,
81  int port_p,
82  Global::conn_id_t peerid_p,
83  const string& s,
84  val::SpFuture& future_p) :
85  ip(ip_p), port(port_p), peerid(peerid_p), name(s), state(CTX_RSP_IDLE),
86  valstack_idx(0), timestamp(std::chrono::system_clock::now()), future(future_p) { }
87 
88  // the following 3 are only to be able to deliver a meaninful
89  // error message:
90  const string ip;
91  const int port;
92  const Global::conn_id_t peerid;
93 
94  const string name;
95  IncomingRspState state;
96  size_t valstack_idx;
97  vector<ValState> valstack;
98  std::chrono::system_clock::time_point timestamp;
99  val::SpFuture future;
101  void readData(const char* buf, size_t len);
102  };
103 
104  struct InterpState {
105  Global::reqid_t reqid;
106  Global::reqid_t sourceid;
107  Global::conn_id_t peerid;
108 
110  std::unique_ptr<const E> e;
111  shared_ptr<interp::Kont> k;
115  vector<interp::shpfrm> fstack;
116 
117  // RSP state, we can query multiple time per expression (before
118  // blocking on the evaluation of a future), so we need a map:
119  std::map<Global::reqid_t, RspState> responses;
120 
121  void popAndClearUntil(const shared_ptr<interp::BaseFrame> r);
122  };
123 
124  // there is one interp context per peer; 'r' is the evaluation
125  // environment for any incoming request from the peer, and so it is
126  // peer specific and is purged when a peer goes down; its ancestor
127  // is global, so a <<- global assignment will write to the top level
128  // (and global) server environment.
129  struct InterpCtx {
130 
131  InterpCtx(MsgHandlerBase& ir_p, interp::shpfrm& global) :
132  r(make_shared<interp::Frame>("working", global, global)), ir(ir_p), s(nullptr) { }
133 
134  std::shared_ptr<interp::BaseFrame> r;
135  MsgHandlerBase& ir;
136 
137  // May be multiple interleaved requests coming in:
138  std::map<Global::reqid_t, ReqState> requests;
139 
140 
141  // replace this by the reqid LLL, it will be less dangerous! LLL
142  Global::reqid_t currentState; // then we can call 's' the retrieved InterpState LLL
143  InterpState* s;
144 
145  size_t readRspData(Global::reqid_t reqid,
146  Global::reqid_t sourceid,
147  const char* buf,
148  size_t len);
149  ssize_t readReqData(Global::conn_id_t peerid,
150  Global::reqid_t reqid,
151  Global::reqid_t sourceid,
152  const char* buf,
153  size_t len);
155  ssize_t readAppendData(const char* buf, size_t len);
157  ssize_t readAppendVectorData(const char* buf, size_t len);
158 
160  Global::conn_id_t connect(const string& ip_p, int port_p);
161 
163  void addTimer(val::SpTimer& tmr);
164  virtual void removeTimer(val::SpTimer& tmr);
165 
166  void sendReq(const string& varName,
167  val::VConn con,
168  E* e,
169  const val::SpVList& vl,
170  val::SpFuture& future);
171  void setStop();
172 
173  virtual int interpret(InterpState& state, const val::Value* retval=nullptr) = 0;
174 
176  virtual void sendGcStateMessage(const string& ip,
177  int port,
178  Global::conn_id_t peerid,
179  const InterpState& state) = 0;
180 
182  void gc_requests(std::chrono::system_clock::time_point::duration ttl);
183  void gc_states(std::chrono::system_clock::time_point::duration ttl);
184 
186  void reset();
187 
189  const MsgStats& getMsgStats() const;
190  void resetMsgStats();
191  const NetStats& getNetStats() const;
192  void resetNetStats();
193  const CtxStats& getCtxStats() const;
194  void resetCtxStats();
195 
197  zcore::NetInfo getNetInfo() const;
198  zcore::MsgInfo getMsgInfo() const;
199  zcore::CtxInfo getCtxInfo() const;
200 
201  static volatile sig_atomic_t sigint;
202 
203  virtual ~InterpCtx();
204 
205  protected:
206  CtxStats stats;
207  std::map<Global::reqid_t, InterpState> states;
208  };
209 
210 
212  InterpCtxLocal(MsgHandlerBase& ir_p, interp::shpfrm& global) :
213  InterpCtx(ir_p, global) { }
214 
215  virtual int interpret(InterpState& state, const val::Value* retval=nullptr);
216  virtual void sendGcStateMessage(const string& ip,
217  int port,
218  Global::conn_id_t peerid,
219  const InterpState& state);
220 
221  int processReqData(Global::conn_id_t peerid, std::unique_ptr<const E> e);
222 
223  };
224 
225 
227  InterpCtxRemote(MsgHandlerBase& ir_p, interp::shpfrm& global) :
228  InterpCtx(ir_p, global) { }
229 
230  virtual int interpret(InterpState& state, const val::Value* retval=nullptr);
231  virtual void sendGcStateMessage(const string& ip,
232  int port,
233  Global::conn_id_t peerid,
234  const InterpState& state);
235  };
236 
238  InterpCtxTimer(val::SpTimer& timer_p, MsgHandlerBase& ir_p, interp::shpfrm& global) :
239  InterpCtx(ir_p, global), timer_wptr(timer_p) { timer_p->start(); }
240 
241  virtual void removeTimer(val::SpTimer& tmr);
242  virtual int interpret(InterpState& state, const val::Value* retval=nullptr);
243  virtual void sendGcStateMessage(const string& ip,
244  int port,
245  Global::conn_id_t peerid,
246  const InterpState& state);
247  int interpretTimer(int fd);
248  std::weak_ptr<val::VTimer> timer_wptr;
249  };
250 
251 }
252 
253 #endif
254 
255 // abbreviations: ir = InterpRun
256 // ic = InterpCtx
257 //
258 // UP (id) ir ic
259 // ---------------------> | create new ic w/ id |
260 // msg part 1/3 (id) | |
261 // ---------------------> | -----------------------> |
262 // msg part 2/3 (id) | |
263 // ---------------------> | -----------------------> |
264 // msg part 3/3 (id) | |
265 // ---------------------> | -----------------------> |
266 
E
Definition: ast.hpp:76
zcore::CtxStats
Definition: stats.hpp:96
zcore::CtxInfo
Definition: info.hpp:58
interp.hpp
zcore::RspState::timestamp
std::chrono::system_clock::time_point timestamp
Definition: interp_ctx.hpp:98
arr::cow_ptr
Definition: cow_ptr.hpp:42
zcore::ReqState
Definition: interp_ctx.hpp:59
zcore::InterpState
Definition: interp_ctx.hpp:104
zcore::InterpCtxRemote
Definition: interp_ctx.hpp:226
zcore::InterpState::e
std::unique_ptr< const E > e
The following 3 represent the state of the interpreter:
Definition: interp_ctx.hpp:110
zcore::InterpCtx::getNetInfo
zcore::NetInfo getNetInfo() const
Info:
Definition: interp_ctx.cpp:455
zcore::InterpState::peerid
Global::conn_id_t peerid
where to send response back
Definition: interp_ctx.hpp:107
zcore::InterpCtxRemote::interpret
virtual int interpret(InterpState &state, const val::Value *retval=nullptr)
pass the iterator to InterpState for easier deletion! LLL
Definition: interp_ctx.cpp:671
zcore::InterpCtxLocal
Definition: interp_ctx.hpp:211
zcore::NetStats
Definition: stats.hpp:29
zcore::ReqState::timestamp
std::chrono::system_clock::time_point timestamp
Definition: interp_ctx.hpp:71
zcore::InterpCtx::readRspData
size_t readRspData(Global::reqid_t reqid, Global::reqid_t sourceid, const char *buf, size_t len)
Definition: interp_ctx.cpp:130
zcore::InterpCtxTimer::interpret
virtual int interpret(InterpState &state, const val::Value *retval=nullptr)
pass the iterator to InterpState for easier deletion! LLL
Definition: interp_ctx.cpp:808
zcore::MsgStats
Definition: stats.hpp:64
zcore::MsgInfo
Definition: info.hpp:46
zcore::InterpCtx::addTimer
void addTimer(val::SpTimer &tmr)
Add a timer.
Definition: interp_ctx.cpp:404
zcore::InterpCtxLocal::sendGcStateMessage
virtual void sendGcStateMessage(const string &ip, int port, Global::conn_id_t peerid, const InterpState &state)
When deleting a state, sends or displays an error message.
Definition: interp_ctx.cpp:656
zcore::InterpCtx::readAppendData
ssize_t readAppendData(const char *buf, size_t len)
Read a buffer containing an array to append.
Definition: interp_ctx.cpp:223
zcore::InterpCtx
Definition: interp_ctx.hpp:129
val::VConn
Definition: valuevar.hpp:101
zcore::InterpCtxTimer
Definition: interp_ctx.hpp:237
zcore::InterpState::sourceid
Global::reqid_t sourceid
on behalf of who we are interpreting
Definition: interp_ctx.hpp:106
zcore::MsgHandlerBase
Definition: msg_handler.hpp:39
state
Definition: tz.hpp:51
zcore::InterpCtx::sendGcStateMessage
virtual void sendGcStateMessage(const string &ip, int port, Global::conn_id_t peerid, const InterpState &state)=0
When deleting a state, sends or displays an error message.
zcore::InterpState::reqid
Global::reqid_t reqid
who we are
Definition: interp_ctx.hpp:105
zcore::InterpCtx::connect
Global::conn_id_t connect(const string &ip_p, int port_p)
Establish connection to given ip/port.
Definition: interp_ctx.cpp:399
zcore::InterpCtx::getMsgStats
const MsgStats & getMsgStats() const
Stats:
Definition: interp_ctx.cpp:446
zcore::NetInfo
Definition: info.hpp:39
zcore::RspState
Definition: interp_ctx.hpp:79
zcore::InterpCtx::readAppendVectorData
ssize_t readAppendVectorData(const char *buf, size_t len)
Read a buffer containing a vector to append.
Definition: interp_ctx.cpp:272
zcore::InterpCtx::removeTimer
virtual void removeTimer(val::SpTimer &tmr)
Definition: interp_ctx.cpp:409
zcore::InterpCtxRemote::sendGcStateMessage
virtual void sendGcStateMessage(const string &ip, int port, Global::conn_id_t peerid, const InterpState &state)
When deleting a state, sends or displays an error message.
Definition: interp_ctx.cpp:777
zcore::InterpCtx::reset
void reset()
Terminate all pending requests and reset the interpreter state.
Definition: interp_ctx.cpp:434
zcore::InterpCtxTimer::sendGcStateMessage
virtual void sendGcStateMessage(const string &ip, int port, Global::conn_id_t peerid, const InterpState &state)
When deleting a state, sends or displays an error message.
Definition: interp_ctx.cpp:883