 |
ztsdb
|
19 #ifndef INTERP_CTX_HPP
20 #define INTERP_CTX_HPP
22 #include <unordered_map>
31 #include "valuevar.hpp"
39 struct MsgHandlerBase;
41 enum IncomingReqState {
48 enum IncomingRspState {
60 ReqState(Global::reqid_t reqid_p, Global::reqid_t sourceid_p) :
61 state(CTX_REQ_IDLE), reqid(reqid_p), sourceid(sourceid_p),
62 codeLen(0), valstack_idx(0),
timestamp(std::chrono::system_clock::now()) { }
64 IncomingReqState
state;
65 Global::reqid_t reqid, sourceid;
66 std::unique_ptr<const E> e;
70 vector<ValState> valstack;
72 void readData(
const char* buf,
size_t len);
82 Global::conn_id_t peerid_p,
84 val::SpFuture& future_p) :
85 ip(ip_p), port(port_p), peerid(peerid_p), name(s),
state(CTX_RSP_IDLE),
86 valstack_idx(0),
timestamp(std::chrono::system_clock::now()), future(future_p) { }
92 const Global::conn_id_t peerid;
95 IncomingRspState
state;
97 vector<ValState> valstack;
101 void readData(
const char* buf,
size_t len);
110 std::unique_ptr<const E>
e;
111 shared_ptr<interp::Kont> k;
115 vector<interp::shpfrm> fstack;
119 std::map<Global::reqid_t, RspState> responses;
121 void popAndClearUntil(
const shared_ptr<interp::BaseFrame> r);
132 r(make_shared<interp::Frame>(
"working", global, global)), ir(ir_p), s(
nullptr) { }
134 std::shared_ptr<interp::BaseFrame> r;
138 std::map<Global::reqid_t, ReqState> requests;
142 Global::reqid_t currentState;
146 Global::reqid_t sourceid,
149 ssize_t readReqData(Global::conn_id_t peerid,
150 Global::reqid_t reqid,
151 Global::reqid_t sourceid,
160 Global::conn_id_t
connect(
const string& ip_p,
int port_p);
166 void sendReq(
const string& varName,
170 val::SpFuture& future);
173 virtual int interpret(
InterpState&
state,
const val::Value* retval=
nullptr) = 0;
178 Global::conn_id_t peerid,
182 void gc_requests(std::chrono::system_clock::time_point::duration ttl);
183 void gc_states(std::chrono::system_clock::time_point::duration ttl);
190 void resetMsgStats();
191 const NetStats& getNetStats()
const;
192 void resetNetStats();
193 const CtxStats& getCtxStats()
const;
194 void resetCtxStats();
201 static volatile sig_atomic_t sigint;
207 std::map<Global::reqid_t, InterpState> states;
215 virtual int interpret(
InterpState&
state,
const val::Value* retval=
nullptr);
218 Global::conn_id_t peerid,
221 int processReqData(Global::conn_id_t peerid, std::unique_ptr<const E> e);
233 Global::conn_id_t peerid,
239 InterpCtx(ir_p, global), timer_wptr(timer_p) { timer_p->start(); }
241 virtual void removeTimer(val::SpTimer& tmr);
245 Global::conn_id_t peerid,
247 int interpretTimer(
int fd);
248 std::weak_ptr<val::VTimer> timer_wptr;
std::chrono::system_clock::time_point timestamp
Definition: interp_ctx.hpp:98
Definition: cow_ptr.hpp:42
Definition: interp_ctx.hpp:59
Definition: interp_ctx.hpp:104
Definition: interp_ctx.hpp:226
std::unique_ptr< const E > e
The following 3 represent the state of the interpreter:
Definition: interp_ctx.hpp:110
zcore::NetInfo getNetInfo() const
Info:
Definition: interp_ctx.cpp:455
Global::conn_id_t peerid
where to send response back
Definition: interp_ctx.hpp:107
virtual int interpret(InterpState &state, const val::Value *retval=nullptr)
pass the iterator to InterpState for easier deletion! LLL
Definition: interp_ctx.cpp:671
Definition: interp_ctx.hpp:211
std::chrono::system_clock::time_point timestamp
Definition: interp_ctx.hpp:71
size_t readRspData(Global::reqid_t reqid, Global::reqid_t sourceid, const char *buf, size_t len)
Definition: interp_ctx.cpp:130
virtual int interpret(InterpState &state, const val::Value *retval=nullptr)
pass the iterator to InterpState for easier deletion! LLL
Definition: interp_ctx.cpp:808
void addTimer(val::SpTimer &tmr)
Add a timer.
Definition: interp_ctx.cpp:404
virtual void sendGcStateMessage(const string &ip, int port, Global::conn_id_t peerid, const InterpState &state)
When deleting a state, sends or displays an error message.
Definition: interp_ctx.cpp:656
ssize_t readAppendData(const char *buf, size_t len)
Read a buffer containing an array to append.
Definition: interp_ctx.cpp:223
Definition: interp_ctx.hpp:129
Definition: valuevar.hpp:101
Definition: interp_ctx.hpp:237
Global::reqid_t sourceid
on behalf of who we are interpreting
Definition: interp_ctx.hpp:106
Definition: msg_handler.hpp:39
virtual void sendGcStateMessage(const string &ip, int port, Global::conn_id_t peerid, const InterpState &state)=0
When deleting a state, sends or displays an error message.
Global::reqid_t reqid
who we are
Definition: interp_ctx.hpp:105
Global::conn_id_t connect(const string &ip_p, int port_p)
Establish connection to given ip/port.
Definition: interp_ctx.cpp:399
const MsgStats & getMsgStats() const
Stats:
Definition: interp_ctx.cpp:446
Definition: interp_ctx.hpp:79
ssize_t readAppendVectorData(const char *buf, size_t len)
Read a buffer containing a vector to append.
Definition: interp_ctx.cpp:272
virtual void removeTimer(val::SpTimer &tmr)
Definition: interp_ctx.cpp:409
virtual void sendGcStateMessage(const string &ip, int port, Global::conn_id_t peerid, const InterpState &state)
When deleting a state, sends or displays an error message.
Definition: interp_ctx.cpp:777
void reset()
Terminate all pending requests and reset the interpreter state.
Definition: interp_ctx.cpp:434
virtual void sendGcStateMessage(const string &ip, int port, Global::conn_id_t peerid, const InterpState &state)
When deleting a state, sends or displays an error message.
Definition: interp_ctx.cpp:883