xref: /unit/src/nxt_port_socket.c (revision 14:556c5643cb8d)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
11 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
12 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
13     nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size);
14 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
15 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
16 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
17 
18 
19 nxt_int_t
20 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
21 {
22     nxt_int_t       sndbuf, rcvbuf, size;
23     nxt_socket_t    snd, rcv;
24     nxt_mem_pool_t  *mp;
25 
26     port->socket.task = task;
27 
28     port->pair[0] = -1;
29     port->pair[1] = -1;
30 
31     nxt_queue_init(&port->messages);
32 
33     mp = nxt_mem_pool_create(1024);
34     if (nxt_slow_path(mp == NULL)) {
35         return NXT_ERROR;
36     }
37 
38     port->mem_pool = mp;
39 
40     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
41         goto socketpair_fail;
42     }
43 
44     snd = port->pair[1];
45 
46     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
47     if (nxt_slow_path(sndbuf < 0)) {
48         goto getsockopt_fail;
49     }
50 
51     rcv = port->pair[0];
52 
53     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
54     if (nxt_slow_path(rcvbuf < 0)) {
55         goto getsockopt_fail;
56     }
57 
58     if (max_size == 0) {
59         max_size = 16 * 1024;
60     }
61 
62     if ((size_t) sndbuf < max_size) {
63         /*
64          * On Unix domain sockets
65          *   Linux uses 224K on both send and receive directions;
66          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
67          *   on send direction and 4K buffer size on receive direction;
68          *   Solaris uses 16K on send direction and 5K on receive direction.
69          */
70         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
71                                      max_size);
72 
73         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
74         if (nxt_slow_path(sndbuf < 0)) {
75             goto getsockopt_fail;
76         }
77 
78         size = sndbuf * 4;
79 
80         if (rcvbuf < size) {
81             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
82                                          size);
83 
84             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
85             if (nxt_slow_path(rcvbuf < 0)) {
86                 goto getsockopt_fail;
87             }
88         }
89     }
90 
91     port->max_size = nxt_min(max_size, (size_t) sndbuf);
92     port->max_share = (64 * 1024);
93 
94     return NXT_OK;
95 
96 getsockopt_fail:
97 
98     nxt_socket_close(task, port->pair[0]);
99     nxt_socket_close(task, port->pair[1]);
100 
101 socketpair_fail:
102 
103     nxt_mem_pool_destroy(port->mem_pool);
104 
105     return NXT_ERROR;
106 }
107 
108 
109 void
110 nxt_port_destroy(nxt_port_t *port)
111 {
112     nxt_socket_close(port->socket.task, port->socket.fd);
113     nxt_mem_pool_destroy(port->mem_pool);
114 }
115 
116 
117 void
118 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
119 {
120     port->socket.fd = port->pair[1];
121     port->socket.log = &nxt_main_log;
122     port->socket.write_ready = 1;
123 
124     port->socket.write_work_queue = &task->thread->engine->fast_work_queue;
125     port->socket.write_handler = nxt_port_write_handler;
126     port->socket.error_handler = nxt_port_error_handler;
127 }
128 
129 
130 void
131 nxt_port_write_close(nxt_port_t *port)
132 {
133     nxt_socket_close(port->socket.task, port->pair[1]);
134     port->pair[1] = -1;
135 }
136 
137 
138 nxt_int_t
139 nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
140     nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
141 {
142     nxt_queue_link_t     *link;
143     nxt_port_send_msg_t  *msg;
144 
145     for (link = nxt_queue_first(&port->messages);
146          link != nxt_queue_tail(&port->messages);
147          link = nxt_queue_next(link))
148     {
149         msg = (nxt_port_send_msg_t *) link;
150 
151         if (msg->port_msg.stream == stream) {
152             /*
153              * An fd is ignored since a file descriptor
154              * must be sent only in the first message of a stream.
155              */
156             nxt_buf_chain_add(&msg->buf, b);
157 
158             return NXT_OK;
159         }
160     }
161 
162     msg = nxt_mem_cache_zalloc0(port->mem_pool, sizeof(nxt_port_send_msg_t));
163     if (nxt_slow_path(msg == NULL)) {
164         return NXT_ERROR;
165     }
166 
167     msg->buf = b;
168     msg->fd = fd;
169     msg->share = 0;
170 
171     msg->port_msg.stream = stream;
172     msg->port_msg.type = type;
173     msg->port_msg.last = 0;
174 
175     nxt_queue_insert_tail(&port->messages, &msg->link);
176 
177     if (port->socket.write_ready) {
178         nxt_port_write_handler(task, port, NULL);
179     }
180 
181     return NXT_OK;
182 }
183 
184 
185 static void
186 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
187 {
188     ssize_t                 n;
189     nxt_uint_t              niov;
190     nxt_port_t              *port;
191     struct iovec            iov[NXT_IOBUF_MAX];
192     nxt_queue_link_t        *link;
193     nxt_port_send_msg_t     *msg;
194     nxt_sendbuf_coalesce_t  sb;
195 
196     port = obj;
197 
198     do {
199         link = nxt_queue_first(&port->messages);
200 
201         if (link == nxt_queue_tail(&port->messages)) {
202             nxt_fd_event_block_write(task->thread->engine, &port->socket);
203             return;
204         }
205 
206         msg = (nxt_port_send_msg_t *) link;
207 
208         iov[0].iov_base = &msg->port_msg;
209         iov[0].iov_len = sizeof(nxt_port_msg_t);
210 
211         sb.buf = msg->buf;
212         sb.iobuf = &iov[1];
213         sb.nmax = NXT_IOBUF_MAX - 1;
214         sb.sync = 0;
215         sb.last = 0;
216         sb.size = sizeof(nxt_port_msg_t);
217         sb.limit = port->max_size;
218 
219         niov = nxt_sendbuf_mem_coalesce(task, &sb);
220 
221         msg->port_msg.last = sb.last;
222 
223         n = nxt_socketpair_send(&port->socket, msg->fd, iov, niov + 1);
224 
225         if (n > 0) {
226             if (nxt_slow_path((size_t) n != sb.size)) {
227                 nxt_log(task, NXT_LOG_CRIT,
228                         "port %d: short write: %z instead of %uz",
229                         port->socket.fd, n, sb.size);
230                 goto fail;
231             }
232 
233             msg->buf = nxt_sendbuf_completion(task,
234                                               port->socket.write_work_queue,
235                                               msg->buf,
236                                               n - sizeof(nxt_port_msg_t));
237 
238             if (msg->buf != NULL) {
239                 /*
240                  * A file descriptor is sent only
241                  * in the first message of a stream.
242                  */
243                 msg->fd = -1;
244                 msg->share += n;
245 
246                 if (msg->share >= port->max_share) {
247                     msg->share = 0;
248                     nxt_queue_remove(link);
249                     nxt_queue_insert_tail(&port->messages, link);
250                 }
251 
252             } else {
253                 nxt_queue_remove(link);
254                 nxt_mem_cache_free0(port->mem_pool, msg,
255                                     sizeof(nxt_port_send_msg_t));
256             }
257 
258         } else if (nxt_slow_path(n == NXT_ERROR)) {
259             goto fail;
260         }
261 
262         /* n == NXT_AGAIN */
263 
264     } while (port->socket.write_ready);
265 
266     if (nxt_fd_event_is_disabled(port->socket.write)) {
267         nxt_fd_event_enable_write(task->thread->engine, &port->socket);
268     }
269 
270     return;
271 
272 fail:
273 
274     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
275                        nxt_port_error_handler, task, &port->socket, NULL);
276 }
277 
278 
279 void
280 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
281 {
282     port->socket.fd = port->pair[0];
283     port->socket.log = &nxt_main_log;
284 
285     port->socket.read_work_queue = &task->thread->engine->fast_work_queue;
286     port->socket.read_handler = nxt_port_read_handler;
287     port->socket.error_handler = nxt_port_error_handler;
288 
289     nxt_fd_event_enable_read(task->thread->engine, &port->socket);
290 }
291 
292 
293 void
294 nxt_port_read_close(nxt_port_t *port)
295 {
296     nxt_socket_close(port->socket.task, port->pair[0]);
297     port->pair[0] = -1;
298 }
299 
300 
301 static void
302 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
303 {
304     ssize_t         n;
305     nxt_fd_t        fd;
306     nxt_buf_t       *b;
307     nxt_port_t      *port;
308     struct iovec    iov[2];
309     nxt_port_msg_t  msg;
310 
311     port = obj;
312 
313     for ( ;; ) {
314 
315         b = nxt_port_buf_alloc(port);
316 
317         if (nxt_slow_path(b == NULL)) {
318             /* TODO: disable event for some time */
319         }
320 
321         iov[0].iov_base = &msg;
322         iov[0].iov_len = sizeof(nxt_port_msg_t);
323 
324         iov[1].iov_base = b->mem.pos;
325         iov[1].iov_len = port->max_size;
326 
327         n = nxt_socketpair_recv(&port->socket, &fd, iov, 2);
328 
329         if (n > 0) {
330             nxt_port_read_msg_process(task, port, &msg, fd, b, n);
331 
332             if (b->mem.pos == b->mem.free) {
333 
334                 if (b->next != NULL) {
335                     /* A sync buffer */
336                     nxt_buf_free(port->mem_pool, b->next);
337                 }
338 
339                 nxt_port_buf_free(port, b);
340             }
341 
342             if (port->socket.read_ready) {
343                 continue;
344             }
345 
346             return;
347         }
348 
349         if (n == NXT_AGAIN) {
350             nxt_port_buf_free(port, b);
351 
352             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
353             return;
354         }
355 
356         /* n == 0 || n == NXT_ERROR */
357 
358         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
359                            nxt_port_error_handler, task, &port->socket, NULL);
360         return;
361     }
362 }
363 
364 
365 static void
366 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
367     nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size)
368 {
369     nxt_buf_t            *sync;
370     nxt_port_recv_msg_t  recv_msg;
371 
372     if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) {
373         nxt_log(port->socket.task, NXT_LOG_CRIT,
374                 "port %d: too small message:%uz", port->socket.fd, size);
375         goto fail;
376     }
377 
378     recv_msg.stream = msg->stream;
379     recv_msg.type = msg->type;
380     recv_msg.fd = fd;
381     recv_msg.buf = b;
382     recv_msg.port = port;
383 
384     b->mem.free += size - sizeof(nxt_port_msg_t);
385 
386     if (msg->last) {
387         sync = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
388         if (nxt_slow_path(sync == NULL)) {
389             goto fail;
390         }
391 
392         b->next = sync;
393     }
394 
395     port->handler(task, &recv_msg);
396 
397     return;
398 
399 fail:
400 
401     if (fd != -1) {
402         nxt_fd_close(fd);
403     }
404 }
405 
406 
407 static nxt_buf_t *
408 nxt_port_buf_alloc(nxt_port_t *port)
409 {
410     nxt_buf_t  *b;
411 
412     if (port->free_bufs != NULL) {
413         b = port->free_bufs;
414         port->free_bufs = b->next;
415 
416         b->mem.pos = b->mem.start;
417         b->mem.free = b->mem.start;
418 
419     } else {
420         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
421         if (nxt_slow_path(b == NULL)) {
422             return NULL;
423         }
424     }
425 
426     return b;
427 }
428 
429 
430 static void
431 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
432 {
433     b->next = port->free_bufs;
434     port->free_bufs = b;
435 }
436 
437 
438 static void
439 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
440 {
441     /* TODO */
442 }
443