1 : /* Synchronous socket and file I/O operations useful across the core.
2 : *
3 : * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are met:
8 : *
9 : * * Redistributions of source code must retain the above copyright notice,
10 : * this list of conditions and the following disclaimer.
11 : * * Redistributions in binary form must reproduce the above copyright
12 : * notice, this list of conditions and the following disclaimer in the
13 : * documentation and/or other materials provided with the distribution.
14 : * * Neither the name of Redis nor the names of its contributors may be used
15 : * to endorse or promote products derived from this software without
16 : * specific prior written permission.
17 : *
18 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 : * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 : * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 : * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 : * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 : * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 : * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 : * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 : * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 : * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 : * POSSIBILITY OF SUCH DAMAGE.
29 : */
30 :
31 : #include "redis.h"
32 :
33 : /* ----------------- Blocking sockets I/O with timeouts --------------------- */
34 :
35 : /* Redis performs most of the I/O in a nonblocking way, with the exception
36 : * of the SYNC command where the slave does it in a blocking way, and
37 : * the MIGRATE command that must be blocking in order to be atomic from the
38 : * point of view of the two instances (one migrating the key and one receiving
39 : * the key). This is why need the following blocking I/O functions.
40 : *
41 : * All the functions take the timeout in milliseconds. */
42 :
43 : #define REDIS_SYNCIO_RESOLUTION 10 /* Resolution in milliseconds */
44 :
45 : /* Write the specified payload to 'fd'. If writing the whole payload will be done
46 : * within 'timeout' milliseconds the operation succeeds and 'size' is returned.
47 : * Otherwise the operation fails, -1 is returned, and an unspecified partial write
48 : * could be performed against the file descriptor. */
49 11 : ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout) {
50 11 : ssize_t nwritten, ret = size;
51 11 : long long start = mstime();
52 11 : long long remaining = timeout;
53 :
54 : while(1) {
55 : long long wait = (remaining > REDIS_SYNCIO_RESOLUTION) ?
56 11 : remaining : REDIS_SYNCIO_RESOLUTION;
57 : long long elapsed;
58 :
59 11 : if (aeWait(fd,AE_WRITABLE,wait) & AE_WRITABLE) {
60 11 : nwritten = write(fd,ptr,size);
61 11 : if (nwritten == -1) return -1;
62 11 : ptr += nwritten;
63 11 : size -= nwritten;
64 11 : if (size == 0) return ret;
65 : }
66 0 : elapsed = mstime() - start;
67 0 : if (elapsed >= timeout) {
68 0 : errno = ETIMEDOUT;
69 0 : return -1;
70 : }
71 0 : remaining = timeout - elapsed;
72 0 : }
73 : }
74 :
75 : /* Read the specified amount of bytes from 'fd'. If all the bytes are read within
76 : * 'timeout' milliseconds the operation succeed and 'size' is returned.
77 : * Otherwise the operation fails, -1 is returned, and an unspecified amount of
78 : * data could be read from the file descriptor. */
79 78 : ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout) {
80 78 : ssize_t nread, totread = 0;
81 78 : long long start = mstime();
82 78 : long long remaining = timeout;
83 :
84 : while(1) {
85 : long long wait = (remaining > REDIS_SYNCIO_RESOLUTION) ?
86 78 : remaining : REDIS_SYNCIO_RESOLUTION;
87 : long long elapsed;
88 :
89 78 : if (aeWait(fd,AE_READABLE,wait) & AE_READABLE) {
90 154 : nread = read(fd,ptr,size);
91 77 : if (nread <= 0) return -1;
92 77 : ptr += nread;
93 77 : size -= nread;
94 77 : totread += nread;
95 77 : if (size == 0) return totread;
96 : }
97 1 : elapsed = mstime() - start;
98 1 : if (elapsed >= timeout) {
99 1 : errno = ETIMEDOUT;
100 1 : return -1;
101 : }
102 0 : remaining = timeout - elapsed;
103 0 : }
104 : }
105 :
106 : /* Read a line making sure that every char will not require more than 'timeout'
107 : * milliseconds to be read.
108 : *
109 : * On success the number of bytes read is returned, otherwise -1.
110 : * On success the string is always correctly terminated with a 0 byte. */
111 12 : ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout) {
112 12 : ssize_t nread = 0;
113 :
114 12 : size--;
115 90 : while(size) {
116 : char c;
117 :
118 78 : if (syncRead(fd,&c,1,timeout) == -1) return -1;
119 77 : if (c == '\n') {
120 11 : *ptr = '\0';
121 11 : if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
122 11 : return nread;
123 : } else {
124 66 : *ptr++ = c;
125 66 : *ptr = '\0';
126 66 : nread++;
127 : }
128 : }
129 0 : return nread;
130 : }
|