41 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
50 WSAStartup (MAKEWORD (2, 2), &data);
69 const std::string& name_)
77 DL((
REACT,
"TIMEOUT_EVENT......: (%d,%d)\n",
78 timeout_.
sec(),timeout_.
msec()));
84 DL((
REACT,
"---Modified Timer Queue----\n"));
86 DL((
REACT,
"---------------------------\n"));
97 std::ostringstream msg;
104 DL((
ASSAERR,
"readset: fd %d out of range\n", fd_));
115 DL((
ASSAERR,
"writeset: fd %d out of range\n", fd_));
119 msg <<
" WRITE_EVENT";
126 DL((
ASSAERR,
"exceptset: fd %d out of range\n", fd_));
130 msg <<
" EXCEPT_EVENT";
134 DL((
REACT,
"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n",
135 eh_->
get_id ().c_str (), fd_, (
u_long)eh_, msg.str ().c_str () ));
144 DL((
REACT,
"Modified waitSet:\n"));
158 DL((
REACT,
"---Modified Timer Queue----\n"));
160 DL((
REACT,
"---------------------------\n"));
193 if ((*iter).second == eh_) {
207 if ((*iter).second == eh_) {
221 if ((*iter).second == eh_) {
233 DL((
REACT,
"Found EvtH \"%s\"(%p)\n", eh_->
get_id ().c_str (), eh_));
239 DL((
REACT,
"Modifies waitSet:\n"));
257 DL((
REACT,
"Removing handler for fd=%d\n",fd_));
265 ehp = (*iter).second;
278 ehp = (*iter).second;
291 ehp = (*iter).second;
302 if (ret ==
true && ehp != NULL) {
303 DL((
REACT,
"Removed EvtH \"%s\"(%p)\n", ehp->
get_id ().c_str (), ehp));
309 DL((
REACT,
"Modifies waitSet:\n"));
321 bool num_removed =
false;
323 timeval poll = { 0, 0 };
328 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
331 DL((
REACT,
"Detected BAD FD: %d\n", fd ));
336 return (num_removed);
348 DL((
REACT,
"Received cmd to stop Reactor\n"));
370 if ( errno == EINTR ) {
371 EL((
REACT,
"EINTR: interrupted select(2)\n"));
387 if ( errno == EBADF ) {
388 DL((
REACT,
"EBADF: bad file descriptor\n"));
395 DL ((
REACT,
"select(3) error = %d\n", WSAGetLastError()));
413 DL((
REACT,
"m_readySet: %d FDs are ready for processing\n", n));
444 DL((
REACT,
"--------- Timer Queue ----------\n"));
446 DL((
REACT,
"--------------------------------\n"));
449 *howlong_ = tv - now;
452 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
457 if (howlong_ != NULL) {
458 DL((
REACT,
"delay (%f)\n",
double (*howlong_) ));
461 DL((
REACT,
"delay (forever)\n"));
500 DL((
REACT,
"======================================\n"));
524 DL((
REACT,
"isAnyReady returned: %d\n",nReady));
529 DL((
REACT,
"=== m_waitSet ===\n"));
534 DL ((
REACT,
"m_readySet after reset():\n"));
538 DL ((
REACT,
"m_readySet after assign:\n"));
548 DL((
REACT,
"::select() returned: %d\n",nReady));
551 DL ((
REACT,
"m_readySet after select:\n"));
579 while (iter != fdSet_.end ())
582 ehp = (*iter).second;
584 if (mask_.
isSet (fd) && ehp != NULL)
587 DL((
REACT,
"Data detected from \"%s\"(fd=%d)\n",
588 eh_id.c_str (), fd));
590 ret = (ehp->*callback_) (fd);
596 DL((
REACT,
"%d bytes pending on fd=%d \"%s\"\n",
597 ret, fd, eh_id.c_str ()));
601 DL((
REACT,
"All data from \"%s\"(fd=%d) are consumed\n",
602 eh_id.c_str (), fd));
611 iter = fdSet_.begin ();
643 DL((
REACT,
"Dispatching %d FDs.\n",ready_));
678 ehp = (*iter).second;
684 ehp = (*iter).second;
690 ehp = (*iter).second;
#define Assure_return(exp_)
Test condition and return bool from a function if assertion fails.
bool clear(handler_t fd_)
Clear flag (OFF) for the argument fd.
std::map< u_int, EventHandler * > Fd2Eh_Map_Type
no cloning
bool isSignalEvent(EventType e_)
virtual int handle_close(int fd)
EOF on peer socket handler callback.
handler_t m_maxfd_plus1
Max file descriptor number (in all sets) plus 1.
bool registerIOHandler(EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS)
Register I/O Event handler with Reactor.
virtual int handle_write(int fd)
Write handler callback.
int expire(const TimeVal &tv_)
Traverse the queue, triggering all timers that are past argument timeval.
bool setFd(handler_t fd_)
Set flag (ON) for the argument fd.
static TimeVal zeroTime()
Static that returns zero timeval: {0,0}.
void adjust_maxfdp1(handler_t fd_)
Adjust maxfdp1 in a portable way (win32 ignores maxfd alltogether).
bool removeHandler(EventHandler *eh_, EventType et_=ALL_EVENTS)
Remove Event handler from reactor for either all I/O events or timeout event or both.
bool isExceptEvent(EventType e_)
bool removeTimerHandler(TimerId id_)
Remove Timer event from the queue.
bool handleError(void)
Handle error in select(2) loop appropriately.
MaskSet m_waitSet
Handlers to wait for event on.
bool is_valid_handler(handler_t socket_)
Detect socket() error in a portable way.
string fmtString(const char *fmt_=NULL) const
Format timeval structure into readable format.
Fd2Eh_Map_Type m_writeSet
Event handlers awaiting on WRITE_EVENT.
#define DL(X)
A macro for writing debug message to the Logger.
#define trace_with_mask(s, m)
trace_with_mask() is used to trace function call chain in C++ program.
FdSet m_eset
Exception fds set.
bool removeIOHandler(handler_t fd_)
Remove IO Event handler from reactor.
void reset()
Clear all bits in all sets.
void sec(long sec_)
Set seconds.
bool isSet(handler_t fd_)
Test whether fd's flag is on.
bool isTimeoutEvent(EventType e_)
#define EL(X)
A macro for writing error message to the Logger.
virtual int handle_read(int fd)
Read event callback.
int remove(EventHandler *eh_)
Cancel all timers for the EventHandler eh_.
std::string get_id() const
Retrieve EventHandler ID.
void dump()
Write current state of MaskSet object to log file.
Extended Reactor/PrioriyQueue messages.
Class Reactor/PrioriyQueue messages.
void dump(void)
Dump Queue information to the log file.
TimerId insert(EventHandler *eh_, const TimeVal &tv_, const TimeVal &delta_, const std::string &name_)
Add timer (EventHandler object) to the queue to be dispatch at the time specified.
Fd2Eh_Map_Type m_readSet
Event handlers awaiting on READ_EVENT.
void msec(long msec_)
Set microseconds.
Socket & ends(Socket &os_)
ends manipulator.
void sync()
Resync internals after select() call.
int max_fd()
Return maximum value of the file descriptor in the Set.
bool isWriteEvent(EventType e_)
void stopReactor(void)
Stop Reactor's activity.
virtual int handle_except(int fd)
Exception handler callback.
int(EventHandler::* EH_IO_Callback)(int)
A type for the pointer to I/O-related callback member function of class EventHandler.
TimeVal & top(void)
Return expiration time of the top element in the queue.
bool m_active
Flag that indicates whether Reactor is active or had been stopped.
An abstraction to message logging facility.
void deactivate(void)
Deactivate Reactor.
An implementation of Reactor pattern.
void waitForEvents(void)
Main waiting loop that blocks indefinitely processing events.
bool isEmpty()
Is queue empty?
bool isReadEvent(EventType e_)
unsigned long TimerId
Timer Id is used in handle_timeout() calls.
TimerId registerTimerHandler(EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>")
Register Timer Event handler with Reactor.
Fd2Eh_Map_Type::iterator Fd2Eh_Map_Iter
bool checkFDs(void)
Check mask for bad file descriptors.
TimerQueue m_tqueue
The queue of Timers.
FdSet m_wset
Write fds set.
void dispatchHandler(FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_)
Call handler's callback and, if callback returns negative value, remove it from the Reactor...
void calculateTimeout(TimeVal *&howlong_, TimeVal *maxwait_)
Calculate closest timeout.
bool dispatch(int minimum_)
Notify all EventHandlers registered on respecful events occured.
FdSet m_rset
Read fds set.
Fd2Eh_Map_Type m_exceptSet
Event handlers awaiting on EXCEPT_EVENT.
static TimeVal gettimeofday()
Shields off underlying OS differences in getting current time.
EventType
EventType defines events types that Reactor understands.
int numSet()
Determine how many bits are set (ON) in the set.
MaskSet m_readySet
Handlers that are ready for processing.
int isAnyReady(void)
Return number of file descriptors ready accross all sets.
int m_fd_setsize
Max number of open files per process.