1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 11 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 12 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 13 nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size); 14 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 15 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 16 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 17 18 19 nxt_int_t 20 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 21 { 22 nxt_int_t sndbuf, rcvbuf, size; 23 nxt_socket_t snd, rcv; 24 nxt_mem_pool_t *mp; 25 26 port->socket.task = task; 27 28 port->pair[0] = -1; 29 port->pair[1] = -1; 30 31 nxt_queue_init(&port->messages); 32 33 mp = nxt_mem_pool_create(1024); 34 if (nxt_slow_path(mp == NULL)) { 35 return NXT_ERROR; 36 } 37 38 port->mem_pool = mp; 39 40 if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 41 goto socketpair_fail; 42 } 43 44 snd = port->pair[1]; 45 46 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 47 if (nxt_slow_path(sndbuf < 0)) { 48 goto getsockopt_fail; 49 } 50 51 rcv = port->pair[0]; 52 53 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 54 if (nxt_slow_path(rcvbuf < 0)) { 55 goto getsockopt_fail; 56 } 57 58 if (max_size == 0) { 59 max_size = 16 * 1024; 60 } 61 62 if ((size_t) sndbuf < max_size) { 63 /* 64 * On Unix domain sockets 65 * Linux uses 224K on both send and receive directions; 66 * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 67 * on send direction and 4K buffer size on receive direction; 68 * Solaris uses 16K on send direction and 5K on receive direction. 69 */ 70 (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 71 max_size); 72 73 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 74 if (nxt_slow_path(sndbuf < 0)) { 75 goto getsockopt_fail; 76 } 77 78 size = sndbuf * 4; 79 80 if (rcvbuf < size) { 81 (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 82 size); 83 84 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 85 if (nxt_slow_path(rcvbuf < 0)) { 86 goto getsockopt_fail; 87 } 88 } 89 } 90 91 port->max_size = nxt_min(max_size, (size_t) sndbuf); 92 port->max_share = (64 * 1024); 93 94 return NXT_OK; 95 96 getsockopt_fail: 97 98 nxt_socket_close(task, port->pair[0]); 99 nxt_socket_close(task, port->pair[1]); 100 101 socketpair_fail: 102 103 nxt_mem_pool_destroy(port->mem_pool); 104 105 return NXT_ERROR; 106 } 107 108 109 void 110 nxt_port_destroy(nxt_port_t *port) 111 { 112 nxt_socket_close(port->socket.task, port->socket.fd); 113 nxt_mem_pool_destroy(port->mem_pool); 114 } 115 116 117 void 118 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 119 { 120 port->socket.fd = port->pair[1]; 121 port->socket.log = &nxt_main_log; 122 port->socket.write_ready = 1; 123 124 port->socket.write_work_queue = &task->thread->engine->fast_work_queue; 125 port->socket.write_handler = nxt_port_write_handler; 126 port->socket.error_handler = nxt_port_error_handler; 127 } 128 129 130 void 131 nxt_port_write_close(nxt_port_t *port) 132 { 133 nxt_socket_close(port->socket.task, port->pair[1]); 134 port->pair[1] = -1; 135 } 136 137 138 nxt_int_t 139 nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 140 nxt_fd_t fd, uint32_t stream, nxt_buf_t *b) 141 { 142 nxt_queue_link_t *link; 143 nxt_port_send_msg_t *msg; 144 145 for (link = nxt_queue_first(&port->messages); 146 link != nxt_queue_tail(&port->messages); 147 link = nxt_queue_next(link)) 148 { 149 msg = (nxt_port_send_msg_t *) link; 150 151 if (msg->port_msg.stream == stream) { 152 /* 153 * An fd is ignored since a file descriptor 154 * must be sent only in the first message of a stream. 155 */ 156 nxt_buf_chain_add(&msg->buf, b); 157 158 return NXT_OK; 159 } 160 } 161 162 msg = nxt_mem_cache_zalloc0(port->mem_pool, sizeof(nxt_port_send_msg_t)); 163 if (nxt_slow_path(msg == NULL)) { 164 return NXT_ERROR; 165 } 166 167 msg->buf = b; 168 msg->fd = fd; 169 msg->share = 0; 170 171 msg->port_msg.stream = stream; 172 msg->port_msg.type = type; 173 msg->port_msg.last = 0; 174 175 nxt_queue_insert_tail(&port->messages, &msg->link); 176 177 if (port->socket.write_ready) { 178 nxt_port_write_handler(task, port, NULL); 179 } 180 181 return NXT_OK; 182 } 183 184 185 static void 186 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 187 { 188 ssize_t n; 189 nxt_uint_t niov; 190 nxt_port_t *port; 191 struct iovec iov[NXT_IOBUF_MAX]; 192 nxt_queue_link_t *link; 193 nxt_port_send_msg_t *msg; 194 nxt_sendbuf_coalesce_t sb; 195 196 port = obj; 197 198 do { 199 link = nxt_queue_first(&port->messages); 200 201 if (link == nxt_queue_tail(&port->messages)) { 202 nxt_fd_event_block_write(task->thread->engine, &port->socket); 203 return; 204 } 205 206 msg = (nxt_port_send_msg_t *) link; 207 208 iov[0].iov_base = &msg->port_msg; 209 iov[0].iov_len = sizeof(nxt_port_msg_t); 210 211 sb.buf = msg->buf; 212 sb.iobuf = &iov[1]; 213 sb.nmax = NXT_IOBUF_MAX - 1; 214 sb.sync = 0; 215 sb.last = 0; 216 sb.size = sizeof(nxt_port_msg_t); 217 sb.limit = port->max_size; 218 219 niov = nxt_sendbuf_mem_coalesce(task, &sb); 220 221 msg->port_msg.last = sb.last; 222 223 n = nxt_socketpair_send(&port->socket, msg->fd, iov, niov + 1); 224 225 if (n > 0) { 226 if (nxt_slow_path((size_t) n != sb.size)) { 227 nxt_log(task, NXT_LOG_CRIT, 228 "port %d: short write: %z instead of %uz", 229 port->socket.fd, n, sb.size); 230 goto fail; 231 } 232 233 msg->buf = nxt_sendbuf_completion(task, 234 port->socket.write_work_queue, 235 msg->buf, 236 n - sizeof(nxt_port_msg_t)); 237 238 if (msg->buf != NULL) { 239 /* 240 * A file descriptor is sent only 241 * in the first message of a stream. 242 */ 243 msg->fd = -1; 244 msg->share += n; 245 246 if (msg->share >= port->max_share) { 247 msg->share = 0; 248 nxt_queue_remove(link); 249 nxt_queue_insert_tail(&port->messages, link); 250 } 251 252 } else { 253 nxt_queue_remove(link); 254 nxt_mem_cache_free0(port->mem_pool, msg, 255 sizeof(nxt_port_send_msg_t)); 256 } 257 258 } else if (nxt_slow_path(n == NXT_ERROR)) { 259 goto fail; 260 } 261 262 /* n == NXT_AGAIN */ 263 264 } while (port->socket.write_ready); 265 266 if (nxt_fd_event_is_disabled(port->socket.write)) { 267 nxt_fd_event_enable_write(task->thread->engine, &port->socket); 268 } 269 270 return; 271 272 fail: 273 274 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 275 nxt_port_error_handler, task, &port->socket, NULL); 276 } 277 278 279 void 280 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 281 { 282 port->socket.fd = port->pair[0]; 283 port->socket.log = &nxt_main_log; 284 285 port->socket.read_work_queue = &task->thread->engine->fast_work_queue; 286 port->socket.read_handler = nxt_port_read_handler; 287 port->socket.error_handler = nxt_port_error_handler; 288 289 nxt_fd_event_enable_read(task->thread->engine, &port->socket); 290 } 291 292 293 void 294 nxt_port_read_close(nxt_port_t *port) 295 { 296 nxt_socket_close(port->socket.task, port->pair[0]); 297 port->pair[0] = -1; 298 } 299 300 301 static void 302 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 303 { 304 ssize_t n; 305 nxt_fd_t fd; 306 nxt_buf_t *b; 307 nxt_port_t *port; 308 struct iovec iov[2]; 309 nxt_port_msg_t msg; 310 311 port = obj; 312 313 for ( ;; ) { 314 315 b = nxt_port_buf_alloc(port); 316 317 if (nxt_slow_path(b == NULL)) { 318 /* TODO: disable event for some time */ 319 } 320 321 iov[0].iov_base = &msg; 322 iov[0].iov_len = sizeof(nxt_port_msg_t); 323 324 iov[1].iov_base = b->mem.pos; 325 iov[1].iov_len = port->max_size; 326 327 n = nxt_socketpair_recv(&port->socket, &fd, iov, 2); 328 329 if (n > 0) { 330 nxt_port_read_msg_process(task, port, &msg, fd, b, n); 331 332 if (b->mem.pos == b->mem.free) { 333 334 if (b->next != NULL) { 335 /* A sync buffer */ 336 nxt_buf_free(port->mem_pool, b->next); 337 } 338 339 nxt_port_buf_free(port, b); 340 } 341 342 if (port->socket.read_ready) { 343 continue; 344 } 345 346 return; 347 } 348 349 if (n == NXT_AGAIN) { 350 nxt_port_buf_free(port, b); 351 352 nxt_fd_event_enable_read(task->thread->engine, &port->socket); 353 return; 354 } 355 356 /* n == 0 || n == NXT_ERROR */ 357 358 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 359 nxt_port_error_handler, task, &port->socket, NULL); 360 return; 361 } 362 } 363 364 365 static void 366 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 367 nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size) 368 { 369 nxt_buf_t *sync; 370 nxt_port_recv_msg_t recv_msg; 371 372 if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) { 373 nxt_log(port->socket.task, NXT_LOG_CRIT, 374 "port %d: too small message:%uz", port->socket.fd, size); 375 goto fail; 376 } 377 378 recv_msg.stream = msg->stream; 379 recv_msg.type = msg->type; 380 recv_msg.fd = fd; 381 recv_msg.buf = b; 382 recv_msg.port = port; 383 384 b->mem.free += size - sizeof(nxt_port_msg_t); 385 386 if (msg->last) { 387 sync = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); 388 if (nxt_slow_path(sync == NULL)) { 389 goto fail; 390 } 391 392 b->next = sync; 393 } 394 395 port->handler(task, &recv_msg); 396 397 return; 398 399 fail: 400 401 if (fd != -1) { 402 nxt_fd_close(fd); 403 } 404 } 405 406 407 static nxt_buf_t * 408 nxt_port_buf_alloc(nxt_port_t *port) 409 { 410 nxt_buf_t *b; 411 412 if (port->free_bufs != NULL) { 413 b = port->free_bufs; 414 port->free_bufs = b->next; 415 416 b->mem.pos = b->mem.start; 417 b->mem.free = b->mem.start; 418 419 } else { 420 b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 421 if (nxt_slow_path(b == NULL)) { 422 return NULL; 423 } 424 } 425 426 return b; 427 } 428 429 430 static void 431 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 432 { 433 b->next = port->free_bufs; 434 port->free_bufs = b; 435 } 436 437 438 static void 439 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 440 { 441 /* TODO */ 442 } 443