Deleted
Added
nxt_port_socket.c (194:6281674ecf4f) | nxt_port_socket.c (197:ae8f843e1fd4) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 100 unchanged lines hidden (view full) --- 109 port->socket.log = &nxt_main_log; 110 port->socket.write_ready = 1; 111 112 port->engine = task->thread->engine; 113 114 port->socket.write_work_queue = &port->engine->fast_work_queue; 115 port->socket.write_handler = nxt_port_write_handler; 116 port->socket.error_handler = nxt_port_error_handler; | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 100 unchanged lines hidden (view full) --- 109 port->socket.log = &nxt_main_log; 110 port->socket.write_ready = 1; 111 112 port->engine = task->thread->engine; 113 114 port->socket.write_work_queue = &port->engine->fast_work_queue; 115 port->socket.write_handler = nxt_port_write_handler; 116 port->socket.error_handler = nxt_port_error_handler; |
117 118 if (port->iov == NULL) { 119 port->iov = nxt_mp_get(port->mem_pool, sizeof(struct iovec) * 120 NXT_IOBUF_MAX * 10); 121 port->mmsg_buf = nxt_mp_get(port->mem_pool, sizeof(uint32_t) * 3 * 122 NXT_IOBUF_MAX * 10); 123 } |
|
117} 118 119 120void 121nxt_port_write_close(nxt_port_t *port) 122{ 123 nxt_socket_close(port->socket.task, port->pair[1]); 124 port->pair[1] = -1; --- 94 unchanged lines hidden (view full) --- 219 220 return NXT_OK; 221} 222 223 224static void 225nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 226{ | 124} 125 126 127void 128nxt_port_write_close(nxt_port_t *port) 129{ 130 nxt_socket_close(port->socket.task, port->pair[1]); 131 port->pair[1] = -1; --- 94 unchanged lines hidden (view full) --- 226 227 return NXT_OK; 228} 229 230 231static void 232nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 233{ |
234 size_t plain_size; |
|
227 ssize_t n; 228 nxt_port_t *port; | 235 ssize_t n; 236 nxt_port_t *port; |
229 struct iovec iov[NXT_IOBUF_MAX * 10]; | 237 struct iovec *iov; |
230 nxt_work_queue_t *wq; 231 nxt_queue_link_t *link; 232 nxt_port_method_t m; 233 nxt_port_send_msg_t *msg; 234 nxt_sendbuf_coalesce_t sb; 235 | 238 nxt_work_queue_t *wq; 239 nxt_queue_link_t *link; 240 nxt_port_method_t m; 241 nxt_port_send_msg_t *msg; 242 nxt_sendbuf_coalesce_t sb; 243 |
236 size_t plain_size; 237 nxt_buf_t *plain_buf; 238 | |
239 port = nxt_container_of(obj, nxt_port_t, socket); 240 | 244 port = nxt_container_of(obj, nxt_port_t, socket); 245 |
246 iov = port->iov; 247 |
|
241 do { 242 link = nxt_queue_first(&port->messages); 243 244 if (link == nxt_queue_tail(&port->messages)) { 245 nxt_fd_event_block_write(task->thread->engine, &port->socket); 246 return; 247 } 248 --- 15 unchanged lines hidden (view full) --- 264 if (m == NXT_PORT_METHOD_MMAP) { 265 sb.limit = (1ULL << 31) - 1; 266 sb.nmax = NXT_IOBUF_MAX * 10 - 1; 267 } 268 269 nxt_sendbuf_mem_coalesce(task, &sb); 270 271 plain_size = sb.size; | 248 do { 249 link = nxt_queue_first(&port->messages); 250 251 if (link == nxt_queue_tail(&port->messages)) { 252 nxt_fd_event_block_write(task->thread->engine, &port->socket); 253 return; 254 } 255 --- 15 unchanged lines hidden (view full) --- 271 if (m == NXT_PORT_METHOD_MMAP) { 272 sb.limit = (1ULL << 31) - 1; 273 sb.nmax = NXT_IOBUF_MAX * 10 - 1; 274 } 275 276 nxt_sendbuf_mem_coalesce(task, &sb); 277 278 plain_size = sb.size; |
272 plain_buf = msg->buf; | |
273 274 /* 275 * Send through mmap enabled only when payload 276 * is bigger than PORT_MMAP_MIN_SIZE. 277 */ 278 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 279 nxt_port_mmap_write(task, port, msg, &sb); 280 --- 16 unchanged lines hidden (view full) --- 297 if (msg->fd != -1 && msg->close_fd != 0) { 298 nxt_fd_close(msg->fd); 299 300 msg->fd = -1; 301 } 302 303 wq = &task->thread->engine->fast_work_queue; 304 | 279 280 /* 281 * Send through mmap enabled only when payload 282 * is bigger than PORT_MMAP_MIN_SIZE. 283 */ 284 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 285 nxt_port_mmap_write(task, port, msg, &sb); 286 --- 16 unchanged lines hidden (view full) --- 303 if (msg->fd != -1 && msg->close_fd != 0) { 304 nxt_fd_close(msg->fd); 305 306 msg->fd = -1; 307 } 308 309 wq = &task->thread->engine->fast_work_queue; 310 |
305 if (msg->buf != plain_buf) { 306 /* 307 * Complete crafted mmap_msgs buf and restore msg->buf 308 * for regular completion call. 309 */ 310 nxt_port_mmap_completion(task, wq, msg->buf); 311 312 msg->buf = plain_buf; 313 } 314 | |
315 msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size); 316 317 if (msg->buf != NULL) { 318 /* 319 * A file descriptor is sent only 320 * in the first message of a stream. 321 */ 322 msg->fd = -1; 323 msg->share += n; 324 325 if (msg->share >= port->max_share) { 326 msg->share = 0; 327 nxt_queue_remove(link); 328 nxt_queue_insert_tail(&port->messages, link); 329 } 330 331 } else { 332 nxt_queue_remove(link); | 311 msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size); 312 313 if (msg->buf != NULL) { 314 /* 315 * A file descriptor is sent only 316 * in the first message of a stream. 317 */ 318 msg->fd = -1; 319 msg->share += n; 320 321 if (msg->share >= port->max_share) { 322 msg->share = 0; 323 nxt_queue_remove(link); 324 nxt_queue_insert_tail(&port->messages, link); 325 } 326 327 } else { 328 nxt_queue_remove(link); |
333 nxt_port_release_send_msg(task, msg, msg->engine); | 329 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, 330 msg->engine); |
334 } 335 336 } else if (nxt_slow_path(n == NXT_ERROR)) { 337 goto fail; 338 } 339 340 /* n == NXT_AGAIN */ 341 --- 189 unchanged lines hidden (view full) --- 531 b->next = port->free_bufs; 532 port->free_bufs = b; 533} 534 535 536static void 537nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 538{ | 331 } 332 333 } else if (nxt_slow_path(n == NXT_ERROR)) { 334 goto fail; 335 } 336 337 /* n == NXT_AGAIN */ 338 --- 189 unchanged lines hidden (view full) --- 528 b->next = port->free_bufs; 529 port->free_bufs = b; 530} 531 532 533static void 534nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 535{ |
536 nxt_buf_t *b; 537 nxt_port_t *port; 538 nxt_work_queue_t *wq; 539 nxt_port_send_msg_t *msg; 540 |
|
539 nxt_debug(task, "port error handler %p", obj); 540 /* TODO */ | 541 nxt_debug(task, "port error handler %p", obj); 542 /* TODO */ |
543 544 port = nxt_container_of(obj, nxt_port_t, socket); 545 546 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 547 548 wq = &task->thread->engine->fast_work_queue; 549 550 for(b = msg->buf; b != NULL; b = b->next) { 551 if (nxt_buf_is_sync(b)) { 552 continue; 553 } 554 555 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 556 } 557 558 nxt_queue_remove(&msg->link); 559 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, 560 msg->engine); 561 562 } nxt_queue_loop; |
|
541} | 563} |