1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <stdlib.h> 7 8 #include "nxt_main.h" 9 #include "nxt_port_memory_int.h" 10 #include "nxt_port_queue.h" 11 #include "nxt_app_queue.h" 12 13 #include "nxt_unit.h" 14 #include "nxt_unit_request.h" 15 #include "nxt_unit_response.h" 16 #include "nxt_unit_websocket.h" 17 18 #include "nxt_websocket.h" 19 20 #if (NXT_HAVE_MEMFD_CREATE) 21 #include <linux/memfd.h> 22 #endif 23 24 #define NXT_UNIT_MAX_PLAIN_SIZE 1024 25 #define NXT_UNIT_LOCAL_BUF_SIZE \ 26 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) 27 28 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 29 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 30 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 31 typedef struct nxt_unit_process_s nxt_unit_process_t; 32 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 33 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 34 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; 35 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 36 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 37 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 38 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 39 40 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 41 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 42 nxt_unit_ctx_impl_t *ctx_impl, void *data); 43 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); 44 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); 45 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); 46 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); 47 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 48 nxt_unit_mmap_buf_t *mmap_buf); 49 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 50 nxt_unit_mmap_buf_t *mmap_buf); 51 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 52 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 53 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 54 int *log_fd, uint32_t *stream, uint32_t *shm_limit); 55 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, 56 int queue_fd); 57 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, 58 nxt_unit_request_info_t **preq); 59 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 60 nxt_unit_recv_msg_t *recv_msg); 61 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); 62 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 63 nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq); 64 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, 65 nxt_unit_recv_msg_t *recv_msg); 66 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 67 nxt_unit_port_id_t *port_id); 68 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); 69 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 70 nxt_unit_recv_msg_t *recv_msg); 71 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); 72 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 73 nxt_unit_ctx_t *ctx); 74 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 75 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 76 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 77 nxt_unit_ctx_t *ctx); 78 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 79 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 80 nxt_unit_websocket_frame_impl_t *ws); 81 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 82 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 83 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 84 nxt_unit_mmap_buf_t *mmap_buf, int last); 85 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 86 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); 87 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); 88 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( 89 nxt_unit_ctx_impl_t *ctx_impl); 90 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 91 nxt_unit_read_buf_t *rbuf); 92 static nxt_unit_mmap_buf_t *nxt_unit_request_preread( 93 nxt_unit_request_info_t *req, size_t size); 94 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 95 size_t size); 96 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 97 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); 98 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 99 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); 100 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 101 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 102 nxt_unit_port_t *port, int n); 103 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); 104 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 105 int fd); 106 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 107 nxt_unit_port_t *port, uint32_t size, 108 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 109 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 110 111 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, 112 nxt_unit_ctx_impl_t *ctx_impl); 113 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 114 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 115 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 116 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 117 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, 118 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, 119 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); 120 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 121 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); 122 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); 123 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 124 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 125 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 126 127 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); 128 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, 129 pid_t pid, int remove); 130 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 131 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); 132 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); 133 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); 134 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); 135 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); 136 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); 137 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); 138 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); 139 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, 140 nxt_unit_port_t *port); 141 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); 142 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); 143 144 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 145 nxt_unit_port_t *port, int queue_fd); 146 147 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); 148 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); 149 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, 150 nxt_unit_port_t *port, void *queue); 151 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, 152 nxt_queue_t *awaiting_req); 153 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, 154 nxt_unit_port_id_t *port_id); 155 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, 156 nxt_unit_port_id_t *port_id); 157 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); 158 static void nxt_unit_remove_process(nxt_unit_impl_t *lib, 159 nxt_unit_process_t *process); 160 static void nxt_unit_quit(nxt_unit_ctx_t *ctx); 161 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); 162 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, 163 nxt_unit_port_t *port, const void *buf, size_t buf_size, 164 const void *oob, size_t oob_size); 165 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 166 const void *buf, size_t buf_size, const void *oob, size_t oob_size); 167 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 168 nxt_unit_read_buf_t *rbuf); 169 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, 170 nxt_unit_read_buf_t *src); 171 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 172 nxt_unit_read_buf_t *rbuf); 173 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 174 nxt_unit_read_buf_t *rbuf); 175 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, 176 nxt_unit_read_buf_t *rbuf); 177 static int nxt_unit_app_queue_recv(nxt_unit_port_t *port, 178 nxt_unit_read_buf_t *rbuf); 179 nxt_inline int nxt_unit_close(int fd); 180 static int nxt_unit_fd_blocking(int fd); 181 182 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 183 nxt_unit_port_t *port); 184 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 185 nxt_unit_port_id_t *port_id, int remove); 186 187 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 188 nxt_unit_request_info_t *req); 189 static nxt_unit_request_info_t *nxt_unit_request_hash_find( 190 nxt_unit_ctx_t *ctx, uint32_t stream, int remove); 191 192 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 193 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size); 194 static void nxt_unit_lvlhsh_free(void *data, void *p); 195 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length); 196 197 198 struct nxt_unit_mmap_buf_s { 199 nxt_unit_buf_t buf; 200 201 nxt_unit_mmap_buf_t *next; 202 nxt_unit_mmap_buf_t **prev; 203 204 nxt_port_mmap_header_t *hdr; 205 nxt_unit_request_info_t *req; 206 nxt_unit_ctx_impl_t *ctx_impl; 207 char *free_ptr; 208 char *plain_ptr; 209 }; 210 211 212 struct nxt_unit_recv_msg_s { 213 uint32_t stream; 214 nxt_pid_t pid; 215 nxt_port_id_t reply_port; 216 217 uint8_t last; /* 1 bit */ 218 uint8_t mmap; /* 1 bit */ 219 220 void *start; 221 uint32_t size; 222 223 int fd[2]; 224 225 nxt_unit_mmap_buf_t *incoming_buf; 226 }; 227 228 229 typedef enum { 230 NXT_UNIT_RS_START = 0, 231 NXT_UNIT_RS_RESPONSE_INIT, 232 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 233 NXT_UNIT_RS_RESPONSE_SENT, 234 NXT_UNIT_RS_RELEASED, 235 } nxt_unit_req_state_t; 236 237 238 struct nxt_unit_request_info_impl_s { 239 nxt_unit_request_info_t req; 240 241 uint32_t stream; 242 243 nxt_unit_mmap_buf_t *outgoing_buf; 244 nxt_unit_mmap_buf_t *incoming_buf; 245 246 nxt_unit_req_state_t state; 247 uint8_t websocket; 248 uint8_t in_hash; 249 250 /* for nxt_unit_ctx_impl_t.free_req or active_req */ 251 nxt_queue_link_t link; 252 /* for nxt_unit_port_impl_t.awaiting_req */ 253 nxt_queue_link_t port_wait_link; 254 255 char extra_data[]; 256 }; 257 258 259 struct nxt_unit_websocket_frame_impl_s { 260 nxt_unit_websocket_frame_t ws; 261 262 nxt_unit_mmap_buf_t *buf; 263 264 nxt_queue_link_t link; 265 266 nxt_unit_ctx_impl_t *ctx_impl; 267 }; 268 269 270 struct nxt_unit_read_buf_s { 271 nxt_queue_link_t link; 272 nxt_unit_ctx_impl_t *ctx_impl; 273 ssize_t size; 274 char buf[16384]; 275 char oob[256]; 276 }; 277 278 279 struct nxt_unit_ctx_impl_s { 280 nxt_unit_ctx_t ctx; 281 282 nxt_atomic_t use_count; 283 nxt_atomic_t wait_items; 284 285 pthread_mutex_t mutex; 286 287 nxt_unit_port_t *read_port; 288 289 nxt_queue_link_t link; 290 291 nxt_unit_mmap_buf_t *free_buf; 292 293 /* of nxt_unit_request_info_impl_t */ 294 nxt_queue_t free_req; 295 296 /* of nxt_unit_websocket_frame_impl_t */ 297 nxt_queue_t free_ws; 298 299 /* of nxt_unit_request_info_impl_t */ 300 nxt_queue_t active_req; 301 302 /* of nxt_unit_request_info_impl_t */ 303 nxt_lvlhsh_t requests; 304 305 /* of nxt_unit_request_info_impl_t */ 306 nxt_queue_t ready_req; 307 308 /* of nxt_unit_read_buf_t */ 309 nxt_queue_t pending_rbuf; 310 311 /* of nxt_unit_read_buf_t */ 312 nxt_queue_t free_rbuf; 313 314 int online; 315 int ready; 316 317 nxt_unit_mmap_buf_t ctx_buf[2]; 318 nxt_unit_read_buf_t ctx_read_buf; 319 320 nxt_unit_request_info_impl_t req; 321 }; 322 323 324 struct nxt_unit_mmap_s { 325 nxt_port_mmap_header_t *hdr; 326 pthread_t src_thread; 327 328 /* of nxt_unit_read_buf_t */ 329 nxt_queue_t awaiting_rbuf; 330 }; 331 332 333 struct nxt_unit_mmaps_s { 334 pthread_mutex_t mutex; 335 uint32_t size; 336 uint32_t cap; 337 nxt_atomic_t allocated_chunks; 338 nxt_unit_mmap_t *elts; 339 }; 340 341 342 struct nxt_unit_impl_s { 343 nxt_unit_t unit; 344 nxt_unit_callbacks_t callbacks; 345 346 nxt_atomic_t use_count; 347 348 uint32_t request_data_size; 349 uint32_t shm_mmap_limit; 350 351 pthread_mutex_t mutex; 352 353 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 354 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 355 356 nxt_unit_port_t *router_port; 357 nxt_unit_port_t *shared_port; 358 359 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 360 361 nxt_unit_mmaps_t incoming; 362 nxt_unit_mmaps_t outgoing; 363 364 pid_t pid; 365 int log_fd; 366 367 nxt_unit_ctx_impl_t main_ctx; 368 }; 369 370 371 struct nxt_unit_port_impl_s { 372 nxt_unit_port_t port; 373 374 nxt_atomic_t use_count; 375 376 /* for nxt_unit_process_t.ports */ 377 nxt_queue_link_t link; 378 nxt_unit_process_t *process; 379 380 /* of nxt_unit_request_info_impl_t */ 381 nxt_queue_t awaiting_req; 382 383 int ready; 384 385 void *queue; 386 387 int from_socket; 388 nxt_unit_read_buf_t *socket_rbuf; 389 }; 390 391 392 struct nxt_unit_process_s { 393 pid_t pid; 394 395 nxt_queue_t ports; /* of nxt_unit_port_impl_t */ 396 397 nxt_unit_impl_t *lib; 398 399 nxt_atomic_t use_count; 400 401 uint32_t next_port_id; 402 }; 403 404 405 /* Explicitly using 32 bit types to avoid possible alignment. */ 406 typedef struct { 407 int32_t pid; 408 uint32_t id; 409 } nxt_unit_port_hash_id_t; 410 411 412 nxt_unit_ctx_t * 413 nxt_unit_init(nxt_unit_init_t *init) 414 { 415 int rc, queue_fd; 416 void *mem; 417 uint32_t ready_stream, shm_limit; 418 nxt_unit_ctx_t *ctx; 419 nxt_unit_impl_t *lib; 420 nxt_unit_port_t ready_port, router_port, read_port; 421 422 lib = nxt_unit_create(init); 423 if (nxt_slow_path(lib == NULL)) { 424 return NULL; 425 } 426 427 queue_fd = -1; 428 mem = MAP_FAILED; 429 430 if (init->ready_port.id.pid != 0 431 && init->ready_stream != 0 432 && init->read_port.id.pid != 0) 433 { 434 ready_port = init->ready_port; 435 ready_stream = init->ready_stream; 436 router_port = init->router_port; 437 read_port = init->read_port; 438 lib->log_fd = init->log_fd; 439 440 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 441 ready_port.id.id); 442 nxt_unit_port_id_init(&router_port.id, router_port.id.pid, 443 router_port.id.id); 444 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 445 read_port.id.id); 446 447 } else { 448 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, 449 &lib->log_fd, &ready_stream, &shm_limit); 450 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 451 goto fail; 452 } 453 454 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) 455 / PORT_MMAP_DATA_SIZE; 456 } 457 458 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { 459 lib->shm_mmap_limit = 1; 460 } 461 462 lib->pid = read_port.id.pid; 463 464 ctx = &lib->main_ctx.ctx; 465 466 rc = nxt_unit_fd_blocking(router_port.out_fd); 467 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 468 goto fail; 469 } 470 471 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); 472 if (nxt_slow_path(lib->router_port == NULL)) { 473 nxt_unit_alert(NULL, "failed to add router_port"); 474 475 goto fail; 476 } 477 478 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); 479 if (nxt_slow_path(queue_fd == -1)) { 480 goto fail; 481 } 482 483 mem = mmap(NULL, sizeof(nxt_port_queue_t), 484 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 485 if (nxt_slow_path(mem == MAP_FAILED)) { 486 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 487 strerror(errno), errno); 488 489 goto fail; 490 } 491 492 nxt_port_queue_init(mem); 493 494 rc = nxt_unit_fd_blocking(read_port.in_fd); 495 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 496 goto fail; 497 } 498 499 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); 500 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { 501 nxt_unit_alert(NULL, "failed to add read_port"); 502 503 goto fail; 504 } 505 506 rc = nxt_unit_fd_blocking(ready_port.out_fd); 507 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 508 goto fail; 509 } 510 511 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); 512 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 513 nxt_unit_alert(NULL, "failed to send READY message"); 514 515 goto fail; 516 } 517 518 nxt_unit_close(ready_port.out_fd); 519 nxt_unit_close(queue_fd); 520 521 return ctx; 522 523 fail: 524 525 if (mem != MAP_FAILED) { 526 munmap(mem, sizeof(nxt_port_queue_t)); 527 } 528 529 if (queue_fd != -1) { 530 nxt_unit_close(queue_fd); 531 } 532 533 nxt_unit_ctx_release(&lib->main_ctx.ctx); 534 535 return NULL; 536 } 537 538 539 static nxt_unit_impl_t * 540 nxt_unit_create(nxt_unit_init_t *init) 541 { 542 int rc; 543 nxt_unit_impl_t *lib; 544 nxt_unit_callbacks_t *cb; 545 546 lib = nxt_unit_malloc(NULL, 547 sizeof(nxt_unit_impl_t) + init->request_data_size); 548 if (nxt_slow_path(lib == NULL)) { 549 nxt_unit_alert(NULL, "failed to allocate unit struct"); 550 551 return NULL; 552 } 553 554 rc = pthread_mutex_init(&lib->mutex, NULL); 555 if (nxt_slow_path(rc != 0)) { 556 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 557 558 goto fail; 559 } 560 561 lib->unit.data = init->data; 562 lib->callbacks = init->callbacks; 563 564 lib->request_data_size = init->request_data_size; 565 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) 566 / PORT_MMAP_DATA_SIZE; 567 568 lib->processes.slot = NULL; 569 lib->ports.slot = NULL; 570 571 lib->log_fd = STDERR_FILENO; 572 573 nxt_queue_init(&lib->contexts); 574 575 lib->use_count = 0; 576 lib->router_port = NULL; 577 lib->shared_port = NULL; 578 579 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 580 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 581 pthread_mutex_destroy(&lib->mutex); 582 goto fail; 583 } 584 585 cb = &lib->callbacks; 586 587 if (cb->request_handler == NULL) { 588 nxt_unit_alert(NULL, "request_handler is NULL"); 589 590 pthread_mutex_destroy(&lib->mutex); 591 goto fail; 592 } 593 594 nxt_unit_mmaps_init(&lib->incoming); 595 nxt_unit_mmaps_init(&lib->outgoing); 596 597 return lib; 598 599 fail: 600 601 nxt_unit_free(NULL, lib); 602 603 return NULL; 604 } 605 606 607 static int 608 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 609 void *data) 610 { 611 int rc; 612 613 ctx_impl->ctx.data = data; 614 ctx_impl->ctx.unit = &lib->unit; 615 616 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 617 if (nxt_slow_path(rc != 0)) { 618 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 619 620 return NXT_UNIT_ERROR; 621 } 622 623 nxt_unit_lib_use(lib); 624 625 pthread_mutex_lock(&lib->mutex); 626 627 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 628 629 pthread_mutex_unlock(&lib->mutex); 630 631 ctx_impl->use_count = 1; 632 ctx_impl->wait_items = 0; 633 ctx_impl->online = 1; 634 ctx_impl->ready = 0; 635 636 nxt_queue_init(&ctx_impl->free_req); 637 nxt_queue_init(&ctx_impl->free_ws); 638 nxt_queue_init(&ctx_impl->active_req); 639 nxt_queue_init(&ctx_impl->ready_req); 640 nxt_queue_init(&ctx_impl->pending_rbuf); 641 nxt_queue_init(&ctx_impl->free_rbuf); 642 643 ctx_impl->free_buf = NULL; 644 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 645 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 646 647 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 648 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); 649 650 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; 651 652 ctx_impl->req.req.ctx = &ctx_impl->ctx; 653 ctx_impl->req.req.unit = &lib->unit; 654 655 ctx_impl->read_port = NULL; 656 ctx_impl->requests.slot = 0; 657 658 return NXT_UNIT_OK; 659 } 660 661 662 nxt_inline void 663 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) 664 { 665 nxt_unit_ctx_impl_t *ctx_impl; 666 667 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 668 669 nxt_atomic_fetch_add(&ctx_impl->use_count, 1); 670 } 671 672 673 nxt_inline void 674 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) 675 { 676 long c; 677 nxt_unit_ctx_impl_t *ctx_impl; 678 679 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 680 681 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); 682 683 if (c == 1) { 684 nxt_unit_ctx_free(ctx_impl); 685 } 686 } 687 688 689 nxt_inline void 690 nxt_unit_lib_use(nxt_unit_impl_t *lib) 691 { 692 nxt_atomic_fetch_add(&lib->use_count, 1); 693 } 694 695 696 nxt_inline void 697 nxt_unit_lib_release(nxt_unit_impl_t *lib) 698 { 699 long c; 700 nxt_unit_process_t *process; 701 702 c = nxt_atomic_fetch_add(&lib->use_count, -1); 703 704 if (c == 1) { 705 for ( ;; ) { 706 pthread_mutex_lock(&lib->mutex); 707 708 process = nxt_unit_process_pop_first(lib); 709 if (process == NULL) { 710 pthread_mutex_unlock(&lib->mutex); 711 712 break; 713 } 714 715 nxt_unit_remove_process(lib, process); 716 } 717 718 pthread_mutex_destroy(&lib->mutex); 719 720 if (nxt_fast_path(lib->router_port != NULL)) { 721 nxt_unit_port_release(lib->router_port); 722 } 723 724 if (nxt_fast_path(lib->shared_port != NULL)) { 725 nxt_unit_port_release(lib->shared_port); 726 } 727 728 nxt_unit_mmaps_destroy(&lib->incoming); 729 nxt_unit_mmaps_destroy(&lib->outgoing); 730 731 nxt_unit_free(NULL, lib); 732 } 733 } 734 735 736 nxt_inline void 737 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 738 nxt_unit_mmap_buf_t *mmap_buf) 739 { 740 mmap_buf->next = *head; 741 742 if (mmap_buf->next != NULL) { 743 mmap_buf->next->prev = &mmap_buf->next; 744 } 745 746 *head = mmap_buf; 747 mmap_buf->prev = head; 748 } 749 750 751 nxt_inline void 752 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 753 nxt_unit_mmap_buf_t *mmap_buf) 754 { 755 while (*prev != NULL) { 756 prev = &(*prev)->next; 757 } 758 759 nxt_unit_mmap_buf_insert(prev, mmap_buf); 760 } 761 762 763 nxt_inline void 764 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) 765 { 766 nxt_unit_mmap_buf_t **prev; 767 768 prev = mmap_buf->prev; 769 770 if (mmap_buf->next != NULL) { 771 mmap_buf->next->prev = prev; 772 } 773 774 if (prev != NULL) { 775 *prev = mmap_buf->next; 776 } 777 } 778 779 780 static int 781 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, 782 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream, 783 uint32_t *shm_limit) 784 { 785 int rc; 786 int ready_fd, router_fd, read_in_fd, read_out_fd; 787 char *unit_init, *version_end; 788 long version_length; 789 int64_t ready_pid, router_pid, read_pid; 790 uint32_t ready_stream, router_id, ready_id, read_id; 791 792 unit_init = getenv(NXT_UNIT_INIT_ENV); 793 if (nxt_slow_path(unit_init == NULL)) { 794 nxt_unit_alert(NULL, "%s is not in the current environment", 795 NXT_UNIT_INIT_ENV); 796 797 return NXT_UNIT_ERROR; 798 } 799 800 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 801 802 version_length = nxt_length(NXT_VERSION); 803 804 version_end = strchr(unit_init, ';'); 805 if (version_end == NULL 806 || version_end - unit_init != version_length 807 || memcmp(unit_init, NXT_VERSION, version_length) != 0) 808 { 809 nxt_unit_alert(NULL, "version check error"); 810 811 return NXT_UNIT_ERROR; 812 } 813 814 rc = sscanf(version_end + 1, 815 "%"PRIu32";" 816 "%"PRId64",%"PRIu32",%d;" 817 "%"PRId64",%"PRIu32",%d;" 818 "%"PRId64",%"PRIu32",%d,%d;" 819 "%d,%"PRIu32, 820 &ready_stream, 821 &ready_pid, &ready_id, &ready_fd, 822 &router_pid, &router_id, &router_fd, 823 &read_pid, &read_id, &read_in_fd, &read_out_fd, 824 log_fd, shm_limit); 825 826 if (nxt_slow_path(rc != 13)) { 827 nxt_unit_alert(NULL, "failed to scan variables: %d", rc); 828 829 return NXT_UNIT_ERROR; 830 } 831 832 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 833 834 ready_port->in_fd = -1; 835 ready_port->out_fd = ready_fd; 836 ready_port->data = NULL; 837 838 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); 839 840 router_port->in_fd = -1; 841 router_port->out_fd = router_fd; 842 router_port->data = NULL; 843 844 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 845 846 read_port->in_fd = read_in_fd; 847 read_port->out_fd = read_out_fd; 848 read_port->data = NULL; 849 850 *stream = ready_stream; 851 852 return NXT_UNIT_OK; 853 } 854 855 856 static int 857 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) 858 { 859 ssize_t res; 860 nxt_port_msg_t msg; 861 nxt_unit_impl_t *lib; 862 863 union { 864 struct cmsghdr cm; 865 char space[CMSG_SPACE(sizeof(int))]; 866 } cmsg; 867 868 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 869 870 msg.stream = stream; 871 msg.pid = lib->pid; 872 msg.reply_port = 0; 873 msg.type = _NXT_PORT_MSG_PROCESS_READY; 874 msg.last = 1; 875 msg.mmap = 0; 876 msg.nf = 0; 877 msg.mf = 0; 878 msg.tracking = 0; 879 880 memset(&cmsg, 0, sizeof(cmsg)); 881 882 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 883 cmsg.cm.cmsg_level = SOL_SOCKET; 884 cmsg.cm.cmsg_type = SCM_RIGHTS; 885 886 /* 887 * memcpy() is used instead of simple 888 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 889 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 890 * dereferencing type-punned pointer will break strict-aliasing rules 891 * 892 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 893 * in the same simple assignment as in the code above. 894 */ 895 memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int)); 896 897 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), 898 &cmsg, sizeof(cmsg)); 899 if (res != sizeof(msg)) { 900 return NXT_UNIT_ERROR; 901 } 902 903 return NXT_UNIT_OK; 904 } 905 906 907 static int 908 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, 909 nxt_unit_request_info_t **preq) 910 { 911 int rc; 912 pid_t pid; 913 struct cmsghdr *cm; 914 nxt_port_msg_t *port_msg; 915 nxt_unit_impl_t *lib; 916 nxt_unit_recv_msg_t recv_msg; 917 918 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 919 920 recv_msg.fd[0] = -1; 921 recv_msg.fd[1] = -1; 922 port_msg = (nxt_port_msg_t *) rbuf->buf; 923 cm = (struct cmsghdr *) rbuf->oob; 924 925 if (cm->cmsg_level == SOL_SOCKET 926 && cm->cmsg_type == SCM_RIGHTS) 927 { 928 if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { 929 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int)); 930 } 931 932 if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { 933 memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); 934 } 935 } 936 937 recv_msg.incoming_buf = NULL; 938 939 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 940 if (nxt_slow_path(rbuf->size == 0)) { 941 nxt_unit_debug(ctx, "read port closed"); 942 943 nxt_unit_quit(ctx); 944 rc = NXT_UNIT_OK; 945 goto done; 946 } 947 948 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); 949 950 rc = NXT_UNIT_ERROR; 951 goto done; 952 } 953 954 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", 955 port_msg->stream, (int) port_msg->type, 956 recv_msg.fd[0], recv_msg.fd[1]); 957 958 recv_msg.stream = port_msg->stream; 959 recv_msg.pid = port_msg->pid; 960 recv_msg.reply_port = port_msg->reply_port; 961 recv_msg.last = port_msg->last; 962 recv_msg.mmap = port_msg->mmap; 963 964 recv_msg.start = port_msg + 1; 965 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); 966 967 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 968 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)", 969 port_msg->stream, (int) port_msg->type); 970 rc = NXT_UNIT_ERROR; 971 goto done; 972 } 973 974 /* Fragmentation is unsupported. */ 975 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 976 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)", 977 port_msg->stream, (int) port_msg->type); 978 rc = NXT_UNIT_ERROR; 979 goto done; 980 } 981 982 if (port_msg->mmap) { 983 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); 984 985 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 986 if (rc == NXT_UNIT_AGAIN) { 987 recv_msg.fd[0] = -1; 988 recv_msg.fd[1] = -1; 989 } 990 991 goto done; 992 } 993 } 994 995 switch (port_msg->type) { 996 997 case _NXT_PORT_MSG_RPC_READY: 998 rc = NXT_UNIT_OK; 999 break; 1000 1001 case _NXT_PORT_MSG_QUIT: 1002 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream); 1003 1004 nxt_unit_quit(ctx); 1005 rc = NXT_UNIT_OK; 1006 break; 1007 1008 case _NXT_PORT_MSG_NEW_PORT: 1009 rc = nxt_unit_process_new_port(ctx, &recv_msg); 1010 break; 1011 1012 case _NXT_PORT_MSG_PORT_ACK: 1013 rc = nxt_unit_ctx_ready(ctx); 1014 break; 1015 1016 case _NXT_PORT_MSG_CHANGE_FILE: 1017 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 1018 port_msg->stream, recv_msg.fd[0]); 1019 1020 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { 1021 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", 1022 port_msg->stream, recv_msg.fd[0], lib->log_fd, 1023 strerror(errno), errno); 1024 1025 rc = NXT_UNIT_ERROR; 1026 goto done; 1027 } 1028 1029 rc = NXT_UNIT_OK; 1030 break; 1031 1032 case _NXT_PORT_MSG_MMAP: 1033 if (nxt_slow_path(recv_msg.fd[0] < 0)) { 1034 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 1035 port_msg->stream, recv_msg.fd[0]); 1036 1037 rc = NXT_UNIT_ERROR; 1038 goto done; 1039 } 1040 1041 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); 1042 break; 1043 1044 case _NXT_PORT_MSG_REQ_HEADERS: 1045 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq); 1046 break; 1047 1048 case _NXT_PORT_MSG_REQ_BODY: 1049 rc = nxt_unit_process_req_body(ctx, &recv_msg); 1050 break; 1051 1052 case _NXT_PORT_MSG_WEBSOCKET: 1053 rc = nxt_unit_process_websocket(ctx, &recv_msg); 1054 break; 1055 1056 case _NXT_PORT_MSG_REMOVE_PID: 1057 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 1058 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " 1059 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 1060 (int) sizeof(pid)); 1061 1062 rc = NXT_UNIT_ERROR; 1063 goto done; 1064 } 1065 1066 memcpy(&pid, recv_msg.start, sizeof(pid)); 1067 1068 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 1069 port_msg->stream, (int) pid); 1070 1071 nxt_unit_remove_pid(lib, pid); 1072 1073 rc = NXT_UNIT_OK; 1074 break; 1075 1076 case _NXT_PORT_MSG_SHM_ACK: 1077 rc = nxt_unit_process_shm_ack(ctx); 1078 break; 1079 1080 default: 1081 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", 1082 port_msg->stream, (int) port_msg->type); 1083 1084 rc = NXT_UNIT_ERROR; 1085 goto done; 1086 } 1087 1088 done: 1089 1090 if (recv_msg.fd[0] != -1) { 1091 nxt_unit_close(recv_msg.fd[0]); 1092 } 1093 1094 if (recv_msg.fd[1] != -1) { 1095 nxt_unit_close(recv_msg.fd[1]); 1096 } 1097 1098 while (recv_msg.incoming_buf != NULL) { 1099 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 1100 } 1101 1102 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { 1103 #if (NXT_DEBUG) 1104 memset(rbuf->buf, 0xAC, rbuf->size); 1105 #endif 1106 nxt_unit_read_buf_release(ctx, rbuf); 1107 } 1108 1109 return rc; 1110 } 1111 1112 1113 static int 1114 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1115 { 1116 void *mem; 1117 nxt_unit_impl_t *lib; 1118 nxt_unit_port_t new_port, *port; 1119 nxt_port_msg_new_port_t *new_port_msg; 1120 1121 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 1122 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 1123 "invalid message size (%d)", 1124 recv_msg->stream, (int) recv_msg->size); 1125 1126 return NXT_UNIT_ERROR; 1127 } 1128 1129 if (nxt_slow_path(recv_msg->fd[0] < 0)) { 1130 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 1131 recv_msg->stream, recv_msg->fd[0]); 1132 1133 return NXT_UNIT_ERROR; 1134 } 1135 1136 new_port_msg = recv_msg->start; 1137 1138 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", 1139 recv_msg->stream, (int) new_port_msg->pid, 1140 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); 1141 1142 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1143 1144 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { 1145 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); 1146 1147 new_port.in_fd = recv_msg->fd[0]; 1148 new_port.out_fd = -1; 1149 1150 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, 1151 MAP_SHARED, recv_msg->fd[1], 0); 1152 1153 } else { 1154 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) 1155 != NXT_UNIT_OK)) 1156 { 1157 return NXT_UNIT_ERROR; 1158 } 1159 1160 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, 1161 new_port_msg->id); 1162 1163 new_port.in_fd = -1; 1164 new_port.out_fd = recv_msg->fd[0]; 1165 1166 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, 1167 MAP_SHARED, recv_msg->fd[1], 0); 1168 } 1169 1170 if (nxt_slow_path(mem == MAP_FAILED)) { 1171 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], 1172 strerror(errno), errno); 1173 1174 return NXT_UNIT_ERROR; 1175 } 1176 1177 new_port.data = NULL; 1178 1179 recv_msg->fd[0] = -1; 1180 1181 port = nxt_unit_add_port(ctx, &new_port, mem); 1182 if (nxt_slow_path(port == NULL)) { 1183 return NXT_UNIT_ERROR; 1184 } 1185 1186 if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) { 1187 lib->shared_port = port; 1188 1189 return nxt_unit_ctx_ready(ctx); 1190 } 1191 1192 nxt_unit_port_release(port); 1193 1194 return NXT_UNIT_OK; 1195 } 1196 1197 1198 static int 1199 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) 1200 { 1201 nxt_unit_impl_t *lib; 1202 nxt_unit_ctx_impl_t *ctx_impl; 1203 1204 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1205 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1206 1207 ctx_impl->ready = 1; 1208 1209 if (lib->callbacks.ready_handler) { 1210 return lib->callbacks.ready_handler(ctx); 1211 } 1212 1213 return NXT_UNIT_OK; 1214 } 1215 1216 1217 static int 1218 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 1219 nxt_unit_request_info_t **preq) 1220 { 1221 int res; 1222 nxt_unit_impl_t *lib; 1223 nxt_unit_port_id_t port_id; 1224 nxt_unit_request_t *r; 1225 nxt_unit_mmap_buf_t *b; 1226 nxt_unit_request_info_t *req; 1227 nxt_unit_request_info_impl_t *req_impl; 1228 1229 if (nxt_slow_path(recv_msg->mmap == 0)) { 1230 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 1231 recv_msg->stream); 1232 1233 return NXT_UNIT_ERROR; 1234 } 1235 1236 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 1237 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 1238 "%d expected", recv_msg->stream, (int) recv_msg->size, 1239 (int) sizeof(nxt_unit_request_t)); 1240 1241 return NXT_UNIT_ERROR; 1242 } 1243 1244 req_impl = nxt_unit_request_info_get(ctx); 1245 if (nxt_slow_path(req_impl == NULL)) { 1246 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 1247 recv_msg->stream); 1248 1249 return NXT_UNIT_ERROR; 1250 } 1251 1252 req = &req_impl->req; 1253 1254 req->request = recv_msg->start; 1255 1256 b = recv_msg->incoming_buf; 1257 1258 req->request_buf = &b->buf; 1259 req->response = NULL; 1260 req->response_buf = NULL; 1261 1262 r = req->request; 1263 1264 req->content_length = r->content_length; 1265 1266 req->content_buf = req->request_buf; 1267 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 1268 1269 req_impl->stream = recv_msg->stream; 1270 1271 req_impl->outgoing_buf = NULL; 1272 1273 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1274 b->req = req; 1275 } 1276 1277 /* "Move" incoming buffer list to req_impl. */ 1278 req_impl->incoming_buf = recv_msg->incoming_buf; 1279 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1280 recv_msg->incoming_buf = NULL; 1281 1282 req->content_fd = recv_msg->fd[0]; 1283 recv_msg->fd[0] = -1; 1284 1285 req->response_max_fields = 0; 1286 req_impl->state = NXT_UNIT_RS_START; 1287 req_impl->websocket = 0; 1288 req_impl->in_hash = 0; 1289 1290 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1291 (int) r->method_length, 1292 (char *) nxt_unit_sptr_get(&r->method), 1293 (int) r->target_length, 1294 (char *) nxt_unit_sptr_get(&r->target), 1295 (int) r->content_length); 1296 1297 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); 1298 1299 res = nxt_unit_request_check_response_port(req, &port_id); 1300 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1301 return NXT_UNIT_ERROR; 1302 } 1303 1304 if (nxt_fast_path(res == NXT_UNIT_OK)) { 1305 res = nxt_unit_send_req_headers_ack(req); 1306 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1307 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1308 1309 return NXT_UNIT_ERROR; 1310 } 1311 1312 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1313 1314 if (req->content_length 1315 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 1316 { 1317 res = nxt_unit_request_hash_add(ctx, req); 1318 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1319 nxt_unit_req_warn(req, "failed to add request to hash"); 1320 1321 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1322 1323 return NXT_UNIT_ERROR; 1324 } 1325 1326 /* 1327 * If application have separate data handler, we may start 1328 * request processing and process data when it is arrived. 1329 */ 1330 if (lib->callbacks.data_handler == NULL) { 1331 return NXT_UNIT_OK; 1332 } 1333 } 1334 1335 if (preq == NULL) { 1336 lib->callbacks.request_handler(req); 1337 1338 } else { 1339 *preq = req; 1340 } 1341 } 1342 1343 return NXT_UNIT_OK; 1344 } 1345 1346 1347 static int 1348 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1349 { 1350 uint64_t l; 1351 nxt_unit_impl_t *lib; 1352 nxt_unit_mmap_buf_t *b; 1353 nxt_unit_request_info_t *req; 1354 1355 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1356 if (req == NULL) { 1357 return NXT_UNIT_OK; 1358 } 1359 1360 l = req->content_buf->end - req->content_buf->free; 1361 1362 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1363 b->req = req; 1364 l += b->buf.end - b->buf.free; 1365 } 1366 1367 if (recv_msg->incoming_buf != NULL) { 1368 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); 1369 1370 while (b->next != NULL) { 1371 b = b->next; 1372 } 1373 1374 /* "Move" incoming buffer list to req_impl. */ 1375 b->next = recv_msg->incoming_buf; 1376 b->next->prev = &b->next; 1377 1378 recv_msg->incoming_buf = NULL; 1379 } 1380 1381 req->content_fd = recv_msg->fd[0]; 1382 recv_msg->fd[0] = -1; 1383 1384 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1385 1386 if (lib->callbacks.data_handler != NULL) { 1387 lib->callbacks.data_handler(req); 1388 1389 return NXT_UNIT_OK; 1390 } 1391 1392 if (req->content_fd != -1 || l == req->content_length) { 1393 lib->callbacks.request_handler(req); 1394 } 1395 1396 return NXT_UNIT_OK; 1397 } 1398 1399 1400 static int 1401 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 1402 nxt_unit_port_id_t *port_id) 1403 { 1404 int res; 1405 nxt_unit_ctx_t *ctx; 1406 nxt_unit_impl_t *lib; 1407 nxt_unit_port_t *port; 1408 nxt_unit_process_t *process; 1409 nxt_unit_ctx_impl_t *ctx_impl; 1410 nxt_unit_port_impl_t *port_impl; 1411 nxt_unit_request_info_impl_t *req_impl; 1412 1413 ctx = req->ctx; 1414 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1415 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1416 1417 pthread_mutex_lock(&lib->mutex); 1418 1419 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 1420 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 1421 1422 if (nxt_fast_path(port != NULL)) { 1423 req->response_port = port; 1424 1425 if (nxt_fast_path(port_impl->ready)) { 1426 pthread_mutex_unlock(&lib->mutex); 1427 1428 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", 1429 (int) port->id.pid, (int) port->id.id); 1430 1431 return NXT_UNIT_OK; 1432 } 1433 1434 nxt_unit_debug(ctx, "check_response_port: " 1435 "port{%d,%d} already requested", 1436 (int) port->id.pid, (int) port->id.id); 1437 1438 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1439 1440 nxt_queue_insert_tail(&port_impl->awaiting_req, 1441 &req_impl->port_wait_link); 1442 1443 pthread_mutex_unlock(&lib->mutex); 1444 1445 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1446 1447 return NXT_UNIT_AGAIN; 1448 } 1449 1450 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 1451 if (nxt_slow_path(port_impl == NULL)) { 1452 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", 1453 (int) sizeof(nxt_unit_port_impl_t)); 1454 1455 pthread_mutex_unlock(&lib->mutex); 1456 1457 return NXT_UNIT_ERROR; 1458 } 1459 1460 port = &port_impl->port; 1461 1462 port->id = *port_id; 1463 port->in_fd = -1; 1464 port->out_fd = -1; 1465 port->data = NULL; 1466 1467 res = nxt_unit_port_hash_add(&lib->ports, port); 1468 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1469 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", 1470 port->id.pid, port->id.id); 1471 1472 pthread_mutex_unlock(&lib->mutex); 1473 1474 nxt_unit_free(ctx, port); 1475 1476 return NXT_UNIT_ERROR; 1477 } 1478 1479 process = nxt_unit_process_find(lib, port_id->pid, 0); 1480 if (nxt_slow_path(process == NULL)) { 1481 nxt_unit_alert(ctx, "check_response_port: process %d not found", 1482 port->id.pid); 1483 1484 nxt_unit_port_hash_find(&lib->ports, port_id, 1); 1485 1486 pthread_mutex_unlock(&lib->mutex); 1487 1488 nxt_unit_free(ctx, port); 1489 1490 return NXT_UNIT_ERROR; 1491 } 1492 1493 nxt_queue_insert_tail(&process->ports, &port_impl->link); 1494 1495 port_impl->process = process; 1496 port_impl->queue = NULL; 1497 port_impl->from_socket = 0; 1498 port_impl->socket_rbuf = NULL; 1499 1500 nxt_queue_init(&port_impl->awaiting_req); 1501 1502 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1503 1504 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); 1505 1506 port_impl->use_count = 2; 1507 port_impl->ready = 0; 1508 1509 req->response_port = port; 1510 1511 pthread_mutex_unlock(&lib->mutex); 1512 1513 res = nxt_unit_get_port(ctx, port_id); 1514 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1515 return NXT_UNIT_ERROR; 1516 } 1517 1518 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1519 1520 return NXT_UNIT_AGAIN; 1521 } 1522 1523 1524 static int 1525 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) 1526 { 1527 ssize_t res; 1528 nxt_port_msg_t msg; 1529 nxt_unit_impl_t *lib; 1530 nxt_unit_ctx_impl_t *ctx_impl; 1531 nxt_unit_request_info_impl_t *req_impl; 1532 1533 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 1534 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1535 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1536 1537 memset(&msg, 0, sizeof(nxt_port_msg_t)); 1538 1539 msg.stream = req_impl->stream; 1540 msg.pid = lib->pid; 1541 msg.reply_port = ctx_impl->read_port->id.id; 1542 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; 1543 1544 res = nxt_unit_port_send(req->ctx, req->response_port, 1545 &msg, sizeof(msg), NULL, 0); 1546 if (nxt_slow_path(res != sizeof(msg))) { 1547 return NXT_UNIT_ERROR; 1548 } 1549 1550 return NXT_UNIT_OK; 1551 } 1552 1553 1554 static int 1555 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1556 { 1557 size_t hsize; 1558 nxt_unit_impl_t *lib; 1559 nxt_unit_mmap_buf_t *b; 1560 nxt_unit_callbacks_t *cb; 1561 nxt_unit_request_info_t *req; 1562 nxt_unit_request_info_impl_t *req_impl; 1563 nxt_unit_websocket_frame_impl_t *ws_impl; 1564 1565 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1566 if (nxt_slow_path(req == NULL)) { 1567 return NXT_UNIT_OK; 1568 } 1569 1570 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1571 1572 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1573 cb = &lib->callbacks; 1574 1575 if (cb->websocket_handler && recv_msg->size >= 2) { 1576 ws_impl = nxt_unit_websocket_frame_get(ctx); 1577 if (nxt_slow_path(ws_impl == NULL)) { 1578 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 1579 req_impl->stream); 1580 1581 return NXT_UNIT_ERROR; 1582 } 1583 1584 ws_impl->ws.req = req; 1585 1586 ws_impl->buf = NULL; 1587 1588 if (recv_msg->mmap) { 1589 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1590 b->req = req; 1591 } 1592 1593 /* "Move" incoming buffer list to ws_impl. */ 1594 ws_impl->buf = recv_msg->incoming_buf; 1595 ws_impl->buf->prev = &ws_impl->buf; 1596 recv_msg->incoming_buf = NULL; 1597 1598 b = ws_impl->buf; 1599 1600 } else { 1601 b = nxt_unit_mmap_buf_get(ctx); 1602 if (nxt_slow_path(b == NULL)) { 1603 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 1604 req_impl->stream); 1605 1606 nxt_unit_websocket_frame_release(&ws_impl->ws); 1607 1608 return NXT_UNIT_ERROR; 1609 } 1610 1611 b->req = req; 1612 b->buf.start = recv_msg->start; 1613 b->buf.free = b->buf.start; 1614 b->buf.end = b->buf.start + recv_msg->size; 1615 1616 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 1617 } 1618 1619 ws_impl->ws.header = (void *) b->buf.start; 1620 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1621 ws_impl->ws.header); 1622 1623 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1624 1625 if (ws_impl->ws.header->mask) { 1626 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1627 1628 } else { 1629 ws_impl->ws.mask = NULL; 1630 } 1631 1632 b->buf.free += hsize; 1633 1634 ws_impl->ws.content_buf = &b->buf; 1635 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1636 1637 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1638 "payload_len=%"PRIu64, 1639 ws_impl->ws.header->opcode, 1640 ws_impl->ws.payload_len); 1641 1642 cb->websocket_handler(&ws_impl->ws); 1643 } 1644 1645 if (recv_msg->last) { 1646 req_impl->websocket = 0; 1647 1648 if (cb->close_handler) { 1649 nxt_unit_req_debug(req, "close_handler"); 1650 1651 cb->close_handler(req); 1652 1653 } else { 1654 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1655 } 1656 } 1657 1658 return NXT_UNIT_OK; 1659 } 1660 1661 1662 static int 1663 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) 1664 { 1665 nxt_unit_impl_t *lib; 1666 nxt_unit_callbacks_t *cb; 1667 1668 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1669 cb = &lib->callbacks; 1670 1671 if (cb->shm_ack_handler != NULL) { 1672 cb->shm_ack_handler(ctx); 1673 } 1674 1675 return NXT_UNIT_OK; 1676 } 1677 1678 1679 static nxt_unit_request_info_impl_t * 1680 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1681 { 1682 nxt_unit_impl_t *lib; 1683 nxt_queue_link_t *lnk; 1684 nxt_unit_ctx_impl_t *ctx_impl; 1685 nxt_unit_request_info_impl_t *req_impl; 1686 1687 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1688 1689 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1690 1691 pthread_mutex_lock(&ctx_impl->mutex); 1692 1693 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1694 pthread_mutex_unlock(&ctx_impl->mutex); 1695 1696 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t) 1697 + lib->request_data_size); 1698 if (nxt_slow_path(req_impl == NULL)) { 1699 return NULL; 1700 } 1701 1702 req_impl->req.unit = ctx->unit; 1703 req_impl->req.ctx = ctx; 1704 1705 pthread_mutex_lock(&ctx_impl->mutex); 1706 1707 } else { 1708 lnk = nxt_queue_first(&ctx_impl->free_req); 1709 nxt_queue_remove(lnk); 1710 1711 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1712 } 1713 1714 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1715 1716 pthread_mutex_unlock(&ctx_impl->mutex); 1717 1718 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1719 1720 return req_impl; 1721 } 1722 1723 1724 static void 1725 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1726 { 1727 nxt_unit_ctx_impl_t *ctx_impl; 1728 nxt_unit_request_info_impl_t *req_impl; 1729 1730 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1731 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1732 1733 req->response = NULL; 1734 req->response_buf = NULL; 1735 1736 if (req_impl->in_hash) { 1737 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); 1738 } 1739 1740 req_impl->websocket = 0; 1741 1742 while (req_impl->outgoing_buf != NULL) { 1743 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1744 } 1745 1746 while (req_impl->incoming_buf != NULL) { 1747 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1748 } 1749 1750 if (req->content_fd != -1) { 1751 nxt_unit_close(req->content_fd); 1752 1753 req->content_fd = -1; 1754 } 1755 1756 if (req->response_port != NULL) { 1757 nxt_unit_port_release(req->response_port); 1758 1759 req->response_port = NULL; 1760 } 1761 1762 req_impl->state = NXT_UNIT_RS_RELEASED; 1763 1764 pthread_mutex_lock(&ctx_impl->mutex); 1765 1766 nxt_queue_remove(&req_impl->link); 1767 1768 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1769 1770 pthread_mutex_unlock(&ctx_impl->mutex); 1771 } 1772 1773 1774 static void 1775 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1776 { 1777 nxt_unit_ctx_impl_t *ctx_impl; 1778 1779 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1780 1781 nxt_queue_remove(&req_impl->link); 1782 1783 if (req_impl != &ctx_impl->req) { 1784 nxt_unit_free(&ctx_impl->ctx, req_impl); 1785 } 1786 } 1787 1788 1789 static nxt_unit_websocket_frame_impl_t * 1790 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1791 { 1792 nxt_queue_link_t *lnk; 1793 nxt_unit_ctx_impl_t *ctx_impl; 1794 nxt_unit_websocket_frame_impl_t *ws_impl; 1795 1796 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1797 1798 pthread_mutex_lock(&ctx_impl->mutex); 1799 1800 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1801 pthread_mutex_unlock(&ctx_impl->mutex); 1802 1803 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t)); 1804 if (nxt_slow_path(ws_impl == NULL)) { 1805 return NULL; 1806 } 1807 1808 } else { 1809 lnk = nxt_queue_first(&ctx_impl->free_ws); 1810 nxt_queue_remove(lnk); 1811 1812 pthread_mutex_unlock(&ctx_impl->mutex); 1813 1814 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1815 } 1816 1817 ws_impl->ctx_impl = ctx_impl; 1818 1819 return ws_impl; 1820 } 1821 1822 1823 static void 1824 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1825 { 1826 nxt_unit_websocket_frame_impl_t *ws_impl; 1827 1828 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1829 1830 while (ws_impl->buf != NULL) { 1831 nxt_unit_mmap_buf_free(ws_impl->buf); 1832 } 1833 1834 ws->req = NULL; 1835 1836 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1837 1838 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1839 1840 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1841 } 1842 1843 1844 static void 1845 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 1846 nxt_unit_websocket_frame_impl_t *ws_impl) 1847 { 1848 nxt_queue_remove(&ws_impl->link); 1849 1850 nxt_unit_free(ctx, ws_impl); 1851 } 1852 1853 1854 uint16_t 1855 nxt_unit_field_hash(const char *name, size_t name_length) 1856 { 1857 u_char ch; 1858 uint32_t hash; 1859 const char *p, *end; 1860 1861 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1862 end = name + name_length; 1863 1864 for (p = name; p < end; p++) { 1865 ch = *p; 1866 hash = (hash << 4) + hash + nxt_lowcase(ch); 1867 } 1868 1869 hash = (hash >> 16) ^ hash; 1870 1871 return hash; 1872 } 1873 1874 1875 void 1876 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1877 { 1878 char *name; 1879 uint32_t i, j; 1880 nxt_unit_field_t *fields, f; 1881 nxt_unit_request_t *r; 1882 1883 static nxt_str_t content_length = nxt_string("content-length"); 1884 static nxt_str_t content_type = nxt_string("content-type"); 1885 static nxt_str_t cookie = nxt_string("cookie"); 1886 1887 nxt_unit_req_debug(req, "group_dup_fields"); 1888 1889 r = req->request; 1890 fields = r->fields; 1891 1892 for (i = 0; i < r->fields_count; i++) { 1893 name = nxt_unit_sptr_get(&fields[i].name); 1894 1895 switch (fields[i].hash) { 1896 case NXT_UNIT_HASH_CONTENT_LENGTH: 1897 if (fields[i].name_length == content_length.length 1898 && nxt_unit_memcasecmp(name, content_length.start, 1899 content_length.length) == 0) 1900 { 1901 r->content_length_field = i; 1902 } 1903 1904 break; 1905 1906 case NXT_UNIT_HASH_CONTENT_TYPE: 1907 if (fields[i].name_length == content_type.length 1908 && nxt_unit_memcasecmp(name, content_type.start, 1909 content_type.length) == 0) 1910 { 1911 r->content_type_field = i; 1912 } 1913 1914 break; 1915 1916 case NXT_UNIT_HASH_COOKIE: 1917 if (fields[i].name_length == cookie.length 1918 && nxt_unit_memcasecmp(name, cookie.start, 1919 cookie.length) == 0) 1920 { 1921 r->cookie_field = i; 1922 } 1923 1924 break; 1925 } 1926 1927 for (j = i + 1; j < r->fields_count; j++) { 1928 if (fields[i].hash != fields[j].hash 1929 || fields[i].name_length != fields[j].name_length 1930 || nxt_unit_memcasecmp(name, 1931 nxt_unit_sptr_get(&fields[j].name), 1932 fields[j].name_length) != 0) 1933 { 1934 continue; 1935 } 1936 1937 f = fields[j]; 1938 f.value.offset += (j - (i + 1)) * sizeof(f); 1939 1940 while (j > i + 1) { 1941 fields[j] = fields[j - 1]; 1942 fields[j].name.offset -= sizeof(f); 1943 fields[j].value.offset -= sizeof(f); 1944 j--; 1945 } 1946 1947 fields[j] = f; 1948 1949 /* Assign the same name pointer for further grouping simplicity. */ 1950 nxt_unit_sptr_set(&fields[j].name, name); 1951 1952 i++; 1953 } 1954 } 1955 } 1956 1957 1958 int 1959 nxt_unit_response_init(nxt_unit_request_info_t *req, 1960 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 1961 { 1962 uint32_t buf_size; 1963 nxt_unit_buf_t *buf; 1964 nxt_unit_request_info_impl_t *req_impl; 1965 1966 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1967 1968 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 1969 nxt_unit_req_warn(req, "init: response already sent"); 1970 1971 return NXT_UNIT_ERROR; 1972 } 1973 1974 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 1975 (int) max_fields_count, (int) max_fields_size); 1976 1977 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 1978 nxt_unit_req_debug(req, "duplicate response init"); 1979 } 1980 1981 /* 1982 * Each field name and value 0-terminated by libunit, 1983 * this is the reason of '+ 2' below. 1984 */ 1985 buf_size = sizeof(nxt_unit_response_t) 1986 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 1987 + max_fields_size; 1988 1989 if (nxt_slow_path(req->response_buf != NULL)) { 1990 buf = req->response_buf; 1991 1992 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 1993 goto init_response; 1994 } 1995 1996 nxt_unit_buf_free(buf); 1997 1998 req->response_buf = NULL; 1999 req->response = NULL; 2000 req->response_max_fields = 0; 2001 2002 req_impl->state = NXT_UNIT_RS_START; 2003 } 2004 2005 buf = nxt_unit_response_buf_alloc(req, buf_size); 2006 if (nxt_slow_path(buf == NULL)) { 2007 return NXT_UNIT_ERROR; 2008 } 2009 2010 init_response: 2011 2012 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 2013 2014 req->response_buf = buf; 2015 2016 req->response = (nxt_unit_response_t *) buf->start; 2017 req->response->status = status; 2018 2019 buf->free = buf->start + sizeof(nxt_unit_response_t) 2020 + max_fields_count * sizeof(nxt_unit_field_t); 2021 2022 req->response_max_fields = max_fields_count; 2023 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 2024 2025 return NXT_UNIT_OK; 2026 } 2027 2028 2029 int 2030 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 2031 uint32_t max_fields_count, uint32_t max_fields_size) 2032 { 2033 char *p; 2034 uint32_t i, buf_size; 2035 nxt_unit_buf_t *buf; 2036 nxt_unit_field_t *f, *src; 2037 nxt_unit_response_t *resp; 2038 nxt_unit_request_info_impl_t *req_impl; 2039 2040 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2041 2042 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2043 nxt_unit_req_warn(req, "realloc: response not init"); 2044 2045 return NXT_UNIT_ERROR; 2046 } 2047 2048 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2049 nxt_unit_req_warn(req, "realloc: response already sent"); 2050 2051 return NXT_UNIT_ERROR; 2052 } 2053 2054 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 2055 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 2056 2057 return NXT_UNIT_ERROR; 2058 } 2059 2060 /* 2061 * Each field name and value 0-terminated by libunit, 2062 * this is the reason of '+ 2' below. 2063 */ 2064 buf_size = sizeof(nxt_unit_response_t) 2065 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 2066 + max_fields_size; 2067 2068 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 2069 2070 buf = nxt_unit_response_buf_alloc(req, buf_size); 2071 if (nxt_slow_path(buf == NULL)) { 2072 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 2073 return NXT_UNIT_ERROR; 2074 } 2075 2076 resp = (nxt_unit_response_t *) buf->start; 2077 2078 memset(resp, 0, sizeof(nxt_unit_response_t)); 2079 2080 resp->status = req->response->status; 2081 resp->content_length = req->response->content_length; 2082 2083 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 2084 f = resp->fields; 2085 2086 for (i = 0; i < req->response->fields_count; i++) { 2087 src = req->response->fields + i; 2088 2089 if (nxt_slow_path(src->skip != 0)) { 2090 continue; 2091 } 2092 2093 if (nxt_slow_path(src->name_length + src->value_length + 2 2094 > (uint32_t) (buf->end - p))) 2095 { 2096 nxt_unit_req_warn(req, "realloc: not enough space for field" 2097 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 2098 i, src, src->name_length, src->value_length); 2099 2100 goto fail; 2101 } 2102 2103 nxt_unit_sptr_set(&f->name, p); 2104 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 2105 *p++ = '\0'; 2106 2107 nxt_unit_sptr_set(&f->value, p); 2108 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 2109 *p++ = '\0'; 2110 2111 f->hash = src->hash; 2112 f->skip = 0; 2113 f->name_length = src->name_length; 2114 f->value_length = src->value_length; 2115 2116 resp->fields_count++; 2117 f++; 2118 } 2119 2120 if (req->response->piggyback_content_length > 0) { 2121 if (nxt_slow_path(req->response->piggyback_content_length 2122 > (uint32_t) (buf->end - p))) 2123 { 2124 nxt_unit_req_warn(req, "realloc: not enought space for content" 2125 " #%"PRIu32", %"PRIu32" required", 2126 i, req->response->piggyback_content_length); 2127 2128 goto fail; 2129 } 2130 2131 resp->piggyback_content_length = 2132 req->response->piggyback_content_length; 2133 2134 nxt_unit_sptr_set(&resp->piggyback_content, p); 2135 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 2136 req->response->piggyback_content_length); 2137 } 2138 2139 buf->free = p; 2140 2141 nxt_unit_buf_free(req->response_buf); 2142 2143 req->response = resp; 2144 req->response_buf = buf; 2145 req->response_max_fields = max_fields_count; 2146 2147 return NXT_UNIT_OK; 2148 2149 fail: 2150 2151 nxt_unit_buf_free(buf); 2152 2153 return NXT_UNIT_ERROR; 2154 } 2155 2156 2157 int 2158 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 2159 { 2160 nxt_unit_request_info_impl_t *req_impl; 2161 2162 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2163 2164 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 2165 } 2166 2167 2168 int 2169 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 2170 const char *name, uint8_t name_length, 2171 const char *value, uint32_t value_length) 2172 { 2173 nxt_unit_buf_t *buf; 2174 nxt_unit_field_t *f; 2175 nxt_unit_response_t *resp; 2176 nxt_unit_request_info_impl_t *req_impl; 2177 2178 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2179 2180 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 2181 nxt_unit_req_warn(req, "add_field: response not initialized or " 2182 "already sent"); 2183 2184 return NXT_UNIT_ERROR; 2185 } 2186 2187 resp = req->response; 2188 2189 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 2190 nxt_unit_req_warn(req, "add_field: too many response fields (%d)", 2191 (int) resp->fields_count); 2192 2193 return NXT_UNIT_ERROR; 2194 } 2195 2196 buf = req->response_buf; 2197 2198 if (nxt_slow_path(name_length + value_length + 2 2199 > (uint32_t) (buf->end - buf->free))) 2200 { 2201 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 2202 2203 return NXT_UNIT_ERROR; 2204 } 2205 2206 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 2207 resp->fields_count, 2208 (int) name_length, name, 2209 (int) value_length, value); 2210 2211 f = resp->fields + resp->fields_count; 2212 2213 nxt_unit_sptr_set(&f->name, buf->free); 2214 buf->free = nxt_cpymem(buf->free, name, name_length); 2215 *buf->free++ = '\0'; 2216 2217 nxt_unit_sptr_set(&f->value, buf->free); 2218 buf->free = nxt_cpymem(buf->free, value, value_length); 2219 *buf->free++ = '\0'; 2220 2221 f->hash = nxt_unit_field_hash(name, name_length); 2222 f->skip = 0; 2223 f->name_length = name_length; 2224 f->value_length = value_length; 2225 2226 resp->fields_count++; 2227 2228 return NXT_UNIT_OK; 2229 } 2230 2231 2232 int 2233 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 2234 const void* src, uint32_t size) 2235 { 2236 nxt_unit_buf_t *buf; 2237 nxt_unit_response_t *resp; 2238 nxt_unit_request_info_impl_t *req_impl; 2239 2240 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2241 2242 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2243 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 2244 2245 return NXT_UNIT_ERROR; 2246 } 2247 2248 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2249 nxt_unit_req_warn(req, "add_content: response already sent"); 2250 2251 return NXT_UNIT_ERROR; 2252 } 2253 2254 buf = req->response_buf; 2255 2256 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 2257 nxt_unit_req_warn(req, "add_content: buffer overflow"); 2258 2259 return NXT_UNIT_ERROR; 2260 } 2261 2262 resp = req->response; 2263 2264 if (resp->piggyback_content_length == 0) { 2265 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 2266 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 2267 } 2268 2269 resp->piggyback_content_length += size; 2270 2271 buf->free = nxt_cpymem(buf->free, src, size); 2272 2273 return NXT_UNIT_OK; 2274 } 2275 2276 2277 int 2278 nxt_unit_response_send(nxt_unit_request_info_t *req) 2279 { 2280 int rc; 2281 nxt_unit_mmap_buf_t *mmap_buf; 2282 nxt_unit_request_info_impl_t *req_impl; 2283 2284 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2285 2286 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2287 nxt_unit_req_warn(req, "send: response is not initialized yet"); 2288 2289 return NXT_UNIT_ERROR; 2290 } 2291 2292 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2293 nxt_unit_req_warn(req, "send: response already sent"); 2294 2295 return NXT_UNIT_ERROR; 2296 } 2297 2298 if (req->request->websocket_handshake && req->response->status == 101) { 2299 nxt_unit_response_upgrade(req); 2300 } 2301 2302 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 2303 req->response->fields_count, 2304 (int) (req->response_buf->free 2305 - req->response_buf->start)); 2306 2307 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 2308 2309 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2310 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 2311 req->response = NULL; 2312 req->response_buf = NULL; 2313 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2314 2315 nxt_unit_mmap_buf_free(mmap_buf); 2316 } 2317 2318 return rc; 2319 } 2320 2321 2322 int 2323 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 2324 { 2325 nxt_unit_request_info_impl_t *req_impl; 2326 2327 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2328 2329 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 2330 } 2331 2332 2333 nxt_unit_buf_t * 2334 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 2335 { 2336 int rc; 2337 nxt_unit_mmap_buf_t *mmap_buf; 2338 nxt_unit_request_info_impl_t *req_impl; 2339 2340 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 2341 nxt_unit_req_warn(req, "response_buf_alloc: " 2342 "requested buffer (%"PRIu32") too big", size); 2343 2344 return NULL; 2345 } 2346 2347 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 2348 2349 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2350 2351 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 2352 if (nxt_slow_path(mmap_buf == NULL)) { 2353 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 2354 2355 return NULL; 2356 } 2357 2358 mmap_buf->req = req; 2359 2360 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 2361 2362 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2363 size, size, mmap_buf, 2364 NULL); 2365 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2366 nxt_unit_mmap_buf_release(mmap_buf); 2367 2368 nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf"); 2369 2370 return NULL; 2371 } 2372 2373 return &mmap_buf->buf; 2374 } 2375 2376 2377 static nxt_unit_mmap_buf_t * 2378 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 2379 { 2380 nxt_unit_mmap_buf_t *mmap_buf; 2381 nxt_unit_ctx_impl_t *ctx_impl; 2382 2383 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2384 2385 pthread_mutex_lock(&ctx_impl->mutex); 2386 2387 if (ctx_impl->free_buf == NULL) { 2388 pthread_mutex_unlock(&ctx_impl->mutex); 2389 2390 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t)); 2391 if (nxt_slow_path(mmap_buf == NULL)) { 2392 return NULL; 2393 } 2394 2395 } else { 2396 mmap_buf = ctx_impl->free_buf; 2397 2398 nxt_unit_mmap_buf_unlink(mmap_buf); 2399 2400 pthread_mutex_unlock(&ctx_impl->mutex); 2401 } 2402 2403 mmap_buf->ctx_impl = ctx_impl; 2404 2405 mmap_buf->hdr = NULL; 2406 mmap_buf->free_ptr = NULL; 2407 2408 return mmap_buf; 2409 } 2410 2411 2412 static void 2413 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 2414 { 2415 nxt_unit_mmap_buf_unlink(mmap_buf); 2416 2417 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 2418 2419 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 2420 2421 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 2422 } 2423 2424 2425 int 2426 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 2427 { 2428 return req->request->websocket_handshake; 2429 } 2430 2431 2432 int 2433 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 2434 { 2435 int rc; 2436 nxt_unit_request_info_impl_t *req_impl; 2437 2438 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2439 2440 if (nxt_slow_path(req_impl->websocket != 0)) { 2441 nxt_unit_req_debug(req, "upgrade: already upgraded"); 2442 2443 return NXT_UNIT_OK; 2444 } 2445 2446 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2447 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 2448 2449 return NXT_UNIT_ERROR; 2450 } 2451 2452 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2453 nxt_unit_req_warn(req, "upgrade: response already sent"); 2454 2455 return NXT_UNIT_ERROR; 2456 } 2457 2458 rc = nxt_unit_request_hash_add(req->ctx, req); 2459 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2460 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 2461 2462 return NXT_UNIT_ERROR; 2463 } 2464 2465 req_impl->websocket = 1; 2466 2467 req->response->status = 101; 2468 2469 return NXT_UNIT_OK; 2470 } 2471 2472 2473 int 2474 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 2475 { 2476 nxt_unit_request_info_impl_t *req_impl; 2477 2478 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2479 2480 return req_impl->websocket; 2481 } 2482 2483 2484 nxt_unit_request_info_t * 2485 nxt_unit_get_request_info_from_data(void *data) 2486 { 2487 nxt_unit_request_info_impl_t *req_impl; 2488 2489 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 2490 2491 return &req_impl->req; 2492 } 2493 2494 2495 int 2496 nxt_unit_buf_send(nxt_unit_buf_t *buf) 2497 { 2498 int rc; 2499 nxt_unit_mmap_buf_t *mmap_buf; 2500 nxt_unit_request_info_t *req; 2501 nxt_unit_request_info_impl_t *req_impl; 2502 2503 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2504 2505 req = mmap_buf->req; 2506 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2507 2508 nxt_unit_req_debug(req, "buf_send: %d bytes", 2509 (int) (buf->free - buf->start)); 2510 2511 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2512 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 2513 2514 return NXT_UNIT_ERROR; 2515 } 2516 2517 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2518 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 2519 2520 return NXT_UNIT_ERROR; 2521 } 2522 2523 if (nxt_fast_path(buf->free > buf->start)) { 2524 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2525 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2526 return rc; 2527 } 2528 } 2529 2530 nxt_unit_mmap_buf_free(mmap_buf); 2531 2532 return NXT_UNIT_OK; 2533 } 2534 2535 2536 static void 2537 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 2538 { 2539 int rc; 2540 nxt_unit_mmap_buf_t *mmap_buf; 2541 nxt_unit_request_info_t *req; 2542 2543 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2544 2545 req = mmap_buf->req; 2546 2547 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); 2548 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 2549 nxt_unit_mmap_buf_free(mmap_buf); 2550 2551 nxt_unit_request_info_release(req); 2552 2553 } else { 2554 nxt_unit_request_done(req, rc); 2555 } 2556 } 2557 2558 2559 static int 2560 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 2561 nxt_unit_mmap_buf_t *mmap_buf, int last) 2562 { 2563 struct { 2564 nxt_port_msg_t msg; 2565 nxt_port_mmap_msg_t mmap_msg; 2566 } m; 2567 2568 int rc; 2569 u_char *last_used, *first_free; 2570 ssize_t res; 2571 nxt_chunk_id_t first_free_chunk; 2572 nxt_unit_buf_t *buf; 2573 nxt_unit_impl_t *lib; 2574 nxt_port_mmap_header_t *hdr; 2575 nxt_unit_request_info_impl_t *req_impl; 2576 2577 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 2578 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2579 2580 buf = &mmap_buf->buf; 2581 hdr = mmap_buf->hdr; 2582 2583 m.mmap_msg.size = buf->free - buf->start; 2584 2585 m.msg.stream = req_impl->stream; 2586 m.msg.pid = lib->pid; 2587 m.msg.reply_port = 0; 2588 m.msg.type = _NXT_PORT_MSG_DATA; 2589 m.msg.last = last != 0; 2590 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 2591 m.msg.nf = 0; 2592 m.msg.mf = 0; 2593 m.msg.tracking = 0; 2594 2595 rc = NXT_UNIT_ERROR; 2596 2597 if (m.msg.mmap) { 2598 m.mmap_msg.mmap_id = hdr->id; 2599 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 2600 (u_char *) buf->start); 2601 2602 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 2603 req_impl->stream, 2604 (int) m.mmap_msg.mmap_id, 2605 (int) m.mmap_msg.chunk_id, 2606 (int) m.mmap_msg.size); 2607 2608 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), 2609 NULL, 0); 2610 if (nxt_slow_path(res != sizeof(m))) { 2611 goto free_buf; 2612 } 2613 2614 last_used = (u_char *) buf->free - 1; 2615 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 2616 2617 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 2618 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 2619 2620 buf->start = (char *) first_free; 2621 buf->free = buf->start; 2622 2623 if (buf->end < buf->start) { 2624 buf->end = buf->start; 2625 } 2626 2627 } else { 2628 buf->start = NULL; 2629 buf->free = NULL; 2630 buf->end = NULL; 2631 2632 mmap_buf->hdr = NULL; 2633 } 2634 2635 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, 2636 (int) m.mmap_msg.chunk_id - (int) first_free_chunk); 2637 2638 nxt_unit_debug(req->ctx, "allocated_chunks %d", 2639 (int) lib->outgoing.allocated_chunks); 2640 2641 } else { 2642 if (nxt_slow_path(mmap_buf->plain_ptr == NULL 2643 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) 2644 { 2645 nxt_unit_alert(req->ctx, 2646 "#%"PRIu32": failed to send plain memory buffer" 2647 ": no space reserved for message header", 2648 req_impl->stream); 2649 2650 goto free_buf; 2651 } 2652 2653 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); 2654 2655 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", 2656 req_impl->stream, 2657 (int) (sizeof(m.msg) + m.mmap_msg.size)); 2658 2659 res = nxt_unit_port_send(req->ctx, req->response_port, 2660 buf->start - sizeof(m.msg), 2661 m.mmap_msg.size + sizeof(m.msg), 2662 NULL, 0); 2663 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { 2664 goto free_buf; 2665 } 2666 } 2667 2668 rc = NXT_UNIT_OK; 2669 2670 free_buf: 2671 2672 nxt_unit_free_outgoing_buf(mmap_buf); 2673 2674 return rc; 2675 } 2676 2677 2678 void 2679 nxt_unit_buf_free(nxt_unit_buf_t *buf) 2680 { 2681 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 2682 } 2683 2684 2685 static void 2686 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 2687 { 2688 nxt_unit_free_outgoing_buf(mmap_buf); 2689 2690 nxt_unit_mmap_buf_release(mmap_buf); 2691 } 2692 2693 2694 static void 2695 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) 2696 { 2697 if (mmap_buf->hdr != NULL) { 2698 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, 2699 mmap_buf->hdr, mmap_buf->buf.start, 2700 mmap_buf->buf.end - mmap_buf->buf.start); 2701 2702 mmap_buf->hdr = NULL; 2703 2704 return; 2705 } 2706 2707 if (mmap_buf->free_ptr != NULL) { 2708 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr); 2709 2710 mmap_buf->free_ptr = NULL; 2711 } 2712 } 2713 2714 2715 static nxt_unit_read_buf_t * 2716 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2717 { 2718 nxt_unit_ctx_impl_t *ctx_impl; 2719 nxt_unit_read_buf_t *rbuf; 2720 2721 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2722 2723 pthread_mutex_lock(&ctx_impl->mutex); 2724 2725 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 2726 2727 pthread_mutex_unlock(&ctx_impl->mutex); 2728 2729 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 2730 2731 return rbuf; 2732 } 2733 2734 2735 static nxt_unit_read_buf_t * 2736 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2737 { 2738 nxt_queue_link_t *link; 2739 nxt_unit_read_buf_t *rbuf; 2740 2741 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { 2742 link = nxt_queue_first(&ctx_impl->free_rbuf); 2743 nxt_queue_remove(link); 2744 2745 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); 2746 2747 return rbuf; 2748 } 2749 2750 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t)); 2751 2752 if (nxt_fast_path(rbuf != NULL)) { 2753 rbuf->ctx_impl = ctx_impl; 2754 } 2755 2756 return rbuf; 2757 } 2758 2759 2760 static void 2761 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2762 nxt_unit_read_buf_t *rbuf) 2763 { 2764 nxt_unit_ctx_impl_t *ctx_impl; 2765 2766 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2767 2768 pthread_mutex_lock(&ctx_impl->mutex); 2769 2770 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); 2771 2772 pthread_mutex_unlock(&ctx_impl->mutex); 2773 } 2774 2775 2776 nxt_unit_buf_t * 2777 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2778 { 2779 nxt_unit_mmap_buf_t *mmap_buf; 2780 2781 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2782 2783 if (mmap_buf->next == NULL) { 2784 return NULL; 2785 } 2786 2787 return &mmap_buf->next->buf; 2788 } 2789 2790 2791 uint32_t 2792 nxt_unit_buf_max(void) 2793 { 2794 return PORT_MMAP_DATA_SIZE; 2795 } 2796 2797 2798 uint32_t 2799 nxt_unit_buf_min(void) 2800 { 2801 return PORT_MMAP_CHUNK_SIZE; 2802 } 2803 2804 2805 int 2806 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2807 size_t size) 2808 { 2809 ssize_t res; 2810 2811 res = nxt_unit_response_write_nb(req, start, size, size); 2812 2813 return res < 0 ? -res : NXT_UNIT_OK; 2814 } 2815 2816 2817 ssize_t 2818 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, 2819 size_t size, size_t min_size) 2820 { 2821 int rc; 2822 ssize_t sent; 2823 uint32_t part_size, min_part_size, buf_size; 2824 const char *part_start; 2825 nxt_unit_mmap_buf_t mmap_buf; 2826 nxt_unit_request_info_impl_t *req_impl; 2827 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2828 2829 nxt_unit_req_debug(req, "write: %d", (int) size); 2830 2831 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2832 2833 part_start = start; 2834 sent = 0; 2835 2836 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2837 nxt_unit_req_alert(req, "write: response not initialized yet"); 2838 2839 return -NXT_UNIT_ERROR; 2840 } 2841 2842 /* Check if response is not send yet. */ 2843 if (nxt_slow_path(req->response_buf != NULL)) { 2844 part_size = req->response_buf->end - req->response_buf->free; 2845 part_size = nxt_min(size, part_size); 2846 2847 rc = nxt_unit_response_add_content(req, part_start, part_size); 2848 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2849 return -rc; 2850 } 2851 2852 rc = nxt_unit_response_send(req); 2853 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2854 return -rc; 2855 } 2856 2857 size -= part_size; 2858 part_start += part_size; 2859 sent += part_size; 2860 2861 min_size -= nxt_min(min_size, part_size); 2862 } 2863 2864 while (size > 0) { 2865 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2866 min_part_size = nxt_min(min_size, part_size); 2867 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); 2868 2869 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, 2870 min_part_size, &mmap_buf, local_buf); 2871 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2872 return -rc; 2873 } 2874 2875 buf_size = mmap_buf.buf.end - mmap_buf.buf.free; 2876 if (nxt_slow_path(buf_size == 0)) { 2877 return sent; 2878 } 2879 part_size = nxt_min(buf_size, part_size); 2880 2881 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2882 part_start, part_size); 2883 2884 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2885 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2886 return -rc; 2887 } 2888 2889 size -= part_size; 2890 part_start += part_size; 2891 sent += part_size; 2892 2893 min_size -= nxt_min(min_size, part_size); 2894 } 2895 2896 return sent; 2897 } 2898 2899 2900 int 2901 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2902 nxt_unit_read_info_t *read_info) 2903 { 2904 int rc; 2905 ssize_t n; 2906 uint32_t buf_size; 2907 nxt_unit_buf_t *buf; 2908 nxt_unit_mmap_buf_t mmap_buf; 2909 nxt_unit_request_info_impl_t *req_impl; 2910 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2911 2912 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2913 2914 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2915 nxt_unit_req_alert(req, "write: response not initialized yet"); 2916 2917 return NXT_UNIT_ERROR; 2918 } 2919 2920 /* Check if response is not send yet. */ 2921 if (nxt_slow_path(req->response_buf != NULL)) { 2922 2923 /* Enable content in headers buf. */ 2924 rc = nxt_unit_response_add_content(req, "", 0); 2925 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2926 nxt_unit_req_error(req, "Failed to add piggyback content"); 2927 2928 return rc; 2929 } 2930 2931 buf = req->response_buf; 2932 2933 while (buf->end - buf->free > 0) { 2934 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2935 if (nxt_slow_path(n < 0)) { 2936 nxt_unit_req_error(req, "Read error"); 2937 2938 return NXT_UNIT_ERROR; 2939 } 2940 2941 /* Manually increase sizes. */ 2942 buf->free += n; 2943 req->response->piggyback_content_length += n; 2944 2945 if (read_info->eof) { 2946 break; 2947 } 2948 } 2949 2950 rc = nxt_unit_response_send(req); 2951 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2952 nxt_unit_req_error(req, "Failed to send headers with content"); 2953 2954 return rc; 2955 } 2956 2957 if (read_info->eof) { 2958 return NXT_UNIT_OK; 2959 } 2960 } 2961 2962 while (!read_info->eof) { 2963 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 2964 read_info->buf_size); 2965 2966 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); 2967 2968 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2969 buf_size, buf_size, 2970 &mmap_buf, local_buf); 2971 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2972 return rc; 2973 } 2974 2975 buf = &mmap_buf.buf; 2976 2977 while (!read_info->eof && buf->end > buf->free) { 2978 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2979 if (nxt_slow_path(n < 0)) { 2980 nxt_unit_req_error(req, "Read error"); 2981 2982 nxt_unit_free_outgoing_buf(&mmap_buf); 2983 2984 return NXT_UNIT_ERROR; 2985 } 2986 2987 buf->free += n; 2988 } 2989 2990 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2991 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2992 nxt_unit_req_error(req, "Failed to send content"); 2993 2994 return rc; 2995 } 2996 } 2997 2998 return NXT_UNIT_OK; 2999 } 3000 3001 3002 ssize_t 3003 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 3004 { 3005 ssize_t buf_res, res; 3006 3007 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, 3008 dst, size); 3009 3010 if (buf_res < (ssize_t) size && req->content_fd != -1) { 3011 res = read(req->content_fd, dst, size); 3012 if (nxt_slow_path(res < 0)) { 3013 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3014 strerror(errno), errno); 3015 3016 return res; 3017 } 3018 3019 if (res < (ssize_t) size) { 3020 nxt_unit_close(req->content_fd); 3021 3022 req->content_fd = -1; 3023 } 3024 3025 req->content_length -= res; 3026 size -= res; 3027 3028 dst = nxt_pointer_to(dst, res); 3029 3030 } else { 3031 res = 0; 3032 } 3033 3034 return buf_res + res; 3035 } 3036 3037 3038 ssize_t 3039 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) 3040 { 3041 char *p; 3042 size_t l_size, b_size; 3043 nxt_unit_buf_t *b; 3044 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; 3045 3046 if (req->content_length == 0) { 3047 return 0; 3048 } 3049 3050 l_size = 0; 3051 3052 b = req->content_buf; 3053 3054 while (b != NULL) { 3055 b_size = b->end - b->free; 3056 p = memchr(b->free, '\n', b_size); 3057 3058 if (p != NULL) { 3059 p++; 3060 l_size += p - b->free; 3061 break; 3062 } 3063 3064 l_size += b_size; 3065 3066 if (max_size <= l_size) { 3067 break; 3068 } 3069 3070 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); 3071 if (mmap_buf->next == NULL 3072 && req->content_fd != -1 3073 && l_size < req->content_length) 3074 { 3075 preread_buf = nxt_unit_request_preread(req, 16384); 3076 if (nxt_slow_path(preread_buf == NULL)) { 3077 return -1; 3078 } 3079 3080 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); 3081 } 3082 3083 b = nxt_unit_buf_next(b); 3084 } 3085 3086 return nxt_min(max_size, l_size); 3087 } 3088 3089 3090 static nxt_unit_mmap_buf_t * 3091 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size) 3092 { 3093 ssize_t res; 3094 nxt_unit_mmap_buf_t *mmap_buf; 3095 3096 if (req->content_fd == -1) { 3097 nxt_unit_req_alert(req, "preread: content_fd == -1"); 3098 return NULL; 3099 } 3100 3101 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 3102 if (nxt_slow_path(mmap_buf == NULL)) { 3103 nxt_unit_req_alert(req, "preread: failed to allocate buf"); 3104 return NULL; 3105 } 3106 3107 mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size); 3108 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3109 nxt_unit_req_alert(req, "preread: failed to allocate buf memory"); 3110 nxt_unit_mmap_buf_release(mmap_buf); 3111 return NULL; 3112 } 3113 3114 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3115 3116 mmap_buf->hdr = NULL; 3117 mmap_buf->buf.start = mmap_buf->free_ptr; 3118 mmap_buf->buf.free = mmap_buf->buf.start; 3119 mmap_buf->buf.end = mmap_buf->buf.start + size; 3120 3121 res = read(req->content_fd, mmap_buf->free_ptr, size); 3122 if (res < 0) { 3123 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3124 strerror(errno), errno); 3125 3126 nxt_unit_mmap_buf_free(mmap_buf); 3127 3128 return NULL; 3129 } 3130 3131 if (res < (ssize_t) size) { 3132 nxt_unit_close(req->content_fd); 3133 3134 req->content_fd = -1; 3135 } 3136 3137 nxt_unit_req_debug(req, "preread: read %d", (int) res); 3138 3139 mmap_buf->buf.end = mmap_buf->buf.free + res; 3140 3141 return mmap_buf; 3142 } 3143 3144 3145 static ssize_t 3146 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size) 3147 { 3148 u_char *p; 3149 size_t rest, copy, read; 3150 nxt_unit_buf_t *buf, *last_buf; 3151 3152 p = dst; 3153 rest = size; 3154 3155 buf = *b; 3156 last_buf = buf; 3157 3158 while (buf != NULL) { 3159 last_buf = buf; 3160 3161 copy = buf->end - buf->free; 3162 copy = nxt_min(rest, copy); 3163 3164 p = nxt_cpymem(p, buf->free, copy); 3165 3166 buf->free += copy; 3167 rest -= copy; 3168 3169 if (rest == 0) { 3170 if (buf->end == buf->free) { 3171 buf = nxt_unit_buf_next(buf); 3172 } 3173 3174 break; 3175 } 3176 3177 buf = nxt_unit_buf_next(buf); 3178 } 3179 3180 *b = last_buf; 3181 3182 read = size - rest; 3183 3184 *len -= read; 3185 3186 return read; 3187 } 3188 3189 3190 void 3191 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc) 3192 { 3193 uint32_t size; 3194 nxt_port_msg_t msg; 3195 nxt_unit_impl_t *lib; 3196 nxt_unit_request_info_impl_t *req_impl; 3197 3198 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 3199 3200 nxt_unit_req_debug(req, "done: %d", rc); 3201 3202 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3203 goto skip_response_send; 3204 } 3205 3206 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 3207 3208 size = nxt_length("Content-Type") + nxt_length("text/plain"); 3209 3210 rc = nxt_unit_response_init(req, 200, 1, size); 3211 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3212 goto skip_response_send; 3213 } 3214 3215 rc = nxt_unit_response_add_field(req, "Content-Type", 3216 nxt_length("Content-Type"), 3217 "text/plain", nxt_length("text/plain")); 3218 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3219 goto skip_response_send; 3220 } 3221 } 3222 3223 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 3224 3225 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 3226 3227 nxt_unit_buf_send_done(req->response_buf); 3228 3229 return; 3230 } 3231 3232 skip_response_send: 3233 3234 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit); 3235 3236 msg.stream = req_impl->stream; 3237 msg.pid = lib->pid; 3238 msg.reply_port = 0; 3239 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA 3240 : _NXT_PORT_MSG_RPC_ERROR; 3241 msg.last = 1; 3242 msg.mmap = 0; 3243 msg.nf = 0; 3244 msg.mf = 0; 3245 msg.tracking = 0; 3246 3247 (void) nxt_unit_port_send(req->ctx, req->response_port, 3248 &msg, sizeof(msg), NULL, 0); 3249 3250 nxt_unit_request_info_release(req); 3251 } 3252 3253 3254 int 3255 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 3256 uint8_t last, const void *start, size_t size) 3257 { 3258 const struct iovec iov = { (void *) start, size }; 3259 3260 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1); 3261 } 3262 3263 3264 int 3265 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 3266 uint8_t last, const struct iovec *iov, int iovcnt) 3267 { 3268 int i, rc; 3269 size_t l, copy; 3270 uint32_t payload_len, buf_size, alloc_size; 3271 const uint8_t *b; 3272 nxt_unit_buf_t *buf; 3273 nxt_unit_mmap_buf_t mmap_buf; 3274 nxt_websocket_header_t *wh; 3275 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 3276 3277 payload_len = 0; 3278 3279 for (i = 0; i < iovcnt; i++) { 3280 payload_len += iov[i].iov_len; 3281 } 3282 3283 buf_size = 10 + payload_len; 3284 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 3285 3286 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3287 alloc_size, alloc_size, 3288 &mmap_buf, local_buf); 3289 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3290 return rc; 3291 } 3292 3293 buf = &mmap_buf.buf; 3294 3295 buf->start[0] = 0; 3296 buf->start[1] = 0; 3297 3298 buf_size -= buf->end - buf->start; 3299 3300 wh = (void *) buf->free; 3301 3302 buf->free = nxt_websocket_frame_init(wh, payload_len); 3303 wh->fin = last; 3304 wh->opcode = opcode; 3305 3306 for (i = 0; i < iovcnt; i++) { 3307 b = iov[i].iov_base; 3308 l = iov[i].iov_len; 3309 3310 while (l > 0) { 3311 copy = buf->end - buf->free; 3312 copy = nxt_min(l, copy); 3313 3314 buf->free = nxt_cpymem(buf->free, b, copy); 3315 b += copy; 3316 l -= copy; 3317 3318 if (l > 0) { 3319 if (nxt_fast_path(buf->free > buf->start)) { 3320 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3321 3322 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3323 return rc; 3324 } 3325 } 3326 3327 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE); 3328 3329 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3330 alloc_size, alloc_size, 3331 &mmap_buf, local_buf); 3332 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3333 return rc; 3334 } 3335 3336 buf_size -= buf->end - buf->start; 3337 } 3338 } 3339 } 3340 3341 if (buf->free > buf->start) { 3342 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3343 } 3344 3345 return rc; 3346 } 3347 3348 3349 ssize_t 3350 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 3351 size_t size) 3352 { 3353 ssize_t res; 3354 uint8_t *b; 3355 uint64_t i, d; 3356 3357 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length, 3358 dst, size); 3359 3360 if (ws->mask == NULL) { 3361 return res; 3362 } 3363 3364 b = dst; 3365 d = (ws->payload_len - ws->content_length - res) % 4; 3366 3367 for (i = 0; i < (uint64_t) res; i++) { 3368 b[i] ^= ws->mask[ (i + d) % 4 ]; 3369 } 3370 3371 return res; 3372 } 3373 3374 3375 int 3376 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws) 3377 { 3378 char *b; 3379 size_t size, hsize; 3380 nxt_unit_websocket_frame_impl_t *ws_impl; 3381 3382 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 3383 3384 if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) { 3385 return NXT_UNIT_OK; 3386 } 3387 3388 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start; 3389 3390 b = nxt_unit_malloc(ws->req->ctx, size); 3391 if (nxt_slow_path(b == NULL)) { 3392 return NXT_UNIT_ERROR; 3393 } 3394 3395 memcpy(b, ws_impl->buf->buf.start, size); 3396 3397 hsize = nxt_websocket_frame_header_size(b); 3398 3399 ws_impl->buf->buf.start = b; 3400 ws_impl->buf->buf.free = b + hsize; 3401 ws_impl->buf->buf.end = b + size; 3402 3403 ws_impl->buf->free_ptr = b; 3404 3405 ws_impl->ws.header = (nxt_websocket_header_t *) b; 3406 3407 if (ws_impl->ws.header->mask) { 3408 ws_impl->ws.mask = (uint8_t *) b + hsize - 4; 3409 3410 } else { 3411 ws_impl->ws.mask = NULL; 3412 } 3413 3414 return NXT_UNIT_OK; 3415 } 3416 3417 3418 void 3419 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws) 3420 { 3421 nxt_unit_websocket_frame_release(ws); 3422 } 3423 3424 3425 static nxt_port_mmap_header_t * 3426 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3427 nxt_chunk_id_t *c, int *n, int min_n) 3428 { 3429 int res, nchunks, i; 3430 uint32_t outgoing_size; 3431 nxt_unit_mmap_t *mm, *mm_end; 3432 nxt_unit_impl_t *lib; 3433 nxt_port_mmap_header_t *hdr; 3434 3435 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3436 3437 pthread_mutex_lock(&lib->outgoing.mutex); 3438 3439 retry: 3440 3441 outgoing_size = lib->outgoing.size; 3442 3443 mm_end = lib->outgoing.elts + outgoing_size; 3444 3445 for (mm = lib->outgoing.elts; mm < mm_end; mm++) { 3446 hdr = mm->hdr; 3447 3448 if (hdr->sent_over != 0xFFFFu 3449 && (hdr->sent_over != port->id.id 3450 || mm->src_thread != pthread_self())) 3451 { 3452 continue; 3453 } 3454 3455 *c = 0; 3456 3457 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) { 3458 nchunks = 1; 3459 3460 while (nchunks < *n) { 3461 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, 3462 *c + nchunks); 3463 3464 if (res == 0) { 3465 if (nchunks >= min_n) { 3466 *n = nchunks; 3467 3468 goto unlock; 3469 } 3470 3471 for (i = 0; i < nchunks; i++) { 3472 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i); 3473 } 3474 3475 *c += nchunks + 1; 3476 nchunks = 0; 3477 break; 3478 } 3479 3480 nchunks++; 3481 } 3482 3483 if (nchunks >= min_n) { 3484 *n = nchunks; 3485 3486 goto unlock; 3487 } 3488 } 3489 3490 hdr->oosm = 1; 3491 } 3492 3493 if (outgoing_size >= lib->shm_mmap_limit) { 3494 /* Cannot allocate more shared memory. */ 3495 pthread_mutex_unlock(&lib->outgoing.mutex); 3496 3497 if (min_n == 0) { 3498 *n = 0; 3499 } 3500 3501 if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n 3502 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT)) 3503 { 3504 /* Memory allocated by application, but not send to router. */ 3505 return NULL; 3506 } 3507 3508 /* Notify router about OOSM condition. */ 3509 3510 res = nxt_unit_send_oosm(ctx, port); 3511 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3512 return NULL; 3513 } 3514 3515 /* Return if caller can handle OOSM condition. Non-blocking mode. */ 3516 3517 if (min_n == 0) { 3518 return NULL; 3519 } 3520 3521 nxt_unit_debug(ctx, "oosm: waiting for ACK"); 3522 3523 res = nxt_unit_wait_shm_ack(ctx); 3524 if (nxt_slow_path(res != NXT_UNIT_OK)) { 3525 return NULL; 3526 } 3527 3528 nxt_unit_debug(ctx, "oosm: retry"); 3529 3530 pthread_mutex_lock(&lib->outgoing.mutex); 3531 3532 goto retry; 3533 } 3534 3535 *c = 0; 3536 hdr = nxt_unit_new_mmap(ctx, port, *n); 3537 3538 unlock: 3539 3540 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n); 3541 3542 nxt_unit_debug(ctx, "allocated_chunks %d", 3543 (int) lib->outgoing.allocated_chunks); 3544 3545 pthread_mutex_unlock(&lib->outgoing.mutex); 3546 3547 return hdr; 3548 } 3549 3550 3551 static int 3552 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 3553 { 3554 ssize_t res; 3555 nxt_port_msg_t msg; 3556 nxt_unit_impl_t *lib; 3557 3558 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3559 3560 msg.stream = 0; 3561 msg.pid = lib->pid; 3562 msg.reply_port = 0; 3563 msg.type = _NXT_PORT_MSG_OOSM; 3564 msg.last = 0; 3565 msg.mmap = 0; 3566 msg.nf = 0; 3567 msg.mf = 0; 3568 msg.tracking = 0; 3569 3570 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 3571 if (nxt_slow_path(res != sizeof(msg))) { 3572 return NXT_UNIT_ERROR; 3573 } 3574 3575 return NXT_UNIT_OK; 3576 } 3577 3578 3579 static int 3580 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx) 3581 { 3582 int res; 3583 nxt_unit_ctx_impl_t *ctx_impl; 3584 nxt_unit_read_buf_t *rbuf; 3585 3586 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3587 3588 while (1) { 3589 rbuf = nxt_unit_read_buf_get(ctx); 3590 if (nxt_slow_path(rbuf == NULL)) { 3591 return NXT_UNIT_ERROR; 3592 } 3593 3594 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 3595 if (res == NXT_UNIT_ERROR) { 3596 nxt_unit_read_buf_release(ctx, rbuf); 3597 3598 return NXT_UNIT_ERROR; 3599 } 3600 3601 if (nxt_unit_is_shm_ack(rbuf)) { 3602 nxt_unit_read_buf_release(ctx, rbuf); 3603 break; 3604 } 3605 3606 pthread_mutex_lock(&ctx_impl->mutex); 3607 3608 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link); 3609 3610 pthread_mutex_unlock(&ctx_impl->mutex); 3611 3612 if (nxt_unit_is_quit(rbuf)) { 3613 nxt_unit_debug(ctx, "oosm: quit received"); 3614 3615 return NXT_UNIT_ERROR; 3616 } 3617 } 3618 3619 return NXT_UNIT_OK; 3620 } 3621 3622 3623 static nxt_unit_mmap_t * 3624 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i) 3625 { 3626 uint32_t cap, n; 3627 nxt_unit_mmap_t *e; 3628 3629 if (nxt_fast_path(mmaps->size > i)) { 3630 return mmaps->elts + i; 3631 } 3632 3633 cap = mmaps->cap; 3634 3635 if (cap == 0) { 3636 cap = i + 1; 3637 } 3638 3639 while (i + 1 > cap) { 3640 3641 if (cap < 16) { 3642 cap = cap * 2; 3643 3644 } else { 3645 cap = cap + cap / 2; 3646 } 3647 } 3648 3649 if (cap != mmaps->cap) { 3650 3651 e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t)); 3652 if (nxt_slow_path(e == NULL)) { 3653 return NULL; 3654 } 3655 3656 mmaps->elts = e; 3657 3658 for (n = mmaps->cap; n < cap; n++) { 3659 e = mmaps->elts + n; 3660 3661 e->hdr = NULL; 3662 nxt_queue_init(&e->awaiting_rbuf); 3663 } 3664 3665 mmaps->cap = cap; 3666 } 3667 3668 if (i + 1 > mmaps->size) { 3669 mmaps->size = i + 1; 3670 } 3671 3672 return mmaps->elts + i; 3673 } 3674 3675 3676 static nxt_port_mmap_header_t * 3677 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n) 3678 { 3679 int i, fd, rc; 3680 void *mem; 3681 nxt_unit_mmap_t *mm; 3682 nxt_unit_impl_t *lib; 3683 nxt_port_mmap_header_t *hdr; 3684 3685 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3686 3687 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size); 3688 if (nxt_slow_path(mm == NULL)) { 3689 nxt_unit_alert(ctx, "failed to add mmap to outgoing array"); 3690 3691 return NULL; 3692 } 3693 3694 fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE); 3695 if (nxt_slow_path(fd == -1)) { 3696 goto remove_fail; 3697 } 3698 3699 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 3700 if (nxt_slow_path(mem == MAP_FAILED)) { 3701 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd, 3702 strerror(errno), errno); 3703 3704 nxt_unit_close(fd); 3705 3706 goto remove_fail; 3707 } 3708 3709 mm->hdr = mem; 3710 hdr = mem; 3711 3712 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 3713 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 3714 3715 hdr->id = lib->outgoing.size - 1; 3716 hdr->src_pid = lib->pid; 3717 hdr->dst_pid = port->id.pid; 3718 hdr->sent_over = port->id.id; 3719 mm->src_thread = pthread_self(); 3720 3721 /* Mark first n chunk(s) as busy */ 3722 for (i = 0; i < n; i++) { 3723 nxt_port_mmap_set_chunk_busy(hdr->free_map, i); 3724 } 3725 3726 /* Mark as busy chunk followed the last available chunk. */ 3727 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 3728 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 3729 3730 pthread_mutex_unlock(&lib->outgoing.mutex); 3731 3732 rc = nxt_unit_send_mmap(ctx, port, fd); 3733 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3734 munmap(mem, PORT_MMAP_SIZE); 3735 hdr = NULL; 3736 3737 } else { 3738 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d", 3739 hdr->id, (int) lib->pid, (int) port->id.pid); 3740 } 3741 3742 nxt_unit_close(fd); 3743 3744 pthread_mutex_lock(&lib->outgoing.mutex); 3745 3746 if (nxt_fast_path(hdr != NULL)) { 3747 return hdr; 3748 } 3749 3750 remove_fail: 3751 3752 lib->outgoing.size--; 3753 3754 return NULL; 3755 } 3756 3757 3758 static int 3759 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size) 3760 { 3761 int fd; 3762 nxt_unit_impl_t *lib; 3763 3764 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3765 3766 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) 3767 char name[64]; 3768 3769 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p", 3770 lib->pid, (void *) pthread_self()); 3771 #endif 3772 3773 #if (NXT_HAVE_MEMFD_CREATE) 3774 3775 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 3776 if (nxt_slow_path(fd == -1)) { 3777 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name, 3778 strerror(errno), errno); 3779 3780 return -1; 3781 } 3782 3783 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd); 3784 3785 #elif (NXT_HAVE_SHM_OPEN_ANON) 3786 3787 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 3788 if (nxt_slow_path(fd == -1)) { 3789 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)", 3790 strerror(errno), errno); 3791 3792 return -1; 3793 } 3794 3795 #elif (NXT_HAVE_SHM_OPEN) 3796 3797 /* Just in case. */ 3798 shm_unlink(name); 3799 3800 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 3801 if (nxt_slow_path(fd == -1)) { 3802 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name, 3803 strerror(errno), errno); 3804 3805 return -1; 3806 } 3807 3808 if (nxt_slow_path(shm_unlink(name) == -1)) { 3809 nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name, 3810 strerror(errno), errno); 3811 } 3812 3813 #else 3814 3815 #error No working shared memory implementation. 3816 3817 #endif 3818 3819 if (nxt_slow_path(ftruncate(fd, size) == -1)) { 3820 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd, 3821 strerror(errno), errno); 3822 3823 nxt_unit_close(fd); 3824 3825 return -1; 3826 } 3827 3828 return fd; 3829 } 3830 3831 3832 static int 3833 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd) 3834 { 3835 ssize_t res; 3836 nxt_port_msg_t msg; 3837 nxt_unit_impl_t *lib; 3838 union { 3839 struct cmsghdr cm; 3840 char space[CMSG_SPACE(sizeof(int))]; 3841 } cmsg; 3842 3843 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3844 3845 msg.stream = 0; 3846 msg.pid = lib->pid; 3847 msg.reply_port = 0; 3848 msg.type = _NXT_PORT_MSG_MMAP; 3849 msg.last = 0; 3850 msg.mmap = 0; 3851 msg.nf = 0; 3852 msg.mf = 0; 3853 msg.tracking = 0; 3854 3855 /* 3856 * Fill all padding fields with 0. 3857 * Code in Go 1.11 validate cmsghdr using padding field as part of len. 3858 * See Cmsghdr definition and socketControlMessageHeaderAndData function. 3859 */ 3860 memset(&cmsg, 0, sizeof(cmsg)); 3861 3862 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); 3863 cmsg.cm.cmsg_level = SOL_SOCKET; 3864 cmsg.cm.cmsg_type = SCM_RIGHTS; 3865 3866 /* 3867 * memcpy() is used instead of simple 3868 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 3869 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 3870 * dereferencing type-punned pointer will break strict-aliasing rules 3871 * 3872 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 3873 * in the same simple assignment as in the code above. 3874 */ 3875 memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int)); 3876 3877 res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), 3878 &cmsg, sizeof(cmsg)); 3879 if (nxt_slow_path(res != sizeof(msg))) { 3880 return NXT_UNIT_ERROR; 3881 } 3882 3883 return NXT_UNIT_OK; 3884 } 3885 3886 3887 static int 3888 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 3889 uint32_t size, uint32_t min_size, 3890 nxt_unit_mmap_buf_t *mmap_buf, char *local_buf) 3891 { 3892 int nchunks, min_nchunks; 3893 nxt_chunk_id_t c; 3894 nxt_port_mmap_header_t *hdr; 3895 3896 if (size <= NXT_UNIT_MAX_PLAIN_SIZE) { 3897 if (local_buf != NULL) { 3898 mmap_buf->free_ptr = NULL; 3899 mmap_buf->plain_ptr = local_buf; 3900 3901 } else { 3902 mmap_buf->free_ptr = nxt_unit_malloc(ctx, 3903 size + sizeof(nxt_port_msg_t)); 3904 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) { 3905 return NXT_UNIT_ERROR; 3906 } 3907 3908 mmap_buf->plain_ptr = mmap_buf->free_ptr; 3909 } 3910 3911 mmap_buf->hdr = NULL; 3912 mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t); 3913 mmap_buf->buf.free = mmap_buf->buf.start; 3914 mmap_buf->buf.end = mmap_buf->buf.start + size; 3915 3916 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)", 3917 mmap_buf->buf.start, (int) size); 3918 3919 return NXT_UNIT_OK; 3920 } 3921 3922 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3923 min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 3924 3925 hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks); 3926 if (nxt_slow_path(hdr == NULL)) { 3927 if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) { 3928 mmap_buf->hdr = NULL; 3929 mmap_buf->buf.start = NULL; 3930 mmap_buf->buf.free = NULL; 3931 mmap_buf->buf.end = NULL; 3932 mmap_buf->free_ptr = NULL; 3933 3934 return NXT_UNIT_OK; 3935 } 3936 3937 return NXT_UNIT_ERROR; 3938 } 3939 3940 mmap_buf->hdr = hdr; 3941 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c); 3942 mmap_buf->buf.free = mmap_buf->buf.start; 3943 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE; 3944 mmap_buf->free_ptr = NULL; 3945 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 3946 3947 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)", 3948 (int) hdr->id, (int) c, 3949 (int) (nchunks * PORT_MMAP_CHUNK_SIZE)); 3950 3951 return NXT_UNIT_OK; 3952 } 3953 3954 3955 static int 3956 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd) 3957 { 3958 int rc; 3959 void *mem; 3960 nxt_queue_t awaiting_rbuf; 3961 struct stat mmap_stat; 3962 nxt_unit_mmap_t *mm; 3963 nxt_unit_impl_t *lib; 3964 nxt_unit_ctx_impl_t *ctx_impl; 3965 nxt_unit_read_buf_t *rbuf; 3966 nxt_port_mmap_header_t *hdr; 3967 3968 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 3969 3970 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid); 3971 3972 if (fstat(fd, &mmap_stat) == -1) { 3973 nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd, 3974 strerror(errno), errno); 3975 3976 return NXT_UNIT_ERROR; 3977 } 3978 3979 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE, 3980 MAP_SHARED, fd, 0); 3981 if (nxt_slow_path(mem == MAP_FAILED)) { 3982 nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)", 3983 strerror(errno), errno); 3984 3985 return NXT_UNIT_ERROR; 3986 } 3987 3988 hdr = mem; 3989 3990 if (nxt_slow_path(hdr->src_pid != pid)) { 3991 3992 nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header " 3993 "detected: %d != %d or %d != %d", (int) hdr->src_pid, 3994 (int) pid, (int) hdr->dst_pid, (int) lib->pid); 3995 3996 munmap(mem, PORT_MMAP_SIZE); 3997 3998 return NXT_UNIT_ERROR; 3999 } 4000 4001 nxt_queue_init(&awaiting_rbuf); 4002 4003 pthread_mutex_lock(&lib->incoming.mutex); 4004 4005 mm = nxt_unit_mmap_at(&lib->incoming, hdr->id); 4006 if (nxt_slow_path(mm == NULL)) { 4007 nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array"); 4008 4009 munmap(mem, PORT_MMAP_SIZE); 4010 4011 rc = NXT_UNIT_ERROR; 4012 4013 } else { 4014 mm->hdr = hdr; 4015 4016 hdr->sent_over = 0xFFFFu; 4017 4018 nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf); 4019 nxt_queue_init(&mm->awaiting_rbuf); 4020 4021 rc = NXT_UNIT_OK; 4022 } 4023 4024 pthread_mutex_unlock(&lib->incoming.mutex); 4025 4026 nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) { 4027 4028 ctx_impl = rbuf->ctx_impl; 4029 4030 pthread_mutex_lock(&ctx_impl->mutex); 4031 4032 nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link); 4033 4034 pthread_mutex_unlock(&ctx_impl->mutex); 4035 4036 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 4037 4038 nxt_unit_awake_ctx(ctx, ctx_impl); 4039 4040 } nxt_queue_loop; 4041 4042 return rc; 4043 } 4044 4045 4046 static void 4047 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl) 4048 { 4049 nxt_port_msg_t msg; 4050 4051 if (nxt_fast_path(ctx == &ctx_impl->ctx)) { 4052 return; 4053 } 4054 4055 if (nxt_slow_path(ctx_impl->read_port == NULL 4056 || ctx_impl->read_port->out_fd == -1)) 4057 { 4058 nxt_unit_alert(ctx, "target context read_port is NULL or not writable"); 4059 4060 return; 4061 } 4062 4063 memset(&msg, 0, sizeof(nxt_port_msg_t)); 4064 4065 msg.type = _NXT_PORT_MSG_RPC_READY; 4066 4067 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, 4068 &msg, sizeof(msg), NULL, 0); 4069 } 4070 4071 4072 static void 4073 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps) 4074 { 4075 pthread_mutex_init(&mmaps->mutex, NULL); 4076 4077 mmaps->size = 0; 4078 mmaps->cap = 0; 4079 mmaps->elts = NULL; 4080 mmaps->allocated_chunks = 0; 4081 } 4082 4083 4084 nxt_inline void 4085 nxt_unit_process_use(nxt_unit_process_t *process) 4086 { 4087 nxt_atomic_fetch_add(&process->use_count, 1); 4088 } 4089 4090 4091 nxt_inline void 4092 nxt_unit_process_release(nxt_unit_process_t *process) 4093 { 4094 long c; 4095 4096 c = nxt_atomic_fetch_add(&process->use_count, -1); 4097 4098 if (c == 1) { 4099 nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid); 4100 4101 nxt_unit_free(NULL, process); 4102 } 4103 } 4104 4105 4106 static void 4107 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps) 4108 { 4109 nxt_unit_mmap_t *mm, *end; 4110 4111 if (mmaps->elts != NULL) { 4112 end = mmaps->elts + mmaps->size; 4113 4114 for (mm = mmaps->elts; mm < end; mm++) { 4115 munmap(mm->hdr, PORT_MMAP_SIZE); 4116 } 4117 4118 nxt_unit_free(NULL, mmaps->elts); 4119 } 4120 4121 pthread_mutex_destroy(&mmaps->mutex); 4122 } 4123 4124 4125 static int 4126 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps, 4127 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr, 4128 nxt_unit_read_buf_t *rbuf) 4129 { 4130 int res, need_rbuf; 4131 nxt_unit_mmap_t *mm; 4132 nxt_unit_ctx_impl_t *ctx_impl; 4133 4134 mm = nxt_unit_mmap_at(mmaps, id); 4135 if (nxt_slow_path(mm == NULL)) { 4136 nxt_unit_alert(ctx, "failed to allocate mmap"); 4137 4138 pthread_mutex_unlock(&mmaps->mutex); 4139 4140 *hdr = NULL; 4141 4142 return NXT_UNIT_ERROR; 4143 } 4144 4145 *hdr = mm->hdr; 4146 4147 if (nxt_fast_path(*hdr != NULL)) { 4148 return NXT_UNIT_OK; 4149 } 4150 4151 need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf); 4152 4153 nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link); 4154 4155 pthread_mutex_unlock(&mmaps->mutex); 4156 4157 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4158 4159 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 4160 4161 if (need_rbuf) { 4162 res = nxt_unit_get_mmap(ctx, pid, id); 4163 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 4164 return NXT_UNIT_ERROR; 4165 } 4166 } 4167 4168 return NXT_UNIT_AGAIN; 4169 } 4170 4171 4172 static int 4173 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 4174 nxt_unit_read_buf_t *rbuf) 4175 { 4176 int res; 4177 void *start; 4178 uint32_t size; 4179 nxt_unit_impl_t *lib; 4180 nxt_unit_mmaps_t *mmaps; 4181 nxt_unit_mmap_buf_t *b, **incoming_tail; 4182 nxt_port_mmap_msg_t *mmap_msg, *end; 4183 nxt_port_mmap_header_t *hdr; 4184 4185 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) { 4186 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)", 4187 recv_msg->stream, (int) recv_msg->size); 4188 4189 return NXT_UNIT_ERROR; 4190 } 4191 4192 mmap_msg = recv_msg->start; 4193 end = nxt_pointer_to(recv_msg->start, recv_msg->size); 4194 4195 incoming_tail = &recv_msg->incoming_buf; 4196 4197 /* Allocating buffer structures. */ 4198 for (; mmap_msg < end; mmap_msg++) { 4199 b = nxt_unit_mmap_buf_get(ctx); 4200 if (nxt_slow_path(b == NULL)) { 4201 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf", 4202 recv_msg->stream); 4203 4204 while (recv_msg->incoming_buf != NULL) { 4205 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 4206 } 4207 4208 return NXT_UNIT_ERROR; 4209 } 4210 4211 nxt_unit_mmap_buf_insert(incoming_tail, b); 4212 incoming_tail = &b->next; 4213 } 4214 4215 b = recv_msg->incoming_buf; 4216 mmap_msg = recv_msg->start; 4217 4218 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4219 4220 mmaps = &lib->incoming; 4221 4222 pthread_mutex_lock(&mmaps->mutex); 4223 4224 for (; mmap_msg < end; mmap_msg++) { 4225 res = nxt_unit_check_rbuf_mmap(ctx, mmaps, 4226 recv_msg->pid, mmap_msg->mmap_id, 4227 &hdr, rbuf); 4228 4229 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4230 while (recv_msg->incoming_buf != NULL) { 4231 nxt_unit_mmap_buf_release(recv_msg->incoming_buf); 4232 } 4233 4234 return res; 4235 } 4236 4237 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 4238 size = mmap_msg->size; 4239 4240 if (recv_msg->start == mmap_msg) { 4241 recv_msg->start = start; 4242 recv_msg->size = size; 4243 } 4244 4245 b->buf.start = start; 4246 b->buf.free = start; 4247 b->buf.end = b->buf.start + size; 4248 b->hdr = hdr; 4249 4250 b = b->next; 4251 4252 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)", 4253 recv_msg->stream, 4254 start, (int) size, 4255 (int) hdr->src_pid, (int) hdr->dst_pid, 4256 (int) hdr->id, (int) mmap_msg->chunk_id, 4257 (int) mmap_msg->size); 4258 } 4259 4260 pthread_mutex_unlock(&mmaps->mutex); 4261 4262 return NXT_UNIT_OK; 4263 } 4264 4265 4266 static int 4267 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id) 4268 { 4269 ssize_t res; 4270 nxt_unit_impl_t *lib; 4271 nxt_unit_ctx_impl_t *ctx_impl; 4272 4273 struct { 4274 nxt_port_msg_t msg; 4275 nxt_port_msg_get_mmap_t get_mmap; 4276 } m; 4277 4278 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4279 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4280 4281 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 4282 4283 m.msg.pid = lib->pid; 4284 m.msg.reply_port = ctx_impl->read_port->id.id; 4285 m.msg.type = _NXT_PORT_MSG_GET_MMAP; 4286 4287 m.get_mmap.id = id; 4288 4289 nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id); 4290 4291 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 4292 if (nxt_slow_path(res != sizeof(m))) { 4293 return NXT_UNIT_ERROR; 4294 } 4295 4296 return NXT_UNIT_OK; 4297 } 4298 4299 4300 static void 4301 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr, 4302 void *start, uint32_t size) 4303 { 4304 int freed_chunks; 4305 u_char *p, *end; 4306 nxt_chunk_id_t c; 4307 nxt_unit_impl_t *lib; 4308 4309 memset(start, 0xA5, size); 4310 4311 p = start; 4312 end = p + size; 4313 c = nxt_port_mmap_chunk_id(hdr, p); 4314 freed_chunks = 0; 4315 4316 while (p < end) { 4317 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 4318 4319 p += PORT_MMAP_CHUNK_SIZE; 4320 c++; 4321 freed_chunks++; 4322 } 4323 4324 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4325 4326 if (hdr->src_pid == lib->pid && freed_chunks != 0) { 4327 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks); 4328 4329 nxt_unit_debug(ctx, "allocated_chunks %d", 4330 (int) lib->outgoing.allocated_chunks); 4331 } 4332 4333 if (hdr->dst_pid == lib->pid 4334 && freed_chunks != 0 4335 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 4336 { 4337 nxt_unit_send_shm_ack(ctx, hdr->src_pid); 4338 } 4339 } 4340 4341 4342 static int 4343 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid) 4344 { 4345 ssize_t res; 4346 nxt_port_msg_t msg; 4347 nxt_unit_impl_t *lib; 4348 4349 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4350 4351 msg.stream = 0; 4352 msg.pid = lib->pid; 4353 msg.reply_port = 0; 4354 msg.type = _NXT_PORT_MSG_SHM_ACK; 4355 msg.last = 0; 4356 msg.mmap = 0; 4357 msg.nf = 0; 4358 msg.mf = 0; 4359 msg.tracking = 0; 4360 4361 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0); 4362 if (nxt_slow_path(res != sizeof(msg))) { 4363 return NXT_UNIT_ERROR; 4364 } 4365 4366 return NXT_UNIT_OK; 4367 } 4368 4369 4370 static nxt_int_t 4371 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) 4372 { 4373 nxt_process_t *process; 4374 4375 process = data; 4376 4377 if (lhq->key.length == sizeof(pid_t) 4378 && *(pid_t *) lhq->key.start == process->pid) 4379 { 4380 return NXT_OK; 4381 } 4382 4383 return NXT_DECLINED; 4384 } 4385 4386 4387 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = { 4388 NXT_LVLHSH_DEFAULT, 4389 nxt_unit_lvlhsh_pid_test, 4390 nxt_unit_lvlhsh_alloc, 4391 nxt_unit_lvlhsh_free, 4392 }; 4393 4394 4395 static inline void 4396 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid) 4397 { 4398 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid)); 4399 lhq->key.length = sizeof(*pid); 4400 lhq->key.start = (u_char *) pid; 4401 lhq->proto = &lvlhsh_processes_proto; 4402 } 4403 4404 4405 static nxt_unit_process_t * 4406 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid) 4407 { 4408 nxt_unit_impl_t *lib; 4409 nxt_unit_process_t *process; 4410 nxt_lvlhsh_query_t lhq; 4411 4412 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4413 4414 nxt_unit_process_lhq_pid(&lhq, &pid); 4415 4416 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) { 4417 process = lhq.value; 4418 nxt_unit_process_use(process); 4419 4420 return process; 4421 } 4422 4423 process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t)); 4424 if (nxt_slow_path(process == NULL)) { 4425 nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid); 4426 4427 return NULL; 4428 } 4429 4430 process->pid = pid; 4431 process->use_count = 2; 4432 process->next_port_id = 0; 4433 process->lib = lib; 4434 4435 nxt_queue_init(&process->ports); 4436 4437 lhq.replace = 0; 4438 lhq.value = process; 4439 4440 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) { 4441 4442 case NXT_OK: 4443 break; 4444 4445 default: 4446 nxt_unit_alert(ctx, "process %d insert failed", (int) pid); 4447 4448 nxt_unit_free(ctx, process); 4449 process = NULL; 4450 break; 4451 } 4452 4453 return process; 4454 } 4455 4456 4457 static nxt_unit_process_t * 4458 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove) 4459 { 4460 int rc; 4461 nxt_lvlhsh_query_t lhq; 4462 4463 nxt_unit_process_lhq_pid(&lhq, &pid); 4464 4465 if (remove) { 4466 rc = nxt_lvlhsh_delete(&lib->processes, &lhq); 4467 4468 } else { 4469 rc = nxt_lvlhsh_find(&lib->processes, &lhq); 4470 } 4471 4472 if (rc == NXT_OK) { 4473 if (!remove) { 4474 nxt_unit_process_use(lhq.value); 4475 } 4476 4477 return lhq.value; 4478 } 4479 4480 return NULL; 4481 } 4482 4483 4484 static nxt_unit_process_t * 4485 nxt_unit_process_pop_first(nxt_unit_impl_t *lib) 4486 { 4487 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL); 4488 } 4489 4490 4491 int 4492 nxt_unit_run(nxt_unit_ctx_t *ctx) 4493 { 4494 int rc; 4495 nxt_unit_ctx_impl_t *ctx_impl; 4496 4497 nxt_unit_ctx_use(ctx); 4498 4499 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4500 4501 rc = NXT_UNIT_OK; 4502 4503 while (nxt_fast_path(ctx_impl->online)) { 4504 rc = nxt_unit_run_once_impl(ctx); 4505 4506 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4507 nxt_unit_quit(ctx); 4508 break; 4509 } 4510 } 4511 4512 nxt_unit_ctx_release(ctx); 4513 4514 return rc; 4515 } 4516 4517 4518 int 4519 nxt_unit_run_once(nxt_unit_ctx_t *ctx) 4520 { 4521 int rc; 4522 4523 nxt_unit_ctx_use(ctx); 4524 4525 rc = nxt_unit_run_once_impl(ctx); 4526 4527 nxt_unit_ctx_release(ctx); 4528 4529 return rc; 4530 } 4531 4532 4533 static int 4534 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx) 4535 { 4536 int rc; 4537 nxt_unit_read_buf_t *rbuf; 4538 4539 rbuf = nxt_unit_read_buf_get(ctx); 4540 if (nxt_slow_path(rbuf == NULL)) { 4541 return NXT_UNIT_ERROR; 4542 } 4543 4544 rc = nxt_unit_read_buf(ctx, rbuf); 4545 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 4546 nxt_unit_read_buf_release(ctx, rbuf); 4547 4548 return rc; 4549 } 4550 4551 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 4552 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4553 return NXT_UNIT_ERROR; 4554 } 4555 4556 rc = nxt_unit_process_pending_rbuf(ctx); 4557 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4558 return NXT_UNIT_ERROR; 4559 } 4560 4561 nxt_unit_process_ready_req(ctx); 4562 4563 return rc; 4564 } 4565 4566 4567 static int 4568 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf) 4569 { 4570 int nevents, res, err; 4571 nxt_unit_impl_t *lib; 4572 nxt_unit_ctx_impl_t *ctx_impl; 4573 nxt_unit_port_impl_t *port_impl; 4574 struct pollfd fds[2]; 4575 4576 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4577 4578 if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) { 4579 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4580 } 4581 4582 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t, 4583 port); 4584 4585 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4586 4587 retry: 4588 4589 if (port_impl->from_socket == 0) { 4590 res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf); 4591 if (res == NXT_UNIT_OK) { 4592 if (nxt_unit_is_read_socket(rbuf)) { 4593 port_impl->from_socket++; 4594 4595 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", 4596 (int) ctx_impl->read_port->id.pid, 4597 (int) ctx_impl->read_port->id.id, 4598 port_impl->from_socket); 4599 4600 } else { 4601 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", 4602 (int) ctx_impl->read_port->id.pid, 4603 (int) ctx_impl->read_port->id.id, 4604 (int) rbuf->size); 4605 4606 return NXT_UNIT_OK; 4607 } 4608 } 4609 } 4610 4611 res = nxt_unit_app_queue_recv(lib->shared_port, rbuf); 4612 if (res == NXT_UNIT_OK) { 4613 return NXT_UNIT_OK; 4614 } 4615 4616 fds[0].fd = ctx_impl->read_port->in_fd; 4617 fds[0].events = POLLIN; 4618 fds[0].revents = 0; 4619 4620 fds[1].fd = lib->shared_port->in_fd; 4621 fds[1].events = POLLIN; 4622 fds[1].revents = 0; 4623 4624 nevents = poll(fds, 2, -1); 4625 if (nxt_slow_path(nevents == -1)) { 4626 err = errno; 4627 4628 if (err == EINTR) { 4629 goto retry; 4630 } 4631 4632 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)", 4633 fds[0].fd, fds[1].fd, strerror(err), err); 4634 4635 rbuf->size = -1; 4636 4637 return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR; 4638 } 4639 4640 nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]", 4641 fds[0].fd, fds[1].fd, nevents, fds[0].revents, 4642 fds[1].revents); 4643 4644 if ((fds[0].revents & POLLIN) != 0) { 4645 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4646 if (res == NXT_UNIT_AGAIN) { 4647 goto retry; 4648 } 4649 4650 return res; 4651 } 4652 4653 if ((fds[1].revents & POLLIN) != 0) { 4654 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); 4655 if (res == NXT_UNIT_AGAIN) { 4656 goto retry; 4657 } 4658 4659 return res; 4660 } 4661 4662 nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]", 4663 fds[0].fd, fds[1].fd, nevents, fds[0].revents, 4664 fds[1].revents); 4665 4666 return NXT_UNIT_ERROR; 4667 } 4668 4669 4670 static int 4671 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx) 4672 { 4673 int rc; 4674 nxt_queue_t pending_rbuf; 4675 nxt_unit_ctx_impl_t *ctx_impl; 4676 nxt_unit_read_buf_t *rbuf; 4677 4678 nxt_queue_init(&pending_rbuf); 4679 4680 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4681 4682 pthread_mutex_lock(&ctx_impl->mutex); 4683 4684 if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) { 4685 pthread_mutex_unlock(&ctx_impl->mutex); 4686 4687 return NXT_UNIT_OK; 4688 } 4689 4690 nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf); 4691 nxt_queue_init(&ctx_impl->pending_rbuf); 4692 4693 pthread_mutex_unlock(&ctx_impl->mutex); 4694 4695 rc = NXT_UNIT_OK; 4696 4697 nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) { 4698 4699 if (nxt_fast_path(rc != NXT_UNIT_ERROR)) { 4700 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL); 4701 4702 } else { 4703 nxt_unit_read_buf_release(ctx, rbuf); 4704 } 4705 4706 } nxt_queue_loop; 4707 4708 return rc; 4709 } 4710 4711 4712 static void 4713 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx) 4714 { 4715 int res; 4716 nxt_queue_t ready_req; 4717 nxt_unit_impl_t *lib; 4718 nxt_unit_ctx_impl_t *ctx_impl; 4719 nxt_unit_request_info_t *req; 4720 nxt_unit_request_info_impl_t *req_impl; 4721 4722 nxt_queue_init(&ready_req); 4723 4724 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4725 4726 pthread_mutex_lock(&ctx_impl->mutex); 4727 4728 if (nxt_queue_is_empty(&ctx_impl->ready_req)) { 4729 pthread_mutex_unlock(&ctx_impl->mutex); 4730 4731 return; 4732 } 4733 4734 nxt_queue_add(&ready_req, &ctx_impl->ready_req); 4735 nxt_queue_init(&ctx_impl->ready_req); 4736 4737 pthread_mutex_unlock(&ctx_impl->mutex); 4738 4739 nxt_queue_each(req_impl, &ready_req, 4740 nxt_unit_request_info_impl_t, port_wait_link) 4741 { 4742 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 4743 4744 req = &req_impl->req; 4745 4746 res = nxt_unit_send_req_headers_ack(req); 4747 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4748 nxt_unit_request_done(req, NXT_UNIT_ERROR); 4749 4750 continue; 4751 } 4752 4753 if (req->content_length 4754 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 4755 { 4756 res = nxt_unit_request_hash_add(ctx, req); 4757 if (nxt_slow_path(res != NXT_UNIT_OK)) { 4758 nxt_unit_req_warn(req, "failed to add request to hash"); 4759 4760 nxt_unit_request_done(req, NXT_UNIT_ERROR); 4761 4762 continue; 4763 } 4764 4765 /* 4766 * If application have separate data handler, we may start 4767 * request processing and process data when it is arrived. 4768 */ 4769 if (lib->callbacks.data_handler == NULL) { 4770 continue; 4771 } 4772 } 4773 4774 lib->callbacks.request_handler(&req_impl->req); 4775 4776 } nxt_queue_loop; 4777 } 4778 4779 4780 int 4781 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx) 4782 { 4783 int rc; 4784 nxt_unit_read_buf_t *rbuf; 4785 nxt_unit_ctx_impl_t *ctx_impl; 4786 4787 nxt_unit_ctx_use(ctx); 4788 4789 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4790 4791 rc = NXT_UNIT_OK; 4792 4793 while (nxt_fast_path(ctx_impl->online)) { 4794 rbuf = nxt_unit_read_buf_get(ctx); 4795 if (nxt_slow_path(rbuf == NULL)) { 4796 rc = NXT_UNIT_ERROR; 4797 break; 4798 } 4799 4800 retry: 4801 4802 rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf); 4803 if (rc == NXT_UNIT_AGAIN) { 4804 goto retry; 4805 } 4806 4807 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 4808 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4809 break; 4810 } 4811 4812 rc = nxt_unit_process_pending_rbuf(ctx); 4813 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4814 break; 4815 } 4816 4817 nxt_unit_process_ready_req(ctx); 4818 } 4819 4820 nxt_unit_ctx_release(ctx); 4821 4822 return rc; 4823 } 4824 4825 4826 nxt_inline int 4827 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf) 4828 { 4829 nxt_port_msg_t *port_msg; 4830 4831 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4832 port_msg = (nxt_port_msg_t *) rbuf->buf; 4833 4834 return port_msg->type == _NXT_PORT_MSG_READ_QUEUE; 4835 } 4836 4837 return 0; 4838 } 4839 4840 4841 nxt_inline int 4842 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf) 4843 { 4844 if (nxt_fast_path(rbuf->size == 1)) { 4845 return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET; 4846 } 4847 4848 return 0; 4849 } 4850 4851 4852 nxt_inline int 4853 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf) 4854 { 4855 nxt_port_msg_t *port_msg; 4856 4857 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4858 port_msg = (nxt_port_msg_t *) rbuf->buf; 4859 4860 return port_msg->type == _NXT_PORT_MSG_SHM_ACK; 4861 } 4862 4863 return 0; 4864 } 4865 4866 4867 nxt_inline int 4868 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf) 4869 { 4870 nxt_port_msg_t *port_msg; 4871 4872 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) { 4873 port_msg = (nxt_port_msg_t *) rbuf->buf; 4874 4875 return port_msg->type == _NXT_PORT_MSG_QUIT; 4876 } 4877 4878 return 0; 4879 } 4880 4881 4882 int 4883 nxt_unit_run_shared(nxt_unit_ctx_t *ctx) 4884 { 4885 int rc; 4886 nxt_unit_impl_t *lib; 4887 nxt_unit_read_buf_t *rbuf; 4888 nxt_unit_ctx_impl_t *ctx_impl; 4889 4890 nxt_unit_ctx_use(ctx); 4891 4892 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4893 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4894 4895 rc = NXT_UNIT_OK; 4896 4897 while (nxt_fast_path(ctx_impl->online)) { 4898 rbuf = nxt_unit_read_buf_get(ctx); 4899 if (nxt_slow_path(rbuf == NULL)) { 4900 rc = NXT_UNIT_ERROR; 4901 break; 4902 } 4903 4904 retry: 4905 4906 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf); 4907 if (rc == NXT_UNIT_AGAIN) { 4908 goto retry; 4909 } 4910 4911 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4912 nxt_unit_read_buf_release(ctx, rbuf); 4913 break; 4914 } 4915 4916 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 4917 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4918 break; 4919 } 4920 4921 rc = nxt_unit_process_pending_rbuf(ctx); 4922 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 4923 break; 4924 } 4925 4926 nxt_unit_process_ready_req(ctx); 4927 } 4928 4929 nxt_unit_ctx_release(ctx); 4930 4931 return rc; 4932 } 4933 4934 4935 nxt_unit_request_info_t * 4936 nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx) 4937 { 4938 int rc; 4939 nxt_unit_impl_t *lib; 4940 nxt_unit_read_buf_t *rbuf; 4941 nxt_unit_ctx_impl_t *ctx_impl; 4942 nxt_unit_request_info_t *req; 4943 4944 nxt_unit_ctx_use(ctx); 4945 4946 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4947 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 4948 4949 req = NULL; 4950 4951 if (nxt_slow_path(!ctx_impl->online)) { 4952 goto done; 4953 } 4954 4955 rbuf = nxt_unit_read_buf_get(ctx); 4956 if (nxt_slow_path(rbuf == NULL)) { 4957 goto done; 4958 } 4959 4960 rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf); 4961 if (rc != NXT_UNIT_OK) { 4962 goto done; 4963 } 4964 4965 (void) nxt_unit_process_msg(ctx, rbuf, &req); 4966 4967 done: 4968 4969 nxt_unit_ctx_release(ctx); 4970 4971 return req; 4972 } 4973 4974 4975 int 4976 nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx) 4977 { 4978 nxt_unit_impl_t *lib; 4979 4980 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 4981 4982 return (ctx == &lib->main_ctx.ctx); 4983 } 4984 4985 4986 int 4987 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 4988 { 4989 int rc; 4990 4991 nxt_unit_ctx_use(ctx); 4992 4993 rc = nxt_unit_process_port_msg_impl(ctx, port); 4994 4995 nxt_unit_ctx_release(ctx); 4996 4997 return rc; 4998 } 4999 5000 5001 static int 5002 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 5003 { 5004 int rc; 5005 nxt_unit_impl_t *lib; 5006 nxt_unit_read_buf_t *rbuf; 5007 nxt_unit_ctx_impl_t *ctx_impl; 5008 5009 rbuf = nxt_unit_read_buf_get(ctx); 5010 if (nxt_slow_path(rbuf == NULL)) { 5011 return NXT_UNIT_ERROR; 5012 } 5013 5014 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5015 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5016 5017 retry: 5018 5019 if (port == lib->shared_port) { 5020 rc = nxt_unit_shared_port_recv(ctx, port, rbuf); 5021 5022 } else { 5023 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf); 5024 } 5025 5026 if (rc != NXT_UNIT_OK) { 5027 nxt_unit_read_buf_release(ctx, rbuf); 5028 return rc; 5029 } 5030 5031 rc = nxt_unit_process_msg(ctx, rbuf, NULL); 5032 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 5033 return NXT_UNIT_ERROR; 5034 } 5035 5036 rc = nxt_unit_process_pending_rbuf(ctx); 5037 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) { 5038 return NXT_UNIT_ERROR; 5039 } 5040 5041 nxt_unit_process_ready_req(ctx); 5042 5043 rbuf = nxt_unit_read_buf_get(ctx); 5044 if (nxt_slow_path(rbuf == NULL)) { 5045 return NXT_UNIT_ERROR; 5046 } 5047 5048 if (ctx_impl->online) { 5049 goto retry; 5050 } 5051 5052 return rc; 5053 } 5054 5055 5056 void 5057 nxt_unit_done(nxt_unit_ctx_t *ctx) 5058 { 5059 nxt_unit_ctx_release(ctx); 5060 } 5061 5062 5063 nxt_unit_ctx_t * 5064 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data) 5065 { 5066 int rc, queue_fd; 5067 void *mem; 5068 nxt_unit_impl_t *lib; 5069 nxt_unit_port_t *port; 5070 nxt_unit_ctx_impl_t *new_ctx; 5071 nxt_unit_port_impl_t *port_impl; 5072 5073 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5074 5075 new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t) 5076 + lib->request_data_size); 5077 if (nxt_slow_path(new_ctx == NULL)) { 5078 nxt_unit_alert(ctx, "failed to allocate context"); 5079 5080 return NULL; 5081 } 5082 5083 rc = nxt_unit_ctx_init(lib, new_ctx, data); 5084 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5085 nxt_unit_free(ctx, new_ctx); 5086 5087 return NULL; 5088 } 5089 5090 queue_fd = -1; 5091 5092 port = nxt_unit_create_port(&new_ctx->ctx); 5093 if (nxt_slow_path(port == NULL)) { 5094 goto fail; 5095 } 5096 5097 new_ctx->read_port = port; 5098 5099 queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t)); 5100 if (nxt_slow_path(queue_fd == -1)) { 5101 goto fail; 5102 } 5103 5104 mem = mmap(NULL, sizeof(nxt_port_queue_t), 5105 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 5106 if (nxt_slow_path(mem == MAP_FAILED)) { 5107 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 5108 strerror(errno), errno); 5109 5110 goto fail; 5111 } 5112 5113 nxt_port_queue_init(mem); 5114 5115 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5116 port_impl->queue = mem; 5117 5118 rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd); 5119 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5120 goto fail; 5121 } 5122 5123 nxt_unit_close(queue_fd); 5124 5125 return &new_ctx->ctx; 5126 5127 fail: 5128 5129 if (queue_fd != -1) { 5130 nxt_unit_close(queue_fd); 5131 } 5132 5133 nxt_unit_ctx_release(&new_ctx->ctx); 5134 5135 return NULL; 5136 } 5137 5138 5139 static void 5140 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl) 5141 { 5142 nxt_unit_impl_t *lib; 5143 nxt_unit_mmap_buf_t *mmap_buf; 5144 nxt_unit_read_buf_t *rbuf; 5145 nxt_unit_request_info_impl_t *req_impl; 5146 nxt_unit_websocket_frame_impl_t *ws_impl; 5147 5148 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit); 5149 5150 nxt_queue_each(req_impl, &ctx_impl->active_req, 5151 nxt_unit_request_info_impl_t, link) 5152 { 5153 nxt_unit_req_warn(&req_impl->req, "active request on ctx free"); 5154 5155 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR); 5156 5157 } nxt_queue_loop; 5158 5159 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]); 5160 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]); 5161 5162 while (ctx_impl->free_buf != NULL) { 5163 mmap_buf = ctx_impl->free_buf; 5164 nxt_unit_mmap_buf_unlink(mmap_buf); 5165 nxt_unit_free(&ctx_impl->ctx, mmap_buf); 5166 } 5167 5168 nxt_queue_each(req_impl, &ctx_impl->free_req, 5169 nxt_unit_request_info_impl_t, link) 5170 { 5171 nxt_unit_request_info_free(req_impl); 5172 5173 } nxt_queue_loop; 5174 5175 nxt_queue_each(ws_impl, &ctx_impl->free_ws, 5176 nxt_unit_websocket_frame_impl_t, link) 5177 { 5178 nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl); 5179 5180 } nxt_queue_loop; 5181 5182 nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link) 5183 { 5184 if (rbuf != &ctx_impl->ctx_read_buf) { 5185 nxt_unit_free(&ctx_impl->ctx, rbuf); 5186 } 5187 } nxt_queue_loop; 5188 5189 pthread_mutex_destroy(&ctx_impl->mutex); 5190 5191 pthread_mutex_lock(&lib->mutex); 5192 5193 nxt_queue_remove(&ctx_impl->link); 5194 5195 pthread_mutex_unlock(&lib->mutex); 5196 5197 if (nxt_fast_path(ctx_impl->read_port != NULL)) { 5198 nxt_unit_remove_port(lib, &ctx_impl->read_port->id); 5199 nxt_unit_port_release(ctx_impl->read_port); 5200 } 5201 5202 if (ctx_impl != &lib->main_ctx) { 5203 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl); 5204 } 5205 5206 nxt_unit_lib_release(lib); 5207 } 5208 5209 5210 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */ 5211 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET) 5212 #define NXT_UNIX_SOCKET SOCK_SEQPACKET 5213 #else 5214 #define NXT_UNIX_SOCKET SOCK_DGRAM 5215 #endif 5216 5217 5218 void 5219 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id) 5220 { 5221 nxt_unit_port_hash_id_t port_hash_id; 5222 5223 port_hash_id.pid = pid; 5224 port_hash_id.id = id; 5225 5226 port_id->pid = pid; 5227 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id)); 5228 port_id->id = id; 5229 } 5230 5231 5232 static nxt_unit_port_t * 5233 nxt_unit_create_port(nxt_unit_ctx_t *ctx) 5234 { 5235 int rc, port_sockets[2]; 5236 nxt_unit_impl_t *lib; 5237 nxt_unit_port_t new_port, *port; 5238 nxt_unit_process_t *process; 5239 5240 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5241 5242 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets); 5243 if (nxt_slow_path(rc != 0)) { 5244 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)", 5245 strerror(errno), errno); 5246 5247 return NULL; 5248 } 5249 5250 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d", 5251 port_sockets[0], port_sockets[1]); 5252 5253 pthread_mutex_lock(&lib->mutex); 5254 5255 process = nxt_unit_process_get(ctx, lib->pid); 5256 if (nxt_slow_path(process == NULL)) { 5257 pthread_mutex_unlock(&lib->mutex); 5258 5259 nxt_unit_close(port_sockets[0]); 5260 nxt_unit_close(port_sockets[1]); 5261 5262 return NULL; 5263 } 5264 5265 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++); 5266 5267 new_port.in_fd = port_sockets[0]; 5268 new_port.out_fd = port_sockets[1]; 5269 new_port.data = NULL; 5270 5271 pthread_mutex_unlock(&lib->mutex); 5272 5273 nxt_unit_process_release(process); 5274 5275 port = nxt_unit_add_port(ctx, &new_port, NULL); 5276 if (nxt_slow_path(port == NULL)) { 5277 nxt_unit_close(port_sockets[0]); 5278 nxt_unit_close(port_sockets[1]); 5279 } 5280 5281 return port; 5282 } 5283 5284 5285 static int 5286 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 5287 nxt_unit_port_t *port, int queue_fd) 5288 { 5289 ssize_t res; 5290 nxt_unit_impl_t *lib; 5291 int fds[2] = { port->out_fd, queue_fd }; 5292 5293 struct { 5294 nxt_port_msg_t msg; 5295 nxt_port_msg_new_port_t new_port; 5296 } m; 5297 5298 union { 5299 struct cmsghdr cm; 5300 char space[CMSG_SPACE(sizeof(int) * 2)]; 5301 } cmsg; 5302 5303 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5304 5305 m.msg.stream = 0; 5306 m.msg.pid = lib->pid; 5307 m.msg.reply_port = 0; 5308 m.msg.type = _NXT_PORT_MSG_NEW_PORT; 5309 m.msg.last = 0; 5310 m.msg.mmap = 0; 5311 m.msg.nf = 0; 5312 m.msg.mf = 0; 5313 m.msg.tracking = 0; 5314 5315 m.new_port.id = port->id.id; 5316 m.new_port.pid = port->id.pid; 5317 m.new_port.type = NXT_PROCESS_APP; 5318 m.new_port.max_size = 16 * 1024; 5319 m.new_port.max_share = 64 * 1024; 5320 5321 memset(&cmsg, 0, sizeof(cmsg)); 5322 5323 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2); 5324 cmsg.cm.cmsg_level = SOL_SOCKET; 5325 cmsg.cm.cmsg_type = SCM_RIGHTS; 5326 5327 /* 5328 * memcpy() is used instead of simple 5329 * *(int *) CMSG_DATA(&cmsg.cm) = fd; 5330 * because GCC 4.4 with -O2/3/s optimization may issue a warning: 5331 * dereferencing type-punned pointer will break strict-aliasing rules 5332 * 5333 * Fortunately, GCC with -O1 compiles this nxt_memcpy() 5334 * in the same simple assignment as in the code above. 5335 */ 5336 memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2); 5337 5338 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg)); 5339 5340 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR; 5341 } 5342 5343 5344 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port) 5345 { 5346 nxt_unit_port_impl_t *port_impl; 5347 5348 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5349 5350 nxt_atomic_fetch_add(&port_impl->use_count, 1); 5351 } 5352 5353 5354 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port) 5355 { 5356 long c; 5357 nxt_unit_port_impl_t *port_impl; 5358 5359 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5360 5361 c = nxt_atomic_fetch_add(&port_impl->use_count, -1); 5362 5363 if (c == 1) { 5364 nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d", 5365 (int) port->id.pid, (int) port->id.id, 5366 port->in_fd, port->out_fd); 5367 5368 nxt_unit_process_release(port_impl->process); 5369 5370 if (port->in_fd != -1) { 5371 nxt_unit_close(port->in_fd); 5372 5373 port->in_fd = -1; 5374 } 5375 5376 if (port->out_fd != -1) { 5377 nxt_unit_close(port->out_fd); 5378 5379 port->out_fd = -1; 5380 } 5381 5382 if (port_impl->queue != NULL) { 5383 munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID) 5384 ? sizeof(nxt_app_queue_t) 5385 : sizeof(nxt_port_queue_t)); 5386 } 5387 5388 nxt_unit_free(NULL, port_impl); 5389 } 5390 } 5391 5392 5393 static nxt_unit_port_t * 5394 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue) 5395 { 5396 int rc, ready; 5397 nxt_queue_t awaiting_req; 5398 nxt_unit_impl_t *lib; 5399 nxt_unit_port_t *old_port; 5400 nxt_unit_process_t *process; 5401 nxt_unit_port_impl_t *new_port, *old_port_impl; 5402 5403 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5404 5405 pthread_mutex_lock(&lib->mutex); 5406 5407 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0); 5408 5409 if (nxt_slow_path(old_port != NULL)) { 5410 nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} " 5411 "in_fd %d out_fd %d queue %p", 5412 port->id.pid, port->id.id, 5413 port->in_fd, port->out_fd, queue); 5414 5415 if (old_port->data == NULL) { 5416 old_port->data = port->data; 5417 port->data = NULL; 5418 } 5419 5420 if (old_port->in_fd == -1) { 5421 old_port->in_fd = port->in_fd; 5422 port->in_fd = -1; 5423 } 5424 5425 if (port->in_fd != -1) { 5426 nxt_unit_close(port->in_fd); 5427 port->in_fd = -1; 5428 } 5429 5430 if (old_port->out_fd == -1) { 5431 old_port->out_fd = port->out_fd; 5432 port->out_fd = -1; 5433 } 5434 5435 if (port->out_fd != -1) { 5436 nxt_unit_close(port->out_fd); 5437 port->out_fd = -1; 5438 } 5439 5440 *port = *old_port; 5441 5442 nxt_queue_init(&awaiting_req); 5443 5444 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port); 5445 5446 if (old_port_impl->queue == NULL) { 5447 old_port_impl->queue = queue; 5448 } 5449 5450 ready = (port->in_fd != -1 || port->out_fd != -1); 5451 5452 /* 5453 * Port can be market as 'ready' only after callbacks.add_port() call. 5454 * Otherwise, request may try to use the port before callback. 5455 */ 5456 if (lib->callbacks.add_port == NULL && ready) { 5457 old_port_impl->ready = ready; 5458 5459 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { 5460 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); 5461 nxt_queue_init(&old_port_impl->awaiting_req); 5462 } 5463 } 5464 5465 pthread_mutex_unlock(&lib->mutex); 5466 5467 if (lib->callbacks.add_port != NULL && ready) { 5468 lib->callbacks.add_port(ctx, old_port); 5469 5470 pthread_mutex_lock(&lib->mutex); 5471 5472 old_port_impl->ready = ready; 5473 5474 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) { 5475 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req); 5476 nxt_queue_init(&old_port_impl->awaiting_req); 5477 } 5478 5479 pthread_mutex_unlock(&lib->mutex); 5480 } 5481 5482 nxt_unit_process_awaiting_req(ctx, &awaiting_req); 5483 5484 return old_port; 5485 } 5486 5487 new_port = NULL; 5488 ready = 0; 5489 5490 nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p", 5491 port->id.pid, port->id.id, 5492 port->in_fd, port->out_fd, queue); 5493 5494 process = nxt_unit_process_get(ctx, port->id.pid); 5495 if (nxt_slow_path(process == NULL)) { 5496 goto unlock; 5497 } 5498 5499 if (port->id.id != NXT_UNIT_SHARED_PORT_ID 5500 && port->id.id >= process->next_port_id) 5501 { 5502 process->next_port_id = port->id.id + 1; 5503 } 5504 5505 new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 5506 if (nxt_slow_path(new_port == NULL)) { 5507 nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed", 5508 port->id.pid, port->id.id); 5509 5510 goto unlock; 5511 } 5512 5513 new_port->port = *port; 5514 5515 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port); 5516 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 5517 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed", 5518 port->id.pid, port->id.id); 5519 5520 nxt_unit_free(ctx, new_port); 5521 5522 new_port = NULL; 5523 5524 goto unlock; 5525 } 5526 5527 nxt_queue_insert_tail(&process->ports, &new_port->link); 5528 5529 new_port->use_count = 2; 5530 new_port->process = process; 5531 new_port->queue = queue; 5532 new_port->from_socket = 0; 5533 new_port->socket_rbuf = NULL; 5534 5535 nxt_queue_init(&new_port->awaiting_req); 5536 5537 ready = (port->in_fd != -1 || port->out_fd != -1); 5538 5539 if (lib->callbacks.add_port == NULL) { 5540 new_port->ready = ready; 5541 5542 } else { 5543 new_port->ready = 0; 5544 } 5545 5546 process = NULL; 5547 5548 unlock: 5549 5550 pthread_mutex_unlock(&lib->mutex); 5551 5552 if (nxt_slow_path(process != NULL)) { 5553 nxt_unit_process_release(process); 5554 } 5555 5556 if (lib->callbacks.add_port != NULL && new_port != NULL && ready) { 5557 lib->callbacks.add_port(ctx, &new_port->port); 5558 5559 nxt_queue_init(&awaiting_req); 5560 5561 pthread_mutex_lock(&lib->mutex); 5562 5563 new_port->ready = 1; 5564 5565 if (!nxt_queue_is_empty(&new_port->awaiting_req)) { 5566 nxt_queue_add(&awaiting_req, &new_port->awaiting_req); 5567 nxt_queue_init(&new_port->awaiting_req); 5568 } 5569 5570 pthread_mutex_unlock(&lib->mutex); 5571 5572 nxt_unit_process_awaiting_req(ctx, &awaiting_req); 5573 } 5574 5575 return (new_port == NULL) ? NULL : &new_port->port; 5576 } 5577 5578 5579 static void 5580 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req) 5581 { 5582 nxt_unit_ctx_impl_t *ctx_impl; 5583 nxt_unit_request_info_impl_t *req_impl; 5584 5585 nxt_queue_each(req_impl, awaiting_req, 5586 nxt_unit_request_info_impl_t, port_wait_link) 5587 { 5588 nxt_queue_remove(&req_impl->port_wait_link); 5589 5590 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, 5591 ctx); 5592 5593 pthread_mutex_lock(&ctx_impl->mutex); 5594 5595 nxt_queue_insert_tail(&ctx_impl->ready_req, 5596 &req_impl->port_wait_link); 5597 5598 pthread_mutex_unlock(&ctx_impl->mutex); 5599 5600 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1); 5601 5602 nxt_unit_awake_ctx(ctx, ctx_impl); 5603 5604 } nxt_queue_loop; 5605 } 5606 5607 5608 static void 5609 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 5610 { 5611 nxt_unit_port_t *port; 5612 nxt_unit_port_impl_t *port_impl; 5613 5614 pthread_mutex_lock(&lib->mutex); 5615 5616 port = nxt_unit_remove_port_unsafe(lib, port_id); 5617 5618 if (nxt_fast_path(port != NULL)) { 5619 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5620 5621 nxt_queue_remove(&port_impl->link); 5622 } 5623 5624 pthread_mutex_unlock(&lib->mutex); 5625 5626 if (lib->callbacks.remove_port != NULL && port != NULL) { 5627 lib->callbacks.remove_port(&lib->unit, port); 5628 } 5629 5630 if (nxt_fast_path(port != NULL)) { 5631 nxt_unit_port_release(port); 5632 } 5633 } 5634 5635 5636 static nxt_unit_port_t * 5637 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id) 5638 { 5639 nxt_unit_port_t *port; 5640 5641 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1); 5642 if (nxt_slow_path(port == NULL)) { 5643 nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found", 5644 (int) port_id->pid, (int) port_id->id); 5645 5646 return NULL; 5647 } 5648 5649 nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p", 5650 (int) port_id->pid, (int) port_id->id, 5651 port->in_fd, port->out_fd, port->data); 5652 5653 return port; 5654 } 5655 5656 5657 static void 5658 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid) 5659 { 5660 nxt_unit_process_t *process; 5661 5662 pthread_mutex_lock(&lib->mutex); 5663 5664 process = nxt_unit_process_find(lib, pid, 1); 5665 if (nxt_slow_path(process == NULL)) { 5666 nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid); 5667 5668 pthread_mutex_unlock(&lib->mutex); 5669 5670 return; 5671 } 5672 5673 nxt_unit_remove_process(lib, process); 5674 5675 if (lib->callbacks.remove_pid != NULL) { 5676 lib->callbacks.remove_pid(&lib->unit, pid); 5677 } 5678 } 5679 5680 5681 static void 5682 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process) 5683 { 5684 nxt_queue_t ports; 5685 nxt_unit_port_impl_t *port; 5686 5687 nxt_queue_init(&ports); 5688 5689 nxt_queue_add(&ports, &process->ports); 5690 5691 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 5692 5693 nxt_unit_remove_port_unsafe(lib, &port->port.id); 5694 5695 } nxt_queue_loop; 5696 5697 pthread_mutex_unlock(&lib->mutex); 5698 5699 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) { 5700 5701 nxt_queue_remove(&port->link); 5702 5703 if (lib->callbacks.remove_port != NULL) { 5704 lib->callbacks.remove_port(&lib->unit, &port->port); 5705 } 5706 5707 nxt_unit_port_release(&port->port); 5708 5709 } nxt_queue_loop; 5710 5711 nxt_unit_process_release(process); 5712 } 5713 5714 5715 static void 5716 nxt_unit_quit(nxt_unit_ctx_t *ctx) 5717 { 5718 nxt_port_msg_t msg; 5719 nxt_unit_impl_t *lib; 5720 nxt_unit_ctx_impl_t *ctx_impl; 5721 5722 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5723 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5724 5725 if (!ctx_impl->online) { 5726 return; 5727 } 5728 5729 ctx_impl->online = 0; 5730 5731 if (lib->callbacks.quit != NULL) { 5732 lib->callbacks.quit(ctx); 5733 } 5734 5735 if (ctx != &lib->main_ctx.ctx) { 5736 return; 5737 } 5738 5739 memset(&msg, 0, sizeof(nxt_port_msg_t)); 5740 5741 msg.pid = lib->pid; 5742 msg.type = _NXT_PORT_MSG_QUIT; 5743 5744 pthread_mutex_lock(&lib->mutex); 5745 5746 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) { 5747 5748 if (ctx == &ctx_impl->ctx 5749 || ctx_impl->read_port == NULL 5750 || ctx_impl->read_port->out_fd == -1) 5751 { 5752 continue; 5753 } 5754 5755 (void) nxt_unit_port_send(ctx, ctx_impl->read_port, 5756 &msg, sizeof(msg), NULL, 0); 5757 5758 } nxt_queue_loop; 5759 5760 pthread_mutex_unlock(&lib->mutex); 5761 } 5762 5763 5764 static int 5765 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 5766 { 5767 ssize_t res; 5768 nxt_unit_impl_t *lib; 5769 nxt_unit_ctx_impl_t *ctx_impl; 5770 5771 struct { 5772 nxt_port_msg_t msg; 5773 nxt_port_msg_get_port_t get_port; 5774 } m; 5775 5776 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5777 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 5778 5779 memset(&m.msg, 0, sizeof(nxt_port_msg_t)); 5780 5781 m.msg.pid = lib->pid; 5782 m.msg.reply_port = ctx_impl->read_port->id.id; 5783 m.msg.type = _NXT_PORT_MSG_GET_PORT; 5784 5785 m.get_port.id = port_id->id; 5786 m.get_port.pid = port_id->pid; 5787 5788 nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid, 5789 (int) port_id->id); 5790 5791 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0); 5792 if (nxt_slow_path(res != sizeof(m))) { 5793 return NXT_UNIT_ERROR; 5794 } 5795 5796 return NXT_UNIT_OK; 5797 } 5798 5799 5800 static ssize_t 5801 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5802 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 5803 { 5804 int notify; 5805 ssize_t ret; 5806 nxt_int_t rc; 5807 nxt_port_msg_t msg; 5808 nxt_unit_impl_t *lib; 5809 nxt_unit_port_impl_t *port_impl; 5810 5811 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 5812 5813 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5814 if (port_impl->queue != NULL && oob_size == 0 5815 && buf_size <= NXT_PORT_QUEUE_MSG_SIZE) 5816 { 5817 rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify); 5818 if (nxt_slow_path(rc != NXT_OK)) { 5819 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", 5820 (int) port->id.pid, (int) port->id.id); 5821 5822 return -1; 5823 } 5824 5825 nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d", 5826 (int) port->id.pid, (int) port->id.id, 5827 (int) buf_size, notify); 5828 5829 if (notify) { 5830 memcpy(&msg, buf, sizeof(nxt_port_msg_t)); 5831 5832 msg.type = _NXT_PORT_MSG_READ_QUEUE; 5833 5834 if (lib->callbacks.port_send == NULL) { 5835 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg, 5836 sizeof(nxt_port_msg_t), NULL, 0); 5837 5838 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue", 5839 (int) port->id.pid, (int) port->id.id, 5840 (int) ret); 5841 5842 } else { 5843 ret = lib->callbacks.port_send(ctx, port, &msg, 5844 sizeof(nxt_port_msg_t), NULL, 0); 5845 5846 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue", 5847 (int) port->id.pid, (int) port->id.id, 5848 (int) ret); 5849 } 5850 5851 } 5852 5853 return buf_size; 5854 } 5855 5856 if (port_impl->queue != NULL) { 5857 msg.type = _NXT_PORT_MSG_READ_SOCKET; 5858 5859 rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify); 5860 if (nxt_slow_path(rc != NXT_OK)) { 5861 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow", 5862 (int) port->id.pid, (int) port->id.id); 5863 5864 return -1; 5865 } 5866 5867 nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d", 5868 (int) port->id.pid, (int) port->id.id, notify); 5869 } 5870 5871 if (lib->callbacks.port_send != NULL) { 5872 ret = lib->callbacks.port_send(ctx, port, buf, buf_size, 5873 oob, oob_size); 5874 5875 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d", 5876 (int) port->id.pid, (int) port->id.id, 5877 (int) ret); 5878 5879 } else { 5880 ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, 5881 oob, oob_size); 5882 5883 nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d", 5884 (int) port->id.pid, (int) port->id.id, 5885 (int) ret); 5886 } 5887 5888 return ret; 5889 } 5890 5891 5892 static ssize_t 5893 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 5894 const void *buf, size_t buf_size, const void *oob, size_t oob_size) 5895 { 5896 int err; 5897 ssize_t res; 5898 struct iovec iov[1]; 5899 struct msghdr msg; 5900 5901 iov[0].iov_base = (void *) buf; 5902 iov[0].iov_len = buf_size; 5903 5904 msg.msg_name = NULL; 5905 msg.msg_namelen = 0; 5906 msg.msg_iov = iov; 5907 msg.msg_iovlen = 1; 5908 msg.msg_flags = 0; 5909 msg.msg_control = (void *) oob; 5910 msg.msg_controllen = oob_size; 5911 5912 retry: 5913 5914 res = sendmsg(fd, &msg, 0); 5915 5916 if (nxt_slow_path(res == -1)) { 5917 err = errno; 5918 5919 if (err == EINTR) { 5920 goto retry; 5921 } 5922 5923 /* 5924 * FIXME: This should be "alert" after router graceful shutdown 5925 * implementation. 5926 */ 5927 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)", 5928 fd, (int) buf_size, strerror(err), err); 5929 5930 } else { 5931 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size, 5932 (int) res); 5933 } 5934 5935 return res; 5936 } 5937 5938 5939 static int 5940 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 5941 nxt_unit_read_buf_t *rbuf) 5942 { 5943 int res, read; 5944 nxt_unit_port_impl_t *port_impl; 5945 5946 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 5947 5948 read = 0; 5949 5950 retry: 5951 5952 if (port_impl->from_socket > 0) { 5953 if (port_impl->socket_rbuf != NULL 5954 && port_impl->socket_rbuf->size > 0) 5955 { 5956 port_impl->from_socket--; 5957 5958 nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf); 5959 port_impl->socket_rbuf->size = 0; 5960 5961 nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d", 5962 (int) port->id.pid, (int) port->id.id, 5963 (int) rbuf->size); 5964 5965 return NXT_UNIT_OK; 5966 } 5967 5968 } else { 5969 res = nxt_unit_port_queue_recv(port, rbuf); 5970 5971 if (res == NXT_UNIT_OK) { 5972 if (nxt_unit_is_read_socket(rbuf)) { 5973 port_impl->from_socket++; 5974 5975 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d", 5976 (int) port->id.pid, (int) port->id.id, 5977 port_impl->from_socket); 5978 5979 goto retry; 5980 } 5981 5982 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d", 5983 (int) port->id.pid, (int) port->id.id, 5984 (int) rbuf->size); 5985 5986 return NXT_UNIT_OK; 5987 } 5988 } 5989 5990 if (read) { 5991 return NXT_UNIT_AGAIN; 5992 } 5993 5994 res = nxt_unit_port_recv(ctx, port, rbuf); 5995 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 5996 return NXT_UNIT_ERROR; 5997 } 5998 5999 read = 1; 6000 6001 if (nxt_unit_is_read_queue(rbuf)) { 6002 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", 6003 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6004 6005 goto retry; 6006 } 6007 6008 nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d", 6009 (int) port->id.pid, (int) port->id.id, 6010 (int) rbuf->size); 6011 6012 if (res == NXT_UNIT_AGAIN) { 6013 return NXT_UNIT_AGAIN; 6014 } 6015 6016 if (port_impl->from_socket > 0) { 6017 port_impl->from_socket--; 6018 6019 return NXT_UNIT_OK; 6020 } 6021 6022 nxt_unit_debug(ctx, "port{%d,%d} suspend message %d", 6023 (int) port->id.pid, (int) port->id.id, 6024 (int) rbuf->size); 6025 6026 if (port_impl->socket_rbuf == NULL) { 6027 port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx); 6028 6029 if (nxt_slow_path(port_impl->socket_rbuf == NULL)) { 6030 return NXT_UNIT_ERROR; 6031 } 6032 6033 port_impl->socket_rbuf->size = 0; 6034 } 6035 6036 if (port_impl->socket_rbuf->size > 0) { 6037 nxt_unit_alert(ctx, "too many port socket messages"); 6038 6039 return NXT_UNIT_ERROR; 6040 } 6041 6042 nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf); 6043 6044 memset(rbuf->oob, 0, sizeof(struct cmsghdr)); 6045 6046 goto retry; 6047 } 6048 6049 6050 nxt_inline void 6051 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src) 6052 { 6053 memcpy(dst->buf, src->buf, src->size); 6054 dst->size = src->size; 6055 memcpy(dst->oob, src->oob, sizeof(src->oob)); 6056 } 6057 6058 6059 static int 6060 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 6061 nxt_unit_read_buf_t *rbuf) 6062 { 6063 int res; 6064 6065 retry: 6066 6067 res = nxt_unit_app_queue_recv(port, rbuf); 6068 6069 if (res == NXT_UNIT_AGAIN) { 6070 res = nxt_unit_port_recv(ctx, port, rbuf); 6071 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 6072 return NXT_UNIT_ERROR; 6073 } 6074 6075 if (nxt_unit_is_read_queue(rbuf)) { 6076 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue", 6077 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6078 6079 goto retry; 6080 } 6081 } 6082 6083 return res; 6084 } 6085 6086 6087 static int 6088 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 6089 nxt_unit_read_buf_t *rbuf) 6090 { 6091 int fd, err; 6092 struct iovec iov[1]; 6093 struct msghdr msg; 6094 nxt_unit_impl_t *lib; 6095 6096 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 6097 6098 if (lib->callbacks.port_recv != NULL) { 6099 rbuf->size = lib->callbacks.port_recv(ctx, port, 6100 rbuf->buf, sizeof(rbuf->buf), 6101 rbuf->oob, sizeof(rbuf->oob)); 6102 6103 nxt_unit_debug(ctx, "port{%d,%d} recvcb %d", 6104 (int) port->id.pid, (int) port->id.id, (int) rbuf->size); 6105 6106 if (nxt_slow_path(rbuf->size < 0)) { 6107 return NXT_UNIT_ERROR; 6108 } 6109 6110 return NXT_UNIT_OK; 6111 } 6112 6113 iov[0].iov_base = rbuf->buf; 6114 iov[0].iov_len = sizeof(rbuf->buf); 6115 6116 msg.msg_name = NULL; 6117 msg.msg_namelen = 0; 6118 msg.msg_iov = iov; 6119 msg.msg_iovlen = 1; 6120 msg.msg_flags = 0; 6121 msg.msg_control = rbuf->oob; 6122 msg.msg_controllen = sizeof(rbuf->oob); 6123 6124 fd = port->in_fd; 6125 6126 retry: 6127 6128 rbuf->size = recvmsg(fd, &msg, 0); 6129 6130 if (nxt_slow_path(rbuf->size == -1)) { 6131 err = errno; 6132 6133 if (err == EINTR) { 6134 goto retry; 6135 } 6136 6137 if (err == EAGAIN) { 6138 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)", 6139 fd, strerror(err), err); 6140 6141 return NXT_UNIT_AGAIN; 6142 } 6143 6144 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)", 6145 fd, strerror(err), err); 6146 6147 return NXT_UNIT_ERROR; 6148 } 6149 6150 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size); 6151 6152 return NXT_UNIT_OK; 6153 } 6154 6155 6156 static int 6157 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) 6158 { 6159 nxt_unit_port_impl_t *port_impl; 6160 6161 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6162 6163 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf); 6164 6165 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; 6166 } 6167 6168 6169 static int 6170 nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf) 6171 { 6172 uint32_t cookie; 6173 nxt_port_msg_t *port_msg; 6174 nxt_app_queue_t *queue; 6175 nxt_unit_port_impl_t *port_impl; 6176 6177 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 6178 queue = port_impl->queue; 6179 6180 retry: 6181 6182 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie); 6183 6184 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size); 6185 6186 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) { 6187 port_msg = (nxt_port_msg_t *) rbuf->buf; 6188 6189 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) { 6190 return NXT_UNIT_OK; 6191 } 6192 6193 nxt_unit_debug(NULL, "app_queue_recv: message cancelled"); 6194 6195 goto retry; 6196 } 6197 6198 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK; 6199 } 6200 6201 6202 nxt_inline int 6203 nxt_unit_close(int fd) 6204 { 6205 int res; 6206 6207 res = close(fd); 6208 6209 if (nxt_slow_path(res == -1)) { 6210 nxt_unit_alert(NULL, "close(%d) failed: %s (%d)", 6211 fd, strerror(errno), errno); 6212 6213 } else { 6214 nxt_unit_debug(NULL, "close(%d): %d", fd, res); 6215 } 6216 6217 return res; 6218 } 6219 6220 6221 static int 6222 nxt_unit_fd_blocking(int fd) 6223 { 6224 int nb; 6225 6226 nb = 0; 6227 6228 if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) { 6229 nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)", 6230 fd, strerror(errno), errno); 6231 6232 return NXT_UNIT_ERROR; 6233 } 6234 6235 return NXT_UNIT_OK; 6236 } 6237 6238 6239 static nxt_int_t 6240 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 6241 { 6242 nxt_unit_port_t *port; 6243 nxt_unit_port_hash_id_t *port_id; 6244 6245 port = data; 6246 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start; 6247 6248 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t) 6249 && port_id->pid == port->id.pid 6250 && port_id->id == port->id.id) 6251 { 6252 return NXT_OK; 6253 } 6254 6255 return NXT_DECLINED; 6256 } 6257 6258 6259 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = { 6260 NXT_LVLHSH_DEFAULT, 6261 nxt_unit_port_hash_test, 6262 nxt_unit_lvlhsh_alloc, 6263 nxt_unit_lvlhsh_free, 6264 }; 6265 6266 6267 static inline void 6268 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq, 6269 nxt_unit_port_hash_id_t *port_hash_id, 6270 nxt_unit_port_id_t *port_id) 6271 { 6272 port_hash_id->pid = port_id->pid; 6273 port_hash_id->id = port_id->id; 6274 6275 if (nxt_fast_path(port_id->hash != 0)) { 6276 lhq->key_hash = port_id->hash; 6277 6278 } else { 6279 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id)); 6280 6281 port_id->hash = lhq->key_hash; 6282 6283 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X", 6284 (int) port_id->pid, (int) port_id->id, 6285 (int) port_id->hash); 6286 } 6287 6288 lhq->key.length = sizeof(nxt_unit_port_hash_id_t); 6289 lhq->key.start = (u_char *) port_hash_id; 6290 lhq->proto = &lvlhsh_ports_proto; 6291 lhq->pool = NULL; 6292 } 6293 6294 6295 static int 6296 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port) 6297 { 6298 nxt_int_t res; 6299 nxt_lvlhsh_query_t lhq; 6300 nxt_unit_port_hash_id_t port_hash_id; 6301 6302 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id); 6303 lhq.replace = 0; 6304 lhq.value = port; 6305 6306 res = nxt_lvlhsh_insert(port_hash, &lhq); 6307 6308 switch (res) { 6309 6310 case NXT_OK: 6311 return NXT_UNIT_OK; 6312 6313 default: 6314 return NXT_UNIT_ERROR; 6315 } 6316 } 6317 6318 6319 static nxt_unit_port_t * 6320 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id, 6321 int remove) 6322 { 6323 nxt_int_t res; 6324 nxt_lvlhsh_query_t lhq; 6325 nxt_unit_port_hash_id_t port_hash_id; 6326 6327 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id); 6328 6329 if (remove) { 6330 res = nxt_lvlhsh_delete(port_hash, &lhq); 6331 6332 } else { 6333 res = nxt_lvlhsh_find(port_hash, &lhq); 6334 } 6335 6336 switch (res) { 6337 6338 case NXT_OK: 6339 if (!remove) { 6340 nxt_unit_port_use(lhq.value); 6341 } 6342 6343 return lhq.value; 6344 6345 default: 6346 return NULL; 6347 } 6348 } 6349 6350 6351 static nxt_int_t 6352 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data) 6353 { 6354 return NXT_OK; 6355 } 6356 6357 6358 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = { 6359 NXT_LVLHSH_DEFAULT, 6360 nxt_unit_request_hash_test, 6361 nxt_unit_lvlhsh_alloc, 6362 nxt_unit_lvlhsh_free, 6363 }; 6364 6365 6366 static int 6367 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 6368 nxt_unit_request_info_t *req) 6369 { 6370 uint32_t *stream; 6371 nxt_int_t res; 6372 nxt_lvlhsh_query_t lhq; 6373 nxt_unit_ctx_impl_t *ctx_impl; 6374 nxt_unit_request_info_impl_t *req_impl; 6375 6376 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 6377 if (req_impl->in_hash) { 6378 return NXT_UNIT_OK; 6379 } 6380 6381 stream = &req_impl->stream; 6382 6383 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 6384 lhq.key.length = sizeof(*stream); 6385 lhq.key.start = (u_char *) stream; 6386 lhq.proto = &lvlhsh_requests_proto; 6387 lhq.pool = NULL; 6388 lhq.replace = 0; 6389 lhq.value = req_impl; 6390 6391 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 6392 6393 pthread_mutex_lock(&ctx_impl->mutex); 6394 6395 res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq); 6396 6397 pthread_mutex_unlock(&ctx_impl->mutex); 6398 6399 switch (res) { 6400 6401 case NXT_OK: 6402 req_impl->in_hash = 1; 6403 return NXT_UNIT_OK; 6404 6405 default: 6406 return NXT_UNIT_ERROR; 6407 } 6408 } 6409 6410 6411 static nxt_unit_request_info_t * 6412 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove) 6413 { 6414 nxt_int_t res; 6415 nxt_lvlhsh_query_t lhq; 6416 nxt_unit_ctx_impl_t *ctx_impl; 6417 nxt_unit_request_info_impl_t *req_impl; 6418 6419 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream)); 6420 lhq.key.length = sizeof(stream); 6421 lhq.key.start = (u_char *) &stream; 6422 lhq.proto = &lvlhsh_requests_proto; 6423 lhq.pool = NULL; 6424 6425 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 6426 6427 pthread_mutex_lock(&ctx_impl->mutex); 6428 6429 if (remove) { 6430 res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq); 6431 6432 } else { 6433 res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq); 6434 } 6435 6436 pthread_mutex_unlock(&ctx_impl->mutex); 6437 6438 switch (res) { 6439 6440 case NXT_OK: 6441 req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t, 6442 req); 6443 if (remove) { 6444 req_impl->in_hash = 0; 6445 } 6446 6447 return lhq.value; 6448 6449 default: 6450 return NULL; 6451 } 6452 } 6453 6454 6455 void 6456 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...) 6457 { 6458 int log_fd, n; 6459 char msg[NXT_MAX_ERROR_STR], *p, *end; 6460 pid_t pid; 6461 va_list ap; 6462 nxt_unit_impl_t *lib; 6463 6464 if (nxt_fast_path(ctx != NULL)) { 6465 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 6466 6467 pid = lib->pid; 6468 log_fd = lib->log_fd; 6469 6470 } else { 6471 pid = getpid(); 6472 log_fd = STDERR_FILENO; 6473 } 6474 6475 p = msg; 6476 end = p + sizeof(msg) - 1; 6477 6478 p = nxt_unit_snprint_prefix(p, end, pid, level); 6479 6480 va_start(ap, fmt); 6481 p += vsnprintf(p, end - p, fmt, ap); 6482 va_end(ap); 6483 6484 if (nxt_slow_path(p > end)) { 6485 memcpy(end - 5, "[...]", 5); 6486 p = end; 6487 } 6488 6489 *p++ = '\n'; 6490 6491 n = write(log_fd, msg, p - msg); 6492 if (nxt_slow_path(n < 0)) { 6493 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 6494 } 6495 } 6496 6497 6498 void 6499 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...) 6500 { 6501 int log_fd, n; 6502 char msg[NXT_MAX_ERROR_STR], *p, *end; 6503 pid_t pid; 6504 va_list ap; 6505 nxt_unit_impl_t *lib; 6506 nxt_unit_request_info_impl_t *req_impl; 6507 6508 if (nxt_fast_path(req != NULL)) { 6509 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 6510 6511 pid = lib->pid; 6512 log_fd = lib->log_fd; 6513 6514 } else { 6515 pid = getpid(); 6516 log_fd = STDERR_FILENO; 6517 } 6518 6519 p = msg; 6520 end = p + sizeof(msg) - 1; 6521 6522 p = nxt_unit_snprint_prefix(p, end, pid, level); 6523 6524 if (nxt_fast_path(req != NULL)) { 6525 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 6526 6527 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream); 6528 } 6529 6530 va_start(ap, fmt); 6531 p += vsnprintf(p, end - p, fmt, ap); 6532 va_end(ap); 6533 6534 if (nxt_slow_path(p > end)) { 6535 memcpy(end - 5, "[...]", 5); 6536 p = end; 6537 } 6538 6539 *p++ = '\n'; 6540 6541 n = write(log_fd, msg, p - msg); 6542 if (nxt_slow_path(n < 0)) { 6543 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg); 6544 } 6545 } 6546 6547 6548 static const char * nxt_unit_log_levels[] = { 6549 "alert", 6550 "error", 6551 "warn", 6552 "notice", 6553 "info", 6554 "debug", 6555 }; 6556 6557 6558 static char * 6559 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level) 6560 { 6561 struct tm tm; 6562 struct timespec ts; 6563 6564 (void) clock_gettime(CLOCK_REALTIME, &ts); 6565 6566 #if (NXT_HAVE_LOCALTIME_R) 6567 (void) localtime_r(&ts.tv_sec, &tm); 6568 #else 6569 tm = *localtime(&ts.tv_sec); 6570 #endif 6571 6572 #if (NXT_DEBUG) 6573 p += snprintf(p, end - p, 6574 "%4d/%02d/%02d %02d:%02d:%02d.%03d ", 6575 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 6576 tm.tm_hour, tm.tm_min, tm.tm_sec, 6577 (int) ts.tv_nsec / 1000000); 6578 #else 6579 p += snprintf(p, end - p, 6580 "%4d/%02d/%02d %02d:%02d:%02d ", 6581 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, 6582 tm.tm_hour, tm.tm_min, tm.tm_sec); 6583 #endif 6584 6585 p += snprintf(p, end - p, 6586 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level], 6587 (int) pid, 6588 (uint64_t) (uintptr_t) nxt_thread_get_tid()); 6589 6590 return p; 6591 } 6592 6593 6594 static void * 6595 nxt_unit_lvlhsh_alloc(void *data, size_t size) 6596 { 6597 int err; 6598 void *p; 6599 6600 err = posix_memalign(&p, size, size); 6601 6602 if (nxt_fast_path(err == 0)) { 6603 nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p", 6604 (int) size, (int) size, p); 6605 return p; 6606 } 6607 6608 nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)", 6609 (int) size, (int) size, strerror(err), err); 6610 return NULL; 6611 } 6612 6613 6614 static void 6615 nxt_unit_lvlhsh_free(void *data, void *p) 6616 { 6617 nxt_unit_free(NULL, p); 6618 } 6619 6620 6621 void * 6622 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size) 6623 { 6624 void *p; 6625 6626 p = malloc(size); 6627 6628 if (nxt_fast_path(p != NULL)) { 6629 #if (NXT_DEBUG_ALLOC) 6630 nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p); 6631 #endif 6632 6633 } else { 6634 nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)", 6635 (int) size, strerror(errno), errno); 6636 } 6637 6638 return p; 6639 } 6640 6641 6642 void 6643 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p) 6644 { 6645 #if (NXT_DEBUG_ALLOC) 6646 nxt_unit_debug(ctx, "free(%p)", p); 6647 #endif 6648 6649 free(p); 6650 } 6651 6652 6653 static int 6654 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length) 6655 { 6656 u_char c1, c2; 6657 nxt_int_t n; 6658 const u_char *s1, *s2; 6659 6660 s1 = p1; 6661 s2 = p2; 6662 6663 while (length-- != 0) { 6664 c1 = *s1++; 6665 c2 = *s2++; 6666 6667 c1 = nxt_lowcase(c1); 6668 c2 = nxt_lowcase(c2); 6669 6670 n = c1 - c2; 6671 6672 if (n != 0) { 6673 return n; 6674 } 6675 } 6676 6677 return 0; 6678 } 6679