1 : /* A simple event-driven programming library. Originally I wrote this code
2 : * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
3 : * it in form of a library for easy reuse.
4 : *
5 : * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
6 : * All rights reserved.
7 : *
8 : * Redistribution and use in source and binary forms, with or without
9 : * modification, are permitted provided that the following conditions are met:
10 : *
11 : * * Redistributions of source code must retain the above copyright notice,
12 : * this list of conditions and the following disclaimer.
13 : * * Redistributions in binary form must reproduce the above copyright
14 : * notice, this list of conditions and the following disclaimer in the
15 : * documentation and/or other materials provided with the distribution.
16 : * * Neither the name of Redis nor the names of its contributors may be used
17 : * to endorse or promote products derived from this software without
18 : * specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 : * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 : * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 : * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 : * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 : * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 : * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 : * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 : * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 : * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 : * POSSIBILITY OF SUCH DAMAGE.
31 : */
32 :
33 : #include <stdio.h>
34 : #include <sys/time.h>
35 : #include <sys/types.h>
36 : #include <unistd.h>
37 : #include <stdlib.h>
38 : #include <string.h>
39 :
40 : #include "ae.h"
41 : #include "zmalloc.h"
42 : #include "config.h"
43 :
44 : /* Include the best multiplexing layer supported by this system.
45 : * The following should be ordered by performances, descending. */
46 : #ifdef HAVE_EPOLL
47 : #include "ae_epoll.c"
48 : #else
49 : #ifdef HAVE_KQUEUE
50 : #include "ae_kqueue.c"
51 : #else
52 : #include "ae_select.c"
53 : #endif
54 : #endif
55 :
56 53 : aeEventLoop *aeCreateEventLoop(int setsize) {
57 : aeEventLoop *eventLoop;
58 : int i;
59 :
60 53 : if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
61 53 : eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
62 53 : eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
63 53 : if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
64 53 : eventLoop->setsize = setsize;
65 53 : eventLoop->timeEventHead = NULL;
66 53 : eventLoop->timeEventNextId = 0;
67 53 : eventLoop->stop = 0;
68 53 : eventLoop->maxfd = -1;
69 53 : eventLoop->beforesleep = NULL;
70 53 : if (aeApiCreate(eventLoop) == -1) goto err;
71 : /* Events with mask == AE_NONE are not set. So let's initialize the
72 : * vector with it. */
73 106901 : for (i = 0; i < setsize; i++)
74 106848 : eventLoop->events[i].mask = AE_NONE;
75 53 : return eventLoop;
76 :
77 : err:
78 0 : if (eventLoop) {
79 0 : zfree(eventLoop->events);
80 0 : zfree(eventLoop->fired);
81 0 : zfree(eventLoop);
82 : }
83 0 : return NULL;
84 : }
85 :
86 0 : void aeDeleteEventLoop(aeEventLoop *eventLoop) {
87 : aeApiFree(eventLoop);
88 0 : zfree(eventLoop->events);
89 0 : zfree(eventLoop->fired);
90 0 : zfree(eventLoop);
91 0 : }
92 :
93 0 : void aeStop(aeEventLoop *eventLoop) {
94 0 : eventLoop->stop = 1;
95 0 : }
96 :
97 1066296 : int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
98 : aeFileProc *proc, void *clientData)
99 : {
100 1066296 : if (fd >= eventLoop->setsize) return AE_ERR;
101 1066296 : aeFileEvent *fe = &eventLoop->events[fd];
102 :
103 1066296 : if (aeApiAddEvent(eventLoop, fd, mask) == -1)
104 0 : return AE_ERR;
105 1066296 : fe->mask |= mask;
106 1066296 : if (mask & AE_READABLE) fe->rfileProc = proc;
107 1066296 : if (mask & AE_WRITABLE) fe->wfileProc = proc;
108 1066296 : fe->clientData = clientData;
109 1066296 : if (fd > eventLoop->maxfd)
110 273 : eventLoop->maxfd = fd;
111 1066296 : return AE_OK;
112 : }
113 :
114 1066233 : void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
115 : {
116 1066233 : if (fd >= eventLoop->setsize) return;
117 1066233 : aeFileEvent *fe = &eventLoop->events[fd];
118 :
119 1066233 : if (fe->mask == AE_NONE) return;
120 1066140 : fe->mask = fe->mask & (~mask);
121 1066140 : if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
122 : /* Update the max fd */
123 : int j;
124 :
125 112 : for (j = eventLoop->maxfd-1; j >= 0; j--)
126 112 : if (eventLoop->events[j].mask != AE_NONE) break;
127 103 : eventLoop->maxfd = j;
128 : }
129 : aeApiDelEvent(eventLoop, fd, mask);
130 : }
131 :
132 150722 : int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
133 150722 : if (fd >= eventLoop->setsize) return 0;
134 150722 : aeFileEvent *fe = &eventLoop->events[fd];
135 :
136 150722 : return fe->mask;
137 : }
138 :
139 3542151 : static void aeGetTime(long *seconds, long *milliseconds)
140 : {
141 : struct timeval tv;
142 :
143 3542151 : gettimeofday(&tv, NULL);
144 3542151 : *seconds = tv.tv_sec;
145 3542151 : *milliseconds = tv.tv_usec/1000;
146 3542151 : }
147 :
148 2455 : static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
149 : long cur_sec, cur_ms, when_sec, when_ms;
150 :
151 2455 : aeGetTime(&cur_sec, &cur_ms);
152 2455 : when_sec = cur_sec + milliseconds/1000;
153 2455 : when_ms = cur_ms + milliseconds%1000;
154 2455 : if (when_ms >= 1000) {
155 235 : when_sec ++;
156 235 : when_ms -= 1000;
157 : }
158 2455 : *sec = when_sec;
159 2455 : *ms = when_ms;
160 2455 : }
161 :
162 53 : long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
163 : aeTimeProc *proc, void *clientData,
164 : aeEventFinalizerProc *finalizerProc)
165 : {
166 53 : long long id = eventLoop->timeEventNextId++;
167 : aeTimeEvent *te;
168 :
169 53 : te = zmalloc(sizeof(*te));
170 53 : if (te == NULL) return AE_ERR;
171 53 : te->id = id;
172 53 : aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
173 53 : te->timeProc = proc;
174 53 : te->finalizerProc = finalizerProc;
175 53 : te->clientData = clientData;
176 53 : te->next = eventLoop->timeEventHead;
177 53 : eventLoop->timeEventHead = te;
178 53 : return id;
179 : }
180 :
181 0 : int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
182 : {
183 0 : aeTimeEvent *te, *prev = NULL;
184 :
185 0 : te = eventLoop->timeEventHead;
186 0 : while(te) {
187 0 : if (te->id == id) {
188 0 : if (prev == NULL)
189 0 : eventLoop->timeEventHead = te->next;
190 : else
191 0 : prev->next = te->next;
192 0 : if (te->finalizerProc)
193 0 : te->finalizerProc(eventLoop, te->clientData);
194 0 : zfree(te);
195 0 : return AE_OK;
196 : }
197 0 : prev = te;
198 0 : te = te->next;
199 : }
200 0 : return AE_ERR; /* NO event with the specified ID found */
201 : }
202 :
203 : /* Search the first timer to fire.
204 : * This operation is useful to know how many time the select can be
205 : * put in sleep without to delay any event.
206 : * If there are no timers NULL is returned.
207 : *
208 : * Note that's O(N) since time events are unsorted.
209 : * Possible optimizations (not needed by Redis so far, but...):
210 : * 1) Insert the event in order, so that the nearest is just the head.
211 : * Much better but still insertion or deletion of timers is O(N).
212 : * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
213 : */
214 : static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
215 : {
216 1768647 : aeTimeEvent *te = eventLoop->timeEventHead;
217 1768647 : aeTimeEvent *nearest = NULL;
218 :
219 3537294 : while(te) {
220 1768647 : if (!nearest || te->when_sec < nearest->when_sec ||
221 0 : (te->when_sec == nearest->when_sec &&
222 0 : te->when_ms < nearest->when_ms))
223 1768647 : nearest = te;
224 1768647 : te = te->next;
225 : }
226 1768647 : return nearest;
227 : }
228 :
229 : /* Process time events */
230 : static int processTimeEvents(aeEventLoop *eventLoop) {
231 1768647 : int processed = 0;
232 : aeTimeEvent *te;
233 : long long maxId;
234 :
235 1768647 : te = eventLoop->timeEventHead;
236 1768647 : maxId = eventLoop->timeEventNextId-1;
237 3539645 : while(te) {
238 : long now_sec, now_ms;
239 : long long id;
240 :
241 1771049 : if (te->id > maxId) {
242 0 : te = te->next;
243 0 : continue;
244 : }
245 1771049 : aeGetTime(&now_sec, &now_ms);
246 5247557 : if (now_sec > te->when_sec ||
247 3476508 : (now_sec == te->when_sec && now_ms >= te->when_ms))
248 : {
249 : int retval;
250 :
251 2453 : id = te->id;
252 2453 : retval = te->timeProc(eventLoop, id, te->clientData);
253 2402 : processed++;
254 : /* After an event is processed our time event list may
255 : * no longer be the same, so we restart from head.
256 : * Still we make sure to don't process events registered
257 : * by event handlers itself in order to don't loop forever.
258 : * To do so we saved the max ID we want to handle.
259 : *
260 : * FUTURE OPTIMIZATIONS:
261 : * Note that this is NOT great algorithmically. Redis uses
262 : * a single time event so it's not a problem but the right
263 : * way to do this is to add the new elements on head, and
264 : * to flag deleted elements in a special way for later
265 : * deletion (putting references to the nodes to delete into
266 : * another linked list). */
267 2402 : if (retval != AE_NOMORE) {
268 2402 : aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
269 : } else {
270 0 : aeDeleteTimeEvent(eventLoop, id);
271 : }
272 2402 : te = eventLoop->timeEventHead;
273 : } else {
274 1768596 : te = te->next;
275 : }
276 : }
277 1768596 : return processed;
278 : }
279 :
280 : /* Process every pending time event, then every pending file event
281 : * (that may be registered by time event callbacks just processed).
282 : * Without special flags the function sleeps until some file event
283 : * fires, or when the next time event occurrs (if any).
284 : *
285 : * If flags is 0, the function does nothing and returns.
286 : * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
287 : * if flags has AE_FILE_EVENTS set, file events are processed.
288 : * if flags has AE_TIME_EVENTS set, time events are processed.
289 : * if flags has AE_DONT_WAIT set the function returns ASAP until all
290 : * the events that's possible to process without to wait are processed.
291 : *
292 : * The function returns the number of events processed. */
293 1769701 : int aeProcessEvents(aeEventLoop *eventLoop, int flags)
294 : {
295 1769701 : int processed = 0, numevents;
296 :
297 : /* Nothing to do? return ASAP */
298 1769701 : if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
299 :
300 : /* Note that we want call select() even if there are no
301 : * file events to process as long as we want to process time
302 : * events, in order to sleep until the next time event is ready
303 : * to fire. */
304 1769701 : if (eventLoop->maxfd != -1 ||
305 0 : ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
306 : int j;
307 1769701 : aeTimeEvent *shortest = NULL;
308 : struct timeval tv, *tvp;
309 :
310 1769701 : if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
311 1768647 : shortest = aeSearchNearestTimer(eventLoop);
312 1769701 : if (shortest) {
313 : long now_sec, now_ms;
314 :
315 : /* Calculate the time missing for the nearest
316 : * timer to fire. */
317 1768647 : aeGetTime(&now_sec, &now_ms);
318 1768647 : tvp = &tv;
319 1768647 : tvp->tv_sec = shortest->when_sec - now_sec;
320 1768647 : if (shortest->when_ms < now_ms) {
321 65536 : tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
322 65536 : tvp->tv_sec --;
323 : } else {
324 1703111 : tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
325 : }
326 1768647 : if (tvp->tv_sec < 0) tvp->tv_sec = 0;
327 1768647 : if (tvp->tv_usec < 0) tvp->tv_usec = 0;
328 : } else {
329 : /* If we have to check for events but need to return
330 : * ASAP because of AE_DONT_WAIT we need to se the timeout
331 : * to zero */
332 1054 : if (flags & AE_DONT_WAIT) {
333 1054 : tv.tv_sec = tv.tv_usec = 0;
334 1054 : tvp = &tv;
335 : } else {
336 : /* Otherwise we can block */
337 0 : tvp = NULL; /* wait forever */
338 : }
339 : }
340 :
341 1769701 : numevents = aeApiPoll(eventLoop, tvp);
342 3879526 : for (j = 0; j < numevents; j++) {
343 2109825 : aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
344 2109825 : int mask = eventLoop->fired[j].mask;
345 2109825 : int fd = eventLoop->fired[j].fd;
346 2109825 : int rfired = 0;
347 :
348 : /* note the fe->mask & mask & ... code: maybe an already processed
349 : * event removed an element that fired and we still didn't
350 : * processed, so we check if the event is still valid. */
351 2109825 : if (fe->mask & mask & AE_READABLE) {
352 1042057 : rfired = 1;
353 1042057 : fe->rfileProc(eventLoop,fd,fe->clientData,mask);
354 : }
355 2109825 : if (fe->mask & mask & AE_WRITABLE) {
356 1069779 : if (!rfired || fe->wfileProc != fe->rfileProc)
357 1069779 : fe->wfileProc(eventLoop,fd,fe->clientData,mask);
358 : }
359 2109825 : processed++;
360 : }
361 : }
362 : /* Check time events */
363 1769701 : if (flags & AE_TIME_EVENTS)
364 1768596 : processed += processTimeEvents(eventLoop);
365 :
366 1769650 : return processed; /* return the number of processed file/time events */
367 : }
368 :
369 : /* Wait for millseconds until the given file descriptor becomes
370 : * writable/readable/exception */
371 91 : int aeWait(int fd, int mask, long long milliseconds) {
372 : struct timeval tv;
373 : fd_set rfds, wfds, efds;
374 91 : int retmask = 0, retval;
375 :
376 91 : tv.tv_sec = milliseconds/1000;
377 91 : tv.tv_usec = (milliseconds%1000)*1000;
378 91 : FD_ZERO(&rfds);
379 91 : FD_ZERO(&wfds);
380 91 : FD_ZERO(&efds);
381 :
382 91 : if (mask & AE_READABLE) FD_SET(fd,&rfds);
383 91 : if (mask & AE_WRITABLE) FD_SET(fd,&wfds);
384 91 : if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) {
385 90 : if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE;
386 90 : if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE;
387 90 : return retmask;
388 : } else {
389 1 : return retval;
390 : }
391 : }
392 :
393 51 : void aeMain(aeEventLoop *eventLoop) {
394 51 : eventLoop->stop = 0;
395 1768698 : while (!eventLoop->stop) {
396 1768647 : if (eventLoop->beforesleep != NULL)
397 1768647 : eventLoop->beforesleep(eventLoop);
398 1768647 : aeProcessEvents(eventLoop, AE_ALL_EVENTS);
399 : }
400 0 : }
401 :
402 9168 : char *aeGetApiName(void) {
403 9168 : return aeApiName();
404 : }
405 :
406 51 : void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
407 51 : eventLoop->beforesleep = beforesleep;
408 51 : }
|