nxt_unit.c (1553:c3fad601f58b) nxt_unit.c (1555:1d84b9e4b459)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"
9#include "nxt_port_memory_int.h"
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"
9#include "nxt_port_memory_int.h"
10#include "nxt_port_queue.h"
11#include "nxt_app_queue.h"
10
11#include "nxt_unit.h"
12#include "nxt_unit_request.h"
13#include "nxt_unit_response.h"
14#include "nxt_unit_websocket.h"
15
16#include "nxt_websocket.h"
17

--- 27 unchanged lines hidden (view full) ---

45nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
46 nxt_unit_mmap_buf_t *mmap_buf);
47nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
48 nxt_unit_mmap_buf_t *mmap_buf);
49nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
50static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
51 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
52 int *log_fd, uint32_t *stream, uint32_t *shm_limit);
12
13#include "nxt_unit.h"
14#include "nxt_unit_request.h"
15#include "nxt_unit_response.h"
16#include "nxt_unit_websocket.h"
17
18#include "nxt_websocket.h"
19

--- 27 unchanged lines hidden (view full) ---

47nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
48 nxt_unit_mmap_buf_t *mmap_buf);
49nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
50 nxt_unit_mmap_buf_t *mmap_buf);
51nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
52static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
53 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
54 int *log_fd, uint32_t *stream, uint32_t *shm_limit);
53static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream);
55static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
56 int queue_fd);
54static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
55static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
56 nxt_unit_recv_msg_t *recv_msg);
57static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
58 nxt_unit_recv_msg_t *recv_msg);
57static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
58static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
59 nxt_unit_recv_msg_t *recv_msg);
60static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
61 nxt_unit_recv_msg_t *recv_msg);
62static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
63 nxt_unit_recv_msg_t *recv_msg);
59static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
60 nxt_unit_port_id_t *port_id);
61static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
62static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
63 nxt_unit_recv_msg_t *recv_msg);
64static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
65static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
66 nxt_unit_ctx_t *ctx);

--- 20 unchanged lines hidden (view full) ---

87 size_t size);
88static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
89 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
90static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
91static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
92static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
93static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
94 nxt_unit_port_t *port, int n);
64static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
65 nxt_unit_port_id_t *port_id);
66static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
67static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
68 nxt_unit_recv_msg_t *recv_msg);
69static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
70static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
71 nxt_unit_ctx_t *ctx);

--- 20 unchanged lines hidden (view full) ---

92 size_t size);
93static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
94 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
95static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
96static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
97static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
98static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
99 nxt_unit_port_t *port, int n);
100static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
95static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
96 int fd);
97static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
98 nxt_unit_port_t *port, uint32_t size,
99 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
100static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
101
102static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
103nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
104nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
105static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
101static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
102 int fd);
103static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
104 nxt_unit_port_t *port, uint32_t size,
105 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
106static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
107
108static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
109nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
110nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
111static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
106static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
107 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
108static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
109 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
110 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
111static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
112 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
113static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
114static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
115 nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
116static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
117
118static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
119 pid_t pid);
120static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
121 pid_t pid, int remove);
122static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
123static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
124static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
125static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
126static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
112static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
113 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
114 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
115static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
116 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
117static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
118static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
119 nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
120static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
121
122static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_impl_t *lib,
123 pid_t pid);
124static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
125 pid_t pid, int remove);
126static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
127static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
128static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
129static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
130static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
131nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
132nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
133nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
134nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
127static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
128 nxt_unit_port_t *port);
129static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
130static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
131
132static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
135static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
136 nxt_unit_port_t *port);
137static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
138static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
139
140static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
133 nxt_unit_port_t *port);
141 nxt_unit_port_t *port, int queue_fd);
134
135nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
136nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
137static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
142
143nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
144nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
145static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
138 nxt_unit_port_t *port);
146 nxt_unit_port_t *port, void *queue);
139static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
140 nxt_unit_port_id_t *port_id);
141static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
142 nxt_unit_port_id_t *port_id);
143static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
144static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
145 nxt_unit_process_t *process);
146static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
147static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
148static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
149 nxt_unit_port_t *port, const void *buf, size_t buf_size,
150 const void *oob, size_t oob_size);
151static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
152 const void *buf, size_t buf_size, const void *oob, size_t oob_size);
147static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
148 nxt_unit_port_id_t *port_id);
149static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
150 nxt_unit_port_id_t *port_id);
151static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
152static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
153 nxt_unit_process_t *process);
154static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
155static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
156static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
157 nxt_unit_port_t *port, const void *buf, size_t buf_size,
158 const void *oob, size_t oob_size);
159static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
160 const void *buf, size_t buf_size, const void *oob, size_t oob_size);
161static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
162 nxt_unit_read_buf_t *rbuf);
163nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
164 nxt_unit_read_buf_t *src);
165static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
166 nxt_unit_read_buf_t *rbuf);
153static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
154 nxt_unit_read_buf_t *rbuf);
167static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
168 nxt_unit_read_buf_t *rbuf);
169static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
170 nxt_unit_read_buf_t *rbuf);
171static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
172 nxt_unit_read_buf_t *rbuf);
155
156static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
157 nxt_unit_port_t *port);
158static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
159 nxt_unit_port_id_t *port_id, int remove);
160
173
174static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
175 nxt_unit_port_t *port);
176static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
177 nxt_unit_port_id_t *port_id, int remove);
178
161static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
162 nxt_unit_request_info_impl_t *req_impl);
163static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
164 nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
179static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
180 nxt_unit_request_info_t *req);
181static nxt_unit_request_info_t *nxt_unit_request_hash_find(
182 nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
165
166static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
167
168
169struct nxt_unit_mmap_buf_s {
170 nxt_unit_buf_t buf;
171
172 nxt_unit_mmap_buf_t *next;

--- 39 unchanged lines hidden (view full) ---

212
213 uint32_t stream;
214
215 nxt_unit_mmap_buf_t *outgoing_buf;
216 nxt_unit_mmap_buf_t *incoming_buf;
217
218 nxt_unit_req_state_t state;
219 uint8_t websocket;
183
184static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
185
186
187struct nxt_unit_mmap_buf_s {
188 nxt_unit_buf_t buf;
189
190 nxt_unit_mmap_buf_t *next;

--- 39 unchanged lines hidden (view full) ---

230
231 uint32_t stream;
232
233 nxt_unit_mmap_buf_t *outgoing_buf;
234 nxt_unit_mmap_buf_t *incoming_buf;
235
236 nxt_unit_req_state_t state;
237 uint8_t websocket;
238 uint8_t in_hash;
220
221 /* for nxt_unit_ctx_impl_t.free_req or active_req */
222 nxt_queue_link_t link;
223 /* for nxt_unit_port_impl_t.awaiting_req */
224 nxt_queue_link_t port_wait_link;
225
226 char extra_data[];
227};

--- 116 unchanged lines hidden (view full) ---

344 /* for nxt_unit_process_t.ports */
345 nxt_queue_link_t link;
346 nxt_unit_process_t *process;
347
348 /* of nxt_unit_request_info_impl_t */
349 nxt_queue_t awaiting_req;
350
351 int ready;
239
240 /* for nxt_unit_ctx_impl_t.free_req or active_req */
241 nxt_queue_link_t link;
242 /* for nxt_unit_port_impl_t.awaiting_req */
243 nxt_queue_link_t port_wait_link;
244
245 char extra_data[];
246};

--- 116 unchanged lines hidden (view full) ---

363 /* for nxt_unit_process_t.ports */
364 nxt_queue_link_t link;
365 nxt_unit_process_t *process;
366
367 /* of nxt_unit_request_info_impl_t */
368 nxt_queue_t awaiting_req;
369
370 int ready;
371
372 void *queue;
373
374 int from_socket;
375 nxt_unit_read_buf_t *socket_rbuf;
352};
353
354
355struct nxt_unit_process_s {
356 pid_t pid;
357
358 nxt_queue_t ports; /* of nxt_unit_port_impl_t */
359

--- 10 unchanged lines hidden (view full) ---

370 int32_t pid;
371 uint32_t id;
372} nxt_unit_port_hash_id_t;
373
374
375nxt_unit_ctx_t *
376nxt_unit_init(nxt_unit_init_t *init)
377{
376};
377
378
379struct nxt_unit_process_s {
380 pid_t pid;
381
382 nxt_queue_t ports; /* of nxt_unit_port_impl_t */
383

--- 10 unchanged lines hidden (view full) ---

394 int32_t pid;
395 uint32_t id;
396} nxt_unit_port_hash_id_t;
397
398
399nxt_unit_ctx_t *
400nxt_unit_init(nxt_unit_init_t *init)
401{
378 int rc;
402 int rc, queue_fd;
403 void *mem;
379 uint32_t ready_stream, shm_limit;
380 nxt_unit_ctx_t *ctx;
381 nxt_unit_impl_t *lib;
382 nxt_unit_port_t ready_port, router_port, read_port;
383
384 lib = nxt_unit_create(init);
385 if (nxt_slow_path(lib == NULL)) {
386 return NULL;
387 }
388
404 uint32_t ready_stream, shm_limit;
405 nxt_unit_ctx_t *ctx;
406 nxt_unit_impl_t *lib;
407 nxt_unit_port_t ready_port, router_port, read_port;
408
409 lib = nxt_unit_create(init);
410 if (nxt_slow_path(lib == NULL)) {
411 return NULL;
412 }
413
414 queue_fd = -1;
415
389 if (init->ready_port.id.pid != 0
390 && init->ready_stream != 0
391 && init->read_port.id.pid != 0)
392 {
393 ready_port = init->ready_port;
394 ready_stream = init->ready_stream;
395 router_port = init->router_port;
396 read_port = init->read_port;

--- 20 unchanged lines hidden (view full) ---

417 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
418 lib->shm_mmap_limit = 1;
419 }
420
421 lib->pid = read_port.id.pid;
422
423 ctx = &lib->main_ctx.ctx;
424
416 if (init->ready_port.id.pid != 0
417 && init->ready_stream != 0
418 && init->read_port.id.pid != 0)
419 {
420 ready_port = init->ready_port;
421 ready_stream = init->ready_stream;
422 router_port = init->router_port;
423 read_port = init->read_port;

--- 20 unchanged lines hidden (view full) ---

444 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
445 lib->shm_mmap_limit = 1;
446 }
447
448 lib->pid = read_port.id.pid;
449
450 ctx = &lib->main_ctx.ctx;
451
425 lib->router_port = nxt_unit_add_port(ctx, &router_port);
452 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
426 if (nxt_slow_path(lib->router_port == NULL)) {
427 nxt_unit_alert(NULL, "failed to add router_port");
428
429 goto fail;
430 }
431
453 if (nxt_slow_path(lib->router_port == NULL)) {
454 nxt_unit_alert(NULL, "failed to add router_port");
455
456 goto fail;
457 }
458
432 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port);
459 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
460 if (nxt_slow_path(queue_fd == -1)) {
461 goto fail;
462 }
463
464 mem = mmap(NULL, sizeof(nxt_port_queue_t),
465 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
466 if (nxt_slow_path(mem == MAP_FAILED)) {
467 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
468 strerror(errno), errno);
469
470 goto fail;
471 }
472
473 nxt_port_queue_init(mem);
474
475 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
433 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
434 nxt_unit_alert(NULL, "failed to add read_port");
435
476 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
477 nxt_unit_alert(NULL, "failed to add read_port");
478
479 munmap(mem, sizeof(nxt_port_queue_t));
480
436 goto fail;
437 }
438
481 goto fail;
482 }
483
439 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream);
484 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
440 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
441 nxt_unit_alert(NULL, "failed to send READY message");
442
485 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
486 nxt_unit_alert(NULL, "failed to send READY message");
487
488 munmap(mem, sizeof(nxt_port_queue_t));
489
443 goto fail;
444 }
445
446 close(ready_port.out_fd);
490 goto fail;
491 }
492
493 close(ready_port.out_fd);
494 close(queue_fd);
447
448 return ctx;
449
450fail:
451
495
496 return ctx;
497
498fail:
499
500 if (queue_fd != -1) {
501 close(queue_fd);
502 }
503
452 nxt_unit_ctx_release(&lib->main_ctx.ctx);
453
454 return NULL;
455}
456
457
458static nxt_unit_impl_t *
459nxt_unit_create(nxt_unit_init_t *init)

--- 32 unchanged lines hidden (view full) ---

492 nxt_queue_init(&lib->contexts);
493
494 lib->use_count = 0;
495 lib->router_port = NULL;
496 lib->shared_port = NULL;
497
498 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
499 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
504 nxt_unit_ctx_release(&lib->main_ctx.ctx);
505
506 return NULL;
507}
508
509
510static nxt_unit_impl_t *
511nxt_unit_create(nxt_unit_init_t *init)

--- 32 unchanged lines hidden (view full) ---

544 nxt_queue_init(&lib->contexts);
545
546 lib->use_count = 0;
547 lib->router_port = NULL;
548 lib->shared_port = NULL;
549
550 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
551 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
552 pthread_mutex_destroy(&lib->mutex);
500 goto fail;
501 }
502
503 cb = &lib->callbacks;
504
505 if (cb->request_handler == NULL) {
506 nxt_unit_alert(NULL, "request_handler is NULL");
507
553 goto fail;
554 }
555
556 cb = &lib->callbacks;
557
558 if (cb->request_handler == NULL) {
559 nxt_unit_alert(NULL, "request_handler is NULL");
560
561 pthread_mutex_destroy(&lib->mutex);
508 goto fail;
509 }
510
511 nxt_unit_mmaps_init(&lib->incoming);
512 nxt_unit_mmaps_init(&lib->outgoing);
513
514 return lib;
515

--- 244 unchanged lines hidden (view full) ---

760
761 *stream = ready_stream;
762
763 return NXT_UNIT_OK;
764}
765
766
767static int
562 goto fail;
563 }
564
565 nxt_unit_mmaps_init(&lib->incoming);
566 nxt_unit_mmaps_init(&lib->outgoing);
567
568 return lib;
569

--- 244 unchanged lines hidden (view full) ---

814
815 *stream = ready_stream;
816
817 return NXT_UNIT_OK;
818}
819
820
821static int
768nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream)
822nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
769{
770 ssize_t res;
771 nxt_port_msg_t msg;
772 nxt_unit_impl_t *lib;
773
823{
824 ssize_t res;
825 nxt_port_msg_t msg;
826 nxt_unit_impl_t *lib;
827
828 union {
829 struct cmsghdr cm;
830 char space[CMSG_SPACE(sizeof(int))];
831 } cmsg;
832
774 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
775
776 msg.stream = stream;
777 msg.pid = lib->pid;
778 msg.reply_port = 0;
779 msg.type = _NXT_PORT_MSG_PROCESS_READY;
780 msg.last = 1;
781 msg.mmap = 0;
782 msg.nf = 0;
783 msg.mf = 0;
784 msg.tracking = 0;
785
833 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
834
835 msg.stream = stream;
836 msg.pid = lib->pid;
837 msg.reply_port = 0;
838 msg.type = _NXT_PORT_MSG_PROCESS_READY;
839 msg.last = 1;
840 msg.mmap = 0;
841 msg.nf = 0;
842 msg.mf = 0;
843 msg.tracking = 0;
844
786 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), NULL, 0);
845 memset(&cmsg, 0, sizeof(cmsg));
846
847 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
848 cmsg.cm.cmsg_level = SOL_SOCKET;
849 cmsg.cm.cmsg_type = SCM_RIGHTS;
850
851 /*
852 * memcpy() is used instead of simple
853 * *(int *) CMSG_DATA(&cmsg.cm) = fd;
854 * because GCC 4.4 with -O2/3/s optimization may issue a warning:
855 * dereferencing type-punned pointer will break strict-aliasing rules
856 *
857 * Fortunately, GCC with -O1 compiles this nxt_memcpy()
858 * in the same simple assignment as in the code above.
859 */
860 memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
861
862 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
863 &cmsg, sizeof(cmsg));
787 if (res != sizeof(msg)) {
788 return NXT_UNIT_ERROR;
789 }
790
791 return NXT_UNIT_OK;
792}
793
794

--- 38 unchanged lines hidden (view full) ---

833
834 goto fail;
835 }
836
837 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
838 goto fail;
839 }
840
864 if (res != sizeof(msg)) {
865 return NXT_UNIT_ERROR;
866 }
867
868 return NXT_UNIT_OK;
869}
870
871

--- 38 unchanged lines hidden (view full) ---

910
911 goto fail;
912 }
913
914 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
915 goto fail;
916 }
917
918 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d",
919 port_msg->stream, (int) port_msg->type,
920 recv_msg.fd, recv_msg.fd2);
921
841 recv_msg.stream = port_msg->stream;
842 recv_msg.pid = port_msg->pid;
843 recv_msg.reply_port = port_msg->reply_port;
844 recv_msg.last = port_msg->last;
845 recv_msg.mmap = port_msg->mmap;
846
847 recv_msg.start = port_msg + 1;
848 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
849
850 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
851 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
852 port_msg->stream, (int) port_msg->type);
853 goto fail;
854 }
855
922 recv_msg.stream = port_msg->stream;
923 recv_msg.pid = port_msg->pid;
924 recv_msg.reply_port = port_msg->reply_port;
925 recv_msg.last = port_msg->last;
926 recv_msg.mmap = port_msg->mmap;
927
928 recv_msg.start = port_msg + 1;
929 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
930
931 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
932 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
933 port_msg->stream, (int) port_msg->type);
934 goto fail;
935 }
936
856 if (port_msg->tracking) {
857 rc = nxt_unit_tracking_read(ctx, &recv_msg, rbuf);
858
859 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
860 if (rc == NXT_UNIT_AGAIN) {
861 recv_msg.fd = -1;
862 recv_msg.fd2 = -1;
863 }
864
865 goto fail;
866 }
867 }
868
869 /* Fragmentation is unsupported. */
870 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
871 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
872 port_msg->stream, (int) port_msg->type);
873 goto fail;
874 }
875
876 if (port_msg->mmap) {

--- 47 unchanged lines hidden (view full) ---

924
925 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
926 break;
927
928 case _NXT_PORT_MSG_REQ_HEADERS:
929 rc = nxt_unit_process_req_headers(ctx, &recv_msg);
930 break;
931
937 /* Fragmentation is unsupported. */
938 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
939 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
940 port_msg->stream, (int) port_msg->type);
941 goto fail;
942 }
943
944 if (port_msg->mmap) {

--- 47 unchanged lines hidden (view full) ---

992
993 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
994 break;
995
996 case _NXT_PORT_MSG_REQ_HEADERS:
997 rc = nxt_unit_process_req_headers(ctx, &recv_msg);
998 break;
999
1000 case _NXT_PORT_MSG_REQ_BODY:
1001 rc = nxt_unit_process_req_body(ctx, &recv_msg);
1002 break;
1003
932 case _NXT_PORT_MSG_WEBSOCKET:
933 rc = nxt_unit_process_websocket(ctx, &recv_msg);
934 break;
935
936 case _NXT_PORT_MSG_REMOVE_PID:
937 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
938 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
939 "(%d != %d)", port_msg->stream, (int) recv_msg.size,

--- 47 unchanged lines hidden (view full) ---

987 return rc;
988}
989
990
991static int
992nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
993{
994 int nb;
1004 case _NXT_PORT_MSG_WEBSOCKET:
1005 rc = nxt_unit_process_websocket(ctx, &recv_msg);
1006 break;
1007
1008 case _NXT_PORT_MSG_REMOVE_PID:
1009 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1010 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
1011 "(%d != %d)", port_msg->stream, (int) recv_msg.size,

--- 47 unchanged lines hidden (view full) ---

1059 return rc;
1060}
1061
1062
1063static int
1064nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1065{
1066 int nb;
1067 void *mem;
995 nxt_unit_impl_t *lib;
996 nxt_unit_port_t new_port, *port;
997 nxt_port_msg_new_port_t *new_port_msg;
998
999 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1000 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1001 "invalid message size (%d)",
1002 recv_msg->stream, (int) recv_msg->size);

--- 5 unchanged lines hidden (view full) ---

1008 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1009 recv_msg->stream, recv_msg->fd);
1010
1011 return NXT_UNIT_ERROR;
1012 }
1013
1014 new_port_msg = recv_msg->start;
1015
1068 nxt_unit_impl_t *lib;
1069 nxt_unit_port_t new_port, *port;
1070 nxt_port_msg_new_port_t *new_port_msg;
1071
1072 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1073 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1074 "invalid message size (%d)",
1075 recv_msg->stream, (int) recv_msg->size);

--- 5 unchanged lines hidden (view full) ---

1081 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1082 recv_msg->stream, recv_msg->fd);
1083
1084 return NXT_UNIT_ERROR;
1085 }
1086
1087 new_port_msg = recv_msg->start;
1088
1016 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
1089 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d",
1017 recv_msg->stream, (int) new_port_msg->pid,
1090 recv_msg->stream, (int) new_port_msg->pid,
1018 (int) new_port_msg->id, recv_msg->fd);
1091 (int) new_port_msg->id, recv_msg->fd, recv_msg->fd2);
1019
1020 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1021
1022 if (new_port_msg->id == (nxt_port_id_t) -1) {
1023 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1024
1025 new_port.in_fd = recv_msg->fd;
1026 new_port.out_fd = -1;
1027
1092
1093 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1094
1095 if (new_port_msg->id == (nxt_port_id_t) -1) {
1096 nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1097
1098 new_port.in_fd = recv_msg->fd;
1099 new_port.out_fd = -1;
1100
1101 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1102 MAP_SHARED, recv_msg->fd2, 0);
1103
1028 } else {
1029 nb = 0;
1030
1031 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
1032 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
1033 "failed: %s (%d)",
1034 recv_msg->stream, recv_msg->fd, strerror(errno), errno);
1035
1036 return NXT_UNIT_ERROR;
1037 }
1038
1039 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1040 new_port_msg->id);
1041
1042 new_port.in_fd = -1;
1043 new_port.out_fd = recv_msg->fd;
1104 } else {
1105 nb = 0;
1106
1107 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
1108 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
1109 "failed: %s (%d)",
1110 recv_msg->stream, recv_msg->fd, strerror(errno), errno);
1111
1112 return NXT_UNIT_ERROR;
1113 }
1114
1115 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1116 new_port_msg->id);
1117
1118 new_port.in_fd = -1;
1119 new_port.out_fd = recv_msg->fd;
1120
1121 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1122 MAP_SHARED, recv_msg->fd2, 0);
1044 }
1045
1123 }
1124
1125 if (nxt_slow_path(mem == MAP_FAILED)) {
1126 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2,
1127 strerror(errno), errno);
1046
1128
1129 return NXT_UNIT_ERROR;
1130 }
1131
1047 new_port.data = NULL;
1048
1049 recv_msg->fd = -1;
1050
1132 new_port.data = NULL;
1133
1134 recv_msg->fd = -1;
1135
1051 port = nxt_unit_add_port(ctx, &new_port);
1136 port = nxt_unit_add_port(ctx, &new_port, mem);
1052 if (nxt_slow_path(port == NULL)) {
1053 return NXT_UNIT_ERROR;
1054 }
1055
1056 if (new_port_msg->id == (nxt_port_id_t) -1) {
1057 lib->shared_port = port;
1058
1059 } else {

--- 69 unchanged lines hidden (view full) ---

1129 recv_msg->incoming_buf = NULL;
1130
1131 req->content_fd = recv_msg->fd;
1132 recv_msg->fd = -1;
1133
1134 req->response_max_fields = 0;
1135 req_impl->state = NXT_UNIT_RS_START;
1136 req_impl->websocket = 0;
1137 if (nxt_slow_path(port == NULL)) {
1138 return NXT_UNIT_ERROR;
1139 }
1140
1141 if (new_port_msg->id == (nxt_port_id_t) -1) {
1142 lib->shared_port = port;
1143
1144 } else {

--- 69 unchanged lines hidden (view full) ---

1214 recv_msg->incoming_buf = NULL;
1215
1216 req->content_fd = recv_msg->fd;
1217 recv_msg->fd = -1;
1218
1219 req->response_max_fields = 0;
1220 req_impl->state = NXT_UNIT_RS_START;
1221 req_impl->websocket = 0;
1222 req_impl->in_hash = 0;
1137
1138 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1139 (int) r->method_length,
1140 (char *) nxt_unit_sptr_get(&r->method),
1141 (int) r->target_length,
1142 (char *) nxt_unit_sptr_get(&r->target),
1143 (int) r->content_length);
1144
1145 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1146
1147 res = nxt_unit_request_check_response_port(req, &port_id);
1148 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1149 return NXT_UNIT_ERROR;
1150 }
1151
1152 if (nxt_fast_path(res == NXT_UNIT_OK)) {
1153 res = nxt_unit_send_req_headers_ack(req);
1223
1224 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1225 (int) r->method_length,
1226 (char *) nxt_unit_sptr_get(&r->method),
1227 (int) r->target_length,
1228 (char *) nxt_unit_sptr_get(&r->target),
1229 (int) r->content_length);
1230
1231 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1232
1233 res = nxt_unit_request_check_response_port(req, &port_id);
1234 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1235 return NXT_UNIT_ERROR;
1236 }
1237
1238 if (nxt_fast_path(res == NXT_UNIT_OK)) {
1239 res = nxt_unit_send_req_headers_ack(req);
1154 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1155 return res;
1240 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1241 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1242
1243 return NXT_UNIT_ERROR;
1156 }
1157
1158 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1159
1244 }
1245
1246 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1247
1248 if (req->content_length
1249 > (uint64_t) (req->content_buf->end - req->content_buf->free))
1250 {
1251 res = nxt_unit_request_hash_add(ctx, req);
1252 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1253 nxt_unit_req_warn(req, "failed to add request to hash");
1254
1255 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1256
1257 return NXT_UNIT_ERROR;
1258 }
1259
1260 /*
1261 * If application have separate data handler, we may start
1262 * request processing and process data when it is arrived.
1263 */
1264 if (lib->callbacks.data_handler == NULL) {
1265 return NXT_UNIT_OK;
1266 }
1267 }
1268
1160 lib->callbacks.request_handler(req);
1161 }
1162
1163 return NXT_UNIT_OK;
1164}
1165
1166
1167static int
1269 lib->callbacks.request_handler(req);
1270 }
1271
1272 return NXT_UNIT_OK;
1273}
1274
1275
1276static int
1277nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1278{
1279 uint64_t l;
1280 nxt_unit_impl_t *lib;
1281 nxt_unit_mmap_buf_t *b;
1282 nxt_unit_request_info_t *req;
1283
1284 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1285 if (req == NULL) {
1286 return NXT_UNIT_OK;
1287 }
1288
1289 l = req->content_buf->end - req->content_buf->free;
1290
1291 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1292 b->req = req;
1293 l += b->buf.end - b->buf.free;
1294 }
1295
1296 if (recv_msg->incoming_buf != NULL) {
1297 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1298
1299 /* "Move" incoming buffer list to req_impl. */
1300 nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
1301 recv_msg->incoming_buf = NULL;
1302 }
1303
1304 req->content_fd = recv_msg->fd;
1305 recv_msg->fd = -1;
1306
1307 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1308
1309 if (lib->callbacks.data_handler != NULL) {
1310 lib->callbacks.data_handler(req);
1311
1312 return NXT_UNIT_OK;
1313 }
1314
1315 if (req->content_fd != -1 || l == req->content_length) {
1316 lib->callbacks.request_handler(req);
1317 }
1318
1319 return NXT_UNIT_OK;
1320}
1321
1322
1323static int
1168nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1169 nxt_unit_port_id_t *port_id)
1170{
1171 int res;
1172 nxt_unit_ctx_t *ctx;
1173 nxt_unit_impl_t *lib;
1174 nxt_unit_port_t *port;
1175 nxt_unit_process_t *process;

--- 79 unchanged lines hidden (view full) ---

1255 free(port);
1256
1257 return NXT_UNIT_ERROR;
1258 }
1259
1260 nxt_queue_insert_tail(&process->ports, &port_impl->link);
1261
1262 port_impl->process = process;
1324nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1325 nxt_unit_port_id_t *port_id)
1326{
1327 int res;
1328 nxt_unit_ctx_t *ctx;
1329 nxt_unit_impl_t *lib;
1330 nxt_unit_port_t *port;
1331 nxt_unit_process_t *process;

--- 79 unchanged lines hidden (view full) ---

1411 free(port);
1412
1413 return NXT_UNIT_ERROR;
1414 }
1415
1416 nxt_queue_insert_tail(&process->ports, &port_impl->link);
1417
1418 port_impl->process = process;
1419 port_impl->queue = NULL;
1420 port_impl->from_socket = 0;
1421 port_impl->socket_rbuf = NULL;
1263
1264 nxt_queue_init(&port_impl->awaiting_req);
1265
1266 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1267
1268 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1269
1270 port_impl->use_count = 2;

--- 45 unchanged lines hidden (view full) ---

1316
1317
1318static int
1319nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1320{
1321 size_t hsize;
1322 nxt_unit_impl_t *lib;
1323 nxt_unit_mmap_buf_t *b;
1422
1423 nxt_queue_init(&port_impl->awaiting_req);
1424
1425 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1426
1427 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1428
1429 port_impl->use_count = 2;

--- 45 unchanged lines hidden (view full) ---

1475
1476
1477static int
1478nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1479{
1480 size_t hsize;
1481 nxt_unit_impl_t *lib;
1482 nxt_unit_mmap_buf_t *b;
1324 nxt_unit_ctx_impl_t *ctx_impl;
1325 nxt_unit_callbacks_t *cb;
1326 nxt_unit_request_info_t *req;
1327 nxt_unit_request_info_impl_t *req_impl;
1328 nxt_unit_websocket_frame_impl_t *ws_impl;
1329
1483 nxt_unit_callbacks_t *cb;
1484 nxt_unit_request_info_t *req;
1485 nxt_unit_request_info_impl_t *req_impl;
1486 nxt_unit_websocket_frame_impl_t *ws_impl;
1487
1330 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1331
1332 req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
1333 recv_msg->last);
1334 if (req_impl == NULL) {
1488 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1489 if (nxt_slow_path(req == NULL)) {
1335 return NXT_UNIT_OK;
1336 }
1337
1490 return NXT_UNIT_OK;
1491 }
1492
1338 req = &req_impl->req;
1493 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1339
1340 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1341 cb = &lib->callbacks;
1342
1343 if (cb->websocket_handler && recv_msg->size >= 2) {
1344 ws_impl = nxt_unit_websocket_frame_get(ctx);
1345 if (nxt_slow_path(ws_impl == NULL)) {
1346 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",

--- 149 unchanged lines hidden (view full) ---

1496 nxt_unit_request_info_impl_t *req_impl;
1497
1498 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1499 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1500
1501 req->response = NULL;
1502 req->response_buf = NULL;
1503
1494
1495 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1496 cb = &lib->callbacks;
1497
1498 if (cb->websocket_handler && recv_msg->size >= 2) {
1499 ws_impl = nxt_unit_websocket_frame_get(ctx);
1500 if (nxt_slow_path(ws_impl == NULL)) {
1501 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",

--- 149 unchanged lines hidden (view full) ---

1651 nxt_unit_request_info_impl_t *req_impl;
1652
1653 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1654 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1655
1656 req->response = NULL;
1657 req->response_buf = NULL;
1658
1504 if (req_impl->websocket) {
1505 nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
1506
1507 req_impl->websocket = 0;
1659 if (req_impl->in_hash) {
1660 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1508 }
1509
1661 }
1662
1663 req_impl->websocket = 0;
1664
1510 while (req_impl->outgoing_buf != NULL) {
1511 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1512 }
1513
1514 while (req_impl->incoming_buf != NULL) {
1515 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1516 }
1517

--- 647 unchanged lines hidden (view full) ---

2165 return req->request->websocket_handshake;
2166}
2167
2168
2169int
2170nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2171{
2172 int rc;
1665 while (req_impl->outgoing_buf != NULL) {
1666 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1667 }
1668
1669 while (req_impl->incoming_buf != NULL) {
1670 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1671 }
1672

--- 647 unchanged lines hidden (view full) ---

2320 return req->request->websocket_handshake;
2321}
2322
2323
2324int
2325nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2326{
2327 int rc;
2173 nxt_unit_ctx_impl_t *ctx_impl;
2174 nxt_unit_request_info_impl_t *req_impl;
2175
2176 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2177
2178 if (nxt_slow_path(req_impl->websocket != 0)) {
2179 nxt_unit_req_debug(req, "upgrade: already upgraded");
2180
2181 return NXT_UNIT_OK;

--- 6 unchanged lines hidden (view full) ---

2188 }
2189
2190 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2191 nxt_unit_req_warn(req, "upgrade: response already sent");
2192
2193 return NXT_UNIT_ERROR;
2194 }
2195
2328 nxt_unit_request_info_impl_t *req_impl;
2329
2330 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2331
2332 if (nxt_slow_path(req_impl->websocket != 0)) {
2333 nxt_unit_req_debug(req, "upgrade: already upgraded");
2334
2335 return NXT_UNIT_OK;

--- 6 unchanged lines hidden (view full) ---

2342 }
2343
2344 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2345 nxt_unit_req_warn(req, "upgrade: response already sent");
2346
2347 return NXT_UNIT_ERROR;
2348 }
2349
2196 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
2197
2198 rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
2350 rc = nxt_unit_request_hash_add(req->ctx, req);
2199 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2200 nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2201
2202 return NXT_UNIT_ERROR;
2203 }
2204
2205 req_impl->websocket = 1;
2206

--- 254 unchanged lines hidden (view full) ---

2461 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2462
2463 pthread_mutex_lock(&ctx_impl->mutex);
2464
2465 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2466
2467 pthread_mutex_unlock(&ctx_impl->mutex);
2468
2351 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2352 nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2353
2354 return NXT_UNIT_ERROR;
2355 }
2356
2357 req_impl->websocket = 1;
2358

--- 254 unchanged lines hidden (view full) ---

2613 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2614
2615 pthread_mutex_lock(&ctx_impl->mutex);
2616
2617 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2618
2619 pthread_mutex_unlock(&ctx_impl->mutex);
2620
2621 memset(rbuf->oob, 0, sizeof(struct cmsghdr));
2622
2469 return rbuf;
2470}
2471
2472
2473static nxt_unit_read_buf_t *
2474nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2475{
2476 nxt_queue_link_t *link;

--- 82 unchanged lines hidden (view full) ---

2559 int rc;
2560 ssize_t sent;
2561 uint32_t part_size, min_part_size, buf_size;
2562 const char *part_start;
2563 nxt_unit_mmap_buf_t mmap_buf;
2564 nxt_unit_request_info_impl_t *req_impl;
2565 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2566
2623 return rbuf;
2624}
2625
2626
2627static nxt_unit_read_buf_t *
2628nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2629{
2630 nxt_queue_link_t *link;

--- 82 unchanged lines hidden (view full) ---

2713 int rc;
2714 ssize_t sent;
2715 uint32_t part_size, min_part_size, buf_size;
2716 const char *part_start;
2717 nxt_unit_mmap_buf_t mmap_buf;
2718 nxt_unit_request_info_impl_t *req_impl;
2719 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2720
2721 nxt_unit_req_debug(req, "write: %d", (int) size);
2722
2567 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2568
2569 part_start = start;
2570 sent = 0;
2571
2572 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2573 nxt_unit_req_alert(req, "write: response not initialized yet");
2574

--- 163 unchanged lines hidden (view full) ---

2738ssize_t
2739nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2740{
2741 ssize_t buf_res, res;
2742
2743 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
2744 dst, size);
2745
2723 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2724
2725 part_start = start;
2726 sent = 0;
2727
2728 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2729 nxt_unit_req_alert(req, "write: response not initialized yet");
2730

--- 163 unchanged lines hidden (view full) ---

2894ssize_t
2895nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2896{
2897 ssize_t buf_res, res;
2898
2899 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
2900 dst, size);
2901
2902 nxt_unit_req_debug(req, "read: %d", (int) buf_res);
2903
2746 if (buf_res < (ssize_t) size && req->content_fd != -1) {
2747 res = read(req->content_fd, dst, size);
2904 if (buf_res < (ssize_t) size && req->content_fd != -1) {
2905 res = read(req->content_fd, dst, size);
2748 if (res < 0) {
2906 if (nxt_slow_path(res < 0)) {
2749 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2750 strerror(errno), errno);
2751
2752 return res;
2753 }
2754
2755 if (res < (ssize_t) size) {
2756 close(req->content_fd);

--- 539 unchanged lines hidden (view full) ---

3296
3297 return NXT_UNIT_OK;
3298}
3299
3300
3301static int
3302nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3303{
2907 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
2908 strerror(errno), errno);
2909
2910 return res;
2911 }
2912
2913 if (res < (ssize_t) size) {
2914 close(req->content_fd);

--- 539 unchanged lines hidden (view full) ---

3454
3455 return NXT_UNIT_OK;
3456}
3457
3458
3459static int
3460nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3461{
3304 nxt_port_msg_t *port_msg;
3462 int res;
3305 nxt_unit_ctx_impl_t *ctx_impl;
3306 nxt_unit_read_buf_t *rbuf;
3307
3308 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3309
3310 while (1) {
3311 rbuf = nxt_unit_read_buf_get(ctx);
3312 if (nxt_slow_path(rbuf == NULL)) {
3313 return NXT_UNIT_ERROR;
3314 }
3315
3463 nxt_unit_ctx_impl_t *ctx_impl;
3464 nxt_unit_read_buf_t *rbuf;
3465
3466 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3467
3468 while (1) {
3469 rbuf = nxt_unit_read_buf_get(ctx);
3470 if (nxt_slow_path(rbuf == NULL)) {
3471 return NXT_UNIT_ERROR;
3472 }
3473
3316 memset(rbuf->oob, 0, sizeof(struct cmsghdr));
3317
3318 nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
3319
3320 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
3474 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
3475 if (res == NXT_UNIT_ERROR) {
3321 nxt_unit_read_buf_release(ctx, rbuf);
3322
3323 return NXT_UNIT_ERROR;
3324 }
3325
3476 nxt_unit_read_buf_release(ctx, rbuf);
3477
3478 return NXT_UNIT_ERROR;
3479 }
3480
3326 port_msg = (nxt_port_msg_t *) rbuf->buf;
3327
3328 if (port_msg->type == _NXT_PORT_MSG_SHM_ACK) {
3481 if (nxt_unit_is_shm_ack(rbuf)) {
3329 nxt_unit_read_buf_release(ctx, rbuf);
3482 nxt_unit_read_buf_release(ctx, rbuf);
3330
3331 break;
3332 }
3333
3334 pthread_mutex_lock(&ctx_impl->mutex);
3335
3336 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3337
3338 pthread_mutex_unlock(&ctx_impl->mutex);
3339
3483 break;
3484 }
3485
3486 pthread_mutex_lock(&ctx_impl->mutex);
3487
3488 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3489
3490 pthread_mutex_unlock(&ctx_impl->mutex);
3491
3340 if (port_msg->type == _NXT_PORT_MSG_QUIT) {
3492 if (nxt_unit_is_quit(rbuf)) {
3341 nxt_unit_debug(ctx, "oosm: quit received");
3342
3343 return NXT_UNIT_ERROR;
3344 }
3345 }
3346
3347 return NXT_UNIT_OK;
3348}

--- 52 unchanged lines hidden (view full) ---

3401}
3402
3403
3404static nxt_port_mmap_header_t *
3405nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3406{
3407 int i, fd, rc;
3408 void *mem;
3493 nxt_unit_debug(ctx, "oosm: quit received");
3494
3495 return NXT_UNIT_ERROR;
3496 }
3497 }
3498
3499 return NXT_UNIT_OK;
3500}

--- 52 unchanged lines hidden (view full) ---

3553}
3554
3555
3556static nxt_port_mmap_header_t *
3557nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3558{
3559 int i, fd, rc;
3560 void *mem;
3409 char name[64];
3410 nxt_unit_mmap_t *mm;
3411 nxt_unit_impl_t *lib;
3412 nxt_port_mmap_header_t *hdr;
3413
3414 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3415
3416 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3417 if (nxt_slow_path(mm == NULL)) {
3418 nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3419
3420 return NULL;
3421 }
3422
3561 nxt_unit_mmap_t *mm;
3562 nxt_unit_impl_t *lib;
3563 nxt_port_mmap_header_t *hdr;
3564
3565 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3566
3567 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3568 if (nxt_slow_path(mm == NULL)) {
3569 nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3570
3571 return NULL;
3572 }
3573
3423 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3424 lib->pid, (void *) pthread_self());
3425
3426#if (NXT_HAVE_MEMFD_CREATE)
3427
3428 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3574 fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3429 if (nxt_slow_path(fd == -1)) {
3575 if (nxt_slow_path(fd == -1)) {
3430 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3431 strerror(errno), errno);
3432
3433 goto remove_fail;
3434 }
3435
3576 goto remove_fail;
3577 }
3578
3436 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3437
3438#elif (NXT_HAVE_SHM_OPEN_ANON)
3439
3440 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3441 if (nxt_slow_path(fd == -1)) {
3442 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3443 strerror(errno), errno);
3444
3445 goto remove_fail;
3446 }
3447
3448#elif (NXT_HAVE_SHM_OPEN)
3449
3450 /* Just in case. */
3451 shm_unlink(name);
3452
3453 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3454 if (nxt_slow_path(fd == -1)) {
3455 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3456 strerror(errno), errno);
3457
3458 goto remove_fail;
3459 }
3460
3461 if (nxt_slow_path(shm_unlink(name) == -1)) {
3462 nxt_unit_warn(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3463 strerror(errno), errno);
3464 }
3465
3466#else
3467
3468#error No working shared memory implementation.
3469
3470#endif
3471
3472 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
3473 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3474 strerror(errno), errno);
3475
3476 goto remove_fail;
3477 }
3478
3479 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3480 if (nxt_slow_path(mem == MAP_FAILED)) {
3481 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3482 strerror(errno), errno);
3483
3579 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3580 if (nxt_slow_path(mem == MAP_FAILED)) {
3581 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3582 strerror(errno), errno);
3583
3584 close(fd);
3585
3484 goto remove_fail;
3485 }
3486
3487 mm->hdr = mem;
3488 hdr = mem;
3489
3490 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3491 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));

--- 36 unchanged lines hidden (view full) ---

3528
3529 lib->outgoing.size--;
3530
3531 return NULL;
3532}
3533
3534
3535static int
3586 goto remove_fail;
3587 }
3588
3589 mm->hdr = mem;
3590 hdr = mem;
3591
3592 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3593 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));

--- 36 unchanged lines hidden (view full) ---

3630
3631 lib->outgoing.size--;
3632
3633 return NULL;
3634}
3635
3636
3637static int
3638nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
3639{
3640 int fd;
3641 nxt_unit_impl_t *lib;
3642
3643 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3644
3645#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
3646 char name[64];
3647
3648 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3649 lib->pid, (void *) pthread_self());
3650#endif
3651
3652#if (NXT_HAVE_MEMFD_CREATE)
3653
3654 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3655 if (nxt_slow_path(fd == -1)) {
3656 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3657 strerror(errno), errno);
3658
3659 return -1;
3660 }
3661
3662 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3663
3664#elif (NXT_HAVE_SHM_OPEN_ANON)
3665
3666 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3667 if (nxt_slow_path(fd == -1)) {
3668 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3669 strerror(errno), errno);
3670
3671 return -1;
3672 }
3673
3674#elif (NXT_HAVE_SHM_OPEN)
3675
3676 /* Just in case. */
3677 shm_unlink(name);
3678
3679 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3680 if (nxt_slow_path(fd == -1)) {
3681 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3682 strerror(errno), errno);
3683
3684 return -1;
3685 }
3686
3687 if (nxt_slow_path(shm_unlink(name) == -1)) {
3688 nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3689 strerror(errno), errno);
3690 }
3691
3692#else
3693
3694#error No working shared memory implementation.
3695
3696#endif
3697
3698 if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3699 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3700 strerror(errno), errno);
3701
3702 close(fd);
3703
3704 return -1;
3705 }
3706
3707 return fd;
3708}
3709
3710
3711static int
3536nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3537{
3538 ssize_t res;
3539 nxt_port_msg_t msg;
3540 nxt_unit_impl_t *lib;
3541 union {
3542 struct cmsghdr cm;
3543 char space[CMSG_SPACE(sizeof(int))];

--- 248 unchanged lines hidden (view full) ---

3792 free(mmaps->elts);
3793 }
3794
3795 pthread_mutex_destroy(&mmaps->mutex);
3796}
3797
3798
3799static int
3712nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3713{
3714 ssize_t res;
3715 nxt_port_msg_t msg;
3716 nxt_unit_impl_t *lib;
3717 union {
3718 struct cmsghdr cm;
3719 char space[CMSG_SPACE(sizeof(int))];

--- 248 unchanged lines hidden (view full) ---

3968 free(mmaps->elts);
3969 }
3970
3971 pthread_mutex_destroy(&mmaps->mutex);
3972}
3973
3974
3975static int
3800nxt_unit_tracking_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
3801 nxt_unit_read_buf_t *rbuf)
3802{
3803 int res;
3804 nxt_chunk_id_t c;
3805 nxt_unit_impl_t *lib;
3806 nxt_port_mmap_header_t *hdr;
3807 nxt_port_mmap_tracking_msg_t *tracking_msg;
3808
3809 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
3810 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
3811 recv_msg->stream, (int) recv_msg->size);
3812
3813 return NXT_UNIT_ERROR;
3814 }
3815
3816 tracking_msg = recv_msg->start;
3817
3818 recv_msg->start = tracking_msg + 1;
3819 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);
3820
3821 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3822
3823 pthread_mutex_lock(&lib->incoming.mutex);
3824
3825 res = nxt_unit_check_rbuf_mmap(ctx, &lib->incoming,
3826 recv_msg->pid, tracking_msg->mmap_id,
3827 &hdr, rbuf);
3828
3829 if (nxt_slow_path(res != NXT_UNIT_OK)) {
3830 return res;
3831 }
3832
3833 c = tracking_msg->tracking_id;
3834 res = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
3835
3836 if (res == 0) {
3837 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
3838 recv_msg->stream);
3839
3840 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
3841
3842 res = NXT_UNIT_CANCELLED;
3843
3844 } else {
3845 res = NXT_UNIT_OK;
3846 }
3847
3848 pthread_mutex_unlock(&lib->incoming.mutex);
3849
3850 return res;
3851}
3852
3853
3854static int
3855nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
3856 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
3857 nxt_unit_read_buf_t *rbuf)
3858{
3859 int res, need_rbuf;
3860 nxt_unit_mmap_t *mm;
3861 nxt_unit_ctx_impl_t *ctx_impl;
3862

--- 286 unchanged lines hidden (view full) ---

4149 process = malloc(sizeof(nxt_unit_process_t));
4150 if (nxt_slow_path(process == NULL)) {
4151 nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid);
4152
4153 return NULL;
4154 }
4155
4156 process->pid = pid;
3976nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
3977 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
3978 nxt_unit_read_buf_t *rbuf)
3979{
3980 int res, need_rbuf;
3981 nxt_unit_mmap_t *mm;
3982 nxt_unit_ctx_impl_t *ctx_impl;
3983

--- 286 unchanged lines hidden (view full) ---

4270 process = malloc(sizeof(nxt_unit_process_t));
4271 if (nxt_slow_path(process == NULL)) {
4272 nxt_unit_alert(NULL, "failed to allocate process for #%d", (int) pid);
4273
4274 return NULL;
4275 }
4276
4277 process->pid = pid;
4157 process->use_count = 1;
4278 process->use_count = 2;
4158 process->next_port_id = 0;
4159 process->lib = lib;
4160
4161 nxt_queue_init(&process->ports);
4162
4163 lhq.replace = 0;
4164 lhq.value = process;
4165

--- 5 unchanged lines hidden (view full) ---

4171 default:
4172 nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
4173
4174 free(process);
4175 process = NULL;
4176 break;
4177 }
4178
4279 process->next_port_id = 0;
4280 process->lib = lib;
4281
4282 nxt_queue_init(&process->ports);
4283
4284 lhq.replace = 0;
4285 lhq.value = process;
4286

--- 5 unchanged lines hidden (view full) ---

4292 default:
4293 nxt_unit_alert(NULL, "process %d insert failed", (int) pid);
4294
4295 free(process);
4296 process = NULL;
4297 break;
4298 }
4299
4179 nxt_unit_process_use(process);
4180
4181 return process;
4182}
4183
4184
4185static nxt_unit_process_t *
4186nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4187{
4188 int rc;

--- 99 unchanged lines hidden (view full) ---

4288
4289 return rc;
4290}
4291
4292
4293static int
4294nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4295{
4300 return process;
4301}
4302
4303
4304static nxt_unit_process_t *
4305nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4306{
4307 int rc;

--- 99 unchanged lines hidden (view full) ---

4407
4408 return rc;
4409}
4410
4411
4412static int
4413nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4414{
4296 int res, err;
4297 nxt_unit_impl_t *lib;
4298 nxt_unit_ctx_impl_t *ctx_impl;
4299 struct pollfd fds[2];
4415 int nevents, res, err;
4416 nxt_unit_impl_t *lib;
4417 nxt_unit_ctx_impl_t *ctx_impl;
4418 nxt_unit_port_impl_t *port_impl;
4419 struct pollfd fds[2];
4300
4301 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4302 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4303
4420
4421 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4422 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4423
4304 memset(rbuf->oob, 0, sizeof(struct cmsghdr));
4305
4306 if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
4424 if (ctx_impl->wait_items > 0 || lib->shared_port == NULL) {
4307 return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
4425
4426 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4308 }
4309
4427 }
4428
4429 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4430 port);
4431
4310retry:
4311
4432retry:
4433
4434 if (port_impl->from_socket == 0) {
4435 res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
4436 if (res == NXT_UNIT_OK) {
4437 if (nxt_unit_is_read_socket(rbuf)) {
4438 port_impl->from_socket++;
4439
4440 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
4441 (int) ctx_impl->read_port->id.pid,
4442 (int) ctx_impl->read_port->id.id,
4443 port_impl->from_socket);
4444
4445 } else {
4446 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
4447 (int) ctx_impl->read_port->id.pid,
4448 (int) ctx_impl->read_port->id.id,
4449 (int) rbuf->size);
4450
4451 return NXT_UNIT_OK;
4452 }
4453 }
4454 }
4455
4456 res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
4457 if (res == NXT_UNIT_OK) {
4458 return NXT_UNIT_OK;
4459 }
4460
4312 fds[0].fd = ctx_impl->read_port->in_fd;
4313 fds[0].events = POLLIN;
4314 fds[0].revents = 0;
4315
4316 fds[1].fd = lib->shared_port->in_fd;
4317 fds[1].events = POLLIN;
4318 fds[1].revents = 0;
4319
4461 fds[0].fd = ctx_impl->read_port->in_fd;
4462 fds[0].events = POLLIN;
4463 fds[0].revents = 0;
4464
4465 fds[1].fd = lib->shared_port->in_fd;
4466 fds[1].events = POLLIN;
4467 fds[1].revents = 0;
4468
4320 res = poll(fds, 2, -1);
4321 if (nxt_slow_path(res < 0)) {
4469 nevents = poll(fds, 2, -1);
4470 if (nxt_slow_path(nevents == -1)) {
4322 err = errno;
4323
4324 if (err == EINTR) {
4325 goto retry;
4326 }
4327
4471 err = errno;
4472
4473 if (err == EINTR) {
4474 goto retry;
4475 }
4476
4328 nxt_unit_alert(ctx, "poll() failed: %s (%d)",
4329 strerror(err), err);
4477 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
4478 fds[0].fd, fds[1].fd, strerror(err), err);
4330
4331 rbuf->size = -1;
4332
4333 return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4334 }
4335
4479
4480 rbuf->size = -1;
4481
4482 return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4483 }
4484
4485 nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
4486 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4487 fds[1].revents);
4488
4336 if ((fds[0].revents & POLLIN) != 0) {
4489 if ((fds[0].revents & POLLIN) != 0) {
4337 return nxt_unit_port_recv(ctx, ctx_impl->read_port, rbuf);
4490 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4491 if (res == NXT_UNIT_AGAIN) {
4492 goto retry;
4493 }
4494
4495 return res;
4338 }
4339
4340 if ((fds[1].revents & POLLIN) != 0) {
4496 }
4497
4498 if ((fds[1].revents & POLLIN) != 0) {
4341 return nxt_unit_port_recv(ctx, lib->shared_port, rbuf);
4499 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4500 if (res == NXT_UNIT_AGAIN) {
4501 goto retry;
4502 }
4503
4504 return res;
4342 }
4343
4505 }
4506
4344 rbuf->size = -1;
4507 nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
4508 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4509 fds[1].revents);
4345
4346 return NXT_UNIT_ERROR;
4347}
4348
4349
4350static int
4351nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4352{

--- 34 unchanged lines hidden (view full) ---

4387
4388 return rc;
4389}
4390
4391
4392static void
4393nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4394{
4510
4511 return NXT_UNIT_ERROR;
4512}
4513
4514
4515static int
4516nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4517{

--- 34 unchanged lines hidden (view full) ---

4552
4553 return rc;
4554}
4555
4556
4557static void
4558nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4559{
4560 int res;
4395 nxt_queue_t ready_req;
4396 nxt_unit_impl_t *lib;
4397 nxt_unit_ctx_impl_t *ctx_impl;
4561 nxt_queue_t ready_req;
4562 nxt_unit_impl_t *lib;
4563 nxt_unit_ctx_impl_t *ctx_impl;
4564 nxt_unit_request_info_t *req;
4398 nxt_unit_request_info_impl_t *req_impl;
4399
4400 nxt_queue_init(&ready_req);
4401
4402 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4403
4404 pthread_mutex_lock(&ctx_impl->mutex);
4405

--- 8 unchanged lines hidden (view full) ---

4414
4415 pthread_mutex_unlock(&ctx_impl->mutex);
4416
4417 nxt_queue_each(req_impl, &ready_req,
4418 nxt_unit_request_info_impl_t, port_wait_link)
4419 {
4420 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4421
4565 nxt_unit_request_info_impl_t *req_impl;
4566
4567 nxt_queue_init(&ready_req);
4568
4569 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4570
4571 pthread_mutex_lock(&ctx_impl->mutex);
4572

--- 8 unchanged lines hidden (view full) ---

4581
4582 pthread_mutex_unlock(&ctx_impl->mutex);
4583
4584 nxt_queue_each(req_impl, &ready_req,
4585 nxt_unit_request_info_impl_t, port_wait_link)
4586 {
4587 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4588
4422 (void) nxt_unit_send_req_headers_ack(&req_impl->req);
4589 req = &req_impl->req;
4423
4590
4591 res = nxt_unit_send_req_headers_ack(req);
4592 if (nxt_slow_path(res != NXT_UNIT_OK)) {
4593 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4594
4595 continue;
4596 }
4597
4598 if (req->content_length
4599 > (uint64_t) (req->content_buf->end - req->content_buf->free))
4600 {
4601 res = nxt_unit_request_hash_add(ctx, req);
4602 if (nxt_slow_path(res != NXT_UNIT_OK)) {
4603 nxt_unit_req_warn(req, "failed to add request to hash");
4604
4605 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4606
4607 continue;
4608 }
4609
4610 /*
4611 * If application have separate data handler, we may start
4612 * request processing and process data when it is arrived.
4613 */
4614 if (lib->callbacks.data_handler == NULL) {
4615 continue;
4616 }
4617 }
4618
4424 lib->callbacks.request_handler(&req_impl->req);
4425
4426 } nxt_queue_loop;
4427}
4428
4429
4430int
4431nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4432{
4433 int rc;
4434 nxt_unit_impl_t *lib;
4619 lib->callbacks.request_handler(&req_impl->req);
4620
4621 } nxt_queue_loop;
4622}
4623
4624
4625int
4626nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4627{
4628 int rc;
4629 nxt_unit_impl_t *lib;
4630 nxt_unit_read_buf_t *rbuf;
4435 nxt_unit_ctx_impl_t *ctx_impl;
4436
4437 nxt_unit_ctx_use(ctx);
4438
4439 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4440 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4441
4442 rc = NXT_UNIT_OK;
4443
4444 while (nxt_fast_path(lib->online)) {
4631 nxt_unit_ctx_impl_t *ctx_impl;
4632
4633 nxt_unit_ctx_use(ctx);
4634
4635 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4636 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4637
4638 rc = NXT_UNIT_OK;
4639
4640 while (nxt_fast_path(lib->online)) {
4445 rc = nxt_unit_process_port_msg_impl(ctx, ctx_impl->read_port);
4641 rbuf = nxt_unit_read_buf_get(ctx);
4642 if (nxt_slow_path(rbuf == NULL)) {
4643 rc = NXT_UNIT_ERROR;
4644 break;
4645 }
4446
4646
4647 retry:
4648
4649 rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4650 if (rc == NXT_UNIT_AGAIN) {
4651 goto retry;
4652 }
4653
4654 rc = nxt_unit_process_msg(ctx, rbuf);
4447 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4448 break;
4449 }
4655 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4656 break;
4657 }
4658
4659 rc = nxt_unit_process_pending_rbuf(ctx);
4660 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4661 break;
4662 }
4663
4664 nxt_unit_process_ready_req(ctx);
4450 }
4451
4452 nxt_unit_ctx_release(ctx);
4453
4454 return rc;
4455}
4456
4457
4665 }
4666
4667 nxt_unit_ctx_release(ctx);
4668
4669 return rc;
4670}
4671
4672
4673nxt_inline int
4674nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
4675{
4676 nxt_port_msg_t *port_msg;
4677
4678 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4679 port_msg = (nxt_port_msg_t *) rbuf->buf;
4680
4681 return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
4682 }
4683
4684 return 0;
4685}
4686
4687
4688nxt_inline int
4689nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
4690{
4691 if (nxt_fast_path(rbuf->size == 1)) {
4692 return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
4693 }
4694
4695 return 0;
4696}
4697
4698
4699nxt_inline int
4700nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
4701{
4702 nxt_port_msg_t *port_msg;
4703
4704 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4705 port_msg = (nxt_port_msg_t *) rbuf->buf;
4706
4707 return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
4708 }
4709
4710 return 0;
4711}
4712
4713
4714nxt_inline int
4715nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
4716{
4717 nxt_port_msg_t *port_msg;
4718
4719 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4720 port_msg = (nxt_port_msg_t *) rbuf->buf;
4721
4722 return port_msg->type == _NXT_PORT_MSG_QUIT;
4723 }
4724
4725 return 0;
4726}
4727
4728
4458int
4459nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4460{
4729int
4730nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4731{
4461 int rc;
4462 nxt_unit_impl_t *lib;
4732 int rc;
4733 nxt_unit_impl_t *lib;
4734 nxt_unit_read_buf_t *rbuf;
4463
4464 nxt_unit_ctx_use(ctx);
4465
4466 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4467 rc = NXT_UNIT_OK;
4468
4469 while (nxt_fast_path(lib->online)) {
4735
4736 nxt_unit_ctx_use(ctx);
4737
4738 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4739 rc = NXT_UNIT_OK;
4740
4741 while (nxt_fast_path(lib->online)) {
4470 rc = nxt_unit_process_port_msg_impl(ctx, lib->shared_port);
4742 rbuf = nxt_unit_read_buf_get(ctx);
4743 if (nxt_slow_path(rbuf == NULL)) {
4744 rc = NXT_UNIT_ERROR;
4745 break;
4746 }
4471
4747
4748 retry:
4749
4750 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4751 if (rc == NXT_UNIT_AGAIN) {
4752 goto retry;
4753 }
4754
4472 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4755 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4756 nxt_unit_read_buf_release(ctx, rbuf);
4473 break;
4474 }
4757 break;
4758 }
4759
4760 rc = nxt_unit_process_msg(ctx, rbuf);
4761 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4762 break;
4763 }
4764
4765 rc = nxt_unit_process_pending_rbuf(ctx);
4766 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4767 break;
4768 }
4769
4770 nxt_unit_process_ready_req(ctx);
4475 }
4476
4477 nxt_unit_ctx_release(ctx);
4478
4479 return rc;
4480}
4481
4482

--- 11 unchanged lines hidden (view full) ---

4494 return rc;
4495}
4496
4497
4498static int
4499nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4500{
4501 int rc;
4771 }
4772
4773 nxt_unit_ctx_release(ctx);
4774
4775 return rc;
4776}
4777
4778

--- 11 unchanged lines hidden (view full) ---

4790 return rc;
4791}
4792
4793
4794static int
4795nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
4796{
4797 int rc;
4798 nxt_unit_impl_t *lib;
4502 nxt_unit_read_buf_t *rbuf;
4503
4504 rbuf = nxt_unit_read_buf_get(ctx);
4505 if (nxt_slow_path(rbuf == NULL)) {
4506 return NXT_UNIT_ERROR;
4507 }
4508
4799 nxt_unit_read_buf_t *rbuf;
4800
4801 rbuf = nxt_unit_read_buf_get(ctx);
4802 if (nxt_slow_path(rbuf == NULL)) {
4803 return NXT_UNIT_ERROR;
4804 }
4805
4509 memset(rbuf->oob, 0, sizeof(struct cmsghdr));
4806 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4510
4807
4511 rc = nxt_unit_port_recv(ctx, port, rbuf);
4512 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4808retry:
4809
4810 if (port == lib->shared_port) {
4811 rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
4812
4813 } else {
4814 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
4815 }
4816
4817 if (rc != NXT_UNIT_OK) {
4513 nxt_unit_read_buf_release(ctx, rbuf);
4514 return rc;
4515 }
4516
4517 rc = nxt_unit_process_msg(ctx, rbuf);
4518 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4519 return NXT_UNIT_ERROR;
4520 }
4521
4522 rc = nxt_unit_process_pending_rbuf(ctx);
4523 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4524 return NXT_UNIT_ERROR;
4525 }
4526
4527 nxt_unit_process_ready_req(ctx);
4528
4818 nxt_unit_read_buf_release(ctx, rbuf);
4819 return rc;
4820 }
4821
4822 rc = nxt_unit_process_msg(ctx, rbuf);
4823 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4824 return NXT_UNIT_ERROR;
4825 }
4826
4827 rc = nxt_unit_process_pending_rbuf(ctx);
4828 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4829 return NXT_UNIT_ERROR;
4830 }
4831
4832 nxt_unit_process_ready_req(ctx);
4833
4834 rbuf = nxt_unit_read_buf_get(ctx);
4835 if (nxt_slow_path(rbuf == NULL)) {
4836 return NXT_UNIT_ERROR;
4837 }
4838
4839 if (lib->online) {
4840 goto retry;
4841 }
4842
4529 return rc;
4530}
4531
4532
4533void
4534nxt_unit_done(nxt_unit_ctx_t *ctx)
4535{
4536 nxt_unit_ctx_release(ctx);
4537}
4538
4539
4540nxt_unit_ctx_t *
4541nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
4542{
4843 return rc;
4844}
4845
4846
4847void
4848nxt_unit_done(nxt_unit_ctx_t *ctx)
4849{
4850 nxt_unit_ctx_release(ctx);
4851}
4852
4853
4854nxt_unit_ctx_t *
4855nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
4856{
4543 int rc;
4544 nxt_unit_impl_t *lib;
4545 nxt_unit_port_t *port;
4546 nxt_unit_ctx_impl_t *new_ctx;
4857 int rc, queue_fd;
4858 void *mem;
4859 nxt_unit_impl_t *lib;
4860 nxt_unit_port_t *port;
4861 nxt_unit_ctx_impl_t *new_ctx;
4862 nxt_unit_port_impl_t *port_impl;
4547
4548 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4549
4550 new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
4551 if (nxt_slow_path(new_ctx == NULL)) {
4552 nxt_unit_alert(ctx, "failed to allocate context");
4553
4554 return NULL;
4555 }
4556
4863
4864 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4865
4866 new_ctx = malloc(sizeof(nxt_unit_ctx_impl_t) + lib->request_data_size);
4867 if (nxt_slow_path(new_ctx == NULL)) {
4868 nxt_unit_alert(ctx, "failed to allocate context");
4869
4870 return NULL;
4871 }
4872
4873 rc = nxt_unit_ctx_init(lib, new_ctx, data);
4874 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4875 free(new_ctx);
4876
4877 return NULL;
4878 }
4879
4880 queue_fd = -1;
4881
4557 port = nxt_unit_create_port(ctx);
4558 if (nxt_slow_path(port == NULL)) {
4882 port = nxt_unit_create_port(ctx);
4883 if (nxt_slow_path(port == NULL)) {
4559 free(new_ctx);
4884 goto fail;
4885 }
4560
4886
4561 return NULL;
4887 new_ctx->read_port = port;
4888
4889 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
4890 if (nxt_slow_path(queue_fd == -1)) {
4891 goto fail;
4562 }
4563
4892 }
4893
4564 rc = nxt_unit_send_port(ctx, lib->router_port, port);
4565 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4894 mem = mmap(NULL, sizeof(nxt_port_queue_t),
4895 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
4896 if (nxt_slow_path(mem == MAP_FAILED)) {
4897 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
4898 strerror(errno), errno);
4899
4566 goto fail;
4567 }
4568
4900 goto fail;
4901 }
4902
4569 rc = nxt_unit_ctx_init(lib, new_ctx, data);
4903 nxt_port_queue_init(mem);
4904
4905 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
4906 port_impl->queue = mem;
4907
4908 rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
4570 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4571 goto fail;
4572 }
4573
4909 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4910 goto fail;
4911 }
4912
4574 new_ctx->read_port = port;
4913 close(queue_fd);
4575
4576 return &new_ctx->ctx;
4577
4578fail:
4579
4914
4915 return &new_ctx->ctx;
4916
4917fail:
4918
4580 nxt_unit_remove_port(lib, &port->id);
4581 nxt_unit_port_release(port);
4919 if (queue_fd != -1) {
4920 close(queue_fd);
4921 }
4582
4922
4583 free(new_ctx);
4923 nxt_unit_ctx_release(&new_ctx->ctx);
4584
4585 return NULL;
4586}
4587
4588
4589static void
4590nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
4591{

--- 36 unchanged lines hidden (view full) ---

4628
4629 } nxt_queue_loop;
4630
4631 pthread_mutex_destroy(&ctx_impl->mutex);
4632
4633 nxt_queue_remove(&ctx_impl->link);
4634
4635 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
4924
4925 return NULL;
4926}
4927
4928
4929static void
4930nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
4931{

--- 36 unchanged lines hidden (view full) ---

4968
4969 } nxt_queue_loop;
4970
4971 pthread_mutex_destroy(&ctx_impl->mutex);
4972
4973 nxt_queue_remove(&ctx_impl->link);
4974
4975 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
4976 nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
4636 nxt_unit_port_release(ctx_impl->read_port);
4637 }
4638
4639 if (ctx_impl != &lib->main_ctx) {
4640 free(ctx_impl);
4641 }
4642
4643 nxt_unit_lib_release(lib);

--- 60 unchanged lines hidden (view full) ---

4704 new_port.in_fd = port_sockets[0];
4705 new_port.out_fd = port_sockets[1];
4706 new_port.data = NULL;
4707
4708 pthread_mutex_unlock(&lib->mutex);
4709
4710 nxt_unit_process_release(process);
4711
4977 nxt_unit_port_release(ctx_impl->read_port);
4978 }
4979
4980 if (ctx_impl != &lib->main_ctx) {
4981 free(ctx_impl);
4982 }
4983
4984 nxt_unit_lib_release(lib);

--- 60 unchanged lines hidden (view full) ---

5045 new_port.in_fd = port_sockets[0];
5046 new_port.out_fd = port_sockets[1];
5047 new_port.data = NULL;
5048
5049 pthread_mutex_unlock(&lib->mutex);
5050
5051 nxt_unit_process_release(process);
5052
4712 port = nxt_unit_add_port(ctx, &new_port);
5053 port = nxt_unit_add_port(ctx, &new_port, NULL);
4713 if (nxt_slow_path(port == NULL)) {
5054 if (nxt_slow_path(port == NULL)) {
4714 nxt_unit_alert(ctx, "create_port: add_port() failed");
4715
4716 close(port_sockets[0]);
4717 close(port_sockets[1]);
4718 }
4719
4720 return port;
4721}
4722
4723
4724static int
4725nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
5055 close(port_sockets[0]);
5056 close(port_sockets[1]);
5057 }
5058
5059 return port;
5060}
5061
5062
5063static int
5064nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
4726 nxt_unit_port_t *port)
5065 nxt_unit_port_t *port, int queue_fd)
4727{
4728 ssize_t res;
4729 nxt_unit_impl_t *lib;
5066{
5067 ssize_t res;
5068 nxt_unit_impl_t *lib;
5069 int fds[2] = { port->out_fd, queue_fd };
4730
4731 struct {
4732 nxt_port_msg_t msg;
4733 nxt_port_msg_new_port_t new_port;
4734 } m;
4735
4736 union {
4737 struct cmsghdr cm;
5070
5071 struct {
5072 nxt_port_msg_t msg;
5073 nxt_port_msg_new_port_t new_port;
5074 } m;
5075
5076 union {
5077 struct cmsghdr cm;
4738 char space[CMSG_SPACE(sizeof(int))];
5078 char space[CMSG_SPACE(sizeof(int) * 2)];
4739 } cmsg;
4740
4741 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4742
4743 m.msg.stream = 0;
4744 m.msg.pid = lib->pid;
4745 m.msg.reply_port = 0;
4746 m.msg.type = _NXT_PORT_MSG_NEW_PORT;

--- 6 unchanged lines hidden (view full) ---

4753 m.new_port.id = port->id.id;
4754 m.new_port.pid = port->id.pid;
4755 m.new_port.type = NXT_PROCESS_APP;
4756 m.new_port.max_size = 16 * 1024;
4757 m.new_port.max_share = 64 * 1024;
4758
4759 memset(&cmsg, 0, sizeof(cmsg));
4760
5079 } cmsg;
5080
5081 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5082
5083 m.msg.stream = 0;
5084 m.msg.pid = lib->pid;
5085 m.msg.reply_port = 0;
5086 m.msg.type = _NXT_PORT_MSG_NEW_PORT;

--- 6 unchanged lines hidden (view full) ---

5093 m.new_port.id = port->id.id;
5094 m.new_port.pid = port->id.pid;
5095 m.new_port.type = NXT_PROCESS_APP;
5096 m.new_port.max_size = 16 * 1024;
5097 m.new_port.max_share = 64 * 1024;
5098
5099 memset(&cmsg, 0, sizeof(cmsg));
5100
4761 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
5101 cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
4762 cmsg.cm.cmsg_level = SOL_SOCKET;
4763 cmsg.cm.cmsg_type = SCM_RIGHTS;
4764
4765 /*
4766 * memcpy() is used instead of simple
4767 * *(int *) CMSG_DATA(&cmsg.cm) = fd;
4768 * because GCC 4.4 with -O2/3/s optimization may issue a warning:
4769 * dereferencing type-punned pointer will break strict-aliasing rules
4770 *
4771 * Fortunately, GCC with -O1 compiles this nxt_memcpy()
4772 * in the same simple assignment as in the code above.
4773 */
5102 cmsg.cm.cmsg_level = SOL_SOCKET;
5103 cmsg.cm.cmsg_type = SCM_RIGHTS;
5104
5105 /*
5106 * memcpy() is used instead of simple
5107 * *(int *) CMSG_DATA(&cmsg.cm) = fd;
5108 * because GCC 4.4 with -O2/3/s optimization may issue a warning:
5109 * dereferencing type-punned pointer will break strict-aliasing rules
5110 *
5111 * Fortunately, GCC with -O1 compiles this nxt_memcpy()
5112 * in the same simple assignment as in the code above.
5113 */
4774 memcpy(CMSG_DATA(&cmsg.cm), &port->out_fd, sizeof(int));
5114 memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
4775
4776 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
4777
4778 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
4779}
4780
4781
4782nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)

--- 11 unchanged lines hidden (view full) ---

4794 long c;
4795 nxt_unit_port_impl_t *port_impl;
4796
4797 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
4798
4799 c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
4800
4801 if (c == 1) {
5115
5116 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
5117
5118 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
5119}
5120
5121
5122nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)

--- 11 unchanged lines hidden (view full) ---

5134 long c;
5135 nxt_unit_port_impl_t *port_impl;
5136
5137 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5138
5139 c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5140
5141 if (c == 1) {
4802 nxt_unit_debug(NULL, "destroy port %d,%d",
5142 nxt_unit_debug(NULL, "destroy port{%d,%d}",
4803 (int) port->id.pid, (int) port->id.id);
4804
4805 nxt_unit_process_release(port_impl->process);
4806
4807 if (port->in_fd != -1) {
4808 close(port->in_fd);
4809
4810 port->in_fd = -1;
4811 }
4812
4813 if (port->out_fd != -1) {
4814 close(port->out_fd);
4815
4816 port->out_fd = -1;
4817 }
4818
5143 (int) port->id.pid, (int) port->id.id);
5144
5145 nxt_unit_process_release(port_impl->process);
5146
5147 if (port->in_fd != -1) {
5148 close(port->in_fd);
5149
5150 port->in_fd = -1;
5151 }
5152
5153 if (port->out_fd != -1) {
5154 close(port->out_fd);
5155
5156 port->out_fd = -1;
5157 }
5158
5159 if (port->in_fd != -1) {
5160 close(port->in_fd);
5161
5162 port->in_fd = -1;
5163 }
5164
5165 if (port->out_fd != -1) {
5166 close(port->out_fd);
5167
5168 port->out_fd = -1;
5169 }
5170
5171 if (port_impl->queue != NULL) {
5172 munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
5173 ? sizeof(nxt_app_queue_t)
5174 : sizeof(nxt_port_queue_t));
5175 }
5176
4819 free(port_impl);
4820 }
4821}
4822
4823
4824static nxt_unit_port_t *
5177 free(port_impl);
5178 }
5179}
5180
5181
5182static nxt_unit_port_t *
4825nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5183nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
4826{
4827 int rc;
4828 nxt_queue_t awaiting_req;
4829 nxt_unit_impl_t *lib;
4830 nxt_unit_port_t *old_port;
4831 nxt_unit_process_t *process;
4832 nxt_unit_ctx_impl_t *ctx_impl;
4833 nxt_unit_port_impl_t *new_port, *old_port_impl;
4834 nxt_unit_request_info_impl_t *req_impl;
4835
4836 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4837
4838 pthread_mutex_lock(&lib->mutex);
4839
4840 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
4841
4842 if (nxt_slow_path(old_port != NULL)) {
5184{
5185 int rc;
5186 nxt_queue_t awaiting_req;
5187 nxt_unit_impl_t *lib;
5188 nxt_unit_port_t *old_port;
5189 nxt_unit_process_t *process;
5190 nxt_unit_ctx_impl_t *ctx_impl;
5191 nxt_unit_port_impl_t *new_port, *old_port_impl;
5192 nxt_unit_request_info_impl_t *req_impl;
5193
5194 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5195
5196 pthread_mutex_lock(&lib->mutex);
5197
5198 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
5199
5200 if (nxt_slow_path(old_port != NULL)) {
4843 nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d",
4844 port->id.pid, port->id.id,
4845 port->in_fd, port->out_fd);
5201 nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
5202 "in_fd %d out_fd %d queue %p",
5203 port->id.pid, port->id.id,
5204 port->in_fd, port->out_fd, queue);
4846
4847 if (old_port->data == NULL) {
4848 old_port->data = port->data;
4849 port->data = NULL;
4850 }
4851
4852 if (old_port->in_fd == -1) {
4853 old_port->in_fd = port->in_fd;

--- 16 unchanged lines hidden (view full) ---

4870 }
4871
4872 *port = *old_port;
4873
4874 nxt_queue_init(&awaiting_req);
4875
4876 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
4877
5205
5206 if (old_port->data == NULL) {
5207 old_port->data = port->data;
5208 port->data = NULL;
5209 }
5210
5211 if (old_port->in_fd == -1) {
5212 old_port->in_fd = port->in_fd;

--- 16 unchanged lines hidden (view full) ---

5229 }
5230
5231 *port = *old_port;
5232
5233 nxt_queue_init(&awaiting_req);
5234
5235 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
5236
5237 if (old_port_impl->queue == NULL) {
5238 old_port_impl->queue = queue;
5239 }
5240
4878 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
4879 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
4880 nxt_queue_init(&old_port_impl->awaiting_req);
4881 }
4882
4883 old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
4884
4885 pthread_mutex_unlock(&lib->mutex);

--- 23 unchanged lines hidden (view full) ---

4909
4910 } nxt_queue_loop;
4911
4912 return old_port;
4913 }
4914
4915 new_port = NULL;
4916
5241 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5242 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5243 nxt_queue_init(&old_port_impl->awaiting_req);
5244 }
5245
5246 old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
5247
5248 pthread_mutex_unlock(&lib->mutex);

--- 23 unchanged lines hidden (view full) ---

5272
5273 } nxt_queue_loop;
5274
5275 return old_port;
5276 }
5277
5278 new_port = NULL;
5279
4917 nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
5280 nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
4918 port->id.pid, port->id.id,
5281 port->id.pid, port->id.id,
4919 port->in_fd, port->out_fd);
5282 port->in_fd, port->out_fd, queue);
4920
4921 process = nxt_unit_process_get(lib, port->id.pid);
4922 if (nxt_slow_path(process == NULL)) {
4923 goto unlock;
4924 }
4925
4926 if (port->id.id >= process->next_port_id) {
4927 process->next_port_id = port->id.id + 1;
4928 }
4929
4930 new_port = malloc(sizeof(nxt_unit_port_impl_t));
4931 if (nxt_slow_path(new_port == NULL)) {
5283
5284 process = nxt_unit_process_get(lib, port->id.pid);
5285 if (nxt_slow_path(process == NULL)) {
5286 goto unlock;
5287 }
5288
5289 if (port->id.id >= process->next_port_id) {
5290 process->next_port_id = port->id.id + 1;
5291 }
5292
5293 new_port = malloc(sizeof(nxt_unit_port_impl_t));
5294 if (nxt_slow_path(new_port == NULL)) {
5295 nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
5296 port->id.pid, port->id.id);
5297
4932 goto unlock;
4933 }
4934
4935 new_port->port = *port;
4936
4937 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
4938 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4939 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",

--- 6 unchanged lines hidden (view full) ---

4946 goto unlock;
4947 }
4948
4949 nxt_queue_insert_tail(&process->ports, &new_port->link);
4950
4951 new_port->use_count = 2;
4952 new_port->process = process;
4953 new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
5298 goto unlock;
5299 }
5300
5301 new_port->port = *port;
5302
5303 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
5304 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5305 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",

--- 6 unchanged lines hidden (view full) ---

5312 goto unlock;
5313 }
5314
5315 nxt_queue_insert_tail(&process->ports, &new_port->link);
5316
5317 new_port->use_count = 2;
5318 new_port->process = process;
5319 new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
5320 new_port->queue = queue;
5321 new_port->from_socket = 0;
5322 new_port->socket_rbuf = NULL;
4954
4955 nxt_queue_init(&new_port->awaiting_req);
4956
4957 process = NULL;
4958
4959unlock:
4960
4961 pthread_mutex_unlock(&lib->mutex);

--- 43 unchanged lines hidden (view full) ---

5005
5006static nxt_unit_port_t *
5007nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5008{
5009 nxt_unit_port_t *port;
5010
5011 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5012 if (nxt_slow_path(port == NULL)) {
5323
5324 nxt_queue_init(&new_port->awaiting_req);
5325
5326 process = NULL;
5327
5328unlock:
5329
5330 pthread_mutex_unlock(&lib->mutex);

--- 43 unchanged lines hidden (view full) ---

5374
5375static nxt_unit_port_t *
5376nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5377{
5378 nxt_unit_port_t *port;
5379
5380 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5381 if (nxt_slow_path(port == NULL)) {
5013 nxt_unit_debug(NULL, "remove_port: port %d,%d not found",
5382 nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5014 (int) port_id->pid, (int) port_id->id);
5015
5016 return NULL;
5017 }
5018
5383 (int) port_id->pid, (int) port_id->id);
5384
5385 return NULL;
5386 }
5387
5019 nxt_unit_debug(NULL, "remove_port: port %d,%d, fds %d,%d, data %p",
5388 nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5020 (int) port_id->pid, (int) port_id->id,
5021 port->in_fd, port->out_fd, port->data);
5022
5023 return port;
5024}
5025
5026
5027static void

--- 56 unchanged lines hidden (view full) ---

5084
5085static void
5086nxt_unit_quit(nxt_unit_ctx_t *ctx)
5087{
5088 nxt_unit_impl_t *lib;
5089
5090 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5091
5389 (int) port_id->pid, (int) port_id->id,
5390 port->in_fd, port->out_fd, port->data);
5391
5392 return port;
5393}
5394
5395
5396static void

--- 56 unchanged lines hidden (view full) ---

5453
5454static void
5455nxt_unit_quit(nxt_unit_ctx_t *ctx)
5456{
5457 nxt_unit_impl_t *lib;
5458
5459 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5460
5092 lib->online = 0;
5461 if (lib->online) {
5462 lib->online = 0;
5093
5463
5094 if (lib->callbacks.quit != NULL) {
5095 lib->callbacks.quit(ctx);
5464 if (lib->callbacks.quit != NULL) {
5465 lib->callbacks.quit(ctx);
5466 }
5096 }
5097}
5098
5099
5100static int
5101nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5102{
5103 ssize_t res;

--- 28 unchanged lines hidden (view full) ---

5132 return NXT_UNIT_OK;
5133}
5134
5135
5136static ssize_t
5137nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5138 const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5139{
5467 }
5468}
5469
5470
5471static int
5472nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5473{
5474 ssize_t res;

--- 28 unchanged lines hidden (view full) ---

5503 return NXT_UNIT_OK;
5504}
5505
5506
5507static ssize_t
5508nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5509 const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5510{
5140 nxt_unit_impl_t *lib;
5511 int notify;
5512 ssize_t ret;
5513 nxt_int_t rc;
5514 nxt_port_msg_t msg;
5515 nxt_unit_impl_t *lib;
5516 nxt_unit_port_impl_t *port_impl;
5141
5517
5142 nxt_unit_debug(ctx, "port_send: port %d,%d fd %d",
5143 (int) port->id.pid, (int) port->id.id, port->out_fd);
5144
5145 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5146
5518 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5519
5520 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5521 if (port_impl->queue != NULL && oob_size == 0
5522 && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
5523 {
5524 rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
5525 if (nxt_slow_path(rc != NXT_OK)) {
5526 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5527 (int) port->id.pid, (int) port->id.id);
5528
5529 return -1;
5530 }
5531
5532 nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
5533 (int) port->id.pid, (int) port->id.id,
5534 (int) buf_size, notify);
5535
5536 if (notify) {
5537 memcpy(&msg, buf, sizeof(nxt_port_msg_t));
5538
5539 msg.type = _NXT_PORT_MSG_READ_QUEUE;
5540
5541 if (lib->callbacks.port_send == NULL) {
5542 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
5543 sizeof(nxt_port_msg_t), NULL, 0);
5544
5545 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
5546 (int) port->id.pid, (int) port->id.id,
5547 (int) ret);
5548
5549 } else {
5550 ret = lib->callbacks.port_send(ctx, port, &msg,
5551 sizeof(nxt_port_msg_t), NULL, 0);
5552
5553 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
5554 (int) port->id.pid, (int) port->id.id,
5555 (int) ret);
5556 }
5557
5558 }
5559
5560 return buf_size;
5561 }
5562
5563 if (port_impl->queue != NULL) {
5564 msg.type = _NXT_PORT_MSG_READ_SOCKET;
5565
5566 rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
5567 if (nxt_slow_path(rc != NXT_OK)) {
5568 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5569 (int) port->id.pid, (int) port->id.id);
5570
5571 return -1;
5572 }
5573
5574 nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
5575 (int) port->id.pid, (int) port->id.id, notify);
5576 }
5577
5147 if (lib->callbacks.port_send != NULL) {
5578 if (lib->callbacks.port_send != NULL) {
5148 return lib->callbacks.port_send(ctx, port, buf, buf_size,
5149 oob, oob_size);
5579 ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
5580 oob, oob_size);
5581
5582 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
5583 (int) port->id.pid, (int) port->id.id,
5584 (int) ret);
5585
5586 } else {
5587 ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
5588 oob, oob_size);
5589
5590 nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
5591 (int) port->id.pid, (int) port->id.id,
5592 (int) ret);
5150 }
5151
5593 }
5594
5152 return nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
5153 oob, oob_size);
5595 return ret;
5154}
5155
5156
5157static ssize_t
5158nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5159 const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5160{
5596}
5597
5598
5599static ssize_t
5600nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5601 const void *buf, size_t buf_size, const void *oob, size_t oob_size)
5602{
5603 int err;
5161 ssize_t res;
5162 struct iovec iov[1];
5163 struct msghdr msg;
5164
5165 iov[0].iov_base = (void *) buf;
5166 iov[0].iov_len = buf_size;
5167
5168 msg.msg_name = NULL;

--- 4 unchanged lines hidden (view full) ---

5173 msg.msg_control = (void *) oob;
5174 msg.msg_controllen = oob_size;
5175
5176retry:
5177
5178 res = sendmsg(fd, &msg, 0);
5179
5180 if (nxt_slow_path(res == -1)) {
5604 ssize_t res;
5605 struct iovec iov[1];
5606 struct msghdr msg;
5607
5608 iov[0].iov_base = (void *) buf;
5609 iov[0].iov_len = buf_size;
5610
5611 msg.msg_name = NULL;

--- 4 unchanged lines hidden (view full) ---

5616 msg.msg_control = (void *) oob;
5617 msg.msg_controllen = oob_size;
5618
5619retry:
5620
5621 res = sendmsg(fd, &msg, 0);
5622
5623 if (nxt_slow_path(res == -1)) {
5181 if (errno == EINTR) {
5624 err = errno;
5625
5626 if (err == EINTR) {
5182 goto retry;
5183 }
5184
5185 /*
5186 * FIXME: This should be "alert" after router graceful shutdown
5187 * implementation.
5188 */
5189 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
5627 goto retry;
5628 }
5629
5630 /*
5631 * FIXME: This should be "alert" after router graceful shutdown
5632 * implementation.
5633 */
5634 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
5190 fd, (int) buf_size, strerror(errno), errno);
5635 fd, (int) buf_size, strerror(err), err);
5191
5192 } else {
5193 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
5194 (int) res);
5195 }
5196
5197 return res;
5198}
5199
5200
5201static int
5636
5637 } else {
5638 nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
5639 (int) res);
5640 }
5641
5642 return res;
5643}
5644
5645
5646static int
5647nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5648 nxt_unit_read_buf_t *rbuf)
5649{
5650 int res, read;
5651 nxt_unit_port_impl_t *port_impl;
5652
5653 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5654
5655 read = 0;
5656
5657retry:
5658
5659 if (port_impl->from_socket > 0) {
5660 if (port_impl->socket_rbuf != NULL
5661 && port_impl->socket_rbuf->size > 0)
5662 {
5663 port_impl->from_socket--;
5664
5665 nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
5666 port_impl->socket_rbuf->size = 0;
5667
5668 nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
5669 (int) port->id.pid, (int) port->id.id,
5670 (int) rbuf->size);
5671
5672 return NXT_UNIT_OK;
5673 }
5674
5675 } else {
5676 res = nxt_unit_port_queue_recv(port, rbuf);
5677
5678 if (res == NXT_UNIT_OK) {
5679 if (nxt_unit_is_read_socket(rbuf)) {
5680 port_impl->from_socket++;
5681
5682 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
5683 (int) port->id.pid, (int) port->id.id,
5684 port_impl->from_socket);
5685
5686 goto retry;
5687 }
5688
5689 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
5690 (int) port->id.pid, (int) port->id.id,
5691 (int) rbuf->size);
5692
5693 return NXT_UNIT_OK;
5694 }
5695 }
5696
5697 if (read) {
5698 return NXT_UNIT_AGAIN;
5699 }
5700
5701 res = nxt_unit_port_recv(ctx, port, rbuf);
5702 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
5703 return NXT_UNIT_ERROR;
5704 }
5705
5706 read = 1;
5707
5708 if (nxt_unit_is_read_queue(rbuf)) {
5709 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
5710 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
5711
5712 if (port_impl->from_socket) {
5713 nxt_unit_warn(ctx, "port protocol warning: READ_QUEUE after READ_SOCKET");
5714 }
5715
5716 goto retry;
5717 }
5718
5719 nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
5720 (int) port->id.pid, (int) port->id.id,
5721 (int) rbuf->size);
5722
5723 if (res == NXT_UNIT_AGAIN) {
5724 return NXT_UNIT_AGAIN;
5725 }
5726
5727 if (port_impl->from_socket > 0) {
5728 port_impl->from_socket--;
5729
5730 return NXT_UNIT_OK;
5731 }
5732
5733 nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
5734 (int) port->id.pid, (int) port->id.id,
5735 (int) rbuf->size);
5736
5737 if (port_impl->socket_rbuf == NULL) {
5738 port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
5739
5740 if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
5741 return NXT_UNIT_ERROR;
5742 }
5743
5744 port_impl->socket_rbuf->size = 0;
5745 }
5746
5747 if (port_impl->socket_rbuf->size > 0) {
5748 nxt_unit_alert(ctx, "too many port socket messages");
5749
5750 return NXT_UNIT_ERROR;
5751 }
5752
5753 nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
5754
5755 memset(rbuf->oob, 0, sizeof(struct cmsghdr));
5756
5757 goto retry;
5758}
5759
5760
5761nxt_inline void
5762nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
5763{
5764 memcpy(dst->buf, src->buf, src->size);
5765 dst->size = src->size;
5766 memcpy(dst->oob, src->oob, sizeof(src->oob));
5767}
5768
5769
5770static int
5771nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5772 nxt_unit_read_buf_t *rbuf)
5773{
5774 int res;
5775
5776retry:
5777
5778 res = nxt_unit_app_queue_recv(port, rbuf);
5779
5780 if (res == NXT_UNIT_AGAIN) {
5781 res = nxt_unit_port_recv(ctx, port, rbuf);
5782 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
5783 return NXT_UNIT_ERROR;
5784 }
5785
5786 if (nxt_unit_is_read_queue(rbuf)) {
5787 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
5788 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
5789
5790 goto retry;
5791 }
5792 }
5793
5794 return res;
5795}
5796
5797
5798static int
5202nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5203 nxt_unit_read_buf_t *rbuf)
5204{
5205 int fd, err;
5206 struct iovec iov[1];
5207 struct msghdr msg;
5208 nxt_unit_impl_t *lib;
5209
5210 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5211
5212 if (lib->callbacks.port_recv != NULL) {
5213 rbuf->size = lib->callbacks.port_recv(ctx, port,
5214 rbuf->buf, sizeof(rbuf->buf),
5215 rbuf->oob, sizeof(rbuf->oob));
5216
5799nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5800 nxt_unit_read_buf_t *rbuf)
5801{
5802 int fd, err;
5803 struct iovec iov[1];
5804 struct msghdr msg;
5805 nxt_unit_impl_t *lib;
5806
5807 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5808
5809 if (lib->callbacks.port_recv != NULL) {
5810 rbuf->size = lib->callbacks.port_recv(ctx, port,
5811 rbuf->buf, sizeof(rbuf->buf),
5812 rbuf->oob, sizeof(rbuf->oob));
5813
5814 nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
5815 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
5816
5217 if (nxt_slow_path(rbuf->size < 0)) {
5218 return NXT_UNIT_ERROR;
5219 }
5220
5221 return NXT_UNIT_OK;
5222 }
5223
5224 iov[0].iov_base = rbuf->buf;

--- 17 unchanged lines hidden (view full) ---

5242 err = errno;
5243
5244 if (err == EINTR) {
5245 goto retry;
5246 }
5247
5248 if (err == EAGAIN) {
5249 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
5817 if (nxt_slow_path(rbuf->size < 0)) {
5818 return NXT_UNIT_ERROR;
5819 }
5820
5821 return NXT_UNIT_OK;
5822 }
5823
5824 iov[0].iov_base = rbuf->buf;

--- 17 unchanged lines hidden (view full) ---

5842 err = errno;
5843
5844 if (err == EINTR) {
5845 goto retry;
5846 }
5847
5848 if (err == EAGAIN) {
5849 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
5250 fd, strerror(errno), errno);
5850 fd, strerror(err), err);
5251
5252 return NXT_UNIT_AGAIN;
5253 }
5254
5255 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
5851
5852 return NXT_UNIT_AGAIN;
5853 }
5854
5855 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
5256 fd, strerror(errno), errno);
5856 fd, strerror(err), err);
5257
5258 return NXT_UNIT_ERROR;
5259 }
5260
5261 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
5262
5263 return NXT_UNIT_OK;
5264}
5265
5266
5857
5858 return NXT_UNIT_ERROR;
5859 }
5860
5861 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
5862
5863 return NXT_UNIT_OK;
5864}
5865
5866
5867static int
5868nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
5869{
5870 nxt_unit_port_impl_t *port_impl;
5871
5872 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5873
5874 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
5875
5876 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
5877}
5878
5879
5880static int
5881nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
5882{
5883 uint32_t cookie;
5884 nxt_port_msg_t *port_msg;
5885 nxt_app_queue_t *queue;
5886 nxt_unit_port_impl_t *port_impl;
5887
5888 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5889 queue = port_impl->queue;
5890
5891retry:
5892
5893 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
5894
5895 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
5896
5897 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
5898 port_msg = (nxt_port_msg_t *) rbuf->buf;
5899
5900 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
5901 return NXT_UNIT_OK;
5902 }
5903
5904 nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
5905
5906 goto retry;
5907 }
5908
5909 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
5910}
5911
5912
5267static nxt_int_t
5268nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
5269{
5270 nxt_unit_port_t *port;
5271 nxt_unit_port_hash_id_t *port_id;
5272
5273 port = data;
5274 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;

--- 112 unchanged lines hidden (view full) ---

5387 NXT_LVLHSH_DEFAULT,
5388 nxt_unit_request_hash_test,
5389 nxt_lvlhsh_alloc,
5390 nxt_lvlhsh_free,
5391};
5392
5393
5394static int
5913static nxt_int_t
5914nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
5915{
5916 nxt_unit_port_t *port;
5917 nxt_unit_port_hash_id_t *port_id;
5918
5919 port = data;
5920 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;

--- 112 unchanged lines hidden (view full) ---

6033 NXT_LVLHSH_DEFAULT,
6034 nxt_unit_request_hash_test,
6035 nxt_lvlhsh_alloc,
6036 nxt_lvlhsh_free,
6037};
6038
6039
6040static int
5395nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
5396 nxt_unit_request_info_impl_t *req_impl)
6041nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
6042 nxt_unit_request_info_t *req)
5397{
6043{
5398 uint32_t *stream;
5399 nxt_int_t res;
5400 nxt_lvlhsh_query_t lhq;
6044 uint32_t *stream;
6045 nxt_int_t res;
6046 nxt_lvlhsh_query_t lhq;
6047 nxt_unit_ctx_impl_t *ctx_impl;
6048 nxt_unit_request_info_impl_t *req_impl;
5401
6049
6050 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6051 if (req_impl->in_hash) {
6052 return NXT_UNIT_OK;
6053 }
6054
5402 stream = &req_impl->stream;
5403
5404 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
5405 lhq.key.length = sizeof(*stream);
5406 lhq.key.start = (u_char *) stream;
5407 lhq.proto = &lvlhsh_requests_proto;
5408 lhq.pool = NULL;
5409 lhq.replace = 0;
5410 lhq.value = req_impl;
5411
6055 stream = &req_impl->stream;
6056
6057 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
6058 lhq.key.length = sizeof(*stream);
6059 lhq.key.start = (u_char *) stream;
6060 lhq.proto = &lvlhsh_requests_proto;
6061 lhq.pool = NULL;
6062 lhq.replace = 0;
6063 lhq.value = req_impl;
6064
5412 res = nxt_lvlhsh_insert(request_hash, &lhq);
6065 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5413
6066
6067 pthread_mutex_lock(&ctx_impl->mutex);
6068
6069 res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
6070
6071 pthread_mutex_unlock(&ctx_impl->mutex);
6072
5414 switch (res) {
5415
5416 case NXT_OK:
6073 switch (res) {
6074
6075 case NXT_OK:
6076 req_impl->in_hash = 1;
5417 return NXT_UNIT_OK;
5418
5419 default:
5420 return NXT_UNIT_ERROR;
5421 }
5422}
5423
5424
6077 return NXT_UNIT_OK;
6078
6079 default:
6080 return NXT_UNIT_ERROR;
6081 }
6082}
6083
6084
5425static nxt_unit_request_info_impl_t *
5426nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
5427 int remove)
6085static nxt_unit_request_info_t *
6086nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
5428{
6087{
5429 nxt_int_t res;
5430 nxt_lvlhsh_query_t lhq;
6088 nxt_int_t res;
6089 nxt_lvlhsh_query_t lhq;
6090 nxt_unit_ctx_impl_t *ctx_impl;
6091 nxt_unit_request_info_impl_t *req_impl;
5431
5432 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
5433 lhq.key.length = sizeof(stream);
5434 lhq.key.start = (u_char *) &stream;
5435 lhq.proto = &lvlhsh_requests_proto;
5436 lhq.pool = NULL;
5437
6092
6093 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
6094 lhq.key.length = sizeof(stream);
6095 lhq.key.start = (u_char *) &stream;
6096 lhq.proto = &lvlhsh_requests_proto;
6097 lhq.pool = NULL;
6098
6099 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6100
6101 pthread_mutex_lock(&ctx_impl->mutex);
6102
5438 if (remove) {
6103 if (remove) {
5439 res = nxt_lvlhsh_delete(request_hash, &lhq);
6104 res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
5440
5441 } else {
6105
6106 } else {
5442 res = nxt_lvlhsh_find(request_hash, &lhq);
6107 res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
5443 }
5444
6108 }
6109
6110 pthread_mutex_unlock(&ctx_impl->mutex);
6111
5445 switch (res) {
5446
5447 case NXT_OK:
6112 switch (res) {
6113
6114 case NXT_OK:
6115 req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
6116 req);
6117 req_impl->in_hash = 0;
6118
5448 return lhq.value;
5449
5450 default:
5451 return NULL;
5452 }
5453}
5454
5455

--- 165 unchanged lines hidden ---
6119 return lhq.value;
6120
6121 default:
6122 return NULL;
6123 }
6124}
6125
6126

--- 165 unchanged lines hidden ---