1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16
17 #include "nxt_websocket.h"
18
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22
23 #define NXT_UNIT_MAX_PLAIN_SIZE 1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE \
25 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26
27 enum {
28 NXT_QUIT_NORMAL = 0,
29 NXT_QUIT_GRACEFUL = 1,
30 };
31
32 typedef struct nxt_unit_impl_s nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
43
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46 nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52 nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54 nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58 int *shared_port_fd, int *shared_queue_fd,
59 int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60 uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62 int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64 nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66 nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69 nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71 nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73 nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76 nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79 nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83 nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86 nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90 nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95 nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97 nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99 nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101 size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108 nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111 int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113 nxt_unit_port_t *port, uint32_t size,
114 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118 nxt_unit_ctx_impl_t *ctx_impl);
119 static int nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130 nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135 pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147 nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152 nxt_unit_port_t *port, int queue_fd);
153
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157 nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159 nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161 nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163 nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166 nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170 nxt_unit_port_t *port, const void *buf, size_t buf_size,
171 const nxt_send_oob_t *oob);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173 const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175 nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177 nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179 nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181 nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183 nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185 nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190 nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192 nxt_unit_port_id_t *port_id, int remove);
193
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195 nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197 nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198
199 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid,
200 int level);
201 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
202 static void nxt_unit_lvlhsh_free(void *data, void *p);
203 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
204
205
206 struct nxt_unit_mmap_buf_s {
207 nxt_unit_buf_t buf;
208
209 nxt_unit_mmap_buf_t *next;
210 nxt_unit_mmap_buf_t **prev;
211
212 nxt_port_mmap_header_t *hdr;
213 nxt_unit_request_info_t *req;
214 nxt_unit_ctx_impl_t *ctx_impl;
215 char *free_ptr;
216 char *plain_ptr;
217 };
218
219
220 struct nxt_unit_recv_msg_s {
221 uint32_t stream;
222 nxt_pid_t pid;
223 nxt_port_id_t reply_port;
224
225 uint8_t last; /* 1 bit */
226 uint8_t mmap; /* 1 bit */
227
228 void *start;
229 uint32_t size;
230
231 int fd[2];
232
233 nxt_unit_mmap_buf_t *incoming_buf;
234 };
235
236
237 typedef enum {
238 NXT_UNIT_RS_START = 0,
239 NXT_UNIT_RS_RESPONSE_INIT,
240 NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
241 NXT_UNIT_RS_RESPONSE_SENT,
242 NXT_UNIT_RS_RELEASED,
243 } nxt_unit_req_state_t;
244
245
246 struct nxt_unit_request_info_impl_s {
247 nxt_unit_request_info_t req;
248
249 uint32_t stream;
250
251 nxt_unit_mmap_buf_t *outgoing_buf;
252 nxt_unit_mmap_buf_t *incoming_buf;
253
254 nxt_unit_req_state_t state;
255 uint8_t websocket;
256 uint8_t in_hash;
257
258 /* for nxt_unit_ctx_impl_t.free_req or active_req */
259 nxt_queue_link_t link;
260 /* for nxt_unit_port_impl_t.awaiting_req */
261 nxt_queue_link_t port_wait_link;
262
263 char extra_data[];
264 };
265
266
267 struct nxt_unit_websocket_frame_impl_s {
268 nxt_unit_websocket_frame_t ws;
269
270 nxt_unit_mmap_buf_t *buf;
271
272 nxt_queue_link_t link;
273
274 nxt_unit_ctx_impl_t *ctx_impl;
275 };
276
277
278 struct nxt_unit_read_buf_s {
279 nxt_queue_link_t link;
280 nxt_unit_ctx_impl_t *ctx_impl;
281 ssize_t size;
282 nxt_recv_oob_t oob;
283 char buf[16384];
284 };
285
286
287 struct nxt_unit_ctx_impl_s {
288 nxt_unit_ctx_t ctx;
289
290 nxt_atomic_t use_count;
291 nxt_atomic_t wait_items;
292
293 pthread_mutex_t mutex;
294
295 nxt_unit_port_t *read_port;
296
297 nxt_queue_link_t link;
298
299 nxt_unit_mmap_buf_t *free_buf;
300
301 /* of nxt_unit_request_info_impl_t */
302 nxt_queue_t free_req;
303
304 /* of nxt_unit_websocket_frame_impl_t */
305 nxt_queue_t free_ws;
306
307 /* of nxt_unit_request_info_impl_t */
308 nxt_queue_t active_req;
309
310 /* of nxt_unit_request_info_impl_t */
311 nxt_lvlhsh_t requests;
312
313 /* of nxt_unit_request_info_impl_t */
314 nxt_queue_t ready_req;
315
316 /* of nxt_unit_read_buf_t */
317 nxt_queue_t pending_rbuf;
318
319 /* of nxt_unit_read_buf_t */
320 nxt_queue_t free_rbuf;
321
322 uint8_t online; /* 1 bit */
323 uint8_t ready; /* 1 bit */
324 uint8_t quit_param;
325
326 nxt_unit_mmap_buf_t ctx_buf[2];
327 nxt_unit_read_buf_t ctx_read_buf;
328
329 nxt_unit_request_info_impl_t req;
330 };
331
332
333 struct nxt_unit_mmap_s {
334 nxt_port_mmap_header_t *hdr;
335 pthread_t src_thread;
336
337 /* of nxt_unit_read_buf_t */
338 nxt_queue_t awaiting_rbuf;
339 };
340
341
342 struct nxt_unit_mmaps_s {
343 pthread_mutex_t mutex;
344 uint32_t size;
345 uint32_t cap;
346 nxt_atomic_t allocated_chunks;
347 nxt_unit_mmap_t *elts;
348 };
349
350
351 struct nxt_unit_impl_s {
352 nxt_unit_t unit;
353 nxt_unit_callbacks_t callbacks;
354
355 nxt_atomic_t use_count;
356 nxt_atomic_t request_count;
357
358 uint32_t request_data_size;
359 uint32_t shm_mmap_limit;
360 uint32_t request_limit;
361
362 pthread_mutex_t mutex;
363
364 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */
365 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */
366
367 nxt_unit_port_t *router_port;
368 nxt_unit_port_t *shared_port;
369
370 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */
371
372 nxt_unit_mmaps_t incoming;
373 nxt_unit_mmaps_t outgoing;
374
375 pid_t pid;
376 int log_fd;
377
378 nxt_unit_ctx_impl_t main_ctx;
379 };
380
381
382 struct nxt_unit_port_impl_s {
383 nxt_unit_port_t port;
384
385 nxt_atomic_t use_count;
386
387 /* for nxt_unit_process_t.ports */
388 nxt_queue_link_t link;
389 nxt_unit_process_t *process;
390
391 /* of nxt_unit_request_info_impl_t */
392 nxt_queue_t awaiting_req;
393
394 int ready;
395
396 void *queue;
397
398 int from_socket;
399 nxt_unit_read_buf_t *socket_rbuf;
400 };
401
402
403 struct nxt_unit_process_s {
404 pid_t pid;
405
406 nxt_queue_t ports; /* of nxt_unit_port_impl_t */
407
408 nxt_unit_impl_t *lib;
409
410 nxt_atomic_t use_count;
411
412 uint32_t next_port_id;
413 };
414
415
416 /* Explicitly using 32 bit types to avoid possible alignment. */
417 typedef struct {
418 int32_t pid;
419 uint32_t id;
420 } nxt_unit_port_hash_id_t;
421
422
423 static pid_t nxt_unit_pid;
424
425
426 nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t * init)427 nxt_unit_init(nxt_unit_init_t *init)
428 {
429 int rc, queue_fd, shared_queue_fd;
430 void *mem;
431 uint32_t ready_stream, shm_limit, request_limit;
432 nxt_unit_ctx_t *ctx;
433 nxt_unit_impl_t *lib;
434 nxt_unit_port_t ready_port, router_port, read_port, shared_port;
435
436 nxt_unit_pid = getpid();
437
438 lib = nxt_unit_create(init);
439 if (nxt_slow_path(lib == NULL)) {
440 return NULL;
441 }
442
443 queue_fd = -1;
444 mem = MAP_FAILED;
445 shared_port.out_fd = -1;
446 shared_port.data = NULL;
447
448 if (init->ready_port.id.pid != 0
449 && init->ready_stream != 0
450 && init->read_port.id.pid != 0)
451 {
452 ready_port = init->ready_port;
453 ready_stream = init->ready_stream;
454 router_port = init->router_port;
455 read_port = init->read_port;
456 lib->log_fd = init->log_fd;
457
458 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
459 ready_port.id.id);
460 nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
461 router_port.id.id);
462 nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
463 read_port.id.id);
464
465 shared_port.in_fd = init->shared_port_fd;
466 shared_queue_fd = init->shared_queue_fd;
467
468 } else {
469 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
470 &shared_port.in_fd, &shared_queue_fd,
471 &lib->log_fd, &ready_stream, &shm_limit,
472 &request_limit);
473 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
474 goto fail;
475 }
476
477 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
478 / PORT_MMAP_DATA_SIZE;
479 lib->request_limit = request_limit;
480 }
481
482 if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
483 lib->shm_mmap_limit = 1;
484 }
485
486 lib->pid = read_port.id.pid;
487 nxt_unit_pid = lib->pid;
488
489 ctx = &lib->main_ctx.ctx;
490
491 rc = nxt_unit_fd_blocking(router_port.out_fd);
492 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
493 goto fail;
494 }
495
496 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
497 if (nxt_slow_path(lib->router_port == NULL)) {
498 nxt_unit_alert(NULL, "failed to add router_port");
499
500 goto fail;
501 }
502
503 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
504 if (nxt_slow_path(queue_fd == -1)) {
505 goto fail;
506 }
507
508 mem = mmap(NULL, sizeof(nxt_port_queue_t),
509 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
510 if (nxt_slow_path(mem == MAP_FAILED)) {
511 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
512 strerror(errno), errno);
513
514 goto fail;
515 }
516
517 nxt_port_queue_init(mem);
518
519 rc = nxt_unit_fd_blocking(read_port.in_fd);
520 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
521 goto fail;
522 }
523
524 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
525 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
526 nxt_unit_alert(NULL, "failed to add read_port");
527
528 goto fail;
529 }
530
531 rc = nxt_unit_fd_blocking(ready_port.out_fd);
532 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
533 goto fail;
534 }
535
536 nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
537 NXT_UNIT_SHARED_PORT_ID);
538
539 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
540 MAP_SHARED, shared_queue_fd, 0);
541 if (nxt_slow_path(mem == MAP_FAILED)) {
542 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
543 strerror(errno), errno);
544
545 goto fail;
546 }
547
548 nxt_unit_close(shared_queue_fd);
549
550 lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
551 if (nxt_slow_path(lib->shared_port == NULL)) {
552 nxt_unit_alert(NULL, "failed to add shared_port");
553
554 goto fail;
555 }
556
557 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
558 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
559 nxt_unit_alert(NULL, "failed to send READY message");
560
561 goto fail;
562 }
563
564 nxt_unit_close(ready_port.out_fd);
565 nxt_unit_close(queue_fd);
566
567 return ctx;
568
569 fail:
570
571 if (mem != MAP_FAILED) {
572 munmap(mem, sizeof(nxt_port_queue_t));
573 }
574
575 if (queue_fd != -1) {
576 nxt_unit_close(queue_fd);
577 }
578
579 nxt_unit_ctx_release(&lib->main_ctx.ctx);
580
581 return NULL;
582 }
583
584
585 static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t * init)586 nxt_unit_create(nxt_unit_init_t *init)
587 {
588 int rc;
589 nxt_unit_impl_t *lib;
590
591 if (nxt_slow_path(init->callbacks.request_handler == NULL)) {
592 nxt_unit_alert(NULL, "request_handler is NULL");
593
594 return NULL;
595 }
596
597 lib = nxt_unit_malloc(NULL,
598 sizeof(nxt_unit_impl_t) + init->request_data_size);
599 if (nxt_slow_path(lib == NULL)) {
600 nxt_unit_alert(NULL, "failed to allocate unit struct");
601
602 return NULL;
603 }
604
605 rc = pthread_mutex_init(&lib->mutex, NULL);
606 if (nxt_slow_path(rc != 0)) {
607 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
608
609 goto out_unit_free;
610 }
611
612 lib->unit.data = init->data;
613 lib->callbacks = init->callbacks;
614
615 lib->request_data_size = init->request_data_size;
616 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
617 / PORT_MMAP_DATA_SIZE;
618 lib->request_limit = init->request_limit;
619
620 lib->processes.slot = NULL;
621 lib->ports.slot = NULL;
622
623 lib->log_fd = STDERR_FILENO;
624
625 nxt_queue_init(&lib->contexts);
626
627 lib->use_count = 0;
628 lib->request_count = 0;
629 lib->router_port = NULL;
630 lib->shared_port = NULL;
631
632 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
633 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
634 goto out_mutex_destroy;
635 }
636
637 rc = nxt_unit_mmaps_init(&lib->incoming);
638 if (nxt_slow_path(rc != 0)) {
639 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
640
641 goto out_ctx_free;
642 }
643
644 rc = nxt_unit_mmaps_init(&lib->outgoing);
645 if (nxt_slow_path(rc != 0)) {
646 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
647
648 goto out_mmaps_destroy;
649 }
650
651 return lib;
652
653 out_mmaps_destroy:
654 nxt_unit_mmaps_destroy(&lib->incoming);
655
656 out_ctx_free:
657 nxt_unit_ctx_free(&lib->main_ctx);
658
659 out_mutex_destroy:
660 pthread_mutex_destroy(&lib->mutex);
661
662 out_unit_free:
663 nxt_unit_free(NULL, lib);
664
665 return NULL;
666 }
667
668
669 static int
nxt_unit_ctx_init(nxt_unit_impl_t * lib,nxt_unit_ctx_impl_t * ctx_impl,void * data)670 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
671 void *data)
672 {
673 int rc;
674
675 ctx_impl->ctx.data = data;
676 ctx_impl->ctx.unit = &lib->unit;
677
678 rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
679 if (nxt_slow_path(rc != 0)) {
680 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
681
682 return NXT_UNIT_ERROR;
683 }
684
685 nxt_unit_lib_use(lib);
686
687 pthread_mutex_lock(&lib->mutex);
688
689 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
690
691 pthread_mutex_unlock(&lib->mutex);
692
693 ctx_impl->use_count = 1;
694 ctx_impl->wait_items = 0;
695 ctx_impl->online = 1;
696 ctx_impl->ready = 0;
697 ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
698
699 nxt_queue_init(&ctx_impl->free_req);
700 nxt_queue_init(&ctx_impl->free_ws);
701 nxt_queue_init(&ctx_impl->active_req);
702 nxt_queue_init(&ctx_impl->ready_req);
703 nxt_queue_init(&ctx_impl->pending_rbuf);
704 nxt_queue_init(&ctx_impl->free_rbuf);
705
706 ctx_impl->free_buf = NULL;
707 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
708 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
709
710 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
711 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
712
713 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
714
715 ctx_impl->req.req.ctx = &ctx_impl->ctx;
716 ctx_impl->req.req.unit = &lib->unit;
717
718 ctx_impl->read_port = NULL;
719 ctx_impl->requests.slot = 0;
720
721 return NXT_UNIT_OK;
722 }
723
724
725 nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_t * ctx)726 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
727 {
728 nxt_unit_ctx_impl_t *ctx_impl;
729
730 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
731
732 nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
733 }
734
735
736 nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_t * ctx)737 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
738 {
739 long c;
740 nxt_unit_ctx_impl_t *ctx_impl;
741
742 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
743
744 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
745
746 if (c == 1) {
747 nxt_unit_ctx_free(ctx_impl);
748 }
749 }
750
751
752 nxt_inline void
nxt_unit_lib_use(nxt_unit_impl_t * lib)753 nxt_unit_lib_use(nxt_unit_impl_t *lib)
754 {
755 nxt_atomic_fetch_add(&lib->use_count, 1);
756 }
757
758
759 nxt_inline void
nxt_unit_lib_release(nxt_unit_impl_t * lib)760 nxt_unit_lib_release(nxt_unit_impl_t *lib)
761 {
762 long c;
763 nxt_unit_process_t *process;
764
765 c = nxt_atomic_fetch_add(&lib->use_count, -1);
766
767 if (c == 1) {
768 for ( ;; ) {
769 pthread_mutex_lock(&lib->mutex);
770
771 process = nxt_unit_process_pop_first(lib);
772 if (process == NULL) {
773 pthread_mutex_unlock(&lib->mutex);
774
775 break;
776 }
777
778 nxt_unit_remove_process(lib, process);
779 }
780
781 pthread_mutex_destroy(&lib->mutex);
782
783 if (nxt_fast_path(lib->router_port != NULL)) {
784 nxt_unit_port_release(lib->router_port);
785 }
786
787 if (nxt_fast_path(lib->shared_port != NULL)) {
788 nxt_unit_port_release(lib->shared_port);
789 }
790
791 nxt_unit_mmaps_destroy(&lib->incoming);
792 nxt_unit_mmaps_destroy(&lib->outgoing);
793
794 nxt_unit_free(NULL, lib);
795 }
796 }
797
798
799 nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t ** head,nxt_unit_mmap_buf_t * mmap_buf)800 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
801 nxt_unit_mmap_buf_t *mmap_buf)
802 {
803 mmap_buf->next = *head;
804
805 if (mmap_buf->next != NULL) {
806 mmap_buf->next->prev = &mmap_buf->next;
807 }
808
809 *head = mmap_buf;
810 mmap_buf->prev = head;
811 }
812
813
814 nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t ** prev,nxt_unit_mmap_buf_t * mmap_buf)815 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
816 nxt_unit_mmap_buf_t *mmap_buf)
817 {
818 while (*prev != NULL) {
819 prev = &(*prev)->next;
820 }
821
822 nxt_unit_mmap_buf_insert(prev, mmap_buf);
823 }
824
825
826 nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t * mmap_buf)827 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
828 {
829 nxt_unit_mmap_buf_t **prev;
830
831 prev = mmap_buf->prev;
832
833 if (mmap_buf->next != NULL) {
834 mmap_buf->next->prev = prev;
835 }
836
837 if (prev != NULL) {
838 *prev = mmap_buf->next;
839 }
840 }
841
842
843 static int
nxt_unit_read_env(nxt_unit_port_t * ready_port,nxt_unit_port_t * router_port,nxt_unit_port_t * read_port,int * shared_port_fd,int * shared_queue_fd,int * log_fd,uint32_t * stream,uint32_t * shm_limit,uint32_t * request_limit)844 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
845 nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
846 int *log_fd, uint32_t *stream,
847 uint32_t *shm_limit, uint32_t *request_limit)
848 {
849 int rc;
850 int ready_fd, router_fd, read_in_fd, read_out_fd;
851 char *unit_init, *version_end, *vars;
852 size_t version_length;
853 int64_t ready_pid, router_pid, read_pid;
854 uint32_t ready_stream, router_id, ready_id, read_id;
855
856 unit_init = getenv(NXT_UNIT_INIT_ENV);
857 if (nxt_slow_path(unit_init == NULL)) {
858 nxt_unit_alert(NULL, "%s is not in the current environment",
859 NXT_UNIT_INIT_ENV);
860
861 return NXT_UNIT_ERROR;
862 }
863
864 version_end = strchr(unit_init, ';');
865 if (nxt_slow_path(version_end == NULL)) {
866 nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
867 NXT_UNIT_INIT_ENV, unit_init);
868
869 return NXT_UNIT_ERROR;
870 }
871
872 version_length = version_end - unit_init;
873
874 rc = version_length != nxt_length(NXT_VERSION)
875 || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
876
877 if (nxt_slow_path(rc != 0)) {
878 nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
879 "%.*s, while the app was compiled with libunit %s",
880 (int) version_length, unit_init, NXT_VERSION);
881
882 return NXT_UNIT_ERROR;
883 }
884
885 vars = version_end + 1;
886
887 rc = sscanf(vars,
888 "%"PRIu32";"
889 "%"PRId64",%"PRIu32",%d;"
890 "%"PRId64",%"PRIu32",%d;"
891 "%"PRId64",%"PRIu32",%d,%d;"
892 "%d,%d;"
893 "%d,%"PRIu32",%"PRIu32,
894 &ready_stream,
895 &ready_pid, &ready_id, &ready_fd,
896 &router_pid, &router_id, &router_fd,
897 &read_pid, &read_id, &read_in_fd, &read_out_fd,
898 shared_port_fd, shared_queue_fd,
899 log_fd, shm_limit, request_limit);
900
901 if (nxt_slow_path(rc == EOF)) {
902 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
903 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
904
905 return NXT_UNIT_ERROR;
906 }
907
908 if (nxt_slow_path(rc != 16)) {
909 nxt_unit_alert(NULL, "invalid number of variables in %s env: "
910 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
911
912 return NXT_UNIT_ERROR;
913 }
914
915 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
916
917 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
918
919 ready_port->in_fd = -1;
920 ready_port->out_fd = ready_fd;
921 ready_port->data = NULL;
922
923 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
924
925 router_port->in_fd = -1;
926 router_port->out_fd = router_fd;
927 router_port->data = NULL;
928
929 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
930
931 read_port->in_fd = read_in_fd;
932 read_port->out_fd = read_out_fd;
933 read_port->data = NULL;
934
935 *stream = ready_stream;
936
937 return NXT_UNIT_OK;
938 }
939
940
941 static int
nxt_unit_ready(nxt_unit_ctx_t * ctx,int ready_fd,uint32_t stream,int queue_fd)942 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
943 {
944 ssize_t res;
945 nxt_send_oob_t oob;
946 nxt_port_msg_t msg;
947 nxt_unit_impl_t *lib;
948 int fds[2] = {queue_fd, -1};
949
950 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
951
952 msg.stream = stream;
953 msg.pid = lib->pid;
954 msg.reply_port = 0;
955 msg.type = _NXT_PORT_MSG_PROCESS_READY;
956 msg.last = 1;
957 msg.mmap = 0;
958 msg.nf = 0;
959 msg.mf = 0;
960
961 nxt_socket_msg_oob_init(&oob, fds);
962
963 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
964 if (res != sizeof(msg)) {
965 return NXT_UNIT_ERROR;
966 }
967
968 return NXT_UNIT_OK;
969 }
970
971
972 static int
nxt_unit_process_msg(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf,nxt_unit_request_info_t ** preq)973 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
974 nxt_unit_request_info_t **preq)
975 {
976 int rc;
977 pid_t pid;
978 uint8_t quit_param;
979 nxt_port_msg_t *port_msg;
980 nxt_unit_impl_t *lib;
981 nxt_unit_recv_msg_t recv_msg;
982
983 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
984
985 recv_msg.incoming_buf = NULL;
986 recv_msg.fd[0] = -1;
987 recv_msg.fd[1] = -1;
988
989 rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
990 if (nxt_slow_path(rc != NXT_OK)) {
991 nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
992 rc = NXT_UNIT_ERROR;
993 goto done;
994 }
995
996 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
997 if (nxt_slow_path(rbuf->size == 0)) {
998 nxt_unit_debug(ctx, "read port closed");
999
1000 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1001 rc = NXT_UNIT_OK;
1002 goto done;
1003 }
1004
1005 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
1006
1007 rc = NXT_UNIT_ERROR;
1008 goto done;
1009 }
1010
1011 port_msg = (nxt_port_msg_t *) rbuf->buf;
1012
1013 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
1014 port_msg->stream, (int) port_msg->type,
1015 recv_msg.fd[0], recv_msg.fd[1]);
1016
1017 recv_msg.stream = port_msg->stream;
1018 recv_msg.pid = port_msg->pid;
1019 recv_msg.reply_port = port_msg->reply_port;
1020 recv_msg.last = port_msg->last;
1021 recv_msg.mmap = port_msg->mmap;
1022
1023 recv_msg.start = port_msg + 1;
1024 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1025
1026 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1027 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1028 port_msg->stream, (int) port_msg->type);
1029 rc = NXT_UNIT_ERROR;
1030 goto done;
1031 }
1032
1033 /* Fragmentation is unsupported. */
1034 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1035 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1036 port_msg->stream, (int) port_msg->type);
1037 rc = NXT_UNIT_ERROR;
1038 goto done;
1039 }
1040
1041 if (port_msg->mmap) {
1042 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1043
1044 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1045 if (rc == NXT_UNIT_AGAIN) {
1046 recv_msg.fd[0] = -1;
1047 recv_msg.fd[1] = -1;
1048 }
1049
1050 goto done;
1051 }
1052 }
1053
1054 switch (port_msg->type) {
1055
1056 case _NXT_PORT_MSG_RPC_READY:
1057 rc = NXT_UNIT_OK;
1058 break;
1059
1060 case _NXT_PORT_MSG_QUIT:
1061 if (recv_msg.size == sizeof(quit_param)) {
1062 memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1063
1064 } else {
1065 quit_param = NXT_QUIT_NORMAL;
1066 }
1067
1068 nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1069 (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1070
1071 nxt_unit_quit(ctx, quit_param);
1072
1073 rc = NXT_UNIT_OK;
1074 break;
1075
1076 case _NXT_PORT_MSG_NEW_PORT:
1077 rc = nxt_unit_process_new_port(ctx, &recv_msg);
1078 break;
1079
1080 case _NXT_PORT_MSG_PORT_ACK:
1081 rc = nxt_unit_ctx_ready(ctx);
1082 break;
1083
1084 case _NXT_PORT_MSG_CHANGE_FILE:
1085 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1086 port_msg->stream, recv_msg.fd[0]);
1087
1088 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1089 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1090 port_msg->stream, recv_msg.fd[0], lib->log_fd,
1091 strerror(errno), errno);
1092
1093 rc = NXT_UNIT_ERROR;
1094 goto done;
1095 }
1096
1097 rc = NXT_UNIT_OK;
1098 break;
1099
1100 case _NXT_PORT_MSG_MMAP:
1101 if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1102 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1103 port_msg->stream, recv_msg.fd[0]);
1104
1105 rc = NXT_UNIT_ERROR;
1106 goto done;
1107 }
1108
1109 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1110 break;
1111
1112 case _NXT_PORT_MSG_REQ_HEADERS:
1113 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1114 break;
1115
1116 case _NXT_PORT_MSG_REQ_BODY:
1117 rc = nxt_unit_process_req_body(ctx, &recv_msg);
1118 break;
1119
1120 case _NXT_PORT_MSG_WEBSOCKET:
1121 rc = nxt_unit_process_websocket(ctx, &recv_msg);
1122 break;
1123
1124 case _NXT_PORT_MSG_REMOVE_PID:
1125 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1126 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1127 "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1128 (int) sizeof(pid));
1129
1130 rc = NXT_UNIT_ERROR;
1131 goto done;
1132 }
1133
1134 memcpy(&pid, recv_msg.start, sizeof(pid));
1135
1136 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1137 port_msg->stream, (int) pid);
1138
1139 nxt_unit_remove_pid(lib, pid);
1140
1141 rc = NXT_UNIT_OK;
1142 break;
1143
1144 case _NXT_PORT_MSG_SHM_ACK:
1145 rc = nxt_unit_process_shm_ack(ctx);
1146 break;
1147
1148 default:
1149 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1150 port_msg->stream, (int) port_msg->type);
1151
1152 rc = NXT_UNIT_ERROR;
1153 goto done;
1154 }
1155
1156 done:
1157
1158 if (recv_msg.fd[0] != -1) {
1159 nxt_unit_close(recv_msg.fd[0]);
1160 }
1161
1162 if (recv_msg.fd[1] != -1) {
1163 nxt_unit_close(recv_msg.fd[1]);
1164 }
1165
1166 while (recv_msg.incoming_buf != NULL) {
1167 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1168 }
1169
1170 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1171 #if (NXT_DEBUG)
1172 memset(rbuf->buf, 0xAC, rbuf->size);
1173 #endif
1174 nxt_unit_read_buf_release(ctx, rbuf);
1175 }
1176
1177 return rc;
1178 }
1179
1180
1181 static int
nxt_unit_process_new_port(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1182 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1183 {
1184 void *mem;
1185 nxt_unit_port_t new_port, *port;
1186 nxt_port_msg_new_port_t *new_port_msg;
1187
1188 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1189 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1190 "invalid message size (%d)",
1191 recv_msg->stream, (int) recv_msg->size);
1192
1193 return NXT_UNIT_ERROR;
1194 }
1195
1196 if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1197 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1198 recv_msg->stream, recv_msg->fd[0]);
1199
1200 return NXT_UNIT_ERROR;
1201 }
1202
1203 new_port_msg = recv_msg->start;
1204
1205 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1206 recv_msg->stream, (int) new_port_msg->pid,
1207 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1208
1209 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
1210 return NXT_UNIT_ERROR;
1211 }
1212
1213 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
1214
1215 new_port.in_fd = -1;
1216 new_port.out_fd = recv_msg->fd[0];
1217
1218 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1219 MAP_SHARED, recv_msg->fd[1], 0);
1220
1221 if (nxt_slow_path(mem == MAP_FAILED)) {
1222 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1223 strerror(errno), errno);
1224
1225 return NXT_UNIT_ERROR;
1226 }
1227
1228 new_port.data = NULL;
1229
1230 recv_msg->fd[0] = -1;
1231
1232 port = nxt_unit_add_port(ctx, &new_port, mem);
1233 if (nxt_slow_path(port == NULL)) {
1234 return NXT_UNIT_ERROR;
1235 }
1236
1237 nxt_unit_port_release(port);
1238
1239 return NXT_UNIT_OK;
1240 }
1241
1242
1243 static int
nxt_unit_ctx_ready(nxt_unit_ctx_t * ctx)1244 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1245 {
1246 nxt_unit_impl_t *lib;
1247 nxt_unit_ctx_impl_t *ctx_impl;
1248
1249 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1250
1251 if (nxt_slow_path(ctx_impl->ready)) {
1252 return NXT_UNIT_OK;
1253 }
1254
1255 ctx_impl->ready = 1;
1256
1257 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1258
1259 /* Call ready_handler() only for main context. */
1260 if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1261 return lib->callbacks.ready_handler(ctx);
1262 }
1263
1264 if (&lib->main_ctx != ctx_impl) {
1265 /* Check if the main context is already stopped or quit. */
1266 if (nxt_slow_path(!lib->main_ctx.ready)) {
1267 ctx_impl->ready = 0;
1268
1269 nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1270
1271 return NXT_UNIT_OK;
1272 }
1273
1274 if (lib->callbacks.add_port != NULL) {
1275 lib->callbacks.add_port(ctx, lib->shared_port);
1276 }
1277 }
1278
1279 return NXT_UNIT_OK;
1280 }
1281
1282
1283 static int
nxt_unit_process_req_headers(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_request_info_t ** preq)1284 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1285 nxt_unit_request_info_t **preq)
1286 {
1287 int res;
1288 nxt_unit_impl_t *lib;
1289 nxt_unit_port_id_t port_id;
1290 nxt_unit_request_t *r;
1291 nxt_unit_mmap_buf_t *b;
1292 nxt_unit_request_info_t *req;
1293 nxt_unit_request_info_impl_t *req_impl;
1294
1295 if (nxt_slow_path(recv_msg->mmap == 0)) {
1296 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1297 recv_msg->stream);
1298
1299 return NXT_UNIT_ERROR;
1300 }
1301
1302 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1303 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1304 "%d expected", recv_msg->stream, (int) recv_msg->size,
1305 (int) sizeof(nxt_unit_request_t));
1306
1307 return NXT_UNIT_ERROR;
1308 }
1309
1310 req_impl = nxt_unit_request_info_get(ctx);
1311 if (nxt_slow_path(req_impl == NULL)) {
1312 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1313 recv_msg->stream);
1314
1315 return NXT_UNIT_ERROR;
1316 }
1317
1318 req = &req_impl->req;
1319
1320 req->request = recv_msg->start;
1321
1322 b = recv_msg->incoming_buf;
1323
1324 req->request_buf = &b->buf;
1325 req->response = NULL;
1326 req->response_buf = NULL;
1327
1328 r = req->request;
1329
1330 req->content_length = r->content_length;
1331
1332 req->content_buf = req->request_buf;
1333 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1334
1335 req_impl->stream = recv_msg->stream;
1336
1337 req_impl->outgoing_buf = NULL;
1338
1339 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1340 b->req = req;
1341 }
1342
1343 /* "Move" incoming buffer list to req_impl. */
1344 req_impl->incoming_buf = recv_msg->incoming_buf;
1345 req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1346 recv_msg->incoming_buf = NULL;
1347
1348 req->content_fd = recv_msg->fd[0];
1349 recv_msg->fd[0] = -1;
1350
1351 req->response_max_fields = 0;
1352 req_impl->state = NXT_UNIT_RS_START;
1353 req_impl->websocket = 0;
1354 req_impl->in_hash = 0;
1355
1356 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1357 (int) r->method_length,
1358 (char *) nxt_unit_sptr_get(&r->method),
1359 (int) r->target_length,
1360 (char *) nxt_unit_sptr_get(&r->target),
1361 (int) r->content_length);
1362
1363 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1364
1365 res = nxt_unit_request_check_response_port(req, &port_id);
1366 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1367 return NXT_UNIT_ERROR;
1368 }
1369
1370 if (nxt_fast_path(res == NXT_UNIT_OK)) {
1371 res = nxt_unit_send_req_headers_ack(req);
1372 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1373 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1374
1375 return NXT_UNIT_ERROR;
1376 }
1377
1378 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1379
1380 if (req->content_length
1381 > (uint64_t) (req->content_buf->end - req->content_buf->free))
1382 {
1383 res = nxt_unit_request_hash_add(ctx, req);
1384 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1385 nxt_unit_req_warn(req, "failed to add request to hash");
1386
1387 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1388
1389 return NXT_UNIT_ERROR;
1390 }
1391
1392 /*
1393 * If application have separate data handler, we may start
1394 * request processing and process data when it is arrived.
1395 */
1396 if (lib->callbacks.data_handler == NULL) {
1397 return NXT_UNIT_OK;
1398 }
1399 }
1400
1401 if (preq == NULL) {
1402 lib->callbacks.request_handler(req);
1403
1404 } else {
1405 *preq = req;
1406 }
1407 }
1408
1409 return NXT_UNIT_OK;
1410 }
1411
1412
1413 static int
nxt_unit_process_req_body(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1414 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1415 {
1416 uint64_t l;
1417 nxt_unit_impl_t *lib;
1418 nxt_unit_mmap_buf_t *b;
1419 nxt_unit_request_info_t *req;
1420
1421 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1422 if (req == NULL) {
1423 return NXT_UNIT_OK;
1424 }
1425
1426 l = req->content_buf->end - req->content_buf->free;
1427
1428 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1429 b->req = req;
1430 l += b->buf.end - b->buf.free;
1431 }
1432
1433 if (recv_msg->incoming_buf != NULL) {
1434 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1435
1436 while (b->next != NULL) {
1437 b = b->next;
1438 }
1439
1440 /* "Move" incoming buffer list to req_impl. */
1441 b->next = recv_msg->incoming_buf;
1442 b->next->prev = &b->next;
1443
1444 recv_msg->incoming_buf = NULL;
1445 }
1446
1447 req->content_fd = recv_msg->fd[0];
1448 recv_msg->fd[0] = -1;
1449
1450 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1451
1452 if (lib->callbacks.data_handler != NULL) {
1453 lib->callbacks.data_handler(req);
1454
1455 return NXT_UNIT_OK;
1456 }
1457
1458 if (req->content_fd != -1 || l == req->content_length) {
1459 lib->callbacks.request_handler(req);
1460 }
1461
1462 return NXT_UNIT_OK;
1463 }
1464
1465
1466 static int
nxt_unit_request_check_response_port(nxt_unit_request_info_t * req,nxt_unit_port_id_t * port_id)1467 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1468 nxt_unit_port_id_t *port_id)
1469 {
1470 int res;
1471 nxt_unit_ctx_t *ctx;
1472 nxt_unit_impl_t *lib;
1473 nxt_unit_port_t *port;
1474 nxt_unit_process_t *process;
1475 nxt_unit_ctx_impl_t *ctx_impl;
1476 nxt_unit_port_impl_t *port_impl;
1477 nxt_unit_request_info_impl_t *req_impl;
1478
1479 ctx = req->ctx;
1480 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1481 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1482
1483 pthread_mutex_lock(&lib->mutex);
1484
1485 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1486 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1487
1488 if (nxt_fast_path(port != NULL)) {
1489 req->response_port = port;
1490
1491 if (nxt_fast_path(port_impl->ready)) {
1492 pthread_mutex_unlock(&lib->mutex);
1493
1494 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1495 (int) port->id.pid, (int) port->id.id);
1496
1497 return NXT_UNIT_OK;
1498 }
1499
1500 nxt_unit_debug(ctx, "check_response_port: "
1501 "port{%d,%d} already requested",
1502 (int) port->id.pid, (int) port->id.id);
1503
1504 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1505
1506 nxt_queue_insert_tail(&port_impl->awaiting_req,
1507 &req_impl->port_wait_link);
1508
1509 pthread_mutex_unlock(&lib->mutex);
1510
1511 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1512
1513 return NXT_UNIT_AGAIN;
1514 }
1515
1516 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1517 if (nxt_slow_path(port_impl == NULL)) {
1518 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1519 (int) sizeof(nxt_unit_port_impl_t));
1520
1521 pthread_mutex_unlock(&lib->mutex);
1522
1523 return NXT_UNIT_ERROR;
1524 }
1525
1526 port = &port_impl->port;
1527
1528 port->id = *port_id;
1529 port->in_fd = -1;
1530 port->out_fd = -1;
1531 port->data = NULL;
1532
1533 res = nxt_unit_port_hash_add(&lib->ports, port);
1534 if (nxt_slow_path(res != NXT_UNIT_OK)) {
1535 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1536 port->id.pid, port->id.id);
1537
1538 pthread_mutex_unlock(&lib->mutex);
1539
1540 nxt_unit_free(ctx, port);
1541
1542 return NXT_UNIT_ERROR;
1543 }
1544
1545 process = nxt_unit_process_find(lib, port_id->pid, 0);
1546 if (nxt_slow_path(process == NULL)) {
1547 nxt_unit_alert(ctx, "check_response_port: process %d not found",
1548 port->id.pid);
1549
1550 nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1551
1552 pthread_mutex_unlock(&lib->mutex);
1553
1554 nxt_unit_free(ctx, port);
1555
1556 return NXT_UNIT_ERROR;
1557 }
1558
1559 nxt_queue_insert_tail(&process->ports, &port_impl->link);
1560
1561 port_impl->process = process;
1562 port_impl->queue = NULL;
1563 port_impl->from_socket = 0;
1564 port_impl->socket_rbuf = NULL;
1565
1566 nxt_queue_init(&port_impl->awaiting_req);
1567
1568 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1569
1570 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1571
1572 port_impl->use_count = 2;
1573 port_impl->ready = 0;
1574
1575 req->response_port = port;
1576
1577 pthread_mutex_unlock(&lib->mutex);
1578
1579 res = nxt_unit_get_port(ctx, port_id);
1580 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1581 return NXT_UNIT_ERROR;
1582 }
1583
1584 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1585
1586 return NXT_UNIT_AGAIN;
1587 }
1588
1589
1590 static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t * req)1591 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1592 {
1593 ssize_t res;
1594 nxt_port_msg_t msg;
1595 nxt_unit_impl_t *lib;
1596 nxt_unit_ctx_impl_t *ctx_impl;
1597 nxt_unit_request_info_impl_t *req_impl;
1598
1599 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1600 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1601 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1602
1603 memset(&msg, 0, sizeof(nxt_port_msg_t));
1604
1605 msg.stream = req_impl->stream;
1606 msg.pid = lib->pid;
1607 msg.reply_port = ctx_impl->read_port->id.id;
1608 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1609
1610 res = nxt_unit_port_send(req->ctx, req->response_port,
1611 &msg, sizeof(msg), NULL);
1612 if (nxt_slow_path(res != sizeof(msg))) {
1613 return NXT_UNIT_ERROR;
1614 }
1615
1616 return NXT_UNIT_OK;
1617 }
1618
1619
1620 static int
nxt_unit_process_websocket(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1621 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1622 {
1623 size_t hsize;
1624 nxt_unit_impl_t *lib;
1625 nxt_unit_mmap_buf_t *b;
1626 nxt_unit_callbacks_t *cb;
1627 nxt_unit_request_info_t *req;
1628 nxt_unit_request_info_impl_t *req_impl;
1629 nxt_unit_websocket_frame_impl_t *ws_impl;
1630
1631 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1632 if (nxt_slow_path(req == NULL)) {
1633 return NXT_UNIT_OK;
1634 }
1635
1636 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1637
1638 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1639 cb = &lib->callbacks;
1640
1641 if (cb->websocket_handler && recv_msg->size >= 2) {
1642 ws_impl = nxt_unit_websocket_frame_get(ctx);
1643 if (nxt_slow_path(ws_impl == NULL)) {
1644 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1645 req_impl->stream);
1646
1647 return NXT_UNIT_ERROR;
1648 }
1649
1650 ws_impl->ws.req = req;
1651
1652 ws_impl->buf = NULL;
1653
1654 if (recv_msg->mmap) {
1655 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1656 b->req = req;
1657 }
1658
1659 /* "Move" incoming buffer list to ws_impl. */
1660 ws_impl->buf = recv_msg->incoming_buf;
1661 ws_impl->buf->prev = &ws_impl->buf;
1662 recv_msg->incoming_buf = NULL;
1663
1664 b = ws_impl->buf;
1665
1666 } else {
1667 b = nxt_unit_mmap_buf_get(ctx);
1668 if (nxt_slow_path(b == NULL)) {
1669 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1670 req_impl->stream);
1671
1672 nxt_unit_websocket_frame_release(&ws_impl->ws);
1673
1674 return NXT_UNIT_ERROR;
1675 }
1676
1677 b->req = req;
1678 b->buf.start = recv_msg->start;
1679 b->buf.free = b->buf.start;
1680 b->buf.end = b->buf.start + recv_msg->size;
1681
1682 nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1683 }
1684
1685 ws_impl->ws.header = (void *) b->buf.start;
1686 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1687 ws_impl->ws.header);
1688
1689 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1690
1691 if (ws_impl->ws.header->mask) {
1692 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1693
1694 } else {
1695 ws_impl->ws.mask = NULL;
1696 }
1697
1698 b->buf.free += hsize;
1699
1700 ws_impl->ws.content_buf = &b->buf;
1701 ws_impl->ws.content_length = ws_impl->ws.payload_len;
1702
1703 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1704 "payload_len=%"PRIu64,
1705 ws_impl->ws.header->opcode,
1706 ws_impl->ws.payload_len);
1707
1708 cb->websocket_handler(&ws_impl->ws);
1709 }
1710
1711 if (recv_msg->last) {
1712 if (cb->close_handler) {
1713 nxt_unit_req_debug(req, "close_handler");
1714
1715 cb->close_handler(req);
1716
1717 } else {
1718 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1719 }
1720 }
1721
1722 return NXT_UNIT_OK;
1723 }
1724
1725
1726 static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t * ctx)1727 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1728 {
1729 nxt_unit_impl_t *lib;
1730 nxt_unit_callbacks_t *cb;
1731
1732 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1733 cb = &lib->callbacks;
1734
1735 if (cb->shm_ack_handler != NULL) {
1736 cb->shm_ack_handler(ctx);
1737 }
1738
1739 return NXT_UNIT_OK;
1740 }
1741
1742
1743 static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t * ctx)1744 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1745 {
1746 nxt_unit_impl_t *lib;
1747 nxt_queue_link_t *lnk;
1748 nxt_unit_ctx_impl_t *ctx_impl;
1749 nxt_unit_request_info_impl_t *req_impl;
1750
1751 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1752
1753 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1754
1755 pthread_mutex_lock(&ctx_impl->mutex);
1756
1757 if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1758 pthread_mutex_unlock(&ctx_impl->mutex);
1759
1760 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1761 + lib->request_data_size);
1762 if (nxt_slow_path(req_impl == NULL)) {
1763 return NULL;
1764 }
1765
1766 req_impl->req.unit = ctx->unit;
1767 req_impl->req.ctx = ctx;
1768
1769 pthread_mutex_lock(&ctx_impl->mutex);
1770
1771 } else {
1772 lnk = nxt_queue_first(&ctx_impl->free_req);
1773 nxt_queue_remove(lnk);
1774
1775 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1776 }
1777
1778 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1779
1780 pthread_mutex_unlock(&ctx_impl->mutex);
1781
1782 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1783
1784 return req_impl;
1785 }
1786
1787
1788 static void
nxt_unit_request_info_release(nxt_unit_request_info_t * req)1789 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1790 {
1791 nxt_unit_ctx_t *ctx;
1792 nxt_unit_ctx_impl_t *ctx_impl;
1793 nxt_unit_request_info_impl_t *req_impl;
1794
1795 ctx = req->ctx;
1796 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1797 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1798
1799 req->response = NULL;
1800 req->response_buf = NULL;
1801
1802 if (req_impl->in_hash) {
1803 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1804 }
1805
1806 while (req_impl->outgoing_buf != NULL) {
1807 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1808 }
1809
1810 while (req_impl->incoming_buf != NULL) {
1811 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1812 }
1813
1814 if (req->content_fd != -1) {
1815 nxt_unit_close(req->content_fd);
1816
1817 req->content_fd = -1;
1818 }
1819
1820 if (req->response_port != NULL) {
1821 nxt_unit_port_release(req->response_port);
1822
1823 req->response_port = NULL;
1824 }
1825
1826 req_impl->state = NXT_UNIT_RS_RELEASED;
1827
1828 pthread_mutex_lock(&ctx_impl->mutex);
1829
1830 nxt_queue_remove(&req_impl->link);
1831
1832 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1833
1834 pthread_mutex_unlock(&ctx_impl->mutex);
1835
1836 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1837 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1838 }
1839 }
1840
1841
1842 static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t * req_impl)1843 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1844 {
1845 nxt_unit_ctx_impl_t *ctx_impl;
1846
1847 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1848
1849 nxt_queue_remove(&req_impl->link);
1850
1851 if (req_impl != &ctx_impl->req) {
1852 nxt_unit_free(&ctx_impl->ctx, req_impl);
1853 }
1854 }
1855
1856
1857 static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t * ctx)1858 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1859 {
1860 nxt_queue_link_t *lnk;
1861 nxt_unit_ctx_impl_t *ctx_impl;
1862 nxt_unit_websocket_frame_impl_t *ws_impl;
1863
1864 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1865
1866 pthread_mutex_lock(&ctx_impl->mutex);
1867
1868 if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1869 pthread_mutex_unlock(&ctx_impl->mutex);
1870
1871 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1872 if (nxt_slow_path(ws_impl == NULL)) {
1873 return NULL;
1874 }
1875
1876 } else {
1877 lnk = nxt_queue_first(&ctx_impl->free_ws);
1878 nxt_queue_remove(lnk);
1879
1880 pthread_mutex_unlock(&ctx_impl->mutex);
1881
1882 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1883 }
1884
1885 ws_impl->ctx_impl = ctx_impl;
1886
1887 return ws_impl;
1888 }
1889
1890
1891 static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t * ws)1892 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1893 {
1894 nxt_unit_websocket_frame_impl_t *ws_impl;
1895
1896 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1897
1898 while (ws_impl->buf != NULL) {
1899 nxt_unit_mmap_buf_free(ws_impl->buf);
1900 }
1901
1902 ws->req = NULL;
1903
1904 pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1905
1906 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1907
1908 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1909 }
1910
1911
1912 static void
nxt_unit_websocket_frame_free(nxt_unit_ctx_t * ctx,nxt_unit_websocket_frame_impl_t * ws_impl)1913 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1914 nxt_unit_websocket_frame_impl_t *ws_impl)
1915 {
1916 nxt_queue_remove(&ws_impl->link);
1917
1918 nxt_unit_free(ctx, ws_impl);
1919 }
1920
1921
1922 uint16_t
nxt_unit_field_hash(const char * name,size_t name_length)1923 nxt_unit_field_hash(const char *name, size_t name_length)
1924 {
1925 u_char ch;
1926 uint32_t hash;
1927 const char *p, *end;
1928
1929 hash = 159406; /* Magic value copied from nxt_http_parse.c */
1930 end = name + name_length;
1931
1932 for (p = name; p < end; p++) {
1933 ch = *p;
1934 hash = (hash << 4) + hash + nxt_lowcase(ch);
1935 }
1936
1937 hash = (hash >> 16) ^ hash;
1938
1939 return hash;
1940 }
1941
1942
1943 void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t * req)1944 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1945 {
1946 char *name;
1947 uint32_t i, j;
1948 nxt_unit_field_t *fields, f;
1949 nxt_unit_request_t *r;
1950
1951 static nxt_str_t content_length = nxt_string("content-length");
1952 static nxt_str_t content_type = nxt_string("content-type");
1953 static nxt_str_t cookie = nxt_string("cookie");
1954
1955 nxt_unit_req_debug(req, "group_dup_fields");
1956
1957 r = req->request;
1958 fields = r->fields;
1959
1960 for (i = 0; i < r->fields_count; i++) {
1961 name = nxt_unit_sptr_get(&fields[i].name);
1962
1963 switch (fields[i].hash) {
1964 case NXT_UNIT_HASH_CONTENT_LENGTH:
1965 if (fields[i].name_length == content_length.length
1966 && nxt_unit_memcasecmp(name, content_length.start,
1967 content_length.length) == 0)
1968 {
1969 r->content_length_field = i;
1970 }
1971
1972 break;
1973
1974 case NXT_UNIT_HASH_CONTENT_TYPE:
1975 if (fields[i].name_length == content_type.length
1976 && nxt_unit_memcasecmp(name, content_type.start,
1977 content_type.length) == 0)
1978 {
1979 r->content_type_field = i;
1980 }
1981
1982 break;
1983
1984 case NXT_UNIT_HASH_COOKIE:
1985 if (fields[i].name_length == cookie.length
1986 && nxt_unit_memcasecmp(name, cookie.start,
1987 cookie.length) == 0)
1988 {
1989 r->cookie_field = i;
1990 }
1991
1992 break;
1993 }
1994
1995 for (j = i + 1; j < r->fields_count; j++) {
1996 if (fields[i].hash != fields[j].hash
1997 || fields[i].name_length != fields[j].name_length
1998 || nxt_unit_memcasecmp(name,
1999 nxt_unit_sptr_get(&fields[j].name),
2000 fields[j].name_length) != 0)
2001 {
2002 continue;
2003 }
2004
2005 f = fields[j];
2006 f.value.offset += (j - (i + 1)) * sizeof(f);
2007
2008 while (j > i + 1) {
2009 fields[j] = fields[j - 1];
2010 fields[j].name.offset -= sizeof(f);
2011 fields[j].value.offset -= sizeof(f);
2012 j--;
2013 }
2014
2015 fields[j] = f;
2016
2017 /* Assign the same name pointer for further grouping simplicity. */
2018 nxt_unit_sptr_set(&fields[j].name, name);
2019
2020 i++;
2021 }
2022 }
2023 }
2024
2025
2026 int
nxt_unit_response_init(nxt_unit_request_info_t * req,uint16_t status,uint32_t max_fields_count,uint32_t max_fields_size)2027 nxt_unit_response_init(nxt_unit_request_info_t *req,
2028 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2029 {
2030 uint32_t buf_size;
2031 nxt_unit_buf_t *buf;
2032 nxt_unit_request_info_impl_t *req_impl;
2033
2034 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2035
2036 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2037 nxt_unit_req_warn(req, "init: response already sent");
2038
2039 return NXT_UNIT_ERROR;
2040 }
2041
2042 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2043 (int) max_fields_count, (int) max_fields_size);
2044
2045 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2046 nxt_unit_req_debug(req, "duplicate response init");
2047 }
2048
2049 /*
2050 * Each field name and value 0-terminated by libunit,
2051 * this is the reason of '+ 2' below.
2052 */
2053 buf_size = sizeof(nxt_unit_response_t)
2054 + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2055 + max_fields_size;
2056
2057 if (nxt_slow_path(req->response_buf != NULL)) {
2058 buf = req->response_buf;
2059
2060 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2061 goto init_response;
2062 }
2063
2064 nxt_unit_buf_free(buf);
2065
2066 req->response_buf = NULL;
2067 req->response = NULL;
2068 req->response_max_fields = 0;
2069
2070 req_impl->state = NXT_UNIT_RS_START;
2071 }
2072
2073 buf = nxt_unit_response_buf_alloc(req, buf_size);
2074 if (nxt_slow_path(buf == NULL)) {
2075 return NXT_UNIT_ERROR;
2076 }
2077
2078 init_response:
2079
2080 memset(buf->start, 0, sizeof(nxt_unit_response_t));
2081
2082 req->response_buf = buf;
2083
2084 req->response = (nxt_unit_response_t *) buf->start;
2085 req->response->status = status;
2086
2087 buf->free = buf->start + sizeof(nxt_unit_response_t)
2088 + max_fields_count * sizeof(nxt_unit_field_t);
2089
2090 req->response_max_fields = max_fields_count;
2091 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2092
2093 return NXT_UNIT_OK;
2094 }
2095
2096
2097 int
nxt_unit_response_realloc(nxt_unit_request_info_t * req,uint32_t max_fields_count,uint32_t max_fields_size)2098 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2099 uint32_t max_fields_count, uint32_t max_fields_size)
2100 {
2101 char *p;
2102 uint32_t i, buf_size;
2103 nxt_unit_buf_t *buf;
2104 nxt_unit_field_t *f, *src;
2105 nxt_unit_response_t *resp;
2106 nxt_unit_request_info_impl_t *req_impl;
2107
2108 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2109
2110 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2111 nxt_unit_req_warn(req, "realloc: response not init");
2112
2113 return NXT_UNIT_ERROR;
2114 }
2115
2116 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2117 nxt_unit_req_warn(req, "realloc: response already sent");
2118
2119 return NXT_UNIT_ERROR;
2120 }
2121
2122 if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2123 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2124
2125 return NXT_UNIT_ERROR;
2126 }
2127
2128 /*
2129 * Each field name and value 0-terminated by libunit,
2130 * this is the reason of '+ 2' below.
2131 */
2132 buf_size = sizeof(nxt_unit_response_t)
2133 + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2134 + max_fields_size;
2135
2136 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2137
2138 buf = nxt_unit_response_buf_alloc(req, buf_size);
2139 if (nxt_slow_path(buf == NULL)) {
2140 nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2141 return NXT_UNIT_ERROR;
2142 }
2143
2144 resp = (nxt_unit_response_t *) buf->start;
2145
2146 memset(resp, 0, sizeof(nxt_unit_response_t));
2147
2148 resp->status = req->response->status;
2149 resp->content_length = req->response->content_length;
2150
2151 p = buf->start + sizeof(nxt_unit_response_t)
2152 + max_fields_count * sizeof(nxt_unit_field_t);
2153 f = resp->fields;
2154
2155 for (i = 0; i < req->response->fields_count; i++) {
2156 src = req->response->fields + i;
2157
2158 if (nxt_slow_path(src->skip != 0)) {
2159 continue;
2160 }
2161
2162 if (nxt_slow_path(src->name_length + src->value_length + 2
2163 > (uint32_t) (buf->end - p)))
2164 {
2165 nxt_unit_req_warn(req, "realloc: not enough space for field"
2166 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2167 i, src, src->name_length, src->value_length);
2168
2169 goto fail;
2170 }
2171
2172 nxt_unit_sptr_set(&f->name, p);
2173 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2174 *p++ = '\0';
2175
2176 nxt_unit_sptr_set(&f->value, p);
2177 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2178 *p++ = '\0';
2179
2180 f->hash = src->hash;
2181 f->skip = 0;
2182 f->name_length = src->name_length;
2183 f->value_length = src->value_length;
2184
2185 resp->fields_count++;
2186 f++;
2187 }
2188
2189 if (req->response->piggyback_content_length > 0) {
2190 if (nxt_slow_path(req->response->piggyback_content_length
2191 > (uint32_t) (buf->end - p)))
2192 {
2193 nxt_unit_req_warn(req, "realloc: not enought space for content"
2194 " #%"PRIu32", %"PRIu32" required",
2195 i, req->response->piggyback_content_length);
2196
2197 goto fail;
2198 }
2199
2200 resp->piggyback_content_length =
2201 req->response->piggyback_content_length;
2202
2203 nxt_unit_sptr_set(&resp->piggyback_content, p);
2204 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2205 req->response->piggyback_content_length);
2206 }
2207
2208 buf->free = p;
2209
2210 nxt_unit_buf_free(req->response_buf);
2211
2212 req->response = resp;
2213 req->response_buf = buf;
2214 req->response_max_fields = max_fields_count;
2215
2216 return NXT_UNIT_OK;
2217
2218 fail:
2219
2220 nxt_unit_buf_free(buf);
2221
2222 return NXT_UNIT_ERROR;
2223 }
2224
2225
2226 int
nxt_unit_response_is_init(nxt_unit_request_info_t * req)2227 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2228 {
2229 nxt_unit_request_info_impl_t *req_impl;
2230
2231 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2232
2233 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2234 }
2235
2236
2237 int
nxt_unit_response_add_field(nxt_unit_request_info_t * req,const char * name,uint8_t name_length,const char * value,uint32_t value_length)2238 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2239 const char *name, uint8_t name_length,
2240 const char *value, uint32_t value_length)
2241 {
2242 nxt_unit_buf_t *buf;
2243 nxt_unit_field_t *f;
2244 nxt_unit_response_t *resp;
2245 nxt_unit_request_info_impl_t *req_impl;
2246
2247 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2248
2249 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2250 nxt_unit_req_warn(req, "add_field: response not initialized or "
2251 "already sent");
2252
2253 return NXT_UNIT_ERROR;
2254 }
2255
2256 resp = req->response;
2257
2258 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2259 nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2260 (int) resp->fields_count);
2261
2262 return NXT_UNIT_ERROR;
2263 }
2264
2265 buf = req->response_buf;
2266
2267 if (nxt_slow_path(name_length + value_length + 2
2268 > (uint32_t) (buf->end - buf->free)))
2269 {
2270 nxt_unit_req_warn(req, "add_field: response buffer overflow");
2271
2272 return NXT_UNIT_ERROR;
2273 }
2274
2275 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2276 resp->fields_count,
2277 (int) name_length, name,
2278 (int) value_length, value);
2279
2280 f = resp->fields + resp->fields_count;
2281
2282 nxt_unit_sptr_set(&f->name, buf->free);
2283 buf->free = nxt_cpymem(buf->free, name, name_length);
2284 *buf->free++ = '\0';
2285
2286 nxt_unit_sptr_set(&f->value, buf->free);
2287 buf->free = nxt_cpymem(buf->free, value, value_length);
2288 *buf->free++ = '\0';
2289
2290 f->hash = nxt_unit_field_hash(name, name_length);
2291 f->skip = 0;
2292 f->name_length = name_length;
2293 f->value_length = value_length;
2294
2295 resp->fields_count++;
2296
2297 return NXT_UNIT_OK;
2298 }
2299
2300
2301 int
nxt_unit_response_add_content(nxt_unit_request_info_t * req,const void * src,uint32_t size)2302 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2303 const void* src, uint32_t size)
2304 {
2305 nxt_unit_buf_t *buf;
2306 nxt_unit_response_t *resp;
2307 nxt_unit_request_info_impl_t *req_impl;
2308
2309 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2310
2311 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2312 nxt_unit_req_warn(req, "add_content: response not initialized yet");
2313
2314 return NXT_UNIT_ERROR;
2315 }
2316
2317 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2318 nxt_unit_req_warn(req, "add_content: response already sent");
2319
2320 return NXT_UNIT_ERROR;
2321 }
2322
2323 buf = req->response_buf;
2324
2325 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2326 nxt_unit_req_warn(req, "add_content: buffer overflow");
2327
2328 return NXT_UNIT_ERROR;
2329 }
2330
2331 resp = req->response;
2332
2333 if (resp->piggyback_content_length == 0) {
2334 nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2335 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2336 }
2337
2338 resp->piggyback_content_length += size;
2339
2340 buf->free = nxt_cpymem(buf->free, src, size);
2341
2342 return NXT_UNIT_OK;
2343 }
2344
2345
2346 int
nxt_unit_response_send(nxt_unit_request_info_t * req)2347 nxt_unit_response_send(nxt_unit_request_info_t *req)
2348 {
2349 int rc;
2350 nxt_unit_mmap_buf_t *mmap_buf;
2351 nxt_unit_request_info_impl_t *req_impl;
2352
2353 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2354
2355 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2356 nxt_unit_req_warn(req, "send: response is not initialized yet");
2357
2358 return NXT_UNIT_ERROR;
2359 }
2360
2361 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2362 nxt_unit_req_warn(req, "send: response already sent");
2363
2364 return NXT_UNIT_ERROR;
2365 }
2366
2367 if (req->request->websocket_handshake && req->response->status == 101) {
2368 nxt_unit_response_upgrade(req);
2369 }
2370
2371 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2372 req->response->fields_count,
2373 (int) (req->response_buf->free
2374 - req->response_buf->start));
2375
2376 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2377
2378 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2379 if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2380 req->response = NULL;
2381 req->response_buf = NULL;
2382 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2383
2384 nxt_unit_mmap_buf_free(mmap_buf);
2385 }
2386
2387 return rc;
2388 }
2389
2390
2391 int
nxt_unit_response_is_sent(nxt_unit_request_info_t * req)2392 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2393 {
2394 nxt_unit_request_info_impl_t *req_impl;
2395
2396 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2397
2398 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2399 }
2400
2401
2402 nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t * req,uint32_t size)2403 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2404 {
2405 int rc;
2406 nxt_unit_mmap_buf_t *mmap_buf;
2407 nxt_unit_request_info_impl_t *req_impl;
2408
2409 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2410 nxt_unit_req_warn(req, "response_buf_alloc: "
2411 "requested buffer (%"PRIu32") too big", size);
2412
2413 return NULL;
2414 }
2415
2416 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2417
2418 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2419
2420 mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2421 if (nxt_slow_path(mmap_buf == NULL)) {
2422 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2423
2424 return NULL;
2425 }
2426
2427 mmap_buf->req = req;
2428
2429 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2430
2431 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2432 size, size, mmap_buf,
2433 NULL);
2434 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2435 nxt_unit_mmap_buf_release(mmap_buf);
2436
2437 nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2438
2439 return NULL;
2440 }
2441
2442 return &mmap_buf->buf;
2443 }
2444
2445
2446 static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t * ctx)2447 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2448 {
2449 nxt_unit_mmap_buf_t *mmap_buf;
2450 nxt_unit_ctx_impl_t *ctx_impl;
2451
2452 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2453
2454 pthread_mutex_lock(&ctx_impl->mutex);
2455
2456 if (ctx_impl->free_buf == NULL) {
2457 pthread_mutex_unlock(&ctx_impl->mutex);
2458
2459 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2460 if (nxt_slow_path(mmap_buf == NULL)) {
2461 return NULL;
2462 }
2463
2464 } else {
2465 mmap_buf = ctx_impl->free_buf;
2466
2467 nxt_unit_mmap_buf_unlink(mmap_buf);
2468
2469 pthread_mutex_unlock(&ctx_impl->mutex);
2470 }
2471
2472 mmap_buf->ctx_impl = ctx_impl;
2473
2474 mmap_buf->hdr = NULL;
2475 mmap_buf->free_ptr = NULL;
2476
2477 return mmap_buf;
2478 }
2479
2480
2481 static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t * mmap_buf)2482 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2483 {
2484 nxt_unit_mmap_buf_unlink(mmap_buf);
2485
2486 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2487
2488 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2489
2490 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2491 }
2492
2493
2494 int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t * req)2495 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2496 {
2497 return req->request->websocket_handshake;
2498 }
2499
2500
2501 int
nxt_unit_response_upgrade(nxt_unit_request_info_t * req)2502 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2503 {
2504 int rc;
2505 nxt_unit_request_info_impl_t *req_impl;
2506
2507 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2508
2509 if (nxt_slow_path(req_impl->websocket != 0)) {
2510 nxt_unit_req_debug(req, "upgrade: already upgraded");
2511
2512 return NXT_UNIT_OK;
2513 }
2514
2515 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2516 nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2517
2518 return NXT_UNIT_ERROR;
2519 }
2520
2521 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2522 nxt_unit_req_warn(req, "upgrade: response already sent");
2523
2524 return NXT_UNIT_ERROR;
2525 }
2526
2527 rc = nxt_unit_request_hash_add(req->ctx, req);
2528 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2529 nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2530
2531 return NXT_UNIT_ERROR;
2532 }
2533
2534 req_impl->websocket = 1;
2535
2536 req->response->status = 101;
2537
2538 return NXT_UNIT_OK;
2539 }
2540
2541
2542 int
nxt_unit_response_is_websocket(nxt_unit_request_info_t * req)2543 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2544 {
2545 nxt_unit_request_info_impl_t *req_impl;
2546
2547 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2548
2549 return req_impl->websocket;
2550 }
2551
2552
2553 nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void * data)2554 nxt_unit_get_request_info_from_data(void *data)
2555 {
2556 nxt_unit_request_info_impl_t *req_impl;
2557
2558 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2559
2560 return &req_impl->req;
2561 }
2562
2563
2564 int
nxt_unit_buf_send(nxt_unit_buf_t * buf)2565 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2566 {
2567 int rc;
2568 nxt_unit_mmap_buf_t *mmap_buf;
2569 nxt_unit_request_info_t *req;
2570 nxt_unit_request_info_impl_t *req_impl;
2571
2572 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2573
2574 req = mmap_buf->req;
2575 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2576
2577 nxt_unit_req_debug(req, "buf_send: %d bytes",
2578 (int) (buf->free - buf->start));
2579
2580 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2581 nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2582
2583 return NXT_UNIT_ERROR;
2584 }
2585
2586 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2587 nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2588
2589 return NXT_UNIT_ERROR;
2590 }
2591
2592 if (nxt_fast_path(buf->free > buf->start)) {
2593 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2594 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2595 return rc;
2596 }
2597 }
2598
2599 nxt_unit_mmap_buf_free(mmap_buf);
2600
2601 return NXT_UNIT_OK;
2602 }
2603
2604
2605 static void
nxt_unit_buf_send_done(nxt_unit_buf_t * buf)2606 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2607 {
2608 int rc;
2609 nxt_unit_mmap_buf_t *mmap_buf;
2610 nxt_unit_request_info_t *req;
2611
2612 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2613
2614 req = mmap_buf->req;
2615
2616 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2617 if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2618 nxt_unit_mmap_buf_free(mmap_buf);
2619
2620 nxt_unit_request_info_release(req);
2621
2622 } else {
2623 nxt_unit_request_done(req, rc);
2624 }
2625 }
2626
2627
2628 static int
nxt_unit_mmap_buf_send(nxt_unit_request_info_t * req,nxt_unit_mmap_buf_t * mmap_buf,int last)2629 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2630 nxt_unit_mmap_buf_t *mmap_buf, int last)
2631 {
2632 struct {
2633 nxt_port_msg_t msg;
2634 nxt_port_mmap_msg_t mmap_msg;
2635 } m;
2636
2637 int rc;
2638 u_char *last_used, *first_free;
2639 ssize_t res;
2640 nxt_chunk_id_t first_free_chunk;
2641 nxt_unit_buf_t *buf;
2642 nxt_unit_impl_t *lib;
2643 nxt_port_mmap_header_t *hdr;
2644 nxt_unit_request_info_impl_t *req_impl;
2645
2646 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2647 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2648
2649 buf = &mmap_buf->buf;
2650 hdr = mmap_buf->hdr;
2651
2652 m.mmap_msg.size = buf->free - buf->start;
2653
2654 m.msg.stream = req_impl->stream;
2655 m.msg.pid = lib->pid;
2656 m.msg.reply_port = 0;
2657 m.msg.type = _NXT_PORT_MSG_DATA;
2658 m.msg.last = last != 0;
2659 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2660 m.msg.nf = 0;
2661 m.msg.mf = 0;
2662
2663 rc = NXT_UNIT_ERROR;
2664
2665 if (m.msg.mmap) {
2666 m.mmap_msg.mmap_id = hdr->id;
2667 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2668 (u_char *) buf->start);
2669
2670 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2671 req_impl->stream,
2672 (int) m.mmap_msg.mmap_id,
2673 (int) m.mmap_msg.chunk_id,
2674 (int) m.mmap_msg.size);
2675
2676 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2677 NULL);
2678 if (nxt_slow_path(res != sizeof(m))) {
2679 goto free_buf;
2680 }
2681
2682 last_used = (u_char *) buf->free - 1;
2683 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2684
2685 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2686 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2687
2688 buf->start = (char *) first_free;
2689 buf->free = buf->start;
2690
2691 if (buf->end < buf->start) {
2692 buf->end = buf->start;
2693 }
2694
2695 } else {
2696 buf->start = NULL;
2697 buf->free = NULL;
2698 buf->end = NULL;
2699
2700 mmap_buf->hdr = NULL;
2701 }
2702
2703 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2704 (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2705
2706 nxt_unit_debug(req->ctx, "allocated_chunks %d",
2707 (int) lib->outgoing.allocated_chunks);
2708
2709 } else {
2710 if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2711 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2712 {
2713 nxt_unit_alert(req->ctx,
2714 "#%"PRIu32": failed to send plain memory buffer"
2715 ": no space reserved for message header",
2716 req_impl->stream);
2717
2718 goto free_buf;
2719 }
2720
2721 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2722
2723 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2724 req_impl->stream,
2725 (int) (sizeof(m.msg) + m.mmap_msg.size));
2726
2727 res = nxt_unit_port_send(req->ctx, req->response_port,
2728 buf->start - sizeof(m.msg),
2729 m.mmap_msg.size + sizeof(m.msg), NULL);
2730
2731 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2732 goto free_buf;
2733 }
2734 }
2735
2736 rc = NXT_UNIT_OK;
2737
2738 free_buf:
2739
2740 nxt_unit_free_outgoing_buf(mmap_buf);
2741
2742 return rc;
2743 }
2744
2745
2746 void
nxt_unit_buf_free(nxt_unit_buf_t * buf)2747 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2748 {
2749 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2750 }
2751
2752
2753 static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t * mmap_buf)2754 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2755 {
2756 nxt_unit_free_outgoing_buf(mmap_buf);
2757
2758 nxt_unit_mmap_buf_release(mmap_buf);
2759 }
2760
2761
2762 static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t * mmap_buf)2763 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2764 {
2765 if (mmap_buf->hdr != NULL) {
2766 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2767 mmap_buf->hdr, mmap_buf->buf.start,
2768 mmap_buf->buf.end - mmap_buf->buf.start);
2769
2770 mmap_buf->hdr = NULL;
2771
2772 return;
2773 }
2774
2775 if (mmap_buf->free_ptr != NULL) {
2776 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2777
2778 mmap_buf->free_ptr = NULL;
2779 }
2780 }
2781
2782
2783 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t * ctx)2784 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2785 {
2786 nxt_unit_ctx_impl_t *ctx_impl;
2787 nxt_unit_read_buf_t *rbuf;
2788
2789 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2790
2791 pthread_mutex_lock(&ctx_impl->mutex);
2792
2793 rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2794
2795 pthread_mutex_unlock(&ctx_impl->mutex);
2796
2797 rbuf->oob.size = 0;
2798
2799 return rbuf;
2800 }
2801
2802
2803 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t * ctx_impl)2804 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2805 {
2806 nxt_queue_link_t *link;
2807 nxt_unit_read_buf_t *rbuf;
2808
2809 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2810 link = nxt_queue_first(&ctx_impl->free_rbuf);
2811 nxt_queue_remove(link);
2812
2813 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2814
2815 return rbuf;
2816 }
2817
2818 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2819
2820 if (nxt_fast_path(rbuf != NULL)) {
2821 rbuf->ctx_impl = ctx_impl;
2822 }
2823
2824 return rbuf;
2825 }
2826
2827
2828 static void
nxt_unit_read_buf_release(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)2829 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2830 nxt_unit_read_buf_t *rbuf)
2831 {
2832 nxt_unit_ctx_impl_t *ctx_impl;
2833
2834 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2835
2836 pthread_mutex_lock(&ctx_impl->mutex);
2837
2838 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2839
2840 pthread_mutex_unlock(&ctx_impl->mutex);
2841 }
2842
2843
2844 nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t * buf)2845 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2846 {
2847 nxt_unit_mmap_buf_t *mmap_buf;
2848
2849 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2850
2851 if (mmap_buf->next == NULL) {
2852 return NULL;
2853 }
2854
2855 return &mmap_buf->next->buf;
2856 }
2857
2858
2859 uint32_t
nxt_unit_buf_max(void)2860 nxt_unit_buf_max(void)
2861 {
2862 return PORT_MMAP_DATA_SIZE;
2863 }
2864
2865
2866 uint32_t
nxt_unit_buf_min(void)2867 nxt_unit_buf_min(void)
2868 {
2869 return PORT_MMAP_CHUNK_SIZE;
2870 }
2871
2872
2873 int
nxt_unit_response_write(nxt_unit_request_info_t * req,const void * start,size_t size)2874 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2875 size_t size)
2876 {
2877 ssize_t res;
2878
2879 res = nxt_unit_response_write_nb(req, start, size, size);
2880
2881 return res < 0 ? -res : NXT_UNIT_OK;
2882 }
2883
2884
2885 ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t * req,const void * start,size_t size,size_t min_size)2886 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2887 size_t size, size_t min_size)
2888 {
2889 int rc;
2890 ssize_t sent;
2891 uint32_t part_size, min_part_size, buf_size;
2892 const char *part_start;
2893 nxt_unit_mmap_buf_t mmap_buf;
2894 nxt_unit_request_info_impl_t *req_impl;
2895 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2896
2897 nxt_unit_req_debug(req, "write: %d", (int) size);
2898
2899 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2900
2901 part_start = start;
2902 sent = 0;
2903
2904 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2905 nxt_unit_req_alert(req, "write: response not initialized yet");
2906
2907 return -NXT_UNIT_ERROR;
2908 }
2909
2910 /* Check if response is not send yet. */
2911 if (nxt_slow_path(req->response_buf != NULL)) {
2912 part_size = req->response_buf->end - req->response_buf->free;
2913 part_size = nxt_min(size, part_size);
2914
2915 rc = nxt_unit_response_add_content(req, part_start, part_size);
2916 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2917 return -rc;
2918 }
2919
2920 rc = nxt_unit_response_send(req);
2921 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2922 return -rc;
2923 }
2924
2925 size -= part_size;
2926 part_start += part_size;
2927 sent += part_size;
2928
2929 min_size -= nxt_min(min_size, part_size);
2930 }
2931
2932 while (size > 0) {
2933 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2934 min_part_size = nxt_min(min_size, part_size);
2935 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2936
2937 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2938 min_part_size, &mmap_buf, local_buf);
2939 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2940 return -rc;
2941 }
2942
2943 buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2944 if (nxt_slow_path(buf_size == 0)) {
2945 return sent;
2946 }
2947 part_size = nxt_min(buf_size, part_size);
2948
2949 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2950 part_start, part_size);
2951
2952 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2953 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2954 return -rc;
2955 }
2956
2957 size -= part_size;
2958 part_start += part_size;
2959 sent += part_size;
2960
2961 min_size -= nxt_min(min_size, part_size);
2962 }
2963
2964 return sent;
2965 }
2966
2967
2968 int
nxt_unit_response_write_cb(nxt_unit_request_info_t * req,nxt_unit_read_info_t * read_info)2969 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2970 nxt_unit_read_info_t *read_info)
2971 {
2972 int rc;
2973 ssize_t n;
2974 uint32_t buf_size;
2975 nxt_unit_buf_t *buf;
2976 nxt_unit_mmap_buf_t mmap_buf;
2977 nxt_unit_request_info_impl_t *req_impl;
2978 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2979
2980 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2981
2982 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2983 nxt_unit_req_alert(req, "write: response not initialized yet");
2984
2985 return NXT_UNIT_ERROR;
2986 }
2987
2988 /* Check if response is not send yet. */
2989 if (nxt_slow_path(req->response_buf != NULL)) {
2990
2991 /* Enable content in headers buf. */
2992 rc = nxt_unit_response_add_content(req, "", 0);
2993 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2994 nxt_unit_req_error(req, "Failed to add piggyback content");
2995
2996 return rc;
2997 }
2998
2999 buf = req->response_buf;
3000
3001 while (buf->end - buf->free > 0) {
3002 n = read_info->read(read_info, buf->free, buf->end - buf->free);
3003 if (nxt_slow_path(n < 0)) {
3004 nxt_unit_req_error(req, "Read error");
3005
3006 return NXT_UNIT_ERROR;
3007 }
3008
3009 /* Manually increase sizes. */
3010 buf->free += n;
3011 req->response->piggyback_content_length += n;
3012
3013 if (read_info->eof) {
3014 break;
3015 }
3016 }
3017
3018 rc = nxt_unit_response_send(req);
3019 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3020 nxt_unit_req_error(req, "Failed to send headers with content");
3021
3022 return rc;
3023 }
3024
3025 if (read_info->eof) {
3026 return NXT_UNIT_OK;
3027 }
3028 }
3029
3030 while (!read_info->eof) {
3031 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3032 read_info->buf_size);
3033
3034 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3035
3036 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3037 buf_size, buf_size,
3038 &mmap_buf, local_buf);
3039 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3040 return rc;
3041 }
3042
3043 buf = &mmap_buf.buf;
3044
3045 while (!read_info->eof && buf->end > buf->free) {
3046 n = read_info->read(read_info, buf->free, buf->end - buf->free);
3047 if (nxt_slow_path(n < 0)) {
3048 nxt_unit_req_error(req, "Read error");
3049
3050 nxt_unit_free_outgoing_buf(&mmap_buf);
3051
3052 return NXT_UNIT_ERROR;
3053 }
3054
3055 buf->free += n;
3056 }
3057
3058 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3059 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3060 nxt_unit_req_error(req, "Failed to send content");
3061
3062 return rc;
3063 }
3064 }
3065
3066 return NXT_UNIT_OK;
3067 }
3068
3069
3070 ssize_t
nxt_unit_request_read(nxt_unit_request_info_t * req,void * dst,size_t size)3071 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3072 {
3073 ssize_t buf_res, res;
3074
3075 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3076 dst, size);
3077
3078 if (buf_res < (ssize_t) size && req->content_fd != -1) {
3079 res = read(req->content_fd, dst, size);
3080 if (nxt_slow_path(res < 0)) {
3081 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3082 strerror(errno), errno);
3083
3084 return res;
3085 }
3086
3087 if (res < (ssize_t) size) {
3088 nxt_unit_close(req->content_fd);
3089
3090 req->content_fd = -1;
3091 }
3092
3093 req->content_length -= res;
3094
3095 dst = nxt_pointer_to(dst, res);
3096
3097 } else {
3098 res = 0;
3099 }
3100
3101 return buf_res + res;
3102 }
3103
3104
3105 ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t * req,size_t max_size)3106 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3107 {
3108 char *p;
3109 size_t l_size, b_size;
3110 nxt_unit_buf_t *b;
3111 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf;
3112
3113 if (req->content_length == 0) {
3114 return 0;
3115 }
3116
3117 l_size = 0;
3118
3119 b = req->content_buf;
3120
3121 while (b != NULL) {
3122 b_size = b->end - b->free;
3123 p = memchr(b->free, '\n', b_size);
3124
3125 if (p != NULL) {
3126 p++;
3127 l_size += p - b->free;
3128 break;
3129 }
3130
3131 l_size += b_size;
3132
3133 if (max_size <= l_size) {
3134 break;
3135 }
3136
3137 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3138 if (mmap_buf->next == NULL
3139 && req->content_fd != -1
3140 && l_size < req->content_length)
3141 {
3142 preread_buf = nxt_unit_request_preread(req, 16384);
3143 if (nxt_slow_path(preread_buf == NULL)) {
3144 return -1;
3145 }
3146
3147 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3148 }
3149
3150 b = nxt_unit_buf_next(b);
3151 }
3152
3153 return nxt_min(max_size, l_size);
3154 }
3155
3156
3157 static nxt_unit_mmap_buf_t *
nxt_unit_request_preread(nxt_unit_request_info_t * req,size_t size)3158 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3159 {
3160 ssize_t res;
3161 nxt_unit_mmap_buf_t *mmap_buf;
3162
3163 if (req->content_fd == -1) {
3164 nxt_unit_req_alert(req, "preread: content_fd == -1");
3165 return NULL;
3166 }
3167
3168 mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
3169 if (nxt_slow_path(mmap_buf == NULL)) {
3170 nxt_unit_req_alert(req, "preread: failed to allocate buf");
3171 return NULL;
3172 }
3173
3174 mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
3175 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3176 nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
3177 nxt_unit_mmap_buf_release(mmap_buf);
3178 return NULL;
3179 }
3180
3181 mmap_buf->plain_ptr = mmap_buf->free_ptr;
3182
3183 mmap_buf->hdr = NULL;
3184 mmap_buf->buf.start = mmap_buf->free_ptr;
3185 mmap_buf->buf.free = mmap_buf->buf.start;
3186 mmap_buf->buf.end = mmap_buf->buf.start + size;
3187
3188 res = read(req->content_fd, mmap_buf->free_ptr, size);
3189 if (res < 0) {
3190 nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3191 strerror(errno), errno);
3192
3193 nxt_unit_mmap_buf_free(mmap_buf);
3194
3195 return NULL;
3196 }
3197
3198 if (res < (ssize_t) size) {
3199 nxt_unit_close(req->content_fd);
3200
3201 req->content_fd = -1;
3202 }
3203
3204 nxt_unit_req_debug(req, "preread: read %d", (int) res);
3205
3206 mmap_buf->buf.end = mmap_buf->buf.free + res;
3207
3208 return mmap_buf;
3209 }
3210
3211
3212 static ssize_t
nxt_unit_buf_read(nxt_unit_buf_t ** b,uint64_t * len,void * dst,size_t size)3213 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
3214 {
3215 u_char *p;
3216 size_t rest, copy, read;
3217 nxt_unit_buf_t *buf, *last_buf;
3218
3219 p = dst;
3220 rest = size;
3221
3222 buf = *b;
3223 last_buf = buf;
3224
3225 while (buf != NULL) {
3226 last_buf = buf;
3227
3228 copy = buf->end - buf->free;
3229 copy = nxt_min(rest, copy);
3230
3231 p = nxt_cpymem(p, buf->free, copy);
3232
3233 buf->free += copy;
3234 rest -= copy;
3235
3236 if (rest == 0) {
3237 if (buf->end == buf->free) {
3238 buf = nxt_unit_buf_next(buf);
3239 }
3240
3241 break;
3242 }
3243
3244 buf = nxt_unit_buf_next(buf);
3245 }
3246
3247 *b = last_buf;
3248
3249 read = size - rest;
3250
3251 *len -= read;
3252
3253 return read;
3254 }
3255
3256
3257 void
nxt_unit_request_done(nxt_unit_request_info_t * req,int rc)3258 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
3259 {
3260 uint32_t size;
3261 nxt_port_msg_t msg;
3262 nxt_unit_impl_t *lib;
3263 nxt_unit_request_info_impl_t *req_impl;
3264
3265 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
3266
3267 nxt_unit_req_debug(req, "done: %d", rc);
3268
3269 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3270 goto skip_response_send;
3271 }
3272
3273 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
3274
3275 size = nxt_length("Content-Type") + nxt_length("text/plain");
3276
3277 rc = nxt_unit_response_init(req, 200, 1, size);
3278 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3279 goto skip_response_send;
3280 }
3281
3282 rc = nxt_unit_response_add_field(req, "Content-Type",
3283 nxt_length("Content-Type"),
3284 "text/plain", nxt_length("text/plain"));
3285 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3286 goto skip_response_send;
3287 }
3288 }
3289
3290 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
3291
3292 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
3293
3294 nxt_unit_buf_send_done(req->response_buf);
3295
3296 return;
3297 }
3298
3299 skip_response_send:
3300
3301 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
3302
3303 msg.stream = req_impl->stream;
3304 msg.pid = lib->pid;
3305 msg.reply_port = 0;
3306 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
3307 : _NXT_PORT_MSG_RPC_ERROR;
3308 msg.last = 1;
3309 msg.mmap = 0;
3310 msg.nf = 0;
3311 msg.mf = 0;
3312
3313 (void) nxt_unit_port_send(req->ctx, req->response_port,
3314 &msg, sizeof(msg), NULL);
3315
3316 nxt_unit_request_info_release(req);
3317 }
3318
3319
3320 int
nxt_unit_websocket_send(nxt_unit_request_info_t * req,uint8_t opcode,uint8_t last,const void * start,size_t size)3321 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
3322 uint8_t last, const void *start, size_t size)
3323 {
3324 const struct iovec iov = { (void *) start, size };
3325
3326 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
3327 }
3328
3329
3330 int
nxt_unit_websocket_sendv(nxt_unit_request_info_t * req,uint8_t opcode,uint8_t last,const struct iovec * iov,int iovcnt)3331 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
3332 uint8_t last, const struct iovec *iov, int iovcnt)
3333 {
3334 int i, rc;
3335 size_t l, copy;
3336 uint32_t payload_len, buf_size, alloc_size;
3337 const uint8_t *b;
3338 nxt_unit_buf_t *buf;
3339 nxt_unit_mmap_buf_t mmap_buf;
3340 nxt_websocket_header_t *wh;
3341 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
3342
3343 payload_len = 0;
3344
3345 for (i = 0; i < iovcnt; i++) {
3346 payload_len += iov[i].iov_len;
3347 }
3348
3349 buf_size = 10 + payload_len;
3350 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3351
3352 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3353 alloc_size, alloc_size,
3354 &mmap_buf, local_buf);
3355 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3356 return rc;
3357 }
3358
3359 buf = &mmap_buf.buf;
3360
3361 buf->start[0] = 0;
3362 buf->start[1] = 0;
3363
3364 buf_size -= buf->end - buf->start;
3365
3366 wh = (void *) buf->free;
3367
3368 buf->free = nxt_websocket_frame_init(wh, payload_len);
3369 wh->fin = last;
3370 wh->opcode = opcode;
3371
3372 for (i = 0; i < iovcnt; i++) {
3373 b = iov[i].iov_base;
3374 l = iov[i].iov_len;
3375
3376 while (l > 0) {
3377 copy = buf->end - buf->free;
3378 copy = nxt_min(l, copy);
3379
3380 buf->free = nxt_cpymem(buf->free, b, copy);
3381 b += copy;
3382 l -= copy;
3383
3384 if (l > 0) {
3385 if (nxt_fast_path(buf->free > buf->start)) {
3386 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3387
3388 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3389 return rc;
3390 }
3391 }
3392
3393 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3394
3395 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3396 alloc_size, alloc_size,
3397 &mmap_buf, local_buf);
3398 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3399 return rc;
3400 }
3401
3402 buf_size -= buf->end - buf->start;
3403 }
3404 }
3405 }
3406
3407 if (buf->free > buf->start) {
3408 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3409 }
3410
3411 return rc;
3412 }
3413
3414
3415 ssize_t
nxt_unit_websocket_read(nxt_unit_websocket_frame_t * ws,void * dst,size_t size)3416 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
3417 size_t size)
3418 {
3419 ssize_t res;
3420 uint8_t *b;
3421 uint64_t i, d;
3422
3423 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
3424 dst, size);
3425
3426 if (ws->mask == NULL) {
3427 return res;
3428 }
3429
3430 b = dst;
3431 d = (ws->payload_len - ws->content_length - res) % 4;
3432
3433 for (i = 0; i < (uint64_t) res; i++) {
3434 b[i] ^= ws->mask[ (i + d) % 4 ];
3435 }
3436
3437 return res;
3438 }
3439
3440
3441 int
nxt_unit_websocket_retain(nxt_unit_websocket_frame_t * ws)3442 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
3443 {
3444 char *b;
3445 size_t size, hsize;
3446 nxt_unit_websocket_frame_impl_t *ws_impl;
3447
3448 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
3449
3450 if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
3451 return NXT_UNIT_OK;
3452 }
3453
3454 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
3455
3456 b = nxt_unit_malloc(ws->req->ctx, size);
3457 if (nxt_slow_path(b == NULL)) {
3458 return NXT_UNIT_ERROR;
3459 }
3460
3461 memcpy(b, ws_impl->buf->buf.start, size);
3462
3463 hsize = nxt_websocket_frame_header_size(b);
3464
3465 ws_impl->buf->buf.start = b;
3466 ws_impl->buf->buf.free = b + hsize;
3467 ws_impl->buf->buf.end = b + size;
3468
3469 ws_impl->buf->free_ptr = b;
3470
3471 ws_impl->ws.header = (nxt_websocket_header_t *) b;
3472
3473 if (ws_impl->ws.header->mask) {
3474 ws_impl->ws.mask = (uint8_t *) b + hsize - 4;
3475
3476 } else {
3477 ws_impl->ws.mask = NULL;
3478 }
3479
3480 return NXT_UNIT_OK;
3481 }
3482
3483
3484 void
nxt_unit_websocket_done(nxt_unit_websocket_frame_t * ws)3485 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
3486 {
3487 nxt_unit_websocket_frame_release(ws);
3488 }
3489
3490
3491 static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_chunk_id_t * c,int * n,int min_n)3492 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3493 nxt_chunk_id_t *c, int *n, int min_n)
3494 {
3495 int res, nchunks, i;
3496 uint32_t outgoing_size;
3497 nxt_unit_mmap_t *mm, *mm_end;
3498 nxt_unit_impl_t *lib;
3499 nxt_port_mmap_header_t *hdr;
3500
3501 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3502
3503 pthread_mutex_lock(&lib->outgoing.mutex);
3504
3505 retry:
3506
3507 outgoing_size = lib->outgoing.size;
3508
3509 mm_end = lib->outgoing.elts + outgoing_size;
3510
3511 for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
3512 hdr = mm->hdr;
3513
3514 if (hdr->sent_over != 0xFFFFu
3515 && (hdr->sent_over != port->id.id
3516 || mm->src_thread != pthread_self()))
3517 {
3518 continue;
3519 }
3520
3521 *c = 0;
3522
3523 while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
3524 nchunks = 1;
3525
3526 while (nchunks < *n) {
3527 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
3528 *c + nchunks);
3529
3530 if (res == 0) {
3531 if (nchunks >= min_n) {
3532 *n = nchunks;
3533
3534 goto unlock;
3535 }
3536
3537 for (i = 0; i < nchunks; i++) {
3538 nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
3539 }
3540
3541 *c += nchunks + 1;
3542 nchunks = 0;
3543 break;
3544 }
3545
3546 nchunks++;
3547 }
3548
3549 if (nchunks >= min_n) {
3550 *n = nchunks;
3551
3552 goto unlock;
3553 }
3554 }
3555
3556 hdr->oosm = 1;
3557 }
3558
3559 if (outgoing_size >= lib->shm_mmap_limit) {
3560 /* Cannot allocate more shared memory. */
3561 pthread_mutex_unlock(&lib->outgoing.mutex);
3562
3563 if (min_n == 0) {
3564 *n = 0;
3565 }
3566
3567 if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
3568 >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
3569 {
3570 /* Memory allocated by application, but not send to router. */
3571 return NULL;
3572 }
3573
3574 /* Notify router about OOSM condition. */
3575
3576 res = nxt_unit_send_oosm(ctx, port);
3577 if (nxt_slow_path(res != NXT_UNIT_OK)) {
3578 return NULL;
3579 }
3580
3581 /* Return if caller can handle OOSM condition. Non-blocking mode. */
3582
3583 if (min_n == 0) {
3584 return NULL;
3585 }
3586
3587 nxt_unit_debug(ctx, "oosm: waiting for ACK");
3588
3589 res = nxt_unit_wait_shm_ack(ctx);
3590 if (nxt_slow_path(res != NXT_UNIT_OK)) {
3591 return NULL;
3592 }
3593
3594 nxt_unit_debug(ctx, "oosm: retry");
3595
3596 pthread_mutex_lock(&lib->outgoing.mutex);
3597
3598 goto retry;
3599 }
3600
3601 *c = 0;
3602 hdr = nxt_unit_new_mmap(ctx, port, *n);
3603
3604 unlock:
3605
3606 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
3607
3608 nxt_unit_debug(ctx, "allocated_chunks %d",
3609 (int) lib->outgoing.allocated_chunks);
3610
3611 pthread_mutex_unlock(&lib->outgoing.mutex);
3612
3613 return hdr;
3614 }
3615
3616
3617 static int
nxt_unit_send_oosm(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)3618 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3619 {
3620 ssize_t res;
3621 nxt_port_msg_t msg;
3622 nxt_unit_impl_t *lib;
3623
3624 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3625
3626 msg.stream = 0;
3627 msg.pid = lib->pid;
3628 msg.reply_port = 0;
3629 msg.type = _NXT_PORT_MSG_OOSM;
3630 msg.last = 0;
3631 msg.mmap = 0;
3632 msg.nf = 0;
3633 msg.mf = 0;
3634
3635 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
3636 if (nxt_slow_path(res != sizeof(msg))) {
3637 return NXT_UNIT_ERROR;
3638 }
3639
3640 return NXT_UNIT_OK;
3641 }
3642
3643
3644 static int
nxt_unit_wait_shm_ack(nxt_unit_ctx_t * ctx)3645 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3646 {
3647 int res;
3648 nxt_unit_ctx_impl_t *ctx_impl;
3649 nxt_unit_read_buf_t *rbuf;
3650
3651 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3652
3653 while (1) {
3654 rbuf = nxt_unit_read_buf_get(ctx);
3655 if (nxt_slow_path(rbuf == NULL)) {
3656 return NXT_UNIT_ERROR;
3657 }
3658
3659 do {
3660 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
3661 } while (res == NXT_UNIT_AGAIN);
3662
3663 if (res == NXT_UNIT_ERROR) {
3664 nxt_unit_read_buf_release(ctx, rbuf);
3665
3666 return NXT_UNIT_ERROR;
3667 }
3668
3669 if (nxt_unit_is_shm_ack(rbuf)) {
3670 nxt_unit_read_buf_release(ctx, rbuf);
3671 break;
3672 }
3673
3674 pthread_mutex_lock(&ctx_impl->mutex);
3675
3676 nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3677
3678 pthread_mutex_unlock(&ctx_impl->mutex);
3679
3680 if (nxt_unit_is_quit(rbuf)) {
3681 nxt_unit_debug(ctx, "oosm: quit received");
3682
3683 return NXT_UNIT_ERROR;
3684 }
3685 }
3686
3687 return NXT_UNIT_OK;
3688 }
3689
3690
3691 static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t * mmaps,uint32_t i)3692 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3693 {
3694 uint32_t cap, n;
3695 nxt_unit_mmap_t *e;
3696
3697 if (nxt_fast_path(mmaps->size > i)) {
3698 return mmaps->elts + i;
3699 }
3700
3701 cap = mmaps->cap;
3702
3703 if (cap == 0) {
3704 cap = i + 1;
3705 }
3706
3707 while (i + 1 > cap) {
3708
3709 if (cap < 16) {
3710 cap = cap * 2;
3711
3712 } else {
3713 cap = cap + cap / 2;
3714 }
3715 }
3716
3717 if (cap != mmaps->cap) {
3718
3719 e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
3720 if (nxt_slow_path(e == NULL)) {
3721 return NULL;
3722 }
3723
3724 mmaps->elts = e;
3725
3726 for (n = mmaps->cap; n < cap; n++) {
3727 e = mmaps->elts + n;
3728
3729 e->hdr = NULL;
3730 nxt_queue_init(&e->awaiting_rbuf);
3731 }
3732
3733 mmaps->cap = cap;
3734 }
3735
3736 if (i + 1 > mmaps->size) {
3737 mmaps->size = i + 1;
3738 }
3739
3740 return mmaps->elts + i;
3741 }
3742
3743
3744 static nxt_port_mmap_header_t *
nxt_unit_new_mmap(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,int n)3745 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3746 {
3747 int i, fd, rc;
3748 void *mem;
3749 nxt_unit_mmap_t *mm;
3750 nxt_unit_impl_t *lib;
3751 nxt_port_mmap_header_t *hdr;
3752
3753 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3754
3755 mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3756 if (nxt_slow_path(mm == NULL)) {
3757 nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3758
3759 return NULL;
3760 }
3761
3762 fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3763 if (nxt_slow_path(fd == -1)) {
3764 goto remove_fail;
3765 }
3766
3767 mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3768 if (nxt_slow_path(mem == MAP_FAILED)) {
3769 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3770 strerror(errno), errno);
3771
3772 nxt_unit_close(fd);
3773
3774 goto remove_fail;
3775 }
3776
3777 mm->hdr = mem;
3778 hdr = mem;
3779
3780 memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3781 memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3782
3783 hdr->id = lib->outgoing.size - 1;
3784 hdr->src_pid = lib->pid;
3785 hdr->dst_pid = port->id.pid;
3786 hdr->sent_over = port->id.id;
3787 mm->src_thread = pthread_self();
3788
3789 /* Mark first n chunk(s) as busy */
3790 for (i = 0; i < n; i++) {
3791 nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3792 }
3793
3794 /* Mark as busy chunk followed the last available chunk. */
3795 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3796 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3797
3798 pthread_mutex_unlock(&lib->outgoing.mutex);
3799
3800 rc = nxt_unit_send_mmap(ctx, port, fd);
3801 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3802 munmap(mem, PORT_MMAP_SIZE);
3803 hdr = NULL;
3804
3805 } else {
3806 nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3807 hdr->id, (int) lib->pid, (int) port->id.pid);
3808 }
3809
3810 nxt_unit_close(fd);
3811
3812 pthread_mutex_lock(&lib->outgoing.mutex);
3813
3814 if (nxt_fast_path(hdr != NULL)) {
3815 return hdr;
3816 }
3817
3818 remove_fail:
3819
3820 lib->outgoing.size--;
3821
3822 return NULL;
3823 }
3824
3825
3826 static int
nxt_unit_shm_open(nxt_unit_ctx_t * ctx,size_t size)3827 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
3828 {
3829 int fd;
3830
3831 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
3832 char name[64];
3833 nxt_unit_impl_t *lib;
3834
3835 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3836 snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3837 lib->pid, (void *) (uintptr_t) pthread_self());
3838 #endif
3839
3840 #if (NXT_HAVE_MEMFD_CREATE)
3841
3842 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3843 if (nxt_slow_path(fd == -1)) {
3844 nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3845 strerror(errno), errno);
3846
3847 return -1;
3848 }
3849
3850 nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3851
3852 #elif (NXT_HAVE_SHM_OPEN_ANON)
3853
3854 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3855 if (nxt_slow_path(fd == -1)) {
3856 nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3857 strerror(errno), errno);
3858
3859 return -1;
3860 }
3861
3862 #elif (NXT_HAVE_SHM_OPEN)
3863
3864 /* Just in case. */
3865 shm_unlink(name);
3866
3867 fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3868 if (nxt_slow_path(fd == -1)) {
3869 nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3870 strerror(errno), errno);
3871
3872 return -1;
3873 }
3874
3875 if (nxt_slow_path(shm_unlink(name) == -1)) {
3876 nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3877 strerror(errno), errno);
3878 }
3879
3880 #else
3881
3882 #error No working shared memory implementation.
3883
3884 #endif
3885
3886 if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3887 nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3888 strerror(errno), errno);
3889
3890 nxt_unit_close(fd);
3891
3892 return -1;
3893 }
3894
3895 return fd;
3896 }
3897
3898
3899 static int
nxt_unit_send_mmap(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,int fd)3900 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3901 {
3902 ssize_t res;
3903 nxt_send_oob_t oob;
3904 nxt_port_msg_t msg;
3905 nxt_unit_impl_t *lib;
3906 int fds[2] = {fd, -1};
3907
3908 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3909
3910 msg.stream = 0;
3911 msg.pid = lib->pid;
3912 msg.reply_port = 0;
3913 msg.type = _NXT_PORT_MSG_MMAP;
3914 msg.last = 0;
3915 msg.mmap = 0;
3916 msg.nf = 0;
3917 msg.mf = 0;
3918
3919 nxt_socket_msg_oob_init(&oob, fds);
3920
3921 res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &oob);
3922 if (nxt_slow_path(res != sizeof(msg))) {
3923 return NXT_UNIT_ERROR;
3924 }
3925
3926 return NXT_UNIT_OK;
3927 }
3928
3929
3930 static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,uint32_t size,uint32_t min_size,nxt_unit_mmap_buf_t * mmap_buf,char * local_buf)3931 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3932 uint32_t size, uint32_t min_size,
3933 nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3934 {
3935 int nchunks, min_nchunks;
3936 nxt_chunk_id_t c;
3937 nxt_port_mmap_header_t *hdr;
3938
3939 if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3940 if (local_buf != NULL) {
3941 mmap_buf->free_ptr = NULL;
3942 mmap_buf->plain_ptr = local_buf;
3943
3944 } else {
3945 mmap_buf->free_ptr = nxt_unit_malloc(ctx,
3946 size + sizeof(nxt_port_msg_t));
3947 if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3948 return NXT_UNIT_ERROR;
3949 }
3950
3951 mmap_buf->plain_ptr = mmap_buf->free_ptr;
3952 }
3953
3954 mmap_buf->hdr = NULL;
3955 mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3956 mmap_buf->buf.free = mmap_buf->buf.start;
3957 mmap_buf->buf.end = mmap_buf->buf.start + size;
3958
3959 nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3960 mmap_buf->buf.start, (int) size);
3961
3962 return NXT_UNIT_OK;
3963 }
3964
3965 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3966 min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3967
3968 hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
3969 if (nxt_slow_path(hdr == NULL)) {
3970 if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3971 mmap_buf->hdr = NULL;
3972 mmap_buf->buf.start = NULL;
3973 mmap_buf->buf.free = NULL;
3974 mmap_buf->buf.end = NULL;
3975 mmap_buf->free_ptr = NULL;
3976
3977 return NXT_UNIT_OK;
3978 }
3979
3980 return NXT_UNIT_ERROR;
3981 }
3982
3983 mmap_buf->hdr = hdr;
3984 mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3985 mmap_buf->buf.free = mmap_buf->buf.start;
3986 mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3987 mmap_buf->free_ptr = NULL;
3988 mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3989
3990 nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3991 (int) hdr->id, (int) c,
3992 (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3993
3994 return NXT_UNIT_OK;
3995 }
3996
3997
3998 static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t * ctx,pid_t pid,int fd)3999 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
4000 {
4001 int rc;
4002 void *mem;
4003 nxt_queue_t awaiting_rbuf;
4004 struct stat mmap_stat;
4005 nxt_unit_mmap_t *mm;
4006 nxt_unit_impl_t *lib;
4007 nxt_unit_ctx_impl_t *ctx_impl;
4008 nxt_unit_read_buf_t *rbuf;
4009 nxt_port_mmap_header_t *hdr;
4010
4011 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4012
4013 nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
4014
4015 if (fstat(fd, &mmap_stat) == -1) {
4016 nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
4017 strerror(errno), errno);
4018
4019 return NXT_UNIT_ERROR;
4020 }
4021
4022 mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
4023 MAP_SHARED, fd, 0);
4024 if (nxt_slow_path(mem == MAP_FAILED)) {
4025 nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
4026 strerror(errno), errno);
4027
4028 return NXT_UNIT_ERROR;
4029 }
4030
4031 hdr = mem;
4032
4033 if (nxt_slow_path(hdr->src_pid != pid)) {
4034
4035 nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
4036 "detected: %d != %d or %d != %d", (int) hdr->src_pid,
4037 (int) pid, (int) hdr->dst_pid, (int) lib->pid);
4038
4039 munmap(mem, PORT_MMAP_SIZE);
4040
4041 return NXT_UNIT_ERROR;
4042 }
4043
4044 nxt_queue_init(&awaiting_rbuf);
4045
4046 pthread_mutex_lock(&lib->incoming.mutex);
4047
4048 mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
4049 if (nxt_slow_path(mm == NULL)) {
4050 nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
4051
4052 munmap(mem, PORT_MMAP_SIZE);
4053
4054 rc = NXT_UNIT_ERROR;
4055
4056 } else {
4057 mm->hdr = hdr;
4058
4059 hdr->sent_over = 0xFFFFu;
4060
4061 nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
4062 nxt_queue_init(&mm->awaiting_rbuf);
4063
4064 rc = NXT_UNIT_OK;
4065 }
4066
4067 pthread_mutex_unlock(&lib->incoming.mutex);
4068
4069 nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
4070
4071 ctx_impl = rbuf->ctx_impl;
4072
4073 pthread_mutex_lock(&ctx_impl->mutex);
4074
4075 nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
4076
4077 pthread_mutex_unlock(&ctx_impl->mutex);
4078
4079 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
4080
4081 nxt_unit_awake_ctx(ctx, ctx_impl);
4082
4083 } nxt_queue_loop;
4084
4085 return rc;
4086 }
4087
4088
4089 static void
nxt_unit_awake_ctx(nxt_unit_ctx_t * ctx,nxt_unit_ctx_impl_t * ctx_impl)4090 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
4091 {
4092 nxt_port_msg_t msg;
4093
4094 if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
4095 return;
4096 }
4097
4098 if (nxt_slow_path(ctx_impl->read_port == NULL
4099 || ctx_impl->read_port->out_fd == -1))
4100 {
4101 nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
4102
4103 return;
4104 }
4105
4106 memset(&msg, 0, sizeof(nxt_port_msg_t));
4107
4108 msg.type = _NXT_PORT_MSG_RPC_READY;
4109
4110 (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
4111 &msg, sizeof(msg), NULL);
4112 }
4113
4114
4115 static int
nxt_unit_mmaps_init(nxt_unit_mmaps_t * mmaps)4116 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
4117 {
4118 mmaps->size = 0;
4119 mmaps->cap = 0;
4120 mmaps->elts = NULL;
4121 mmaps->allocated_chunks = 0;
4122
4123 return pthread_mutex_init(&mmaps->mutex, NULL);
4124 }
4125
4126
4127 nxt_inline void
nxt_unit_process_use(nxt_unit_process_t * process)4128 nxt_unit_process_use(nxt_unit_process_t *process)
4129 {
4130 nxt_atomic_fetch_add(&process->use_count, 1);
4131 }
4132
4133
4134 nxt_inline void
nxt_unit_process_release(nxt_unit_process_t * process)4135 nxt_unit_process_release(nxt_unit_process_t *process)
4136 {
4137 long c;
4138
4139 c = nxt_atomic_fetch_add(&process->use_count, -1);
4140
4141 if (c == 1) {
4142 nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
4143
4144 nxt_unit_free(NULL, process);
4145 }
4146 }
4147
4148
4149 static void
nxt_unit_mmaps_destroy(nxt_unit_mmaps_t * mmaps)4150 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
4151 {
4152 nxt_unit_mmap_t *mm, *end;
4153
4154 if (mmaps->elts != NULL) {
4155 end = mmaps->elts + mmaps->size;
4156
4157 for (mm = mmaps->elts; mm < end; mm++) {
4158 munmap(mm->hdr, PORT_MMAP_SIZE);
4159 }
4160
4161 nxt_unit_free(NULL, mmaps->elts);
4162 }
4163
4164 pthread_mutex_destroy(&mmaps->mutex);
4165 }
4166
4167
4168 static int
nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t * ctx,nxt_unit_mmaps_t * mmaps,pid_t pid,uint32_t id,nxt_port_mmap_header_t ** hdr,nxt_unit_read_buf_t * rbuf)4169 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
4170 pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
4171 nxt_unit_read_buf_t *rbuf)
4172 {
4173 int res, need_rbuf;
4174 nxt_unit_mmap_t *mm;
4175 nxt_unit_ctx_impl_t *ctx_impl;
4176
4177 mm = nxt_unit_mmap_at(mmaps, id);
4178 if (nxt_slow_path(mm == NULL)) {
4179 nxt_unit_alert(ctx, "failed to allocate mmap");
4180
4181 pthread_mutex_unlock(&mmaps->mutex);
4182
4183 *hdr = NULL;
4184
4185 return NXT_UNIT_ERROR;
4186 }
4187
4188 *hdr = mm->hdr;
4189
4190 if (nxt_fast_path(*hdr != NULL)) {
4191 return NXT_UNIT_OK;
4192 }
4193
4194 need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
4195
4196 nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
4197
4198 pthread_mutex_unlock(&mmaps->mutex);
4199
4200 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4201
4202 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
4203
4204 if (need_rbuf) {
4205 res = nxt_unit_get_mmap(ctx, pid, id);
4206 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
4207 return NXT_UNIT_ERROR;
4208 }
4209 }
4210
4211 return NXT_UNIT_AGAIN;
4212 }
4213
4214
4215 static int
nxt_unit_mmap_read(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_read_buf_t * rbuf)4216 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
4217 nxt_unit_read_buf_t *rbuf)
4218 {
4219 int res;
4220 void *start;
4221 uint32_t size;
4222 nxt_unit_impl_t *lib;
4223 nxt_unit_mmaps_t *mmaps;
4224 nxt_unit_mmap_buf_t *b, **incoming_tail;
4225 nxt_port_mmap_msg_t *mmap_msg, *end;
4226 nxt_port_mmap_header_t *hdr;
4227
4228 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
4229 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
4230 recv_msg->stream, (int) recv_msg->size);
4231
4232 return NXT_UNIT_ERROR;
4233 }
4234
4235 mmap_msg = recv_msg->start;
4236 end = nxt_pointer_to(recv_msg->start, recv_msg->size);
4237
4238 incoming_tail = &recv_msg->incoming_buf;
4239
4240 /* Allocating buffer structures. */
4241 for (; mmap_msg < end; mmap_msg++) {
4242 b = nxt_unit_mmap_buf_get(ctx);
4243 if (nxt_slow_path(b == NULL)) {
4244 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
4245 recv_msg->stream);
4246
4247 while (recv_msg->incoming_buf != NULL) {
4248 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4249 }
4250
4251 return NXT_UNIT_ERROR;
4252 }
4253
4254 nxt_unit_mmap_buf_insert(incoming_tail, b);
4255 incoming_tail = &b->next;
4256 }
4257
4258 b = recv_msg->incoming_buf;
4259 mmap_msg = recv_msg->start;
4260
4261 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4262
4263 mmaps = &lib->incoming;
4264
4265 pthread_mutex_lock(&mmaps->mutex);
4266
4267 for (; mmap_msg < end; mmap_msg++) {
4268 res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
4269 recv_msg->pid, mmap_msg->mmap_id,
4270 &hdr, rbuf);
4271
4272 if (nxt_slow_path(res != NXT_UNIT_OK)) {
4273 while (recv_msg->incoming_buf != NULL) {
4274 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4275 }
4276
4277 return res;
4278 }
4279
4280 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
4281 size = mmap_msg->size;
4282
4283 if (recv_msg->start == mmap_msg) {
4284 recv_msg->start = start;
4285 recv_msg->size = size;
4286 }
4287
4288 b->buf.start = start;
4289 b->buf.free = start;
4290 b->buf.end = b->buf.start + size;
4291 b->hdr = hdr;
4292
4293 b = b->next;
4294
4295 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
4296 recv_msg->stream,
4297 start, (int) size,
4298 (int) hdr->src_pid, (int) hdr->dst_pid,
4299 (int) hdr->id, (int) mmap_msg->chunk_id,
4300 (int) mmap_msg->size);
4301 }
4302
4303 pthread_mutex_unlock(&mmaps->mutex);
4304
4305 return NXT_UNIT_OK;
4306 }
4307
4308
4309 static int
nxt_unit_get_mmap(nxt_unit_ctx_t * ctx,pid_t pid,uint32_t id)4310 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
4311 {
4312 ssize_t res;
4313 nxt_unit_impl_t *lib;
4314 nxt_unit_ctx_impl_t *ctx_impl;
4315
4316 struct {
4317 nxt_port_msg_t msg;
4318 nxt_port_msg_get_mmap_t get_mmap;
4319 } m;
4320
4321 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4322 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4323
4324 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
4325
4326 m.msg.pid = lib->pid;
4327 m.msg.reply_port = ctx_impl->read_port->id.id;
4328 m.msg.type = _NXT_PORT_MSG_GET_MMAP;
4329
4330 m.get_mmap.id = id;
4331
4332 nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
4333
4334 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
4335 if (nxt_slow_path(res != sizeof(m))) {
4336 return NXT_UNIT_ERROR;
4337 }
4338
4339 return NXT_UNIT_OK;
4340 }
4341
4342
4343 static void
nxt_unit_mmap_release(nxt_unit_ctx_t * ctx,nxt_port_mmap_header_t * hdr,void * start,uint32_t size)4344 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
4345 void *start, uint32_t size)
4346 {
4347 int freed_chunks;
4348 u_char *p, *end;
4349 nxt_chunk_id_t c;
4350 nxt_unit_impl_t *lib;
4351
4352 memset(start, 0xA5, size);
4353
4354 p = start;
4355 end = p + size;
4356 c = nxt_port_mmap_chunk_id(hdr, p);
4357 freed_chunks = 0;
4358
4359 while (p < end) {
4360 nxt_port_mmap_set_chunk_free(hdr->free_map, c);
4361
4362 p += PORT_MMAP_CHUNK_SIZE;
4363 c++;
4364 freed_chunks++;
4365 }
4366
4367 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4368
4369 if (hdr->src_pid == lib->pid && freed_chunks != 0) {
4370 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
4371
4372 nxt_unit_debug(ctx, "allocated_chunks %d",
4373 (int) lib->outgoing.allocated_chunks);
4374 }
4375
4376 if (hdr->dst_pid == lib->pid
4377 && freed_chunks != 0
4378 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
4379 {
4380 nxt_unit_send_shm_ack(ctx, hdr->src_pid);
4381 }
4382 }
4383
4384
4385 static int
nxt_unit_send_shm_ack(nxt_unit_ctx_t * ctx,pid_t pid)4386 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
4387 {
4388 ssize_t res;
4389 nxt_port_msg_t msg;
4390 nxt_unit_impl_t *lib;
4391
4392 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4393
4394 msg.stream = 0;
4395 msg.pid = lib->pid;
4396 msg.reply_port = 0;
4397 msg.type = _NXT_PORT_MSG_SHM_ACK;
4398 msg.last = 0;
4399 msg.mmap = 0;
4400 msg.nf = 0;
4401 msg.mf = 0;
4402
4403 res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
4404 if (nxt_slow_path(res != sizeof(msg))) {
4405 return NXT_UNIT_ERROR;
4406 }
4407
4408 return NXT_UNIT_OK;
4409 }
4410
4411
4412 static nxt_int_t
nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t * lhq,void * data)4413 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
4414 {
4415 nxt_process_t *process;
4416
4417 process = data;
4418
4419 if (lhq->key.length == sizeof(pid_t)
4420 && *(pid_t *) lhq->key.start == process->pid)
4421 {
4422 return NXT_OK;
4423 }
4424
4425 return NXT_DECLINED;
4426 }
4427
4428
4429 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
4430 NXT_LVLHSH_DEFAULT,
4431 nxt_unit_lvlhsh_pid_test,
4432 nxt_unit_lvlhsh_alloc,
4433 nxt_unit_lvlhsh_free,
4434 };
4435
4436
4437 static inline void
nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t * lhq,pid_t * pid)4438 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
4439 {
4440 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
4441 lhq->key.length = sizeof(*pid);
4442 lhq->key.start = (u_char *) pid;
4443 lhq->proto = &lvlhsh_processes_proto;
4444 }
4445
4446
4447 static nxt_unit_process_t *
nxt_unit_process_get(nxt_unit_ctx_t * ctx,pid_t pid)4448 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
4449 {
4450 nxt_unit_impl_t *lib;
4451 nxt_unit_process_t *process;
4452 nxt_lvlhsh_query_t lhq;
4453
4454 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4455
4456 nxt_unit_process_lhq_pid(&lhq, &pid);
4457
4458 if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
4459 process = lhq.value;
4460 nxt_unit_process_use(process);
4461
4462 return process;
4463 }
4464
4465 process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t));
4466 if (nxt_slow_path(process == NULL)) {
4467 nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid);
4468
4469 return NULL;
4470 }
4471
4472 process->pid = pid;
4473 process->use_count = 2;
4474 process->next_port_id = 0;
4475 process->lib = lib;
4476
4477 nxt_queue_init(&process->ports);
4478
4479 lhq.replace = 0;
4480 lhq.value = process;
4481
4482 switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
4483
4484 case NXT_OK:
4485 break;
4486
4487 default:
4488 nxt_unit_alert(ctx, "process %d insert failed", (int) pid);
4489
4490 nxt_unit_free(ctx, process);
4491 process = NULL;
4492 break;
4493 }
4494
4495 return process;
4496 }
4497
4498
4499 static nxt_unit_process_t *
nxt_unit_process_find(nxt_unit_impl_t * lib,pid_t pid,int remove)4500 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4501 {
4502 int rc;
4503 nxt_lvlhsh_query_t lhq;
4504
4505 nxt_unit_process_lhq_pid(&lhq, &pid);
4506
4507 if (remove) {
4508 rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
4509
4510 } else {
4511 rc = nxt_lvlhsh_find(&lib->processes, &lhq);
4512 }
4513
4514 if (rc == NXT_OK) {
4515 if (!remove) {
4516 nxt_unit_process_use(lhq.value);
4517 }
4518
4519 return lhq.value;
4520 }
4521
4522 return NULL;
4523 }
4524
4525
4526 static nxt_unit_process_t *
nxt_unit_process_pop_first(nxt_unit_impl_t * lib)4527 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
4528 {
4529 return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
4530 }
4531
4532
4533 int
nxt_unit_run(nxt_unit_ctx_t * ctx)4534 nxt_unit_run(nxt_unit_ctx_t *ctx)
4535 {
4536 int rc;
4537 nxt_unit_ctx_impl_t *ctx_impl;
4538
4539 nxt_unit_ctx_use(ctx);
4540
4541 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4542
4543 rc = NXT_UNIT_OK;
4544
4545 while (nxt_fast_path(ctx_impl->online)) {
4546 rc = nxt_unit_run_once_impl(ctx);
4547
4548 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4549 nxt_unit_quit(ctx, NXT_QUIT_NORMAL);
4550 break;
4551 }
4552 }
4553
4554 nxt_unit_ctx_release(ctx);
4555
4556 return rc;
4557 }
4558
4559
4560 int
nxt_unit_run_once(nxt_unit_ctx_t * ctx)4561 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
4562 {
4563 int rc;
4564
4565 nxt_unit_ctx_use(ctx);
4566
4567 rc = nxt_unit_run_once_impl(ctx);
4568
4569 nxt_unit_ctx_release(ctx);
4570
4571 return rc;
4572 }
4573
4574
4575 static int
nxt_unit_run_once_impl(nxt_unit_ctx_t * ctx)4576 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
4577 {
4578 int rc;
4579 nxt_unit_read_buf_t *rbuf;
4580
4581 rbuf = nxt_unit_read_buf_get(ctx);
4582 if (nxt_slow_path(rbuf == NULL)) {
4583 return NXT_UNIT_ERROR;
4584 }
4585
4586 rc = nxt_unit_read_buf(ctx, rbuf);
4587 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4588 nxt_unit_read_buf_release(ctx, rbuf);
4589
4590 return rc;
4591 }
4592
4593 rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4594 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4595 return NXT_UNIT_ERROR;
4596 }
4597
4598 rc = nxt_unit_process_pending_rbuf(ctx);
4599 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4600 return NXT_UNIT_ERROR;
4601 }
4602
4603 nxt_unit_process_ready_req(ctx);
4604
4605 return rc;
4606 }
4607
4608
4609 static int
nxt_unit_read_buf(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)4610 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4611 {
4612 int nevents, res, err;
4613 nxt_uint_t nfds;
4614 nxt_unit_impl_t *lib;
4615 nxt_unit_ctx_impl_t *ctx_impl;
4616 nxt_unit_port_impl_t *port_impl;
4617 struct pollfd fds[2];
4618
4619 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4620
4621 if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) {
4622 return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4623 }
4624
4625 port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4626 port);
4627
4628 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4629
4630 retry:
4631
4632 if (port_impl->from_socket == 0) {
4633 res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
4634 if (res == NXT_UNIT_OK) {
4635 if (nxt_unit_is_read_socket(rbuf)) {
4636 port_impl->from_socket++;
4637
4638 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
4639 (int) ctx_impl->read_port->id.pid,
4640 (int) ctx_impl->read_port->id.id,
4641 port_impl->from_socket);
4642
4643 } else {
4644 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
4645 (int) ctx_impl->read_port->id.pid,
4646 (int) ctx_impl->read_port->id.id,
4647 (int) rbuf->size);
4648
4649 return NXT_UNIT_OK;
4650 }
4651 }
4652 }
4653
4654 if (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
4655 res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
4656 if (res == NXT_UNIT_OK) {
4657 return NXT_UNIT_OK;
4658 }
4659
4660 fds[1].fd = lib->shared_port->in_fd;
4661 fds[1].events = POLLIN;
4662
4663 nfds = 2;
4664
4665 } else {
4666 nfds = 1;
4667 }
4668
4669 fds[0].fd = ctx_impl->read_port->in_fd;
4670 fds[0].events = POLLIN;
4671 fds[0].revents = 0;
4672
4673 fds[1].revents = 0;
4674
4675 nevents = poll(fds, nfds, -1);
4676 if (nxt_slow_path(nevents == -1)) {
4677 err = errno;
4678
4679 if (err == EINTR) {
4680 goto retry;
4681 }
4682
4683 nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
4684 fds[0].fd, fds[1].fd, strerror(err), err);
4685
4686 rbuf->size = -1;
4687
4688 return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4689 }
4690
4691 nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04X, %04X]",
4692 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4693 fds[1].revents);
4694
4695 if ((fds[0].revents & POLLIN) != 0) {
4696 res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4697 if (res == NXT_UNIT_AGAIN) {
4698 goto retry;
4699 }
4700
4701 return res;
4702 }
4703
4704 if ((fds[1].revents & POLLIN) != 0) {
4705 res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4706 if (res == NXT_UNIT_AGAIN) {
4707 goto retry;
4708 }
4709
4710 return res;
4711 }
4712
4713 nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
4714 fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4715 fds[1].revents);
4716
4717 return NXT_UNIT_ERROR;
4718 }
4719
4720
4721 static int
nxt_unit_chk_ready(nxt_unit_ctx_t * ctx)4722 nxt_unit_chk_ready(nxt_unit_ctx_t *ctx)
4723 {
4724 nxt_unit_impl_t *lib;
4725 nxt_unit_ctx_impl_t *ctx_impl;
4726
4727 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4728 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4729
4730 return (ctx_impl->ready
4731 && (lib->request_limit == 0
4732 || lib->request_count < lib->request_limit));
4733 }
4734
4735
4736 static int
nxt_unit_process_pending_rbuf(nxt_unit_ctx_t * ctx)4737 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4738 {
4739 int rc;
4740 nxt_queue_t pending_rbuf;
4741 nxt_unit_ctx_impl_t *ctx_impl;
4742 nxt_unit_read_buf_t *rbuf;
4743
4744 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4745
4746 pthread_mutex_lock(&ctx_impl->mutex);
4747
4748 if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
4749 pthread_mutex_unlock(&ctx_impl->mutex);
4750
4751 return NXT_UNIT_OK;
4752 }
4753
4754 nxt_queue_init(&pending_rbuf);
4755
4756 nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
4757 nxt_queue_init(&ctx_impl->pending_rbuf);
4758
4759 pthread_mutex_unlock(&ctx_impl->mutex);
4760
4761 rc = NXT_UNIT_OK;
4762
4763 nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
4764
4765 if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
4766 rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
4767
4768 } else {
4769 nxt_unit_read_buf_release(ctx, rbuf);
4770 }
4771
4772 } nxt_queue_loop;
4773
4774 if (!ctx_impl->ready) {
4775 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
4776 }
4777
4778 return rc;
4779 }
4780
4781
4782 static void
nxt_unit_process_ready_req(nxt_unit_ctx_t * ctx)4783 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4784 {
4785 int res;
4786 nxt_queue_t ready_req;
4787 nxt_unit_impl_t *lib;
4788 nxt_unit_ctx_impl_t *ctx_impl;
4789 nxt_unit_request_info_t *req;
4790 nxt_unit_request_info_impl_t *req_impl;
4791
4792 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4793
4794 pthread_mutex_lock(&ctx_impl->mutex);
4795
4796 if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
4797 pthread_mutex_unlock(&ctx_impl->mutex);
4798
4799 return;
4800 }
4801
4802 nxt_queue_init(&ready_req);
4803
4804 nxt_queue_add(&ready_req, &ctx_impl->ready_req);
4805 nxt_queue_init(&ctx_impl->ready_req);
4806
4807 pthread_mutex_unlock(&ctx_impl->mutex);
4808
4809 nxt_queue_each(req_impl, &ready_req,
4810 nxt_unit_request_info_impl_t, port_wait_link)
4811 {
4812 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4813
4814 req = &req_impl->req;
4815
4816 res = nxt_unit_send_req_headers_ack(req);
4817 if (nxt_slow_path(res != NXT_UNIT_OK)) {
4818 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4819
4820 continue;
4821 }
4822
4823 if (req->content_length
4824 > (uint64_t) (req->content_buf->end - req->content_buf->free))
4825 {
4826 res = nxt_unit_request_hash_add(ctx, req);
4827 if (nxt_slow_path(res != NXT_UNIT_OK)) {
4828 nxt_unit_req_warn(req, "failed to add request to hash");
4829
4830 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4831
4832 continue;
4833 }
4834
4835 /*
4836 * If application have separate data handler, we may start
4837 * request processing and process data when it is arrived.
4838 */
4839 if (lib->callbacks.data_handler == NULL) {
4840 continue;
4841 }
4842 }
4843
4844 lib->callbacks.request_handler(&req_impl->req);
4845
4846 } nxt_queue_loop;
4847 }
4848
4849
4850 int
nxt_unit_run_ctx(nxt_unit_ctx_t * ctx)4851 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4852 {
4853 int rc;
4854 nxt_unit_read_buf_t *rbuf;
4855 nxt_unit_ctx_impl_t *ctx_impl;
4856
4857 nxt_unit_ctx_use(ctx);
4858
4859 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4860
4861 rc = NXT_UNIT_OK;
4862
4863 while (nxt_fast_path(ctx_impl->online)) {
4864 rbuf = nxt_unit_read_buf_get(ctx);
4865 if (nxt_slow_path(rbuf == NULL)) {
4866 rc = NXT_UNIT_ERROR;
4867 break;
4868 }
4869
4870 retry:
4871
4872 rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4873 if (rc == NXT_UNIT_AGAIN) {
4874 goto retry;
4875 }
4876
4877 rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4878 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4879 break;
4880 }
4881
4882 rc = nxt_unit_process_pending_rbuf(ctx);
4883 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4884 break;
4885 }
4886
4887 nxt_unit_process_ready_req(ctx);
4888 }
4889
4890 nxt_unit_ctx_release(ctx);
4891
4892 return rc;
4893 }
4894
4895
4896 nxt_inline int
nxt_unit_is_read_queue(nxt_unit_read_buf_t * rbuf)4897 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
4898 {
4899 nxt_port_msg_t *port_msg;
4900
4901 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4902 port_msg = (nxt_port_msg_t *) rbuf->buf;
4903
4904 return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
4905 }
4906
4907 return 0;
4908 }
4909
4910
4911 nxt_inline int
nxt_unit_is_read_socket(nxt_unit_read_buf_t * rbuf)4912 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
4913 {
4914 if (nxt_fast_path(rbuf->size == 1)) {
4915 return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
4916 }
4917
4918 return 0;
4919 }
4920
4921
4922 nxt_inline int
nxt_unit_is_shm_ack(nxt_unit_read_buf_t * rbuf)4923 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
4924 {
4925 nxt_port_msg_t *port_msg;
4926
4927 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4928 port_msg = (nxt_port_msg_t *) rbuf->buf;
4929
4930 return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
4931 }
4932
4933 return 0;
4934 }
4935
4936
4937 nxt_inline int
nxt_unit_is_quit(nxt_unit_read_buf_t * rbuf)4938 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
4939 {
4940 nxt_port_msg_t *port_msg;
4941
4942 if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4943 port_msg = (nxt_port_msg_t *) rbuf->buf;
4944
4945 return port_msg->type == _NXT_PORT_MSG_QUIT;
4946 }
4947
4948 return 0;
4949 }
4950
4951
4952 int
nxt_unit_run_shared(nxt_unit_ctx_t * ctx)4953 nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4954 {
4955 int rc;
4956 nxt_unit_impl_t *lib;
4957 nxt_unit_read_buf_t *rbuf;
4958
4959 nxt_unit_ctx_use(ctx);
4960
4961 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4962
4963 rc = NXT_UNIT_OK;
4964
4965 while (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
4966 rbuf = nxt_unit_read_buf_get(ctx);
4967 if (nxt_slow_path(rbuf == NULL)) {
4968 rc = NXT_UNIT_ERROR;
4969 break;
4970 }
4971
4972 retry:
4973
4974 rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4975 if (rc == NXT_UNIT_AGAIN) {
4976 goto retry;
4977 }
4978
4979 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4980 nxt_unit_read_buf_release(ctx, rbuf);
4981 break;
4982 }
4983
4984 rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4985 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4986 break;
4987 }
4988 }
4989
4990 nxt_unit_ctx_release(ctx);
4991
4992 return rc;
4993 }
4994
4995
4996 nxt_unit_request_info_t *
nxt_unit_dequeue_request(nxt_unit_ctx_t * ctx)4997 nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
4998 {
4999 int rc;
5000 nxt_unit_impl_t *lib;
5001 nxt_unit_read_buf_t *rbuf;
5002 nxt_unit_request_info_t *req;
5003
5004 nxt_unit_ctx_use(ctx);
5005
5006 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5007
5008 req = NULL;
5009
5010 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
5011 goto done;
5012 }
5013
5014 rbuf = nxt_unit_read_buf_get(ctx);
5015 if (nxt_slow_path(rbuf == NULL)) {
5016 goto done;
5017 }
5018
5019 rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
5020 if (rc != NXT_UNIT_OK) {
5021 nxt_unit_read_buf_release(ctx, rbuf);
5022 goto done;
5023 }
5024
5025 (void) nxt_unit_process_msg(ctx, rbuf, &req);
5026
5027 done:
5028
5029 nxt_unit_ctx_release(ctx);
5030
5031 return req;
5032 }
5033
5034
5035 int
nxt_unit_process_port_msg(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)5036 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5037 {
5038 int rc;
5039
5040 nxt_unit_ctx_use(ctx);
5041
5042 rc = nxt_unit_process_port_msg_impl(ctx, port);
5043
5044 nxt_unit_ctx_release(ctx);
5045
5046 return rc;
5047 }
5048
5049
5050 static int
nxt_unit_process_port_msg_impl(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)5051 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5052 {
5053 int rc;
5054 nxt_unit_impl_t *lib;
5055 nxt_unit_read_buf_t *rbuf;
5056
5057 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5058
5059 if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) {
5060 return NXT_UNIT_AGAIN;
5061 }
5062
5063 rbuf = nxt_unit_read_buf_get(ctx);
5064 if (nxt_slow_path(rbuf == NULL)) {
5065 return NXT_UNIT_ERROR;
5066 }
5067
5068 if (port == lib->shared_port) {
5069 rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
5070
5071 } else {
5072 rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
5073 }
5074
5075 if (rc != NXT_UNIT_OK) {
5076 nxt_unit_read_buf_release(ctx, rbuf);
5077 return rc;
5078 }
5079
5080 rc = nxt_unit_process_msg(ctx, rbuf, NULL);
5081 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5082 return NXT_UNIT_ERROR;
5083 }
5084
5085 rc = nxt_unit_process_pending_rbuf(ctx);
5086 if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5087 return NXT_UNIT_ERROR;
5088 }
5089
5090 nxt_unit_process_ready_req(ctx);
5091
5092 return rc;
5093 }
5094
5095
5096 void
nxt_unit_done(nxt_unit_ctx_t * ctx)5097 nxt_unit_done(nxt_unit_ctx_t *ctx)
5098 {
5099 nxt_unit_ctx_release(ctx);
5100 }
5101
5102
5103 nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t * ctx,void * data)5104 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
5105 {
5106 int rc, queue_fd;
5107 void *mem;
5108 nxt_unit_impl_t *lib;
5109 nxt_unit_port_t *port;
5110 nxt_unit_ctx_impl_t *new_ctx;
5111 nxt_unit_port_impl_t *port_impl;
5112
5113 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5114
5115 new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t)
5116 + lib->request_data_size);
5117 if (nxt_slow_path(new_ctx == NULL)) {
5118 nxt_unit_alert(ctx, "failed to allocate context");
5119
5120 return NULL;
5121 }
5122
5123 rc = nxt_unit_ctx_init(lib, new_ctx, data);
5124 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5125 nxt_unit_free(ctx, new_ctx);
5126
5127 return NULL;
5128 }
5129
5130 queue_fd = -1;
5131
5132 port = nxt_unit_create_port(&new_ctx->ctx);
5133 if (nxt_slow_path(port == NULL)) {
5134 goto fail;
5135 }
5136
5137 new_ctx->read_port = port;
5138
5139 queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
5140 if (nxt_slow_path(queue_fd == -1)) {
5141 goto fail;
5142 }
5143
5144 mem = mmap(NULL, sizeof(nxt_port_queue_t),
5145 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
5146 if (nxt_slow_path(mem == MAP_FAILED)) {
5147 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
5148 strerror(errno), errno);
5149
5150 goto fail;
5151 }
5152
5153 nxt_port_queue_init(mem);
5154
5155 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5156 port_impl->queue = mem;
5157
5158 rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
5159 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5160 goto fail;
5161 }
5162
5163 nxt_unit_close(queue_fd);
5164
5165 return &new_ctx->ctx;
5166
5167 fail:
5168
5169 if (queue_fd != -1) {
5170 nxt_unit_close(queue_fd);
5171 }
5172
5173 nxt_unit_ctx_release(&new_ctx->ctx);
5174
5175 return NULL;
5176 }
5177
5178
5179 static void
nxt_unit_ctx_free(nxt_unit_ctx_impl_t * ctx_impl)5180 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
5181 {
5182 nxt_unit_impl_t *lib;
5183 nxt_unit_mmap_buf_t *mmap_buf;
5184 nxt_unit_read_buf_t *rbuf;
5185 nxt_unit_request_info_impl_t *req_impl;
5186 nxt_unit_websocket_frame_impl_t *ws_impl;
5187
5188 lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
5189
5190 nxt_queue_each(req_impl, &ctx_impl->active_req,
5191 nxt_unit_request_info_impl_t, link)
5192 {
5193 nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
5194
5195 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
5196
5197 } nxt_queue_loop;
5198
5199 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
5200 nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
5201
5202 while (ctx_impl->free_buf != NULL) {
5203 mmap_buf = ctx_impl->free_buf;
5204 nxt_unit_mmap_buf_unlink(mmap_buf);
5205 nxt_unit_free(&ctx_impl->ctx, mmap_buf);
5206 }
5207
5208 nxt_queue_each(req_impl, &ctx_impl->free_req,
5209 nxt_unit_request_info_impl_t, link)
5210 {
5211 nxt_unit_request_info_free(req_impl);
5212
5213 } nxt_queue_loop;
5214
5215 nxt_queue_each(ws_impl, &ctx_impl->free_ws,
5216 nxt_unit_websocket_frame_impl_t, link)
5217 {
5218 nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl);
5219
5220 } nxt_queue_loop;
5221
5222 nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link)
5223 {
5224 if (rbuf != &ctx_impl->ctx_read_buf) {
5225 nxt_unit_free(&ctx_impl->ctx, rbuf);
5226 }
5227 } nxt_queue_loop;
5228
5229 pthread_mutex_destroy(&ctx_impl->mutex);
5230
5231 pthread_mutex_lock(&lib->mutex);
5232
5233 nxt_queue_remove(&ctx_impl->link);
5234
5235 pthread_mutex_unlock(&lib->mutex);
5236
5237 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5238 nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id);
5239 nxt_unit_port_release(ctx_impl->read_port);
5240 }
5241
5242 if (ctx_impl != &lib->main_ctx) {
5243 nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5244 }
5245
5246 nxt_unit_lib_release(lib);
5247 }
5248
5249
5250 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
5251 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
5252 #define NXT_UNIX_SOCKET SOCK_SEQPACKET
5253 #else
5254 #define NXT_UNIX_SOCKET SOCK_DGRAM
5255 #endif
5256
5257
5258 void
nxt_unit_port_id_init(nxt_unit_port_id_t * port_id,pid_t pid,uint16_t id)5259 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
5260 {
5261 nxt_unit_port_hash_id_t port_hash_id;
5262
5263 port_hash_id.pid = pid;
5264 port_hash_id.id = id;
5265
5266 port_id->pid = pid;
5267 port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
5268 port_id->id = id;
5269 }
5270
5271
5272 static nxt_unit_port_t *
nxt_unit_create_port(nxt_unit_ctx_t * ctx)5273 nxt_unit_create_port(nxt_unit_ctx_t *ctx)
5274 {
5275 int rc, port_sockets[2];
5276 nxt_unit_impl_t *lib;
5277 nxt_unit_port_t new_port, *port;
5278 nxt_unit_process_t *process;
5279
5280 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5281
5282 rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
5283 if (nxt_slow_path(rc != 0)) {
5284 nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
5285 strerror(errno), errno);
5286
5287 return NULL;
5288 }
5289
5290 #if (NXT_HAVE_SOCKOPT_SO_PASSCRED)
5291 int enable_creds = 1;
5292
5293 if (nxt_slow_path(setsockopt(port_sockets[0], SOL_SOCKET, SO_PASSCRED,
5294 &enable_creds, sizeof(enable_creds)) == -1))
5295 {
5296 nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
5297 return NULL;
5298 }
5299
5300 if (nxt_slow_path(setsockopt(port_sockets[1], SOL_SOCKET, SO_PASSCRED,
5301 &enable_creds, sizeof(enable_creds)) == -1))
5302 {
5303 nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
5304 return NULL;
5305 }
5306 #endif
5307
5308 nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
5309 port_sockets[0], port_sockets[1]);
5310
5311 pthread_mutex_lock(&lib->mutex);
5312
5313 process = nxt_unit_process_get(ctx, lib->pid);
5314 if (nxt_slow_path(process == NULL)) {
5315 pthread_mutex_unlock(&lib->mutex);
5316
5317 nxt_unit_close(port_sockets[0]);
5318 nxt_unit_close(port_sockets[1]);
5319
5320 return NULL;
5321 }
5322
5323 nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5324
5325 new_port.in_fd = port_sockets[0];
5326 new_port.out_fd = port_sockets[1];
5327 new_port.data = NULL;
5328
5329 pthread_mutex_unlock(&lib->mutex);
5330
5331 nxt_unit_process_release(process);
5332
5333 port = nxt_unit_add_port(ctx, &new_port, NULL);
5334 if (nxt_slow_path(port == NULL)) {
5335 nxt_unit_close(port_sockets[0]);
5336 nxt_unit_close(port_sockets[1]);
5337 }
5338
5339 return port;
5340 }
5341
5342
5343 static int
nxt_unit_send_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * dst,nxt_unit_port_t * port,int queue_fd)5344 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
5345 nxt_unit_port_t *port, int queue_fd)
5346 {
5347 ssize_t res;
5348 nxt_send_oob_t oob;
5349 nxt_unit_impl_t *lib;
5350 int fds[2] = { port->out_fd, queue_fd };
5351
5352 struct {
5353 nxt_port_msg_t msg;
5354 nxt_port_msg_new_port_t new_port;
5355 } m;
5356
5357 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5358
5359 m.msg.stream = 0;
5360 m.msg.pid = lib->pid;
5361 m.msg.reply_port = 0;
5362 m.msg.type = _NXT_PORT_MSG_NEW_PORT;
5363 m.msg.last = 0;
5364 m.msg.mmap = 0;
5365 m.msg.nf = 0;
5366 m.msg.mf = 0;
5367
5368 m.new_port.id = port->id.id;
5369 m.new_port.pid = port->id.pid;
5370 m.new_port.type = NXT_PROCESS_APP;
5371 m.new_port.max_size = 16 * 1024;
5372 m.new_port.max_share = 64 * 1024;
5373
5374 nxt_socket_msg_oob_init(&oob, fds);
5375
5376 res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &oob);
5377
5378 return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
5379 }
5380
5381
nxt_unit_port_use(nxt_unit_port_t * port)5382 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
5383 {
5384 nxt_unit_port_impl_t *port_impl;
5385
5386 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5387
5388 nxt_atomic_fetch_add(&port_impl->use_count, 1);
5389 }
5390
5391
nxt_unit_port_release(nxt_unit_port_t * port)5392 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
5393 {
5394 long c;
5395 nxt_unit_port_impl_t *port_impl;
5396
5397 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5398
5399 c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5400
5401 if (c == 1) {
5402 nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
5403 (int) port->id.pid, (int) port->id.id,
5404 port->in_fd, port->out_fd);
5405
5406 nxt_unit_process_release(port_impl->process);
5407
5408 if (port->in_fd != -1) {
5409 nxt_unit_close(port->in_fd);
5410
5411 port->in_fd = -1;
5412 }
5413
5414 if (port->out_fd != -1) {
5415 nxt_unit_close(port->out_fd);
5416
5417 port->out_fd = -1;
5418 }
5419
5420 if (port_impl->queue != NULL) {
5421 munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
5422 ? sizeof(nxt_app_queue_t)
5423 : sizeof(nxt_port_queue_t));
5424 }
5425
5426 nxt_unit_free(NULL, port_impl);
5427 }
5428 }
5429
5430
5431 static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,void * queue)5432 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
5433 {
5434 int rc, ready;
5435 nxt_queue_t awaiting_req;
5436 nxt_unit_impl_t *lib;
5437 nxt_unit_port_t *old_port;
5438 nxt_unit_process_t *process;
5439 nxt_unit_port_impl_t *new_port, *old_port_impl;
5440
5441 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5442
5443 pthread_mutex_lock(&lib->mutex);
5444
5445 old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
5446
5447 if (nxt_slow_path(old_port != NULL)) {
5448 nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
5449 "in_fd %d out_fd %d queue %p",
5450 port->id.pid, port->id.id,
5451 port->in_fd, port->out_fd, queue);
5452
5453 if (old_port->data == NULL) {
5454 old_port->data = port->data;
5455 port->data = NULL;
5456 }
5457
5458 if (old_port->in_fd == -1) {
5459 old_port->in_fd = port->in_fd;
5460 port->in_fd = -1;
5461 }
5462
5463 if (port->in_fd != -1) {
5464 nxt_unit_close(port->in_fd);
5465 port->in_fd = -1;
5466 }
5467
5468 if (old_port->out_fd == -1) {
5469 old_port->out_fd = port->out_fd;
5470 port->out_fd = -1;
5471 }
5472
5473 if (port->out_fd != -1) {
5474 nxt_unit_close(port->out_fd);
5475 port->out_fd = -1;
5476 }
5477
5478 *port = *old_port;
5479
5480 nxt_queue_init(&awaiting_req);
5481
5482 old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
5483
5484 if (old_port_impl->queue == NULL) {
5485 old_port_impl->queue = queue;
5486 }
5487
5488 ready = (port->in_fd != -1 || port->out_fd != -1);
5489
5490 /*
5491 * Port can be market as 'ready' only after callbacks.add_port() call.
5492 * Otherwise, request may try to use the port before callback.
5493 */
5494 if (lib->callbacks.add_port == NULL && ready) {
5495 old_port_impl->ready = ready;
5496
5497 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5498 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5499 nxt_queue_init(&old_port_impl->awaiting_req);
5500 }
5501 }
5502
5503 pthread_mutex_unlock(&lib->mutex);
5504
5505 if (lib->callbacks.add_port != NULL && ready) {
5506 lib->callbacks.add_port(ctx, old_port);
5507
5508 pthread_mutex_lock(&lib->mutex);
5509
5510 old_port_impl->ready = ready;
5511
5512 if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5513 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5514 nxt_queue_init(&old_port_impl->awaiting_req);
5515 }
5516
5517 pthread_mutex_unlock(&lib->mutex);
5518 }
5519
5520 nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5521
5522 return old_port;
5523 }
5524
5525 new_port = NULL;
5526 ready = 0;
5527
5528 nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
5529 port->id.pid, port->id.id,
5530 port->in_fd, port->out_fd, queue);
5531
5532 process = nxt_unit_process_get(ctx, port->id.pid);
5533 if (nxt_slow_path(process == NULL)) {
5534 goto unlock;
5535 }
5536
5537 if (port->id.id != NXT_UNIT_SHARED_PORT_ID
5538 && port->id.id >= process->next_port_id)
5539 {
5540 process->next_port_id = port->id.id + 1;
5541 }
5542
5543 new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
5544 if (nxt_slow_path(new_port == NULL)) {
5545 nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
5546 port->id.pid, port->id.id);
5547
5548 goto unlock;
5549 }
5550
5551 new_port->port = *port;
5552
5553 rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
5554 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5555 nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
5556 port->id.pid, port->id.id);
5557
5558 nxt_unit_free(ctx, new_port);
5559
5560 new_port = NULL;
5561
5562 goto unlock;
5563 }
5564
5565 nxt_queue_insert_tail(&process->ports, &new_port->link);
5566
5567 new_port->use_count = 2;
5568 new_port->process = process;
5569 new_port->queue = queue;
5570 new_port->from_socket = 0;
5571 new_port->socket_rbuf = NULL;
5572
5573 nxt_queue_init(&new_port->awaiting_req);
5574
5575 ready = (port->in_fd != -1 || port->out_fd != -1);
5576
5577 if (lib->callbacks.add_port == NULL) {
5578 new_port->ready = ready;
5579
5580 } else {
5581 new_port->ready = 0;
5582 }
5583
5584 process = NULL;
5585
5586 unlock:
5587
5588 pthread_mutex_unlock(&lib->mutex);
5589
5590 if (nxt_slow_path(process != NULL)) {
5591 nxt_unit_process_release(process);
5592 }
5593
5594 if (lib->callbacks.add_port != NULL && new_port != NULL && ready) {
5595 lib->callbacks.add_port(ctx, &new_port->port);
5596
5597 nxt_queue_init(&awaiting_req);
5598
5599 pthread_mutex_lock(&lib->mutex);
5600
5601 new_port->ready = 1;
5602
5603 if (!nxt_queue_is_empty(&new_port->awaiting_req)) {
5604 nxt_queue_add(&awaiting_req, &new_port->awaiting_req);
5605 nxt_queue_init(&new_port->awaiting_req);
5606 }
5607
5608 pthread_mutex_unlock(&lib->mutex);
5609
5610 nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5611 }
5612
5613 return (new_port == NULL) ? NULL : &new_port->port;
5614 }
5615
5616
5617 static void
nxt_unit_process_awaiting_req(nxt_unit_ctx_t * ctx,nxt_queue_t * awaiting_req)5618 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
5619 {
5620 nxt_unit_ctx_impl_t *ctx_impl;
5621 nxt_unit_request_info_impl_t *req_impl;
5622
5623 nxt_queue_each(req_impl, awaiting_req,
5624 nxt_unit_request_info_impl_t, port_wait_link)
5625 {
5626 nxt_queue_remove(&req_impl->port_wait_link);
5627
5628 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
5629 ctx);
5630
5631 pthread_mutex_lock(&ctx_impl->mutex);
5632
5633 nxt_queue_insert_tail(&ctx_impl->ready_req,
5634 &req_impl->port_wait_link);
5635
5636 pthread_mutex_unlock(&ctx_impl->mutex);
5637
5638 nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
5639
5640 nxt_unit_awake_ctx(ctx, ctx_impl);
5641
5642 } nxt_queue_loop;
5643 }
5644
5645
5646 static void
nxt_unit_remove_port(nxt_unit_impl_t * lib,nxt_unit_ctx_t * ctx,nxt_unit_port_id_t * port_id)5647 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
5648 nxt_unit_port_id_t *port_id)
5649 {
5650 nxt_unit_port_t *port;
5651 nxt_unit_port_impl_t *port_impl;
5652
5653 pthread_mutex_lock(&lib->mutex);
5654
5655 port = nxt_unit_remove_port_unsafe(lib, port_id);
5656
5657 if (nxt_fast_path(port != NULL)) {
5658 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5659
5660 nxt_queue_remove(&port_impl->link);
5661 }
5662
5663 pthread_mutex_unlock(&lib->mutex);
5664
5665 if (lib->callbacks.remove_port != NULL && port != NULL) {
5666 lib->callbacks.remove_port(&lib->unit, ctx, port);
5667 }
5668
5669 if (nxt_fast_path(port != NULL)) {
5670 nxt_unit_port_release(port);
5671 }
5672 }
5673
5674
5675 static nxt_unit_port_t *
nxt_unit_remove_port_unsafe(nxt_unit_impl_t * lib,nxt_unit_port_id_t * port_id)5676 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5677 {
5678 nxt_unit_port_t *port;
5679
5680 port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5681 if (nxt_slow_path(port == NULL)) {
5682 nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5683 (int) port_id->pid, (int) port_id->id);
5684
5685 return NULL;
5686 }
5687
5688 nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5689 (int) port_id->pid, (int) port_id->id,
5690 port->in_fd, port->out_fd, port->data);
5691
5692 return port;
5693 }
5694
5695
5696 static void
nxt_unit_remove_pid(nxt_unit_impl_t * lib,pid_t pid)5697 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
5698 {
5699 nxt_unit_process_t *process;
5700
5701 pthread_mutex_lock(&lib->mutex);
5702
5703 process = nxt_unit_process_find(lib, pid, 1);
5704 if (nxt_slow_path(process == NULL)) {
5705 nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
5706
5707 pthread_mutex_unlock(&lib->mutex);
5708
5709 return;
5710 }
5711
5712 nxt_unit_remove_process(lib, process);
5713
5714 if (lib->callbacks.remove_pid != NULL) {
5715 lib->callbacks.remove_pid(&lib->unit, pid);
5716 }
5717 }
5718
5719
5720 static void
nxt_unit_remove_process(nxt_unit_impl_t * lib,nxt_unit_process_t * process)5721 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
5722 {
5723 nxt_queue_t ports;
5724 nxt_unit_port_impl_t *port;
5725
5726 nxt_queue_init(&ports);
5727
5728 nxt_queue_add(&ports, &process->ports);
5729
5730 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5731
5732 nxt_unit_remove_port_unsafe(lib, &port->port.id);
5733
5734 } nxt_queue_loop;
5735
5736 pthread_mutex_unlock(&lib->mutex);
5737
5738 nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5739
5740 nxt_queue_remove(&port->link);
5741
5742 if (lib->callbacks.remove_port != NULL) {
5743 lib->callbacks.remove_port(&lib->unit, NULL, &port->port);
5744 }
5745
5746 nxt_unit_port_release(&port->port);
5747
5748 } nxt_queue_loop;
5749
5750 nxt_unit_process_release(process);
5751 }
5752
5753
5754 static void
nxt_unit_quit(nxt_unit_ctx_t * ctx,uint8_t quit_param)5755 nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param)
5756 {
5757 nxt_bool_t skip_graceful_broadcast, quit;
5758 nxt_unit_impl_t *lib;
5759 nxt_unit_ctx_impl_t *ctx_impl;
5760 nxt_unit_callbacks_t *cb;
5761 nxt_unit_request_info_t *req;
5762 nxt_unit_request_info_impl_t *req_impl;
5763
5764 struct {
5765 nxt_port_msg_t msg;
5766 uint8_t quit_param;
5767 } nxt_packed m;
5768
5769 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5770 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5771
5772 nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready,
5773 ctx_impl->online);
5774
5775 if (nxt_slow_path(!ctx_impl->online)) {
5776 return;
5777 }
5778
5779 skip_graceful_broadcast = quit_param == NXT_QUIT_GRACEFUL
5780 && !ctx_impl->ready;
5781
5782 cb = &lib->callbacks;
5783
5784 if (nxt_fast_path(ctx_impl->ready)) {
5785 ctx_impl->ready = 0;
5786
5787 if (cb->remove_port != NULL) {
5788 cb->remove_port(&lib->unit, ctx, lib->shared_port);
5789 }
5790 }
5791
5792 if (quit_param == NXT_QUIT_GRACEFUL) {
5793 pthread_mutex_lock(&ctx_impl->mutex);
5794
5795 quit = nxt_queue_is_empty(&ctx_impl->active_req)
5796 && nxt_queue_is_empty(&ctx_impl->pending_rbuf)
5797 && ctx_impl->wait_items == 0;
5798
5799 pthread_mutex_unlock(&ctx_impl->mutex);
5800
5801 } else {
5802 quit = 1;
5803 ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
5804 }
5805
5806 if (quit) {
5807 ctx_impl->online = 0;
5808
5809 if (cb->quit != NULL) {
5810 cb->quit(ctx);
5811 }
5812
5813 nxt_queue_each(req_impl, &ctx_impl->active_req,
5814 nxt_unit_request_info_impl_t, link)
5815 {
5816 req = &req_impl->req;
5817
5818 nxt_unit_req_warn(req, "active request on ctx quit");
5819
5820 if (cb->close_handler) {
5821 nxt_unit_req_debug(req, "close_handler");
5822
5823 cb->close_handler(req);
5824
5825 } else {
5826 nxt_unit_request_done(req, NXT_UNIT_ERROR);
5827 }
5828
5829 } nxt_queue_loop;
5830
5831 if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5832 nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id);
5833 }
5834 }
5835
5836 if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) {
5837 return;
5838 }
5839
5840 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5841
5842 m.msg.pid = lib->pid;
5843 m.msg.type = _NXT_PORT_MSG_QUIT;
5844 m.quit_param = quit_param;
5845
5846 pthread_mutex_lock(&lib->mutex);
5847
5848 nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5849
5850 if (ctx == &ctx_impl->ctx
5851 || ctx_impl->read_port == NULL
5852 || ctx_impl->read_port->out_fd == -1)
5853 {
5854 continue;
5855 }
5856
5857 (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5858 &m, sizeof(m), NULL);
5859
5860 } nxt_queue_loop;
5861
5862 pthread_mutex_unlock(&lib->mutex);
5863 }
5864
5865
5866 static int
nxt_unit_get_port(nxt_unit_ctx_t * ctx,nxt_unit_port_id_t * port_id)5867 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5868 {
5869 ssize_t res;
5870 nxt_unit_impl_t *lib;
5871 nxt_unit_ctx_impl_t *ctx_impl;
5872
5873 struct {
5874 nxt_port_msg_t msg;
5875 nxt_port_msg_get_port_t get_port;
5876 } m;
5877
5878 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5879 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5880
5881 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5882
5883 m.msg.pid = lib->pid;
5884 m.msg.reply_port = ctx_impl->read_port->id.id;
5885 m.msg.type = _NXT_PORT_MSG_GET_PORT;
5886
5887 m.get_port.id = port_id->id;
5888 m.get_port.pid = port_id->pid;
5889
5890 nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
5891 (int) port_id->id);
5892
5893 res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
5894 if (nxt_slow_path(res != sizeof(m))) {
5895 return NXT_UNIT_ERROR;
5896 }
5897
5898 return NXT_UNIT_OK;
5899 }
5900
5901
5902 static ssize_t
nxt_unit_port_send(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,const void * buf,size_t buf_size,const nxt_send_oob_t * oob)5903 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5904 const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
5905 {
5906 int notify;
5907 ssize_t ret;
5908 nxt_int_t rc;
5909 nxt_port_msg_t msg;
5910 nxt_unit_impl_t *lib;
5911 nxt_unit_port_impl_t *port_impl;
5912
5913 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5914
5915 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5916 if (port_impl->queue != NULL && (oob == NULL || oob->size == 0)
5917 && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
5918 {
5919 rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify);
5920 if (nxt_slow_path(rc != NXT_OK)) {
5921 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5922 (int) port->id.pid, (int) port->id.id);
5923
5924 return -1;
5925 }
5926
5927 nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
5928 (int) port->id.pid, (int) port->id.id,
5929 (int) buf_size, notify);
5930
5931 if (notify) {
5932 memcpy(&msg, buf, sizeof(nxt_port_msg_t));
5933
5934 msg.type = _NXT_PORT_MSG_READ_QUEUE;
5935
5936 if (lib->callbacks.port_send == NULL) {
5937 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
5938 sizeof(nxt_port_msg_t), NULL);
5939
5940 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
5941 (int) port->id.pid, (int) port->id.id,
5942 (int) ret);
5943
5944 } else {
5945 ret = lib->callbacks.port_send(ctx, port, &msg,
5946 sizeof(nxt_port_msg_t), NULL, 0);
5947
5948 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
5949 (int) port->id.pid, (int) port->id.id,
5950 (int) ret);
5951 }
5952
5953 }
5954
5955 return buf_size;
5956 }
5957
5958 if (port_impl->queue != NULL) {
5959 msg.type = _NXT_PORT_MSG_READ_SOCKET;
5960
5961 rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, ¬ify);
5962 if (nxt_slow_path(rc != NXT_OK)) {
5963 nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5964 (int) port->id.pid, (int) port->id.id);
5965
5966 return -1;
5967 }
5968
5969 nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
5970 (int) port->id.pid, (int) port->id.id, notify);
5971 }
5972
5973 if (lib->callbacks.port_send != NULL) {
5974 ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
5975 oob != NULL ? oob->buf : NULL,
5976 oob != NULL ? oob->size : 0);
5977
5978 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
5979 (int) port->id.pid, (int) port->id.id,
5980 (int) ret);
5981
5982 } else {
5983 ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, oob);
5984
5985 nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
5986 (int) port->id.pid, (int) port->id.id,
5987 (int) ret);
5988 }
5989
5990 return ret;
5991 }
5992
5993
5994 static ssize_t
nxt_unit_sendmsg(nxt_unit_ctx_t * ctx,int fd,const void * buf,size_t buf_size,const nxt_send_oob_t * oob)5995 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5996 const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
5997 {
5998 int err;
5999 ssize_t n;
6000 struct iovec iov[1];
6001
6002 iov[0].iov_base = (void *) buf;
6003 iov[0].iov_len = buf_size;
6004
6005 retry:
6006
6007 n = nxt_sendmsg(fd, iov, 1, oob);
6008
6009 if (nxt_slow_path(n == -1)) {
6010 err = errno;
6011
6012 if (err == EINTR) {
6013 goto retry;
6014 }
6015
6016 /*
6017 * FIXME: This should be "alert" after router graceful shutdown
6018 * implementation.
6019 */
6020 nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
6021 fd, (int) buf_size, strerror(err), err);
6022
6023 } else {
6024 nxt_unit_debug(ctx, "sendmsg(%d, %d, %d): %d", fd, (int) buf_size,
6025 (oob != NULL ? (int) oob->size : 0), (int) n);
6026 }
6027
6028 return n;
6029 }
6030
6031
6032 static int
nxt_unit_ctx_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6033 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6034 nxt_unit_read_buf_t *rbuf)
6035 {
6036 int res, read;
6037 nxt_unit_port_impl_t *port_impl;
6038
6039 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6040
6041 read = 0;
6042
6043 retry:
6044
6045 if (port_impl->from_socket > 0) {
6046 if (port_impl->socket_rbuf != NULL
6047 && port_impl->socket_rbuf->size > 0)
6048 {
6049 port_impl->from_socket--;
6050
6051 nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
6052 port_impl->socket_rbuf->size = 0;
6053
6054 nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
6055 (int) port->id.pid, (int) port->id.id,
6056 (int) rbuf->size);
6057
6058 return NXT_UNIT_OK;
6059 }
6060
6061 } else {
6062 res = nxt_unit_port_queue_recv(port, rbuf);
6063
6064 if (res == NXT_UNIT_OK) {
6065 if (nxt_unit_is_read_socket(rbuf)) {
6066 port_impl->from_socket++;
6067
6068 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
6069 (int) port->id.pid, (int) port->id.id,
6070 port_impl->from_socket);
6071
6072 goto retry;
6073 }
6074
6075 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
6076 (int) port->id.pid, (int) port->id.id,
6077 (int) rbuf->size);
6078
6079 return NXT_UNIT_OK;
6080 }
6081 }
6082
6083 if (read) {
6084 return NXT_UNIT_AGAIN;
6085 }
6086
6087 res = nxt_unit_port_recv(ctx, port, rbuf);
6088 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6089 return NXT_UNIT_ERROR;
6090 }
6091
6092 read = 1;
6093
6094 if (nxt_unit_is_read_queue(rbuf)) {
6095 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6096 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6097
6098 goto retry;
6099 }
6100
6101 nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
6102 (int) port->id.pid, (int) port->id.id,
6103 (int) rbuf->size);
6104
6105 if (res == NXT_UNIT_AGAIN) {
6106 return NXT_UNIT_AGAIN;
6107 }
6108
6109 if (port_impl->from_socket > 0) {
6110 port_impl->from_socket--;
6111
6112 return NXT_UNIT_OK;
6113 }
6114
6115 nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
6116 (int) port->id.pid, (int) port->id.id,
6117 (int) rbuf->size);
6118
6119 if (port_impl->socket_rbuf == NULL) {
6120 port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
6121
6122 if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
6123 return NXT_UNIT_ERROR;
6124 }
6125
6126 port_impl->socket_rbuf->size = 0;
6127 }
6128
6129 if (port_impl->socket_rbuf->size > 0) {
6130 nxt_unit_alert(ctx, "too many port socket messages");
6131
6132 return NXT_UNIT_ERROR;
6133 }
6134
6135 nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
6136
6137 rbuf->oob.size = 0;
6138
6139 goto retry;
6140 }
6141
6142
6143 nxt_inline void
nxt_unit_rbuf_cpy(nxt_unit_read_buf_t * dst,nxt_unit_read_buf_t * src)6144 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
6145 {
6146 memcpy(dst->buf, src->buf, src->size);
6147 dst->size = src->size;
6148 dst->oob.size = src->oob.size;
6149 memcpy(dst->oob.buf, src->oob.buf, src->oob.size);
6150 }
6151
6152
6153 static int
nxt_unit_shared_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6154 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6155 nxt_unit_read_buf_t *rbuf)
6156 {
6157 int res;
6158 nxt_unit_port_impl_t *port_impl;
6159
6160 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6161
6162 retry:
6163
6164 res = nxt_unit_app_queue_recv(ctx, port, rbuf);
6165
6166 if (res == NXT_UNIT_OK) {
6167 return NXT_UNIT_OK;
6168 }
6169
6170 if (res == NXT_UNIT_AGAIN) {
6171 res = nxt_unit_port_recv(ctx, port, rbuf);
6172 if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6173 return NXT_UNIT_ERROR;
6174 }
6175
6176 if (nxt_unit_is_read_queue(rbuf)) {
6177 nxt_app_queue_notification_received(port_impl->queue);
6178
6179 nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6180 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6181
6182 goto retry;
6183 }
6184 }
6185
6186 return res;
6187 }
6188
6189
6190 static int
nxt_unit_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6191 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6192 nxt_unit_read_buf_t *rbuf)
6193 {
6194 int fd, err;
6195 size_t oob_size;
6196 struct iovec iov[1];
6197 nxt_unit_impl_t *lib;
6198
6199 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6200
6201 if (lib->callbacks.port_recv != NULL) {
6202 oob_size = sizeof(rbuf->oob.buf);
6203
6204 rbuf->size = lib->callbacks.port_recv(ctx, port,
6205 rbuf->buf, sizeof(rbuf->buf),
6206 rbuf->oob.buf, &oob_size);
6207
6208 nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
6209 (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6210
6211 if (nxt_slow_path(rbuf->size < 0)) {
6212 return NXT_UNIT_ERROR;
6213 }
6214
6215 rbuf->oob.size = oob_size;
6216 return NXT_UNIT_OK;
6217 }
6218
6219 iov[0].iov_base = rbuf->buf;
6220 iov[0].iov_len = sizeof(rbuf->buf);
6221
6222 fd = port->in_fd;
6223
6224 retry:
6225
6226 rbuf->size = nxt_recvmsg(fd, iov, 1, &rbuf->oob);
6227
6228 if (nxt_slow_path(rbuf->size == -1)) {
6229 err = errno;
6230
6231 if (err == EINTR) {
6232 goto retry;
6233 }
6234
6235 if (err == EAGAIN) {
6236 nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
6237 fd, strerror(err), err);
6238
6239 return NXT_UNIT_AGAIN;
6240 }
6241
6242 nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
6243 fd, strerror(err), err);
6244
6245 return NXT_UNIT_ERROR;
6246 }
6247
6248 nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
6249
6250 return NXT_UNIT_OK;
6251 }
6252
6253
6254 static int
nxt_unit_port_queue_recv(nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6255 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6256 {
6257 nxt_unit_port_impl_t *port_impl;
6258
6259 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6260
6261 rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6262
6263 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6264 }
6265
6266
6267 static int
nxt_unit_app_queue_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6268 nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6269 nxt_unit_read_buf_t *rbuf)
6270 {
6271 uint32_t cookie;
6272 nxt_port_msg_t *port_msg;
6273 nxt_app_queue_t *queue;
6274 nxt_unit_impl_t *lib;
6275 nxt_unit_port_impl_t *port_impl;
6276
6277 struct {
6278 nxt_port_msg_t msg;
6279 uint8_t quit_param;
6280 } nxt_packed m;
6281
6282 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6283 queue = port_impl->queue;
6284
6285 retry:
6286
6287 rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6288
6289 nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6290
6291 if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6292 port_msg = (nxt_port_msg_t *) rbuf->buf;
6293
6294 if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6295 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6296
6297 if (lib->request_limit != 0) {
6298 nxt_atomic_fetch_add(&lib->request_count, 1);
6299
6300 if (nxt_slow_path(lib->request_count >= lib->request_limit)) {
6301 nxt_unit_debug(ctx, "request limit reached");
6302
6303 memset(&m.msg, 0, sizeof(nxt_port_msg_t));
6304
6305 m.msg.pid = lib->pid;
6306 m.msg.type = _NXT_PORT_MSG_QUIT;
6307 m.quit_param = NXT_QUIT_GRACEFUL;
6308
6309 (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port,
6310 &m, sizeof(m), NULL);
6311 }
6312 }
6313
6314 return NXT_UNIT_OK;
6315 }
6316
6317 nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6318
6319 goto retry;
6320 }
6321
6322 return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6323 }
6324
6325
6326 nxt_inline int
nxt_unit_close(int fd)6327 nxt_unit_close(int fd)
6328 {
6329 int res;
6330
6331 res = close(fd);
6332
6333 if (nxt_slow_path(res == -1)) {
6334 nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
6335 fd, strerror(errno), errno);
6336
6337 } else {
6338 nxt_unit_debug(NULL, "close(%d): %d", fd, res);
6339 }
6340
6341 return res;
6342 }
6343
6344
6345 static int
nxt_unit_fd_blocking(int fd)6346 nxt_unit_fd_blocking(int fd)
6347 {
6348 int nb;
6349
6350 nb = 0;
6351
6352 if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
6353 nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
6354 fd, strerror(errno), errno);
6355
6356 return NXT_UNIT_ERROR;
6357 }
6358
6359 return NXT_UNIT_OK;
6360 }
6361
6362
6363 static nxt_int_t
nxt_unit_port_hash_test(nxt_lvlhsh_query_t * lhq,void * data)6364 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6365 {
6366 nxt_unit_port_t *port;
6367 nxt_unit_port_hash_id_t *port_id;
6368
6369 port = data;
6370 port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
6371
6372 if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
6373 && port_id->pid == port->id.pid
6374 && port_id->id == port->id.id)
6375 {
6376 return NXT_OK;
6377 }
6378
6379 return NXT_DECLINED;
6380 }
6381
6382
6383 static const nxt_lvlhsh_proto_t lvlhsh_ports_proto nxt_aligned(64) = {
6384 NXT_LVLHSH_DEFAULT,
6385 nxt_unit_port_hash_test,
6386 nxt_unit_lvlhsh_alloc,
6387 nxt_unit_lvlhsh_free,
6388 };
6389
6390
6391 static inline void
nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t * lhq,nxt_unit_port_hash_id_t * port_hash_id,nxt_unit_port_id_t * port_id)6392 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
6393 nxt_unit_port_hash_id_t *port_hash_id,
6394 nxt_unit_port_id_t *port_id)
6395 {
6396 port_hash_id->pid = port_id->pid;
6397 port_hash_id->id = port_id->id;
6398
6399 if (nxt_fast_path(port_id->hash != 0)) {
6400 lhq->key_hash = port_id->hash;
6401
6402 } else {
6403 lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
6404
6405 port_id->hash = lhq->key_hash;
6406
6407 nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
6408 (int) port_id->pid, (int) port_id->id,
6409 (int) port_id->hash);
6410 }
6411
6412 lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
6413 lhq->key.start = (u_char *) port_hash_id;
6414 lhq->proto = &lvlhsh_ports_proto;
6415 lhq->pool = NULL;
6416 }
6417
6418
6419 static int
nxt_unit_port_hash_add(nxt_lvlhsh_t * port_hash,nxt_unit_port_t * port)6420 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
6421 {
6422 nxt_int_t res;
6423 nxt_lvlhsh_query_t lhq;
6424 nxt_unit_port_hash_id_t port_hash_id;
6425
6426 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
6427 lhq.replace = 0;
6428 lhq.value = port;
6429
6430 res = nxt_lvlhsh_insert(port_hash, &lhq);
6431
6432 switch (res) {
6433
6434 case NXT_OK:
6435 return NXT_UNIT_OK;
6436
6437 default:
6438 return NXT_UNIT_ERROR;
6439 }
6440 }
6441
6442
6443 static nxt_unit_port_t *
nxt_unit_port_hash_find(nxt_lvlhsh_t * port_hash,nxt_unit_port_id_t * port_id,int remove)6444 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
6445 int remove)
6446 {
6447 nxt_int_t res;
6448 nxt_lvlhsh_query_t lhq;
6449 nxt_unit_port_hash_id_t port_hash_id;
6450
6451 nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
6452
6453 if (remove) {
6454 res = nxt_lvlhsh_delete(port_hash, &lhq);
6455
6456 } else {
6457 res = nxt_lvlhsh_find(port_hash, &lhq);
6458 }
6459
6460 switch (res) {
6461
6462 case NXT_OK:
6463 if (!remove) {
6464 nxt_unit_port_use(lhq.value);
6465 }
6466
6467 return lhq.value;
6468
6469 default:
6470 return NULL;
6471 }
6472 }
6473
6474
6475 static nxt_int_t
nxt_unit_request_hash_test(nxt_lvlhsh_query_t * lhq,void * data)6476 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6477 {
6478 return NXT_OK;
6479 }
6480
6481
6482 static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
6483 NXT_LVLHSH_DEFAULT,
6484 nxt_unit_request_hash_test,
6485 nxt_unit_lvlhsh_alloc,
6486 nxt_unit_lvlhsh_free,
6487 };
6488
6489
6490 static int
nxt_unit_request_hash_add(nxt_unit_ctx_t * ctx,nxt_unit_request_info_t * req)6491 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
6492 nxt_unit_request_info_t *req)
6493 {
6494 uint32_t *stream;
6495 nxt_int_t res;
6496 nxt_lvlhsh_query_t lhq;
6497 nxt_unit_ctx_impl_t *ctx_impl;
6498 nxt_unit_request_info_impl_t *req_impl;
6499
6500 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6501 if (req_impl->in_hash) {
6502 return NXT_UNIT_OK;
6503 }
6504
6505 stream = &req_impl->stream;
6506
6507 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
6508 lhq.key.length = sizeof(*stream);
6509 lhq.key.start = (u_char *) stream;
6510 lhq.proto = &lvlhsh_requests_proto;
6511 lhq.pool = NULL;
6512 lhq.replace = 0;
6513 lhq.value = req_impl;
6514
6515 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6516
6517 pthread_mutex_lock(&ctx_impl->mutex);
6518
6519 res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
6520
6521 pthread_mutex_unlock(&ctx_impl->mutex);
6522
6523 switch (res) {
6524
6525 case NXT_OK:
6526 req_impl->in_hash = 1;
6527 return NXT_UNIT_OK;
6528
6529 default:
6530 return NXT_UNIT_ERROR;
6531 }
6532 }
6533
6534
6535 static nxt_unit_request_info_t *
nxt_unit_request_hash_find(nxt_unit_ctx_t * ctx,uint32_t stream,int remove)6536 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
6537 {
6538 nxt_int_t res;
6539 nxt_lvlhsh_query_t lhq;
6540 nxt_unit_ctx_impl_t *ctx_impl;
6541 nxt_unit_request_info_impl_t *req_impl;
6542
6543 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
6544 lhq.key.length = sizeof(stream);
6545 lhq.key.start = (u_char *) &stream;
6546 lhq.proto = &lvlhsh_requests_proto;
6547 lhq.pool = NULL;
6548
6549 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6550
6551 pthread_mutex_lock(&ctx_impl->mutex);
6552
6553 if (remove) {
6554 res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
6555
6556 } else {
6557 res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
6558 }
6559
6560 pthread_mutex_unlock(&ctx_impl->mutex);
6561
6562 switch (res) {
6563
6564 case NXT_OK:
6565 req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
6566 req);
6567 if (remove) {
6568 req_impl->in_hash = 0;
6569 }
6570
6571 return lhq.value;
6572
6573 default:
6574 return NULL;
6575 }
6576 }
6577
6578
6579 void
nxt_unit_log(nxt_unit_ctx_t * ctx,int level,const char * fmt,...)6580 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
6581 {
6582 int log_fd, n;
6583 char msg[NXT_MAX_ERROR_STR], *p, *end;
6584 pid_t pid;
6585 va_list ap;
6586 nxt_unit_impl_t *lib;
6587
6588 if (nxt_fast_path(ctx != NULL)) {
6589 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6590
6591 pid = lib->pid;
6592 log_fd = lib->log_fd;
6593
6594 } else {
6595 pid = nxt_unit_pid;
6596 log_fd = STDERR_FILENO;
6597 }
6598
6599 p = msg;
6600 end = p + sizeof(msg) - 1;
6601
6602 p = nxt_unit_snprint_prefix(p, end, pid, level);
6603
6604 va_start(ap, fmt);
6605 p += vsnprintf(p, end - p, fmt, ap);
6606 va_end(ap);
6607
6608 if (nxt_slow_path(p > end)) {
6609 memcpy(end - 5, "[...]", 5);
6610 p = end;
6611 }
6612
6613 *p++ = '\n';
6614
6615 n = write(log_fd, msg, p - msg);
6616 if (nxt_slow_path(n < 0)) {
6617 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6618 }
6619 }
6620
6621
6622 void
nxt_unit_req_log(nxt_unit_request_info_t * req,int level,const char * fmt,...)6623 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
6624 {
6625 int log_fd, n;
6626 char msg[NXT_MAX_ERROR_STR], *p, *end;
6627 pid_t pid;
6628 va_list ap;
6629 nxt_unit_impl_t *lib;
6630 nxt_unit_request_info_impl_t *req_impl;
6631
6632 if (nxt_fast_path(req != NULL)) {
6633 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
6634
6635 pid = lib->pid;
6636 log_fd = lib->log_fd;
6637
6638 } else {
6639 pid = nxt_unit_pid;
6640 log_fd = STDERR_FILENO;
6641 }
6642
6643 p = msg;
6644 end = p + sizeof(msg) - 1;
6645
6646 p = nxt_unit_snprint_prefix(p, end, pid, level);
6647
6648 if (nxt_fast_path(req != NULL)) {
6649 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6650
6651 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
6652 }
6653
6654 va_start(ap, fmt);
6655 p += vsnprintf(p, end - p, fmt, ap);
6656 va_end(ap);
6657
6658 if (nxt_slow_path(p > end)) {
6659 memcpy(end - 5, "[...]", 5);
6660 p = end;
6661 }
6662
6663 *p++ = '\n';
6664
6665 n = write(log_fd, msg, p - msg);
6666 if (nxt_slow_path(n < 0)) {
6667 fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6668 }
6669 }
6670
6671
6672 static const char * nxt_unit_log_levels[] = {
6673 "alert",
6674 "error",
6675 "warn",
6676 "notice",
6677 "info",
6678 "debug",
6679 };
6680
6681
6682 static char *
nxt_unit_snprint_prefix(char * p,char * end,pid_t pid,int level)6683 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
6684 {
6685 struct tm tm;
6686 struct timespec ts;
6687
6688 (void) clock_gettime(CLOCK_REALTIME, &ts);
6689
6690 #if (NXT_HAVE_LOCALTIME_R)
6691 (void) localtime_r(&ts.tv_sec, &tm);
6692 #else
6693 tm = *localtime(&ts.tv_sec);
6694 #endif
6695
6696 #if (NXT_DEBUG)
6697 p += snprintf(p, end - p,
6698 "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
6699 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6700 tm.tm_hour, tm.tm_min, tm.tm_sec,
6701 (int) ts.tv_nsec / 1000000);
6702 #else
6703 p += snprintf(p, end - p,
6704 "%4d/%02d/%02d %02d:%02d:%02d ",
6705 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6706 tm.tm_hour, tm.tm_min, tm.tm_sec);
6707 #endif
6708
6709 p += snprintf(p, end - p,
6710 "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
6711 (int) pid,
6712 (uint64_t) (uintptr_t) nxt_thread_get_tid());
6713
6714 return p;
6715 }
6716
6717
6718 static void *
nxt_unit_lvlhsh_alloc(void * data,size_t size)6719 nxt_unit_lvlhsh_alloc(void *data, size_t size)
6720 {
6721 int err;
6722 void *p;
6723
6724 err = posix_memalign(&p, size, size);
6725
6726 if (nxt_fast_path(err == 0)) {
6727 nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p",
6728 (int) size, (int) size, p);
6729 return p;
6730 }
6731
6732 nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)",
6733 (int) size, (int) size, strerror(err), err);
6734 return NULL;
6735 }
6736
6737
6738 static void
nxt_unit_lvlhsh_free(void * data,void * p)6739 nxt_unit_lvlhsh_free(void *data, void *p)
6740 {
6741 nxt_unit_free(NULL, p);
6742 }
6743
6744
6745 void *
nxt_unit_malloc(nxt_unit_ctx_t * ctx,size_t size)6746 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
6747 {
6748 void *p;
6749
6750 p = malloc(size);
6751
6752 if (nxt_fast_path(p != NULL)) {
6753 #if (NXT_DEBUG_ALLOC)
6754 nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
6755 #endif
6756
6757 } else {
6758 nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
6759 (int) size, strerror(errno), errno);
6760 }
6761
6762 return p;
6763 }
6764
6765
6766 void
nxt_unit_free(nxt_unit_ctx_t * ctx,void * p)6767 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
6768 {
6769 #if (NXT_DEBUG_ALLOC)
6770 nxt_unit_debug(ctx, "free(%p)", p);
6771 #endif
6772
6773 free(p);
6774 }
6775
6776
6777 static int
nxt_unit_memcasecmp(const void * p1,const void * p2,size_t length)6778 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length)
6779 {
6780 u_char c1, c2;
6781 nxt_int_t n;
6782 const u_char *s1, *s2;
6783
6784 s1 = p1;
6785 s2 = p2;
6786
6787 while (length-- != 0) {
6788 c1 = *s1++;
6789 c2 = *s2++;
6790
6791 c1 = nxt_lowcase(c1);
6792 c2 = nxt_lowcase(c2);
6793
6794 n = c1 - c2;
6795
6796 if (n != 0) {
6797 return n;
6798 }
6799 }
6800
6801 return 0;
6802 }
6803