xref: /unit/src/nxt_unit.c (revision 2561:0e6d01d0c23b)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11 
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16 
17 #include "nxt_websocket.h"
18 
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22 
23 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE  \
25     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26 
27 enum {
28     NXT_QUIT_NORMAL   = 0,
29     NXT_QUIT_GRACEFUL = 1,
30 };
31 
32 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s               nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
43 
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46     nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52     nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54     nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58     int *shared_port_fd, int *shared_queue_fd,
59     int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60     uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62     int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64     nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66     nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71     nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73     nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76     nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79     nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83     nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86     nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90     nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95     nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97     nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99     nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101     size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108     nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111     int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113     nxt_unit_port_t *port, uint32_t size,
114     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116 
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118     nxt_unit_ctx_impl_t *ctx_impl);
119 static int nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132 
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135     pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147     nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150 
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152     nxt_unit_port_t *port, int queue_fd);
153 
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157     nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159     nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161     nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163     nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166     nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170     nxt_unit_port_t *port, const void *buf, size_t buf_size,
171     const nxt_send_oob_t *oob);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173     const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175     nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177     nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179     nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181     nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183     nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185     nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188 
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190     nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192     nxt_unit_port_id_t *port_id, int remove);
193 
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195     nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198 
199 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid,
200     int level);
201 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
202 static void nxt_unit_lvlhsh_free(void *data, void *p);
203 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
204 
205 
206 struct nxt_unit_mmap_buf_s {
207     nxt_unit_buf_t           buf;
208 
209     nxt_unit_mmap_buf_t      *next;
210     nxt_unit_mmap_buf_t      **prev;
211 
212     nxt_port_mmap_header_t   *hdr;
213     nxt_unit_request_info_t  *req;
214     nxt_unit_ctx_impl_t      *ctx_impl;
215     char                     *free_ptr;
216     char                     *plain_ptr;
217 };
218 
219 
220 struct nxt_unit_recv_msg_s {
221     uint32_t                 stream;
222     nxt_pid_t                pid;
223     nxt_port_id_t            reply_port;
224 
225     uint8_t                  last;      /* 1 bit */
226     uint8_t                  mmap;      /* 1 bit */
227 
228     void                     *start;
229     uint32_t                 size;
230 
231     int                      fd[2];
232 
233     nxt_unit_mmap_buf_t      *incoming_buf;
234 };
235 
236 
237 typedef enum {
238     NXT_UNIT_RS_START           = 0,
239     NXT_UNIT_RS_RESPONSE_INIT,
240     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
241     NXT_UNIT_RS_RESPONSE_SENT,
242     NXT_UNIT_RS_RELEASED,
243 } nxt_unit_req_state_t;
244 
245 
246 struct nxt_unit_request_info_impl_s {
247     nxt_unit_request_info_t  req;
248 
249     uint32_t                 stream;
250 
251     nxt_unit_mmap_buf_t      *outgoing_buf;
252     nxt_unit_mmap_buf_t      *incoming_buf;
253 
254     nxt_unit_req_state_t     state;
255     uint8_t                  websocket;
256     uint8_t                  in_hash;
257 
258     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
259     nxt_queue_link_t         link;
260     /*  for nxt_unit_port_impl_t.awaiting_req */
261     nxt_queue_link_t         port_wait_link;
262 
263     char                     extra_data[];
264 };
265 
266 
267 struct nxt_unit_websocket_frame_impl_s {
268     nxt_unit_websocket_frame_t  ws;
269 
270     nxt_unit_mmap_buf_t         *buf;
271 
272     nxt_queue_link_t            link;
273 
274     nxt_unit_ctx_impl_t         *ctx_impl;
275 };
276 
277 
278 struct nxt_unit_read_buf_s {
279     nxt_queue_link_t              link;
280     nxt_unit_ctx_impl_t           *ctx_impl;
281     ssize_t                       size;
282     nxt_recv_oob_t                oob;
283     char                          buf[16384];
284 };
285 
286 
287 struct nxt_unit_ctx_impl_s {
288     nxt_unit_ctx_t                ctx;
289 
290     nxt_atomic_t                  use_count;
291     nxt_atomic_t                  wait_items;
292 
293     pthread_mutex_t               mutex;
294 
295     nxt_unit_port_t               *read_port;
296 
297     nxt_queue_link_t              link;
298 
299     nxt_unit_mmap_buf_t           *free_buf;
300 
301     /*  of nxt_unit_request_info_impl_t */
302     nxt_queue_t                   free_req;
303 
304     /*  of nxt_unit_websocket_frame_impl_t */
305     nxt_queue_t                   free_ws;
306 
307     /*  of nxt_unit_request_info_impl_t */
308     nxt_queue_t                   active_req;
309 
310     /*  of nxt_unit_request_info_impl_t */
311     nxt_lvlhsh_t                  requests;
312 
313     /*  of nxt_unit_request_info_impl_t */
314     nxt_queue_t                   ready_req;
315 
316     /*  of nxt_unit_read_buf_t */
317     nxt_queue_t                   pending_rbuf;
318 
319     /*  of nxt_unit_read_buf_t */
320     nxt_queue_t                   free_rbuf;
321 
322     uint8_t                       online;       /* 1 bit */
323     uint8_t                       ready;        /* 1 bit */
324     uint8_t                       quit_param;
325 
326     nxt_unit_mmap_buf_t           ctx_buf[2];
327     nxt_unit_read_buf_t           ctx_read_buf;
328 
329     nxt_unit_request_info_impl_t  req;
330 };
331 
332 
333 struct nxt_unit_mmap_s {
334     nxt_port_mmap_header_t   *hdr;
335     pthread_t                src_thread;
336 
337     /*  of nxt_unit_read_buf_t */
338     nxt_queue_t              awaiting_rbuf;
339 };
340 
341 
342 struct nxt_unit_mmaps_s {
343     pthread_mutex_t          mutex;
344     uint32_t                 size;
345     uint32_t                 cap;
346     nxt_atomic_t             allocated_chunks;
347     nxt_unit_mmap_t          *elts;
348 };
349 
350 
351 struct nxt_unit_impl_s {
352     nxt_unit_t               unit;
353     nxt_unit_callbacks_t     callbacks;
354 
355     nxt_atomic_t             use_count;
356     nxt_atomic_t             request_count;
357 
358     uint32_t                 request_data_size;
359     uint32_t                 shm_mmap_limit;
360     uint32_t                 request_limit;
361 
362     pthread_mutex_t          mutex;
363 
364     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
365     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
366 
367     nxt_unit_port_t          *router_port;
368     nxt_unit_port_t          *shared_port;
369 
370     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
371 
372     nxt_unit_mmaps_t         incoming;
373     nxt_unit_mmaps_t         outgoing;
374 
375     pid_t                    pid;
376     int                      log_fd;
377 
378     nxt_unit_ctx_impl_t      main_ctx;
379 };
380 
381 
382 struct nxt_unit_port_impl_s {
383     nxt_unit_port_t          port;
384 
385     nxt_atomic_t             use_count;
386 
387     /*  for nxt_unit_process_t.ports */
388     nxt_queue_link_t         link;
389     nxt_unit_process_t       *process;
390 
391     /*  of nxt_unit_request_info_impl_t */
392     nxt_queue_t              awaiting_req;
393 
394     int                      ready;
395 
396     void                     *queue;
397 
398     int                      from_socket;
399     nxt_unit_read_buf_t      *socket_rbuf;
400 };
401 
402 
403 struct nxt_unit_process_s {
404     pid_t                    pid;
405 
406     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
407 
408     nxt_unit_impl_t          *lib;
409 
410     nxt_atomic_t             use_count;
411 
412     uint32_t                 next_port_id;
413 };
414 
415 
416 /* Explicitly using 32 bit types to avoid possible alignment. */
417 typedef struct {
418     int32_t   pid;
419     uint32_t  id;
420 } nxt_unit_port_hash_id_t;
421 
422 
423 static pid_t  nxt_unit_pid;
424 
425 
426 nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t * init)427 nxt_unit_init(nxt_unit_init_t *init)
428 {
429     int              rc, queue_fd, shared_queue_fd;
430     void             *mem;
431     uint32_t         ready_stream, shm_limit, request_limit;
432     nxt_unit_ctx_t   *ctx;
433     nxt_unit_impl_t  *lib;
434     nxt_unit_port_t  ready_port, router_port, read_port, shared_port;
435 
436     nxt_unit_pid = getpid();
437 
438     lib = nxt_unit_create(init);
439     if (nxt_slow_path(lib == NULL)) {
440         return NULL;
441     }
442 
443     queue_fd = -1;
444     mem = MAP_FAILED;
445     shared_port.out_fd = -1;
446     shared_port.data = NULL;
447 
448     if (init->ready_port.id.pid != 0
449         && init->ready_stream != 0
450         && init->read_port.id.pid != 0)
451     {
452         ready_port = init->ready_port;
453         ready_stream = init->ready_stream;
454         router_port = init->router_port;
455         read_port = init->read_port;
456         lib->log_fd = init->log_fd;
457 
458         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
459                               ready_port.id.id);
460         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
461                               router_port.id.id);
462         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
463                               read_port.id.id);
464 
465         shared_port.in_fd = init->shared_port_fd;
466         shared_queue_fd = init->shared_queue_fd;
467 
468     } else {
469         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
470                                &shared_port.in_fd, &shared_queue_fd,
471                                &lib->log_fd, &ready_stream, &shm_limit,
472                                &request_limit);
473         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
474             goto fail;
475         }
476 
477         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
478                                 / PORT_MMAP_DATA_SIZE;
479         lib->request_limit = request_limit;
480     }
481 
482     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
483         lib->shm_mmap_limit = 1;
484     }
485 
486     lib->pid = read_port.id.pid;
487     nxt_unit_pid = lib->pid;
488 
489     ctx = &lib->main_ctx.ctx;
490 
491     rc = nxt_unit_fd_blocking(router_port.out_fd);
492     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
493         goto fail;
494     }
495 
496     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
497     if (nxt_slow_path(lib->router_port == NULL)) {
498         nxt_unit_alert(NULL, "failed to add router_port");
499 
500         goto fail;
501     }
502 
503     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
504     if (nxt_slow_path(queue_fd == -1)) {
505         goto fail;
506     }
507 
508     mem = mmap(NULL, sizeof(nxt_port_queue_t),
509                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
510     if (nxt_slow_path(mem == MAP_FAILED)) {
511         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
512                        strerror(errno), errno);
513 
514         goto fail;
515     }
516 
517     nxt_port_queue_init(mem);
518 
519     rc = nxt_unit_fd_blocking(read_port.in_fd);
520     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
521         goto fail;
522     }
523 
524     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
525     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
526         nxt_unit_alert(NULL, "failed to add read_port");
527 
528         goto fail;
529     }
530 
531     rc = nxt_unit_fd_blocking(ready_port.out_fd);
532     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
533         goto fail;
534     }
535 
536     nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
537                           NXT_UNIT_SHARED_PORT_ID);
538 
539     mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
540                MAP_SHARED, shared_queue_fd, 0);
541     if (nxt_slow_path(mem == MAP_FAILED)) {
542         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
543                        strerror(errno), errno);
544 
545         goto fail;
546     }
547 
548     nxt_unit_close(shared_queue_fd);
549 
550     lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
551     if (nxt_slow_path(lib->shared_port == NULL)) {
552         nxt_unit_alert(NULL, "failed to add shared_port");
553 
554         goto fail;
555     }
556 
557     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
558     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
559         nxt_unit_alert(NULL, "failed to send READY message");
560 
561         goto fail;
562     }
563 
564     nxt_unit_close(ready_port.out_fd);
565     nxt_unit_close(queue_fd);
566 
567     return ctx;
568 
569 fail:
570 
571     if (mem != MAP_FAILED) {
572         munmap(mem, sizeof(nxt_port_queue_t));
573     }
574 
575     if (queue_fd != -1) {
576         nxt_unit_close(queue_fd);
577     }
578 
579     nxt_unit_ctx_release(&lib->main_ctx.ctx);
580 
581     return NULL;
582 }
583 
584 
585 static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t * init)586 nxt_unit_create(nxt_unit_init_t *init)
587 {
588     int              rc;
589     nxt_unit_impl_t  *lib;
590 
591     if (nxt_slow_path(init->callbacks.request_handler == NULL)) {
592         nxt_unit_alert(NULL, "request_handler is NULL");
593 
594         return NULL;
595     }
596 
597     lib = nxt_unit_malloc(NULL,
598                           sizeof(nxt_unit_impl_t) + init->request_data_size);
599     if (nxt_slow_path(lib == NULL)) {
600         nxt_unit_alert(NULL, "failed to allocate unit struct");
601 
602         return NULL;
603     }
604 
605     rc = pthread_mutex_init(&lib->mutex, NULL);
606     if (nxt_slow_path(rc != 0)) {
607         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
608 
609         goto out_unit_free;
610     }
611 
612     lib->unit.data = init->data;
613     lib->callbacks = init->callbacks;
614 
615     lib->request_data_size = init->request_data_size;
616     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
617                             / PORT_MMAP_DATA_SIZE;
618     lib->request_limit = init->request_limit;
619 
620     lib->processes.slot = NULL;
621     lib->ports.slot = NULL;
622 
623     lib->log_fd = STDERR_FILENO;
624 
625     nxt_queue_init(&lib->contexts);
626 
627     lib->use_count = 0;
628     lib->request_count = 0;
629     lib->router_port = NULL;
630     lib->shared_port = NULL;
631 
632     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
633     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
634         goto out_mutex_destroy;
635     }
636 
637     rc = nxt_unit_mmaps_init(&lib->incoming);
638     if (nxt_slow_path(rc != 0)) {
639         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
640 
641         goto out_ctx_free;
642     }
643 
644     rc = nxt_unit_mmaps_init(&lib->outgoing);
645     if (nxt_slow_path(rc != 0)) {
646         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
647 
648         goto out_mmaps_destroy;
649     }
650 
651     return lib;
652 
653 out_mmaps_destroy:
654     nxt_unit_mmaps_destroy(&lib->incoming);
655 
656 out_ctx_free:
657     nxt_unit_ctx_free(&lib->main_ctx);
658 
659 out_mutex_destroy:
660     pthread_mutex_destroy(&lib->mutex);
661 
662 out_unit_free:
663     nxt_unit_free(NULL, lib);
664 
665     return NULL;
666 }
667 
668 
669 static int
nxt_unit_ctx_init(nxt_unit_impl_t * lib,nxt_unit_ctx_impl_t * ctx_impl,void * data)670 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
671     void *data)
672 {
673     int  rc;
674 
675     ctx_impl->ctx.data = data;
676     ctx_impl->ctx.unit = &lib->unit;
677 
678     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
679     if (nxt_slow_path(rc != 0)) {
680         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
681 
682         return NXT_UNIT_ERROR;
683     }
684 
685     nxt_unit_lib_use(lib);
686 
687     pthread_mutex_lock(&lib->mutex);
688 
689     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
690 
691     pthread_mutex_unlock(&lib->mutex);
692 
693     ctx_impl->use_count = 1;
694     ctx_impl->wait_items = 0;
695     ctx_impl->online = 1;
696     ctx_impl->ready = 0;
697     ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
698 
699     nxt_queue_init(&ctx_impl->free_req);
700     nxt_queue_init(&ctx_impl->free_ws);
701     nxt_queue_init(&ctx_impl->active_req);
702     nxt_queue_init(&ctx_impl->ready_req);
703     nxt_queue_init(&ctx_impl->pending_rbuf);
704     nxt_queue_init(&ctx_impl->free_rbuf);
705 
706     ctx_impl->free_buf = NULL;
707     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
708     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
709 
710     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
711     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
712 
713     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
714 
715     ctx_impl->req.req.ctx = &ctx_impl->ctx;
716     ctx_impl->req.req.unit = &lib->unit;
717 
718     ctx_impl->read_port = NULL;
719     ctx_impl->requests.slot = 0;
720 
721     return NXT_UNIT_OK;
722 }
723 
724 
725 nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_t * ctx)726 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
727 {
728     nxt_unit_ctx_impl_t  *ctx_impl;
729 
730     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
731 
732     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
733 }
734 
735 
736 nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_t * ctx)737 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
738 {
739     long                 c;
740     nxt_unit_ctx_impl_t  *ctx_impl;
741 
742     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
743 
744     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
745 
746     if (c == 1) {
747         nxt_unit_ctx_free(ctx_impl);
748     }
749 }
750 
751 
752 nxt_inline void
nxt_unit_lib_use(nxt_unit_impl_t * lib)753 nxt_unit_lib_use(nxt_unit_impl_t *lib)
754 {
755     nxt_atomic_fetch_add(&lib->use_count, 1);
756 }
757 
758 
759 nxt_inline void
nxt_unit_lib_release(nxt_unit_impl_t * lib)760 nxt_unit_lib_release(nxt_unit_impl_t *lib)
761 {
762     long                c;
763     nxt_unit_process_t  *process;
764 
765     c = nxt_atomic_fetch_add(&lib->use_count, -1);
766 
767     if (c == 1) {
768         for ( ;; ) {
769             pthread_mutex_lock(&lib->mutex);
770 
771             process = nxt_unit_process_pop_first(lib);
772             if (process == NULL) {
773                 pthread_mutex_unlock(&lib->mutex);
774 
775                 break;
776             }
777 
778             nxt_unit_remove_process(lib, process);
779         }
780 
781         pthread_mutex_destroy(&lib->mutex);
782 
783         if (nxt_fast_path(lib->router_port != NULL)) {
784             nxt_unit_port_release(lib->router_port);
785         }
786 
787         if (nxt_fast_path(lib->shared_port != NULL)) {
788             nxt_unit_port_release(lib->shared_port);
789         }
790 
791         nxt_unit_mmaps_destroy(&lib->incoming);
792         nxt_unit_mmaps_destroy(&lib->outgoing);
793 
794         nxt_unit_free(NULL, lib);
795     }
796 }
797 
798 
799 nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t ** head,nxt_unit_mmap_buf_t * mmap_buf)800 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
801     nxt_unit_mmap_buf_t *mmap_buf)
802 {
803     mmap_buf->next = *head;
804 
805     if (mmap_buf->next != NULL) {
806         mmap_buf->next->prev = &mmap_buf->next;
807     }
808 
809     *head = mmap_buf;
810     mmap_buf->prev = head;
811 }
812 
813 
814 nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t ** prev,nxt_unit_mmap_buf_t * mmap_buf)815 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
816     nxt_unit_mmap_buf_t *mmap_buf)
817 {
818     while (*prev != NULL) {
819         prev = &(*prev)->next;
820     }
821 
822     nxt_unit_mmap_buf_insert(prev, mmap_buf);
823 }
824 
825 
826 nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t * mmap_buf)827 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
828 {
829     nxt_unit_mmap_buf_t  **prev;
830 
831     prev = mmap_buf->prev;
832 
833     if (mmap_buf->next != NULL) {
834         mmap_buf->next->prev = prev;
835     }
836 
837     if (prev != NULL) {
838         *prev = mmap_buf->next;
839     }
840 }
841 
842 
843 static int
nxt_unit_read_env(nxt_unit_port_t * ready_port,nxt_unit_port_t * router_port,nxt_unit_port_t * read_port,int * shared_port_fd,int * shared_queue_fd,int * log_fd,uint32_t * stream,uint32_t * shm_limit,uint32_t * request_limit)844 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
845     nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
846     int *log_fd, uint32_t *stream,
847     uint32_t *shm_limit, uint32_t *request_limit)
848 {
849     int       rc;
850     int       ready_fd, router_fd, read_in_fd, read_out_fd;
851     char      *unit_init, *version_end, *vars;
852     size_t    version_length;
853     int64_t   ready_pid, router_pid, read_pid;
854     uint32_t  ready_stream, router_id, ready_id, read_id;
855 
856     unit_init = getenv(NXT_UNIT_INIT_ENV);
857     if (nxt_slow_path(unit_init == NULL)) {
858         nxt_unit_alert(NULL, "%s is not in the current environment",
859                        NXT_UNIT_INIT_ENV);
860 
861         return NXT_UNIT_ERROR;
862     }
863 
864     version_end = strchr(unit_init, ';');
865     if (nxt_slow_path(version_end == NULL)) {
866         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
867                        NXT_UNIT_INIT_ENV, unit_init);
868 
869         return NXT_UNIT_ERROR;
870     }
871 
872     version_length = version_end - unit_init;
873 
874     rc = version_length != nxt_length(NXT_VERSION)
875          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
876 
877     if (nxt_slow_path(rc != 0)) {
878         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
879                        "%.*s, while the app was compiled with libunit %s",
880                        (int) version_length, unit_init, NXT_VERSION);
881 
882         return NXT_UNIT_ERROR;
883     }
884 
885     vars = version_end + 1;
886 
887     rc = sscanf(vars,
888                 "%"PRIu32";"
889                 "%"PRId64",%"PRIu32",%d;"
890                 "%"PRId64",%"PRIu32",%d;"
891                 "%"PRId64",%"PRIu32",%d,%d;"
892                 "%d,%d;"
893                 "%d,%"PRIu32",%"PRIu32,
894                 &ready_stream,
895                 &ready_pid, &ready_id, &ready_fd,
896                 &router_pid, &router_id, &router_fd,
897                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
898                 shared_port_fd, shared_queue_fd,
899                 log_fd, shm_limit, request_limit);
900 
901     if (nxt_slow_path(rc == EOF)) {
902         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
903                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
904 
905         return NXT_UNIT_ERROR;
906     }
907 
908     if (nxt_slow_path(rc != 16)) {
909         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
910                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
911 
912         return NXT_UNIT_ERROR;
913     }
914 
915     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
916 
917     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
918 
919     ready_port->in_fd = -1;
920     ready_port->out_fd = ready_fd;
921     ready_port->data = NULL;
922 
923     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
924 
925     router_port->in_fd = -1;
926     router_port->out_fd = router_fd;
927     router_port->data = NULL;
928 
929     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
930 
931     read_port->in_fd = read_in_fd;
932     read_port->out_fd = read_out_fd;
933     read_port->data = NULL;
934 
935     *stream = ready_stream;
936 
937     return NXT_UNIT_OK;
938 }
939 
940 
941 static int
nxt_unit_ready(nxt_unit_ctx_t * ctx,int ready_fd,uint32_t stream,int queue_fd)942 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
943 {
944     ssize_t          res;
945     nxt_send_oob_t   oob;
946     nxt_port_msg_t   msg;
947     nxt_unit_impl_t  *lib;
948     int              fds[2] = {queue_fd, -1};
949 
950     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
951 
952     msg.stream = stream;
953     msg.pid = lib->pid;
954     msg.reply_port = 0;
955     msg.type = _NXT_PORT_MSG_PROCESS_READY;
956     msg.last = 1;
957     msg.mmap = 0;
958     msg.nf = 0;
959     msg.mf = 0;
960 
961     nxt_socket_msg_oob_init(&oob, fds);
962 
963     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
964     if (res != sizeof(msg)) {
965         return NXT_UNIT_ERROR;
966     }
967 
968     return NXT_UNIT_OK;
969 }
970 
971 
972 static int
nxt_unit_process_msg(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf,nxt_unit_request_info_t ** preq)973 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
974     nxt_unit_request_info_t **preq)
975 {
976     int                  rc;
977     pid_t                pid;
978     uint8_t              quit_param;
979     nxt_port_msg_t       *port_msg;
980     nxt_unit_impl_t      *lib;
981     nxt_unit_recv_msg_t  recv_msg;
982 
983     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
984 
985     recv_msg.incoming_buf = NULL;
986     recv_msg.fd[0] = -1;
987     recv_msg.fd[1] = -1;
988 
989     rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
990     if (nxt_slow_path(rc != NXT_OK)) {
991         nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
992         rc = NXT_UNIT_ERROR;
993         goto done;
994     }
995 
996     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
997         if (nxt_slow_path(rbuf->size == 0)) {
998             nxt_unit_debug(ctx, "read port closed");
999 
1000             nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1001             rc = NXT_UNIT_OK;
1002             goto done;
1003         }
1004 
1005         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
1006 
1007         rc = NXT_UNIT_ERROR;
1008         goto done;
1009     }
1010 
1011     port_msg = (nxt_port_msg_t *) rbuf->buf;
1012 
1013     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
1014                    port_msg->stream, (int) port_msg->type,
1015                    recv_msg.fd[0], recv_msg.fd[1]);
1016 
1017     recv_msg.stream = port_msg->stream;
1018     recv_msg.pid = port_msg->pid;
1019     recv_msg.reply_port = port_msg->reply_port;
1020     recv_msg.last = port_msg->last;
1021     recv_msg.mmap = port_msg->mmap;
1022 
1023     recv_msg.start = port_msg + 1;
1024     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1025 
1026     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1027         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1028                        port_msg->stream, (int) port_msg->type);
1029         rc = NXT_UNIT_ERROR;
1030         goto done;
1031     }
1032 
1033     /* Fragmentation is unsupported. */
1034     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1035         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1036                        port_msg->stream, (int) port_msg->type);
1037         rc = NXT_UNIT_ERROR;
1038         goto done;
1039     }
1040 
1041     if (port_msg->mmap) {
1042         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1043 
1044         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1045             if (rc == NXT_UNIT_AGAIN) {
1046                 recv_msg.fd[0] = -1;
1047                 recv_msg.fd[1] = -1;
1048             }
1049 
1050             goto done;
1051         }
1052     }
1053 
1054     switch (port_msg->type) {
1055 
1056     case _NXT_PORT_MSG_RPC_READY:
1057         rc = NXT_UNIT_OK;
1058         break;
1059 
1060     case _NXT_PORT_MSG_QUIT:
1061         if (recv_msg.size == sizeof(quit_param)) {
1062             memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1063 
1064         } else {
1065             quit_param = NXT_QUIT_NORMAL;
1066         }
1067 
1068         nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1069                        (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1070 
1071         nxt_unit_quit(ctx, quit_param);
1072 
1073         rc = NXT_UNIT_OK;
1074         break;
1075 
1076     case _NXT_PORT_MSG_NEW_PORT:
1077         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1078         break;
1079 
1080     case _NXT_PORT_MSG_PORT_ACK:
1081         rc = nxt_unit_ctx_ready(ctx);
1082         break;
1083 
1084     case _NXT_PORT_MSG_CHANGE_FILE:
1085         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1086                        port_msg->stream, recv_msg.fd[0]);
1087 
1088         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1089             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1090                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1091                            strerror(errno), errno);
1092 
1093             rc = NXT_UNIT_ERROR;
1094             goto done;
1095         }
1096 
1097         rc = NXT_UNIT_OK;
1098         break;
1099 
1100     case _NXT_PORT_MSG_MMAP:
1101         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1102             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1103                            port_msg->stream, recv_msg.fd[0]);
1104 
1105             rc = NXT_UNIT_ERROR;
1106             goto done;
1107         }
1108 
1109         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1110         break;
1111 
1112     case _NXT_PORT_MSG_REQ_HEADERS:
1113         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1114         break;
1115 
1116     case _NXT_PORT_MSG_REQ_BODY:
1117         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1118         break;
1119 
1120     case _NXT_PORT_MSG_WEBSOCKET:
1121         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1122         break;
1123 
1124     case _NXT_PORT_MSG_REMOVE_PID:
1125         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1126             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1127                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1128                            (int) sizeof(pid));
1129 
1130             rc = NXT_UNIT_ERROR;
1131             goto done;
1132         }
1133 
1134         memcpy(&pid, recv_msg.start, sizeof(pid));
1135 
1136         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1137                        port_msg->stream, (int) pid);
1138 
1139         nxt_unit_remove_pid(lib, pid);
1140 
1141         rc = NXT_UNIT_OK;
1142         break;
1143 
1144     case _NXT_PORT_MSG_SHM_ACK:
1145         rc = nxt_unit_process_shm_ack(ctx);
1146         break;
1147 
1148     default:
1149         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1150                        port_msg->stream, (int) port_msg->type);
1151 
1152         rc = NXT_UNIT_ERROR;
1153         goto done;
1154     }
1155 
1156 done:
1157 
1158     if (recv_msg.fd[0] != -1) {
1159         nxt_unit_close(recv_msg.fd[0]);
1160     }
1161 
1162     if (recv_msg.fd[1] != -1) {
1163         nxt_unit_close(recv_msg.fd[1]);
1164     }
1165 
1166     while (recv_msg.incoming_buf != NULL) {
1167         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1168     }
1169 
1170     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1171 #if (NXT_DEBUG)
1172         memset(rbuf->buf, 0xAC, rbuf->size);
1173 #endif
1174         nxt_unit_read_buf_release(ctx, rbuf);
1175     }
1176 
1177     return rc;
1178 }
1179 
1180 
1181 static int
nxt_unit_process_new_port(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1182 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1183 {
1184     void                     *mem;
1185     nxt_unit_port_t          new_port, *port;
1186     nxt_port_msg_new_port_t  *new_port_msg;
1187 
1188     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1189         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1190                       "invalid message size (%d)",
1191                       recv_msg->stream, (int) recv_msg->size);
1192 
1193         return NXT_UNIT_ERROR;
1194     }
1195 
1196     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1197         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1198                        recv_msg->stream, recv_msg->fd[0]);
1199 
1200         return NXT_UNIT_ERROR;
1201     }
1202 
1203     new_port_msg = recv_msg->start;
1204 
1205     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1206                    recv_msg->stream, (int) new_port_msg->pid,
1207                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1208 
1209     if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
1210         return NXT_UNIT_ERROR;
1211     }
1212 
1213     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
1214 
1215     new_port.in_fd = -1;
1216     new_port.out_fd = recv_msg->fd[0];
1217 
1218     mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1219                MAP_SHARED, recv_msg->fd[1], 0);
1220 
1221     if (nxt_slow_path(mem == MAP_FAILED)) {
1222         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1223                        strerror(errno), errno);
1224 
1225         return NXT_UNIT_ERROR;
1226     }
1227 
1228     new_port.data = NULL;
1229 
1230     recv_msg->fd[0] = -1;
1231 
1232     port = nxt_unit_add_port(ctx, &new_port, mem);
1233     if (nxt_slow_path(port == NULL)) {
1234         return NXT_UNIT_ERROR;
1235     }
1236 
1237     nxt_unit_port_release(port);
1238 
1239     return NXT_UNIT_OK;
1240 }
1241 
1242 
1243 static int
nxt_unit_ctx_ready(nxt_unit_ctx_t * ctx)1244 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1245 {
1246     nxt_unit_impl_t      *lib;
1247     nxt_unit_ctx_impl_t  *ctx_impl;
1248 
1249     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1250 
1251     if (nxt_slow_path(ctx_impl->ready)) {
1252         return NXT_UNIT_OK;
1253     }
1254 
1255     ctx_impl->ready = 1;
1256 
1257     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1258 
1259     /* Call ready_handler() only for main context. */
1260     if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1261         return lib->callbacks.ready_handler(ctx);
1262     }
1263 
1264     if (&lib->main_ctx != ctx_impl) {
1265         /* Check if the main context is already stopped or quit. */
1266         if (nxt_slow_path(!lib->main_ctx.ready)) {
1267             ctx_impl->ready = 0;
1268 
1269             nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1270 
1271             return NXT_UNIT_OK;
1272         }
1273 
1274         if (lib->callbacks.add_port != NULL) {
1275             lib->callbacks.add_port(ctx, lib->shared_port);
1276         }
1277     }
1278 
1279     return NXT_UNIT_OK;
1280 }
1281 
1282 
1283 static int
nxt_unit_process_req_headers(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_request_info_t ** preq)1284 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1285     nxt_unit_request_info_t **preq)
1286 {
1287     int                           res;
1288     nxt_unit_impl_t               *lib;
1289     nxt_unit_port_id_t            port_id;
1290     nxt_unit_request_t            *r;
1291     nxt_unit_mmap_buf_t           *b;
1292     nxt_unit_request_info_t       *req;
1293     nxt_unit_request_info_impl_t  *req_impl;
1294 
1295     if (nxt_slow_path(recv_msg->mmap == 0)) {
1296         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1297                       recv_msg->stream);
1298 
1299         return NXT_UNIT_ERROR;
1300     }
1301 
1302     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1303         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1304                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1305                       (int) sizeof(nxt_unit_request_t));
1306 
1307         return NXT_UNIT_ERROR;
1308     }
1309 
1310     req_impl = nxt_unit_request_info_get(ctx);
1311     if (nxt_slow_path(req_impl == NULL)) {
1312         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1313                       recv_msg->stream);
1314 
1315         return NXT_UNIT_ERROR;
1316     }
1317 
1318     req = &req_impl->req;
1319 
1320     req->request = recv_msg->start;
1321 
1322     b = recv_msg->incoming_buf;
1323 
1324     req->request_buf = &b->buf;
1325     req->response = NULL;
1326     req->response_buf = NULL;
1327 
1328     r = req->request;
1329 
1330     req->content_length = r->content_length;
1331 
1332     req->content_buf = req->request_buf;
1333     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1334 
1335     req_impl->stream = recv_msg->stream;
1336 
1337     req_impl->outgoing_buf = NULL;
1338 
1339     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1340         b->req = req;
1341     }
1342 
1343     /* "Move" incoming buffer list to req_impl. */
1344     req_impl->incoming_buf = recv_msg->incoming_buf;
1345     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1346     recv_msg->incoming_buf = NULL;
1347 
1348     req->content_fd = recv_msg->fd[0];
1349     recv_msg->fd[0] = -1;
1350 
1351     req->response_max_fields = 0;
1352     req_impl->state = NXT_UNIT_RS_START;
1353     req_impl->websocket = 0;
1354     req_impl->in_hash = 0;
1355 
1356     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1357                    (int) r->method_length,
1358                    (char *) nxt_unit_sptr_get(&r->method),
1359                    (int) r->target_length,
1360                    (char *) nxt_unit_sptr_get(&r->target),
1361                    (int) r->content_length);
1362 
1363     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1364 
1365     res = nxt_unit_request_check_response_port(req, &port_id);
1366     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1367         return NXT_UNIT_ERROR;
1368     }
1369 
1370     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1371         res = nxt_unit_send_req_headers_ack(req);
1372         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1373             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1374 
1375             return NXT_UNIT_ERROR;
1376         }
1377 
1378         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1379 
1380         if (req->content_length
1381             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1382         {
1383             res = nxt_unit_request_hash_add(ctx, req);
1384             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1385                 nxt_unit_req_warn(req, "failed to add request to hash");
1386 
1387                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1388 
1389                 return NXT_UNIT_ERROR;
1390             }
1391 
1392             /*
1393              * If application have separate data handler, we may start
1394              * request processing and process data when it is arrived.
1395              */
1396             if (lib->callbacks.data_handler == NULL) {
1397                 return NXT_UNIT_OK;
1398             }
1399         }
1400 
1401         if (preq == NULL) {
1402             lib->callbacks.request_handler(req);
1403 
1404         } else {
1405             *preq = req;
1406         }
1407     }
1408 
1409     return NXT_UNIT_OK;
1410 }
1411 
1412 
1413 static int
nxt_unit_process_req_body(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1414 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1415 {
1416     uint64_t                 l;
1417     nxt_unit_impl_t          *lib;
1418     nxt_unit_mmap_buf_t      *b;
1419     nxt_unit_request_info_t  *req;
1420 
1421     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1422     if (req == NULL) {
1423         return NXT_UNIT_OK;
1424     }
1425 
1426     l = req->content_buf->end - req->content_buf->free;
1427 
1428     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1429         b->req = req;
1430         l += b->buf.end - b->buf.free;
1431     }
1432 
1433     if (recv_msg->incoming_buf != NULL) {
1434         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1435 
1436         while (b->next != NULL) {
1437             b = b->next;
1438         }
1439 
1440         /* "Move" incoming buffer list to req_impl. */
1441         b->next = recv_msg->incoming_buf;
1442         b->next->prev = &b->next;
1443 
1444         recv_msg->incoming_buf = NULL;
1445     }
1446 
1447     req->content_fd = recv_msg->fd[0];
1448     recv_msg->fd[0] = -1;
1449 
1450     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1451 
1452     if (lib->callbacks.data_handler != NULL) {
1453         lib->callbacks.data_handler(req);
1454 
1455         return NXT_UNIT_OK;
1456     }
1457 
1458     if (req->content_fd != -1 || l == req->content_length) {
1459         lib->callbacks.request_handler(req);
1460     }
1461 
1462     return NXT_UNIT_OK;
1463 }
1464 
1465 
1466 static int
nxt_unit_request_check_response_port(nxt_unit_request_info_t * req,nxt_unit_port_id_t * port_id)1467 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1468     nxt_unit_port_id_t *port_id)
1469 {
1470     int                           res;
1471     nxt_unit_ctx_t                *ctx;
1472     nxt_unit_impl_t               *lib;
1473     nxt_unit_port_t               *port;
1474     nxt_unit_process_t            *process;
1475     nxt_unit_ctx_impl_t           *ctx_impl;
1476     nxt_unit_port_impl_t          *port_impl;
1477     nxt_unit_request_info_impl_t  *req_impl;
1478 
1479     ctx = req->ctx;
1480     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1481     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1482 
1483     pthread_mutex_lock(&lib->mutex);
1484 
1485     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1486     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1487 
1488     if (nxt_fast_path(port != NULL)) {
1489         req->response_port = port;
1490 
1491         if (nxt_fast_path(port_impl->ready)) {
1492             pthread_mutex_unlock(&lib->mutex);
1493 
1494             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1495                            (int) port->id.pid, (int) port->id.id);
1496 
1497             return NXT_UNIT_OK;
1498         }
1499 
1500         nxt_unit_debug(ctx, "check_response_port: "
1501                        "port{%d,%d} already requested",
1502                        (int) port->id.pid, (int) port->id.id);
1503 
1504         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1505 
1506         nxt_queue_insert_tail(&port_impl->awaiting_req,
1507                               &req_impl->port_wait_link);
1508 
1509         pthread_mutex_unlock(&lib->mutex);
1510 
1511         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1512 
1513         return NXT_UNIT_AGAIN;
1514     }
1515 
1516     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1517     if (nxt_slow_path(port_impl == NULL)) {
1518         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1519                        (int) sizeof(nxt_unit_port_impl_t));
1520 
1521         pthread_mutex_unlock(&lib->mutex);
1522 
1523         return NXT_UNIT_ERROR;
1524     }
1525 
1526     port = &port_impl->port;
1527 
1528     port->id = *port_id;
1529     port->in_fd = -1;
1530     port->out_fd = -1;
1531     port->data = NULL;
1532 
1533     res = nxt_unit_port_hash_add(&lib->ports, port);
1534     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1535         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1536                        port->id.pid, port->id.id);
1537 
1538         pthread_mutex_unlock(&lib->mutex);
1539 
1540         nxt_unit_free(ctx, port);
1541 
1542         return NXT_UNIT_ERROR;
1543     }
1544 
1545     process = nxt_unit_process_find(lib, port_id->pid, 0);
1546     if (nxt_slow_path(process == NULL)) {
1547         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1548                        port->id.pid);
1549 
1550         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1551 
1552         pthread_mutex_unlock(&lib->mutex);
1553 
1554         nxt_unit_free(ctx, port);
1555 
1556         return NXT_UNIT_ERROR;
1557     }
1558 
1559     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1560 
1561     port_impl->process = process;
1562     port_impl->queue = NULL;
1563     port_impl->from_socket = 0;
1564     port_impl->socket_rbuf = NULL;
1565 
1566     nxt_queue_init(&port_impl->awaiting_req);
1567 
1568     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1569 
1570     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1571 
1572     port_impl->use_count = 2;
1573     port_impl->ready = 0;
1574 
1575     req->response_port = port;
1576 
1577     pthread_mutex_unlock(&lib->mutex);
1578 
1579     res = nxt_unit_get_port(ctx, port_id);
1580     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1581         return NXT_UNIT_ERROR;
1582     }
1583 
1584     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1585 
1586     return NXT_UNIT_AGAIN;
1587 }
1588 
1589 
1590 static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t * req)1591 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1592 {
1593     ssize_t                       res;
1594     nxt_port_msg_t                msg;
1595     nxt_unit_impl_t               *lib;
1596     nxt_unit_ctx_impl_t           *ctx_impl;
1597     nxt_unit_request_info_impl_t  *req_impl;
1598 
1599     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1600     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1601     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1602 
1603     memset(&msg, 0, sizeof(nxt_port_msg_t));
1604 
1605     msg.stream = req_impl->stream;
1606     msg.pid = lib->pid;
1607     msg.reply_port = ctx_impl->read_port->id.id;
1608     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1609 
1610     res = nxt_unit_port_send(req->ctx, req->response_port,
1611                              &msg, sizeof(msg), NULL);
1612     if (nxt_slow_path(res != sizeof(msg))) {
1613         return NXT_UNIT_ERROR;
1614     }
1615 
1616     return NXT_UNIT_OK;
1617 }
1618 
1619 
1620 static int
nxt_unit_process_websocket(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1621 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1622 {
1623     size_t                           hsize;
1624     nxt_unit_impl_t                  *lib;
1625     nxt_unit_mmap_buf_t              *b;
1626     nxt_unit_callbacks_t             *cb;
1627     nxt_unit_request_info_t          *req;
1628     nxt_unit_request_info_impl_t     *req_impl;
1629     nxt_unit_websocket_frame_impl_t  *ws_impl;
1630 
1631     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1632     if (nxt_slow_path(req == NULL)) {
1633         return NXT_UNIT_OK;
1634     }
1635 
1636     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1637 
1638     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1639     cb = &lib->callbacks;
1640 
1641     if (cb->websocket_handler && recv_msg->size >= 2) {
1642         ws_impl = nxt_unit_websocket_frame_get(ctx);
1643         if (nxt_slow_path(ws_impl == NULL)) {
1644             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1645                           req_impl->stream);
1646 
1647             return NXT_UNIT_ERROR;
1648         }
1649 
1650         ws_impl->ws.req = req;
1651 
1652         ws_impl->buf = NULL;
1653 
1654         if (recv_msg->mmap) {
1655             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1656                 b->req = req;
1657             }
1658 
1659             /* "Move" incoming buffer list to ws_impl. */
1660             ws_impl->buf = recv_msg->incoming_buf;
1661             ws_impl->buf->prev = &ws_impl->buf;
1662             recv_msg->incoming_buf = NULL;
1663 
1664             b = ws_impl->buf;
1665 
1666         } else {
1667             b = nxt_unit_mmap_buf_get(ctx);
1668             if (nxt_slow_path(b == NULL)) {
1669                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1670                                req_impl->stream);
1671 
1672                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1673 
1674                 return NXT_UNIT_ERROR;
1675             }
1676 
1677             b->req = req;
1678             b->buf.start = recv_msg->start;
1679             b->buf.free = b->buf.start;
1680             b->buf.end = b->buf.start + recv_msg->size;
1681 
1682             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1683         }
1684 
1685         ws_impl->ws.header = (void *) b->buf.start;
1686         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1687             ws_impl->ws.header);
1688 
1689         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1690 
1691         if (ws_impl->ws.header->mask) {
1692             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1693 
1694         } else {
1695             ws_impl->ws.mask = NULL;
1696         }
1697 
1698         b->buf.free += hsize;
1699 
1700         ws_impl->ws.content_buf = &b->buf;
1701         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1702 
1703         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1704                            "payload_len=%"PRIu64,
1705                             ws_impl->ws.header->opcode,
1706                             ws_impl->ws.payload_len);
1707 
1708         cb->websocket_handler(&ws_impl->ws);
1709     }
1710 
1711     if (recv_msg->last) {
1712         if (cb->close_handler) {
1713             nxt_unit_req_debug(req, "close_handler");
1714 
1715             cb->close_handler(req);
1716 
1717         } else {
1718             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1719         }
1720     }
1721 
1722     return NXT_UNIT_OK;
1723 }
1724 
1725 
1726 static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t * ctx)1727 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1728 {
1729     nxt_unit_impl_t       *lib;
1730     nxt_unit_callbacks_t  *cb;
1731 
1732     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1733     cb = &lib->callbacks;
1734 
1735     if (cb->shm_ack_handler != NULL) {
1736         cb->shm_ack_handler(ctx);
1737     }
1738 
1739     return NXT_UNIT_OK;
1740 }
1741 
1742 
1743 static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t * ctx)1744 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1745 {
1746     nxt_unit_impl_t               *lib;
1747     nxt_queue_link_t              *lnk;
1748     nxt_unit_ctx_impl_t           *ctx_impl;
1749     nxt_unit_request_info_impl_t  *req_impl;
1750 
1751     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1752 
1753     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1754 
1755     pthread_mutex_lock(&ctx_impl->mutex);
1756 
1757     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1758         pthread_mutex_unlock(&ctx_impl->mutex);
1759 
1760         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1761                                         + lib->request_data_size);
1762         if (nxt_slow_path(req_impl == NULL)) {
1763             return NULL;
1764         }
1765 
1766         req_impl->req.unit = ctx->unit;
1767         req_impl->req.ctx = ctx;
1768 
1769         pthread_mutex_lock(&ctx_impl->mutex);
1770 
1771     } else {
1772         lnk = nxt_queue_first(&ctx_impl->free_req);
1773         nxt_queue_remove(lnk);
1774 
1775         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1776     }
1777 
1778     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1779 
1780     pthread_mutex_unlock(&ctx_impl->mutex);
1781 
1782     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1783 
1784     return req_impl;
1785 }
1786 
1787 
1788 static void
nxt_unit_request_info_release(nxt_unit_request_info_t * req)1789 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1790 {
1791     nxt_unit_ctx_t                *ctx;
1792     nxt_unit_ctx_impl_t           *ctx_impl;
1793     nxt_unit_request_info_impl_t  *req_impl;
1794 
1795     ctx = req->ctx;
1796     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1797     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1798 
1799     req->response = NULL;
1800     req->response_buf = NULL;
1801 
1802     if (req_impl->in_hash) {
1803         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1804     }
1805 
1806     while (req_impl->outgoing_buf != NULL) {
1807         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1808     }
1809 
1810     while (req_impl->incoming_buf != NULL) {
1811         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1812     }
1813 
1814     if (req->content_fd != -1) {
1815         nxt_unit_close(req->content_fd);
1816 
1817         req->content_fd = -1;
1818     }
1819 
1820     if (req->response_port != NULL) {
1821         nxt_unit_port_release(req->response_port);
1822 
1823         req->response_port = NULL;
1824     }
1825 
1826     req_impl->state = NXT_UNIT_RS_RELEASED;
1827 
1828     pthread_mutex_lock(&ctx_impl->mutex);
1829 
1830     nxt_queue_remove(&req_impl->link);
1831 
1832     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1833 
1834     pthread_mutex_unlock(&ctx_impl->mutex);
1835 
1836     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1837         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1838     }
1839 }
1840 
1841 
1842 static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t * req_impl)1843 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1844 {
1845     nxt_unit_ctx_impl_t  *ctx_impl;
1846 
1847     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1848 
1849     nxt_queue_remove(&req_impl->link);
1850 
1851     if (req_impl != &ctx_impl->req) {
1852         nxt_unit_free(&ctx_impl->ctx, req_impl);
1853     }
1854 }
1855 
1856 
1857 static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t * ctx)1858 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1859 {
1860     nxt_queue_link_t                 *lnk;
1861     nxt_unit_ctx_impl_t              *ctx_impl;
1862     nxt_unit_websocket_frame_impl_t  *ws_impl;
1863 
1864     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1865 
1866     pthread_mutex_lock(&ctx_impl->mutex);
1867 
1868     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1869         pthread_mutex_unlock(&ctx_impl->mutex);
1870 
1871         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1872         if (nxt_slow_path(ws_impl == NULL)) {
1873             return NULL;
1874         }
1875 
1876     } else {
1877         lnk = nxt_queue_first(&ctx_impl->free_ws);
1878         nxt_queue_remove(lnk);
1879 
1880         pthread_mutex_unlock(&ctx_impl->mutex);
1881 
1882         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1883     }
1884 
1885     ws_impl->ctx_impl = ctx_impl;
1886 
1887     return ws_impl;
1888 }
1889 
1890 
1891 static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t * ws)1892 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1893 {
1894     nxt_unit_websocket_frame_impl_t  *ws_impl;
1895 
1896     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1897 
1898     while (ws_impl->buf != NULL) {
1899         nxt_unit_mmap_buf_free(ws_impl->buf);
1900     }
1901 
1902     ws->req = NULL;
1903 
1904     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1905 
1906     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1907 
1908     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1909 }
1910 
1911 
1912 static void
nxt_unit_websocket_frame_free(nxt_unit_ctx_t * ctx,nxt_unit_websocket_frame_impl_t * ws_impl)1913 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1914     nxt_unit_websocket_frame_impl_t *ws_impl)
1915 {
1916     nxt_queue_remove(&ws_impl->link);
1917 
1918     nxt_unit_free(ctx, ws_impl);
1919 }
1920 
1921 
1922 uint16_t
nxt_unit_field_hash(const char * name,size_t name_length)1923 nxt_unit_field_hash(const char *name, size_t name_length)
1924 {
1925     u_char      ch;
1926     uint32_t    hash;
1927     const char  *p, *end;
1928 
1929     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1930     end = name + name_length;
1931 
1932     for (p = name; p < end; p++) {
1933         ch = *p;
1934         hash = (hash << 4) + hash + nxt_lowcase(ch);
1935     }
1936 
1937     hash = (hash >> 16) ^ hash;
1938 
1939     return hash;
1940 }
1941 
1942 
1943 void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t * req)1944 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1945 {
1946     char                *name;
1947     uint32_t            i, j;
1948     nxt_unit_field_t    *fields, f;
1949     nxt_unit_request_t  *r;
1950 
1951     static nxt_str_t  content_length = nxt_string("content-length");
1952     static nxt_str_t  content_type = nxt_string("content-type");
1953     static nxt_str_t  cookie = nxt_string("cookie");
1954 
1955     nxt_unit_req_debug(req, "group_dup_fields");
1956 
1957     r = req->request;
1958     fields = r->fields;
1959 
1960     for (i = 0; i < r->fields_count; i++) {
1961         name = nxt_unit_sptr_get(&fields[i].name);
1962 
1963         switch (fields[i].hash) {
1964         case NXT_UNIT_HASH_CONTENT_LENGTH:
1965             if (fields[i].name_length == content_length.length
1966                 && nxt_unit_memcasecmp(name, content_length.start,
1967                                        content_length.length) == 0)
1968             {
1969                 r->content_length_field = i;
1970             }
1971 
1972             break;
1973 
1974         case NXT_UNIT_HASH_CONTENT_TYPE:
1975             if (fields[i].name_length == content_type.length
1976                 && nxt_unit_memcasecmp(name, content_type.start,
1977                                        content_type.length) == 0)
1978             {
1979                 r->content_type_field = i;
1980             }
1981 
1982             break;
1983 
1984         case NXT_UNIT_HASH_COOKIE:
1985             if (fields[i].name_length == cookie.length
1986                 && nxt_unit_memcasecmp(name, cookie.start,
1987                                        cookie.length) == 0)
1988             {
1989                 r->cookie_field = i;
1990             }
1991 
1992             break;
1993         }
1994 
1995         for (j = i + 1; j < r->fields_count; j++) {
1996             if (fields[i].hash != fields[j].hash
1997                 || fields[i].name_length != fields[j].name_length
1998                 || nxt_unit_memcasecmp(name,
1999                                        nxt_unit_sptr_get(&fields[j].name),
2000                                        fields[j].name_length) != 0)
2001             {
2002                 continue;
2003             }
2004 
2005             f = fields[j];
2006             f.value.offset += (j - (i + 1)) * sizeof(f);
2007 
2008             while (j > i + 1) {
2009                 fields[j] = fields[j - 1];
2010                 fields[j].name.offset -= sizeof(f);
2011                 fields[j].value.offset -= sizeof(f);
2012                 j--;
2013             }
2014 
2015             fields[j] = f;
2016 
2017             /* Assign the same name pointer for further grouping simplicity. */
2018             nxt_unit_sptr_set(&fields[j].name, name);
2019 
2020             i++;
2021         }
2022     }
2023 }
2024 
2025 
2026 int
nxt_unit_response_init(nxt_unit_request_info_t * req,uint16_t status,uint32_t max_fields_count,uint32_t max_fields_size)2027 nxt_unit_response_init(nxt_unit_request_info_t *req,
2028     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2029 {
2030     uint32_t                      buf_size;
2031     nxt_unit_buf_t                *buf;
2032     nxt_unit_request_info_impl_t  *req_impl;
2033 
2034     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2035 
2036     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2037         nxt_unit_req_warn(req, "init: response already sent");
2038 
2039         return NXT_UNIT_ERROR;
2040     }
2041 
2042     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2043                        (int) max_fields_count, (int) max_fields_size);
2044 
2045     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2046         nxt_unit_req_debug(req, "duplicate response init");
2047     }
2048 
2049     /*
2050      * Each field name and value 0-terminated by libunit,
2051      * this is the reason of '+ 2' below.
2052      */
2053     buf_size = sizeof(nxt_unit_response_t)
2054                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2055                + max_fields_size;
2056 
2057     if (nxt_slow_path(req->response_buf != NULL)) {
2058         buf = req->response_buf;
2059 
2060         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2061             goto init_response;
2062         }
2063 
2064         nxt_unit_buf_free(buf);
2065 
2066         req->response_buf = NULL;
2067         req->response = NULL;
2068         req->response_max_fields = 0;
2069 
2070         req_impl->state = NXT_UNIT_RS_START;
2071     }
2072 
2073     buf = nxt_unit_response_buf_alloc(req, buf_size);
2074     if (nxt_slow_path(buf == NULL)) {
2075         return NXT_UNIT_ERROR;
2076     }
2077 
2078 init_response:
2079 
2080     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2081 
2082     req->response_buf = buf;
2083 
2084     req->response = (nxt_unit_response_t *) buf->start;
2085     req->response->status = status;
2086 
2087     buf->free = buf->start + sizeof(nxt_unit_response_t)
2088                 + max_fields_count * sizeof(nxt_unit_field_t);
2089 
2090     req->response_max_fields = max_fields_count;
2091     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2092 
2093     return NXT_UNIT_OK;
2094 }
2095 
2096 
2097 int
nxt_unit_response_realloc(nxt_unit_request_info_t * req,uint32_t max_fields_count,uint32_t max_fields_size)2098 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2099     uint32_t max_fields_count, uint32_t max_fields_size)
2100 {
2101     char                          *p;
2102     uint32_t                      i, buf_size;
2103     nxt_unit_buf_t                *buf;
2104     nxt_unit_field_t              *f, *src;
2105     nxt_unit_response_t           *resp;
2106     nxt_unit_request_info_impl_t  *req_impl;
2107 
2108     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2109 
2110     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2111         nxt_unit_req_warn(req, "realloc: response not init");
2112 
2113         return NXT_UNIT_ERROR;
2114     }
2115 
2116     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2117         nxt_unit_req_warn(req, "realloc: response already sent");
2118 
2119         return NXT_UNIT_ERROR;
2120     }
2121 
2122     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2123         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2124 
2125         return NXT_UNIT_ERROR;
2126     }
2127 
2128     /*
2129      * Each field name and value 0-terminated by libunit,
2130      * this is the reason of '+ 2' below.
2131      */
2132     buf_size = sizeof(nxt_unit_response_t)
2133                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2134                + max_fields_size;
2135 
2136     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2137 
2138     buf = nxt_unit_response_buf_alloc(req, buf_size);
2139     if (nxt_slow_path(buf == NULL)) {
2140         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2141         return NXT_UNIT_ERROR;
2142     }
2143 
2144     resp = (nxt_unit_response_t *) buf->start;
2145 
2146     memset(resp, 0, sizeof(nxt_unit_response_t));
2147 
2148     resp->status = req->response->status;
2149     resp->content_length = req->response->content_length;
2150 
2151     p = buf->start + sizeof(nxt_unit_response_t)
2152         + max_fields_count * sizeof(nxt_unit_field_t);
2153     f = resp->fields;
2154 
2155     for (i = 0; i < req->response->fields_count; i++) {
2156         src = req->response->fields + i;
2157 
2158         if (nxt_slow_path(src->skip != 0)) {
2159             continue;
2160         }
2161 
2162         if (nxt_slow_path(src->name_length + src->value_length + 2
2163                           > (uint32_t) (buf->end - p)))
2164         {
2165             nxt_unit_req_warn(req, "realloc: not enough space for field"
2166                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2167                   i, src, src->name_length, src->value_length);
2168 
2169             goto fail;
2170         }
2171 
2172         nxt_unit_sptr_set(&f->name, p);
2173         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2174         *p++ = '\0';
2175 
2176         nxt_unit_sptr_set(&f->value, p);
2177         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2178         *p++ = '\0';
2179 
2180         f->hash = src->hash;
2181         f->skip = 0;
2182         f->name_length = src->name_length;
2183         f->value_length = src->value_length;
2184 
2185         resp->fields_count++;
2186         f++;
2187     }
2188 
2189     if (req->response->piggyback_content_length > 0) {
2190         if (nxt_slow_path(req->response->piggyback_content_length
2191                           > (uint32_t) (buf->end - p)))
2192         {
2193             nxt_unit_req_warn(req, "realloc: not enought space for content"
2194                   " #%"PRIu32", %"PRIu32" required",
2195                   i, req->response->piggyback_content_length);
2196 
2197             goto fail;
2198         }
2199 
2200         resp->piggyback_content_length =
2201                                        req->response->piggyback_content_length;
2202 
2203         nxt_unit_sptr_set(&resp->piggyback_content, p);
2204         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2205                        req->response->piggyback_content_length);
2206     }
2207 
2208     buf->free = p;
2209 
2210     nxt_unit_buf_free(req->response_buf);
2211 
2212     req->response = resp;
2213     req->response_buf = buf;
2214     req->response_max_fields = max_fields_count;
2215 
2216     return NXT_UNIT_OK;
2217 
2218 fail:
2219 
2220     nxt_unit_buf_free(buf);
2221 
2222     return NXT_UNIT_ERROR;
2223 }
2224 
2225 
2226 int
nxt_unit_response_is_init(nxt_unit_request_info_t * req)2227 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2228 {
2229     nxt_unit_request_info_impl_t  *req_impl;
2230 
2231     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2232 
2233     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2234 }
2235 
2236 
2237 int
nxt_unit_response_add_field(nxt_unit_request_info_t * req,const char * name,uint8_t name_length,const char * value,uint32_t value_length)2238 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2239     const char *name, uint8_t name_length,
2240     const char *value, uint32_t value_length)
2241 {
2242     nxt_unit_buf_t                *buf;
2243     nxt_unit_field_t              *f;
2244     nxt_unit_response_t           *resp;
2245     nxt_unit_request_info_impl_t  *req_impl;
2246 
2247     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2248 
2249     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2250         nxt_unit_req_warn(req, "add_field: response not initialized or "
2251                           "already sent");
2252 
2253         return NXT_UNIT_ERROR;
2254     }
2255 
2256     resp = req->response;
2257 
2258     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2259         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2260                           (int) resp->fields_count);
2261 
2262         return NXT_UNIT_ERROR;
2263     }
2264 
2265     buf = req->response_buf;
2266 
2267     if (nxt_slow_path(name_length + value_length + 2
2268                       > (uint32_t) (buf->end - buf->free)))
2269     {
2270         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2271 
2272         return NXT_UNIT_ERROR;
2273     }
2274 
2275     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2276                        resp->fields_count,
2277                        (int) name_length, name,
2278                        (int) value_length, value);
2279 
2280     f = resp->fields + resp->fields_count;
2281 
2282     nxt_unit_sptr_set(&f->name, buf->free);
2283     buf->free = nxt_cpymem(buf->free, name, name_length);
2284     *buf->free++ = '\0';
2285 
2286     nxt_unit_sptr_set(&f->value, buf->free);
2287     buf->free = nxt_cpymem(buf->free, value, value_length);
2288     *buf->free++ = '\0';
2289 
2290     f->hash = nxt_unit_field_hash(name, name_length);
2291     f->skip = 0;
2292     f->name_length = name_length;
2293     f->value_length = value_length;
2294 
2295     resp->fields_count++;
2296 
2297     return NXT_UNIT_OK;
2298 }
2299 
2300 
2301 int
nxt_unit_response_add_content(nxt_unit_request_info_t * req,const void * src,uint32_t size)2302 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2303     const void* src, uint32_t size)
2304 {
2305     nxt_unit_buf_t                *buf;
2306     nxt_unit_response_t           *resp;
2307     nxt_unit_request_info_impl_t  *req_impl;
2308 
2309     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2310 
2311     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2312         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2313 
2314         return NXT_UNIT_ERROR;
2315     }
2316 
2317     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2318         nxt_unit_req_warn(req, "add_content: response already sent");
2319 
2320         return NXT_UNIT_ERROR;
2321     }
2322 
2323     buf = req->response_buf;
2324 
2325     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2326         nxt_unit_req_warn(req, "add_content: buffer overflow");
2327 
2328         return NXT_UNIT_ERROR;
2329     }
2330 
2331     resp = req->response;
2332 
2333     if (resp->piggyback_content_length == 0) {
2334         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2335         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2336     }
2337 
2338     resp->piggyback_content_length += size;
2339 
2340     buf->free = nxt_cpymem(buf->free, src, size);
2341 
2342     return NXT_UNIT_OK;
2343 }
2344 
2345 
2346 int
nxt_unit_response_send(nxt_unit_request_info_t * req)2347 nxt_unit_response_send(nxt_unit_request_info_t *req)
2348 {
2349     int                           rc;
2350     nxt_unit_mmap_buf_t           *mmap_buf;
2351     nxt_unit_request_info_impl_t  *req_impl;
2352 
2353     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2354 
2355     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2356         nxt_unit_req_warn(req, "send: response is not initialized yet");
2357 
2358         return NXT_UNIT_ERROR;
2359     }
2360 
2361     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2362         nxt_unit_req_warn(req, "send: response already sent");
2363 
2364         return NXT_UNIT_ERROR;
2365     }
2366 
2367     if (req->request->websocket_handshake && req->response->status == 101) {
2368         nxt_unit_response_upgrade(req);
2369     }
2370 
2371     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2372                        req->response->fields_count,
2373                        (int) (req->response_buf->free
2374                               - req->response_buf->start));
2375 
2376     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2377 
2378     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2379     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2380         req->response = NULL;
2381         req->response_buf = NULL;
2382         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2383 
2384         nxt_unit_mmap_buf_free(mmap_buf);
2385     }
2386 
2387     return rc;
2388 }
2389 
2390 
2391 int
nxt_unit_response_is_sent(nxt_unit_request_info_t * req)2392 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2393 {
2394     nxt_unit_request_info_impl_t  *req_impl;
2395 
2396     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2397 
2398     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2399 }
2400 
2401 
2402 nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t * req,uint32_t size)2403 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2404 {
2405     int                           rc;
2406     nxt_unit_mmap_buf_t           *mmap_buf;
2407     nxt_unit_request_info_impl_t  *req_impl;
2408 
2409     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2410         nxt_unit_req_warn(req, "response_buf_alloc: "
2411                           "requested buffer (%"PRIu32") too big", size);
2412 
2413         return NULL;
2414     }
2415 
2416     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2417 
2418     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2419 
2420     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2421     if (nxt_slow_path(mmap_buf == NULL)) {
2422         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2423 
2424         return NULL;
2425     }
2426 
2427     mmap_buf->req = req;
2428 
2429     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2430 
2431     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2432                                    size, size, mmap_buf,
2433                                    NULL);
2434     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2435         nxt_unit_mmap_buf_release(mmap_buf);
2436 
2437         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2438 
2439         return NULL;
2440     }
2441 
2442     return &mmap_buf->buf;
2443 }
2444 
2445 
2446 static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t * ctx)2447 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2448 {
2449     nxt_unit_mmap_buf_t  *mmap_buf;
2450     nxt_unit_ctx_impl_t  *ctx_impl;
2451 
2452     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2453 
2454     pthread_mutex_lock(&ctx_impl->mutex);
2455 
2456     if (ctx_impl->free_buf == NULL) {
2457         pthread_mutex_unlock(&ctx_impl->mutex);
2458 
2459         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2460         if (nxt_slow_path(mmap_buf == NULL)) {
2461             return NULL;
2462         }
2463 
2464     } else {
2465         mmap_buf = ctx_impl->free_buf;
2466 
2467         nxt_unit_mmap_buf_unlink(mmap_buf);
2468 
2469         pthread_mutex_unlock(&ctx_impl->mutex);
2470     }
2471 
2472     mmap_buf->ctx_impl = ctx_impl;
2473 
2474     mmap_buf->hdr = NULL;
2475     mmap_buf->free_ptr = NULL;
2476 
2477     return mmap_buf;
2478 }
2479 
2480 
2481 static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t * mmap_buf)2482 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2483 {
2484     nxt_unit_mmap_buf_unlink(mmap_buf);
2485 
2486     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2487 
2488     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2489 
2490     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2491 }
2492 
2493 
2494 int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t * req)2495 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2496 {
2497     return req->request->websocket_handshake;
2498 }
2499 
2500 
2501 int
nxt_unit_response_upgrade(nxt_unit_request_info_t * req)2502 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2503 {
2504     int                           rc;
2505     nxt_unit_request_info_impl_t  *req_impl;
2506 
2507     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2508 
2509     if (nxt_slow_path(req_impl->websocket != 0)) {
2510         nxt_unit_req_debug(req, "upgrade: already upgraded");
2511 
2512         return NXT_UNIT_OK;
2513     }
2514 
2515     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2516         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2517 
2518         return NXT_UNIT_ERROR;
2519     }
2520 
2521     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2522         nxt_unit_req_warn(req, "upgrade: response already sent");
2523 
2524         return NXT_UNIT_ERROR;
2525     }
2526 
2527     rc = nxt_unit_request_hash_add(req->ctx, req);
2528     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2529         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2530 
2531         return NXT_UNIT_ERROR;
2532     }
2533 
2534     req_impl->websocket = 1;
2535 
2536     req->response->status = 101;
2537 
2538     return NXT_UNIT_OK;
2539 }
2540 
2541 
2542 int
nxt_unit_response_is_websocket(nxt_unit_request_info_t * req)2543 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2544 {
2545     nxt_unit_request_info_impl_t  *req_impl;
2546 
2547     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2548 
2549     return req_impl->websocket;
2550 }
2551 
2552 
2553 nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void * data)2554 nxt_unit_get_request_info_from_data(void *data)
2555 {
2556     nxt_unit_request_info_impl_t  *req_impl;
2557 
2558     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2559 
2560     return &req_impl->req;
2561 }
2562 
2563 
2564 int
nxt_unit_buf_send(nxt_unit_buf_t * buf)2565 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2566 {
2567     int                           rc;
2568     nxt_unit_mmap_buf_t           *mmap_buf;
2569     nxt_unit_request_info_t       *req;
2570     nxt_unit_request_info_impl_t  *req_impl;
2571 
2572     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2573 
2574     req = mmap_buf->req;
2575     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2576 
2577     nxt_unit_req_debug(req, "buf_send: %d bytes",
2578                        (int) (buf->free - buf->start));
2579 
2580     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2581         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2582 
2583         return NXT_UNIT_ERROR;
2584     }
2585 
2586     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2587         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2588 
2589         return NXT_UNIT_ERROR;
2590     }
2591 
2592     if (nxt_fast_path(buf->free > buf->start)) {
2593         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2594         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2595             return rc;
2596         }
2597     }
2598 
2599     nxt_unit_mmap_buf_free(mmap_buf);
2600 
2601     return NXT_UNIT_OK;
2602 }
2603 
2604 
2605 static void
nxt_unit_buf_send_done(nxt_unit_buf_t * buf)2606 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2607 {
2608     int                      rc;
2609     nxt_unit_mmap_buf_t      *mmap_buf;
2610     nxt_unit_request_info_t  *req;
2611 
2612     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2613 
2614     req = mmap_buf->req;
2615 
2616     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2617     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2618         nxt_unit_mmap_buf_free(mmap_buf);
2619 
2620         nxt_unit_request_info_release(req);
2621 
2622     } else {
2623         nxt_unit_request_done(req, rc);
2624     }
2625 }
2626 
2627 
2628 static int
nxt_unit_mmap_buf_send(nxt_unit_request_info_t * req,nxt_unit_mmap_buf_t * mmap_buf,int last)2629 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2630     nxt_unit_mmap_buf_t *mmap_buf, int last)
2631 {
2632     struct {
2633         nxt_port_msg_t       msg;
2634         nxt_port_mmap_msg_t  mmap_msg;
2635     } m;
2636 
2637     int                           rc;
2638     u_char                        *last_used, *first_free;
2639     ssize_t                       res;
2640     nxt_chunk_id_t                first_free_chunk;
2641     nxt_unit_buf_t                *buf;
2642     nxt_unit_impl_t               *lib;
2643     nxt_port_mmap_header_t        *hdr;
2644     nxt_unit_request_info_impl_t  *req_impl;
2645 
2646     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2647     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2648 
2649     buf = &mmap_buf->buf;
2650     hdr = mmap_buf->hdr;
2651 
2652     m.mmap_msg.size = buf->free - buf->start;
2653 
2654     m.msg.stream = req_impl->stream;
2655     m.msg.pid = lib->pid;
2656     m.msg.reply_port = 0;
2657     m.msg.type = _NXT_PORT_MSG_DATA;
2658     m.msg.last = last != 0;
2659     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2660     m.msg.nf = 0;
2661     m.msg.mf = 0;
2662 
2663     rc = NXT_UNIT_ERROR;
2664 
2665     if (m.msg.mmap) {
2666         m.mmap_msg.mmap_id = hdr->id;
2667         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2668                                                      (u_char *) buf->start);
2669 
2670         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2671                        req_impl->stream,
2672                        (int) m.mmap_msg.mmap_id,
2673                        (int) m.mmap_msg.chunk_id,
2674                        (int) m.mmap_msg.size);
2675 
2676         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2677                                  NULL);
2678         if (nxt_slow_path(res != sizeof(m))) {
2679             goto free_buf;
2680         }
2681 
2682         last_used = (u_char *) buf->free - 1;
2683         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2684 
2685         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2686             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2687 
2688             buf->start = (char *) first_free;
2689             buf->free = buf->start;
2690 
2691             if (buf->end < buf->start) {
2692                 buf->end = buf->start;
2693             }
2694 
2695         } else {
2696             buf->start = NULL;
2697             buf->free = NULL;
2698             buf->end = NULL;
2699 
2700             mmap_buf->hdr = NULL;
2701         }
2702 
2703         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2704                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2705 
2706         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2707                        (int) lib->outgoing.allocated_chunks);
2708 
2709     } else {
2710         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2711                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2712         {
2713             nxt_unit_alert(req->ctx,
2714                            "#%"PRIu32": failed to send plain memory buffer"
2715                            ": no space reserved for message header",
2716                            req_impl->stream);
2717 
2718             goto free_buf;
2719         }
2720 
2721         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2722 
2723         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2724                        req_impl->stream,
2725                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2726 
2727         res = nxt_unit_port_send(req->ctx, req->response_port,
2728                                  buf->start - sizeof(m.msg),
2729                                  m.mmap_msg.size + sizeof(m.msg), NULL);
2730 
2731         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2732             goto free_buf;
2733         }
2734     }
2735 
2736     rc = NXT_UNIT_OK;
2737 
2738 free_buf:
2739 
2740     nxt_unit_free_outgoing_buf(mmap_buf);
2741 
2742     return rc;
2743 }
2744 
2745 
2746 void
nxt_unit_buf_free(nxt_unit_buf_t * buf)2747 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2748 {
2749     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2750 }
2751 
2752 
2753 static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t * mmap_buf)2754 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2755 {
2756     nxt_unit_free_outgoing_buf(mmap_buf);
2757 
2758     nxt_unit_mmap_buf_release(mmap_buf);
2759 }
2760 
2761 
2762 static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t * mmap_buf)2763 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2764 {
2765     if (mmap_buf->hdr != NULL) {
2766         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2767                               mmap_buf->hdr, mmap_buf->buf.start,
2768                               mmap_buf->buf.end - mmap_buf->buf.start);
2769 
2770         mmap_buf->hdr = NULL;
2771 
2772         return;
2773     }
2774 
2775     if (mmap_buf->free_ptr != NULL) {
2776         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2777 
2778         mmap_buf->free_ptr = NULL;
2779     }
2780 }
2781 
2782 
2783 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t * ctx)2784 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2785 {
2786     nxt_unit_ctx_impl_t  *ctx_impl;
2787     nxt_unit_read_buf_t  *rbuf;
2788 
2789     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2790 
2791     pthread_mutex_lock(&ctx_impl->mutex);
2792 
2793     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2794 
2795     pthread_mutex_unlock(&ctx_impl->mutex);
2796 
2797     rbuf->oob.size = 0;
2798 
2799     return rbuf;
2800 }
2801 
2802 
2803 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t * ctx_impl)2804 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2805 {
2806     nxt_queue_link_t     *link;
2807     nxt_unit_read_buf_t  *rbuf;
2808 
2809     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2810         link = nxt_queue_first(&ctx_impl->free_rbuf);
2811         nxt_queue_remove(link);
2812 
2813         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2814 
2815         return rbuf;
2816     }
2817 
2818     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2819 
2820     if (nxt_fast_path(rbuf != NULL)) {
2821         rbuf->ctx_impl = ctx_impl;
2822     }
2823 
2824     return rbuf;
2825 }
2826 
2827 
2828 static void
nxt_unit_read_buf_release(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)2829 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2830     nxt_unit_read_buf_t *rbuf)
2831 {
2832     nxt_unit_ctx_impl_t  *ctx_impl;
2833 
2834     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2835 
2836     pthread_mutex_lock(&ctx_impl->mutex);
2837 
2838     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2839 
2840     pthread_mutex_unlock(&ctx_impl->mutex);
2841 }
2842 
2843 
2844 nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t * buf)2845 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2846 {
2847     nxt_unit_mmap_buf_t  *mmap_buf;
2848 
2849     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2850 
2851     if (mmap_buf->next == NULL) {
2852         return NULL;
2853     }
2854 
2855     return &mmap_buf->next->buf;
2856 }
2857 
2858 
2859 uint32_t
nxt_unit_buf_max(void)2860 nxt_unit_buf_max(void)
2861 {
2862     return PORT_MMAP_DATA_SIZE;
2863 }
2864 
2865 
2866 uint32_t
nxt_unit_buf_min(void)2867 nxt_unit_buf_min(void)
2868 {
2869     return PORT_MMAP_CHUNK_SIZE;
2870 }
2871 
2872 
2873 int
nxt_unit_response_write(nxt_unit_request_info_t * req,const void * start,size_t size)2874 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2875     size_t size)
2876 {
2877     ssize_t  res;
2878 
2879     res = nxt_unit_response_write_nb(req, start, size, size);
2880 
2881     return res < 0 ? -res : NXT_UNIT_OK;
2882 }
2883 
2884 
2885 ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t * req,const void * start,size_t size,size_t min_size)2886 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2887     size_t size, size_t min_size)
2888 {
2889     int                           rc;
2890     ssize_t                       sent;
2891     uint32_t                      part_size, min_part_size, buf_size;
2892     const char                    *part_start;
2893     nxt_unit_mmap_buf_t           mmap_buf;
2894     nxt_unit_request_info_impl_t  *req_impl;
2895     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2896 
2897     nxt_unit_req_debug(req, "write: %d", (int) size);
2898 
2899     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2900 
2901     part_start = start;
2902     sent = 0;
2903 
2904     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2905         nxt_unit_req_alert(req, "write: response not initialized yet");
2906 
2907         return -NXT_UNIT_ERROR;
2908     }
2909 
2910     /* Check if response is not send yet. */
2911     if (nxt_slow_path(req->response_buf != NULL)) {
2912         part_size = req->response_buf->end - req->response_buf->free;
2913         part_size = nxt_min(size, part_size);
2914 
2915         rc = nxt_unit_response_add_content(req, part_start, part_size);
2916         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2917             return -rc;
2918         }
2919 
2920         rc = nxt_unit_response_send(req);
2921         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2922             return -rc;
2923         }
2924 
2925         size -= part_size;
2926         part_start += part_size;
2927         sent += part_size;
2928 
2929         min_size -= nxt_min(min_size, part_size);
2930     }
2931 
2932     while (size > 0) {
2933         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2934         min_part_size = nxt_min(min_size, part_size);
2935         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2936 
2937         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2938                                        min_part_size, &mmap_buf, local_buf);
2939         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2940             return -rc;
2941         }
2942 
2943         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2944         if (nxt_slow_path(buf_size == 0)) {
2945             return sent;
2946         }
2947         part_size = nxt_min(buf_size, part_size);
2948 
2949         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2950                                        part_start, part_size);
2951 
2952         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2953         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2954             return -rc;
2955         }
2956 
2957         size -= part_size;
2958         part_start += part_size;
2959         sent += part_size;
2960 
2961         min_size -= nxt_min(min_size, part_size);
2962     }
2963 
2964     return sent;
2965 }
2966 
2967 
2968 int
nxt_unit_response_write_cb(nxt_unit_request_info_t * req,nxt_unit_read_info_t * read_info)2969 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2970     nxt_unit_read_info_t *read_info)
2971 {
2972     int                           rc;
2973     ssize_t                       n;
2974     uint32_t                      buf_size;
2975     nxt_unit_buf_t                *buf;
2976     nxt_unit_mmap_buf_t           mmap_buf;
2977     nxt_unit_request_info_impl_t  *req_impl;
2978     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2979 
2980     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2981 
2982     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2983         nxt_unit_req_alert(req, "write: response not initialized yet");
2984 
2985         return NXT_UNIT_ERROR;
2986     }
2987 
2988     /* Check if response is not send yet. */
2989     if (nxt_slow_path(req->response_buf != NULL)) {
2990 
2991         /* Enable content in headers buf. */
2992         rc = nxt_unit_response_add_content(req, "", 0);
2993         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2994             nxt_unit_req_error(req, "Failed to add piggyback content");
2995 
2996             return rc;
2997         }
2998 
2999         buf = req->response_buf;
3000 
3001         while (buf->end - buf->free > 0) {
3002             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3003             if (nxt_slow_path(n < 0)) {
3004                 nxt_unit_req_error(req, "Read error");
3005 
3006                 return NXT_UNIT_ERROR;
3007             }
3008 
3009             /* Manually increase sizes. */
3010             buf->free += n;
3011             req->response->piggyback_content_length += n;
3012 
3013             if (read_info->eof) {
3014                 break;
3015             }
3016         }
3017 
3018         rc = nxt_unit_response_send(req);
3019         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3020             nxt_unit_req_error(req, "Failed to send headers with content");
3021 
3022             return rc;
3023         }
3024 
3025         if (read_info->eof) {
3026             return NXT_UNIT_OK;
3027         }
3028     }
3029 
3030     while (!read_info->eof) {
3031         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3032                            read_info->buf_size);
3033 
3034         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3035 
3036         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3037                                        buf_size, buf_size,
3038                                        &mmap_buf, local_buf);
3039         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3040             return rc;
3041         }
3042 
3043         buf = &mmap_buf.buf;
3044 
3045         while (!read_info->eof && buf->end > buf->free) {
3046             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3047             if (nxt_slow_path(n < 0)) {
3048                 nxt_unit_req_error(req, "Read error");
3049 
3050                 nxt_unit_free_outgoing_buf(&mmap_buf);
3051 
3052                 return NXT_UNIT_ERROR;
3053             }
3054 
3055             buf->free += n;
3056         }
3057 
3058         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3059         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3060             nxt_unit_req_error(req, "Failed to send content");
3061 
3062             return rc;
3063         }
3064     }
3065 
3066     return NXT_UNIT_OK;
3067 }
3068 
3069 
3070 ssize_t
nxt_unit_request_read(nxt_unit_request_info_t * req,void * dst,size_t size)3071 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3072 {
3073     ssize_t  buf_res, res;
3074 
3075     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3076                                 dst, size);
3077 
3078     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3079         res = read(req->content_fd, dst, size);
3080         if (nxt_slow_path(res < 0)) {
3081             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3082                                strerror(errno), errno);
3083 
3084             return res;
3085         }
3086 
3087         if (res < (ssize_t) size) {
3088             nxt_unit_close(req->content_fd);
3089 
3090             req->content_fd = -1;
3091         }
3092 
3093         req->content_length -= res;
3094 
3095         dst = nxt_pointer_to(dst, res);
3096 
3097     } else {
3098         res = 0;
3099     }
3100 
3101     return buf_res + res;
3102 }
3103 
3104 
3105 ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t * req,size_t max_size)3106 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3107 {
3108     char                 *p;
3109     size_t               l_size, b_size;
3110     nxt_unit_buf_t       *b;
3111     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3112 
3113     if (req->content_length == 0) {
3114         return 0;
3115     }
3116 
3117     l_size = 0;
3118 
3119     b = req->content_buf;
3120 
3121     while (b != NULL) {
3122         b_size = b->end - b->free;
3123         p = memchr(b->free, '\n', b_size);
3124 
3125         if (p != NULL) {
3126             p++;
3127             l_size += p - b->free;
3128             break;
3129         }
3130 
3131         l_size += b_size;
3132 
3133         if (max_size <= l_size) {
3134             break;
3135         }
3136 
3137         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3138         if (mmap_buf->next == NULL
3139             && req->content_fd != -1
3140             && l_size < req->content_length)
3141         {
3142             preread_buf = nxt_unit_request_preread(req, 16384);
3143             if (nxt_slow_path(preread_buf == NULL)) {
3144                 return -1;
3145             }
3146 
3147             nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf);
3148         }
3149 
3150         b = nxt_unit_buf_next(b);
3151     }
3152 
3153     return nxt_min(max_size, l_size);
3154 }
3155 
3156 
3157 static nxt_unit_mmap_buf_t *
nxt_unit_request_preread(nxt_unit_request_info_t * req,size_t size)3158 nxt_unit_request_preread(nxt_unit_request_info_t *req, size_t size)
3159 {
3160     ssize_t              res;
3161     nxt_unit_mmap_buf_t  *mmap_buf;
3162 
3163     if (req->content_fd == -1) {
3164         nxt_unit_req_alert(req, "preread: content_fd == -1");
3165         return NULL;
3166     }
3167 
3168     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
3169     if (nxt_slow_path(mmap_buf == NULL)) {
3170         nxt_unit_req_alert(req, "preread: failed to allocate buf");
3171         return NULL;
3172     }
3173 
3174     mmap_buf->free_ptr = nxt_unit_malloc(req->ctx, size);
3175     if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3176         nxt_unit_req_alert(req, "preread: failed to allocate buf memory");
3177         nxt_unit_mmap_buf_release(mmap_buf);
3178         return NULL;
3179     }
3180 
3181     mmap_buf->plain_ptr = mmap_buf->free_ptr;
3182 
3183     mmap_buf->hdr = NULL;
3184     mmap_buf->buf.start = mmap_buf->free_ptr;
3185     mmap_buf->buf.free = mmap_buf->buf.start;
3186     mmap_buf->buf.end = mmap_buf->buf.start + size;
3187 
3188     res = read(req->content_fd, mmap_buf->free_ptr, size);
3189     if (res < 0) {
3190         nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3191                            strerror(errno), errno);
3192 
3193         nxt_unit_mmap_buf_free(mmap_buf);
3194 
3195         return NULL;
3196     }
3197 
3198     if (res < (ssize_t) size) {
3199         nxt_unit_close(req->content_fd);
3200 
3201         req->content_fd = -1;
3202     }
3203 
3204     nxt_unit_req_debug(req, "preread: read %d", (int) res);
3205 
3206     mmap_buf->buf.end = mmap_buf->buf.free + res;
3207 
3208     return mmap_buf;
3209 }
3210 
3211 
3212 static ssize_t
nxt_unit_buf_read(nxt_unit_buf_t ** b,uint64_t * len,void * dst,size_t size)3213 nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
3214 {
3215     u_char          *p;
3216     size_t          rest, copy, read;
3217     nxt_unit_buf_t  *buf, *last_buf;
3218 
3219     p = dst;
3220     rest = size;
3221 
3222     buf = *b;
3223     last_buf = buf;
3224 
3225     while (buf != NULL) {
3226         last_buf = buf;
3227 
3228         copy = buf->end - buf->free;
3229         copy = nxt_min(rest, copy);
3230 
3231         p = nxt_cpymem(p, buf->free, copy);
3232 
3233         buf->free += copy;
3234         rest -= copy;
3235 
3236         if (rest == 0) {
3237             if (buf->end == buf->free) {
3238                 buf = nxt_unit_buf_next(buf);
3239             }
3240 
3241             break;
3242         }
3243 
3244         buf = nxt_unit_buf_next(buf);
3245     }
3246 
3247     *b = last_buf;
3248 
3249     read = size - rest;
3250 
3251     *len -= read;
3252 
3253     return read;
3254 }
3255 
3256 
3257 void
nxt_unit_request_done(nxt_unit_request_info_t * req,int rc)3258 nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
3259 {
3260     uint32_t                      size;
3261     nxt_port_msg_t                msg;
3262     nxt_unit_impl_t               *lib;
3263     nxt_unit_request_info_impl_t  *req_impl;
3264 
3265     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
3266 
3267     nxt_unit_req_debug(req, "done: %d", rc);
3268 
3269     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3270         goto skip_response_send;
3271     }
3272 
3273     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
3274 
3275         size = nxt_length("Content-Type") + nxt_length("text/plain");
3276 
3277         rc = nxt_unit_response_init(req, 200, 1, size);
3278         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3279             goto skip_response_send;
3280         }
3281 
3282         rc = nxt_unit_response_add_field(req, "Content-Type",
3283                                    nxt_length("Content-Type"),
3284                                    "text/plain", nxt_length("text/plain"));
3285         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3286             goto skip_response_send;
3287         }
3288     }
3289 
3290     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
3291 
3292         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
3293 
3294         nxt_unit_buf_send_done(req->response_buf);
3295 
3296         return;
3297     }
3298 
3299 skip_response_send:
3300 
3301     lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
3302 
3303     msg.stream = req_impl->stream;
3304     msg.pid = lib->pid;
3305     msg.reply_port = 0;
3306     msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
3307                                    : _NXT_PORT_MSG_RPC_ERROR;
3308     msg.last = 1;
3309     msg.mmap = 0;
3310     msg.nf = 0;
3311     msg.mf = 0;
3312 
3313     (void) nxt_unit_port_send(req->ctx, req->response_port,
3314                               &msg, sizeof(msg), NULL);
3315 
3316     nxt_unit_request_info_release(req);
3317 }
3318 
3319 
3320 int
nxt_unit_websocket_send(nxt_unit_request_info_t * req,uint8_t opcode,uint8_t last,const void * start,size_t size)3321 nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
3322     uint8_t last, const void *start, size_t size)
3323 {
3324     const struct iovec  iov = { (void *) start, size };
3325 
3326     return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
3327 }
3328 
3329 
3330 int
nxt_unit_websocket_sendv(nxt_unit_request_info_t * req,uint8_t opcode,uint8_t last,const struct iovec * iov,int iovcnt)3331 nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
3332     uint8_t last, const struct iovec *iov, int iovcnt)
3333 {
3334     int                     i, rc;
3335     size_t                  l, copy;
3336     uint32_t                payload_len, buf_size, alloc_size;
3337     const uint8_t           *b;
3338     nxt_unit_buf_t          *buf;
3339     nxt_unit_mmap_buf_t     mmap_buf;
3340     nxt_websocket_header_t  *wh;
3341     char                    local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
3342 
3343     payload_len = 0;
3344 
3345     for (i = 0; i < iovcnt; i++) {
3346         payload_len += iov[i].iov_len;
3347     }
3348 
3349     buf_size = 10 + payload_len;
3350     alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3351 
3352     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3353                                    alloc_size, alloc_size,
3354                                    &mmap_buf, local_buf);
3355     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3356         return rc;
3357     }
3358 
3359     buf = &mmap_buf.buf;
3360 
3361     buf->start[0] = 0;
3362     buf->start[1] = 0;
3363 
3364     buf_size -= buf->end - buf->start;
3365 
3366     wh = (void *) buf->free;
3367 
3368     buf->free = nxt_websocket_frame_init(wh, payload_len);
3369     wh->fin = last;
3370     wh->opcode = opcode;
3371 
3372     for (i = 0; i < iovcnt; i++) {
3373         b = iov[i].iov_base;
3374         l = iov[i].iov_len;
3375 
3376         while (l > 0) {
3377             copy = buf->end - buf->free;
3378             copy = nxt_min(l, copy);
3379 
3380             buf->free = nxt_cpymem(buf->free, b, copy);
3381             b += copy;
3382             l -= copy;
3383 
3384             if (l > 0) {
3385                 if (nxt_fast_path(buf->free > buf->start)) {
3386                     rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3387 
3388                     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3389                         return rc;
3390                     }
3391                 }
3392 
3393                 alloc_size = nxt_min(buf_size, PORT_MMAP_DATA_SIZE);
3394 
3395                 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3396                                                alloc_size, alloc_size,
3397                                                &mmap_buf, local_buf);
3398                 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3399                     return rc;
3400                 }
3401 
3402                 buf_size -= buf->end - buf->start;
3403             }
3404         }
3405     }
3406 
3407     if (buf->free > buf->start) {
3408         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3409     }
3410 
3411     return rc;
3412 }
3413 
3414 
3415 ssize_t
nxt_unit_websocket_read(nxt_unit_websocket_frame_t * ws,void * dst,size_t size)3416 nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
3417     size_t size)
3418 {
3419     ssize_t   res;
3420     uint8_t   *b;
3421     uint64_t  i, d;
3422 
3423     res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
3424                             dst, size);
3425 
3426     if (ws->mask == NULL) {
3427         return res;
3428     }
3429 
3430     b = dst;
3431     d = (ws->payload_len - ws->content_length - res) % 4;
3432 
3433     for (i = 0; i < (uint64_t) res; i++) {
3434         b[i] ^= ws->mask[ (i + d) % 4 ];
3435     }
3436 
3437     return res;
3438 }
3439 
3440 
3441 int
nxt_unit_websocket_retain(nxt_unit_websocket_frame_t * ws)3442 nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
3443 {
3444     char                             *b;
3445     size_t                           size, hsize;
3446     nxt_unit_websocket_frame_impl_t  *ws_impl;
3447 
3448     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
3449 
3450     if (ws_impl->buf->free_ptr != NULL || ws_impl->buf->hdr != NULL) {
3451         return NXT_UNIT_OK;
3452     }
3453 
3454     size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
3455 
3456     b = nxt_unit_malloc(ws->req->ctx, size);
3457     if (nxt_slow_path(b == NULL)) {
3458         return NXT_UNIT_ERROR;
3459     }
3460 
3461     memcpy(b, ws_impl->buf->buf.start, size);
3462 
3463     hsize = nxt_websocket_frame_header_size(b);
3464 
3465     ws_impl->buf->buf.start = b;
3466     ws_impl->buf->buf.free = b + hsize;
3467     ws_impl->buf->buf.end = b + size;
3468 
3469     ws_impl->buf->free_ptr = b;
3470 
3471     ws_impl->ws.header = (nxt_websocket_header_t *) b;
3472 
3473     if (ws_impl->ws.header->mask) {
3474         ws_impl->ws.mask = (uint8_t *) b + hsize - 4;
3475 
3476     } else {
3477         ws_impl->ws.mask = NULL;
3478     }
3479 
3480     return NXT_UNIT_OK;
3481 }
3482 
3483 
3484 void
nxt_unit_websocket_done(nxt_unit_websocket_frame_t * ws)3485 nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
3486 {
3487     nxt_unit_websocket_frame_release(ws);
3488 }
3489 
3490 
3491 static nxt_port_mmap_header_t *
nxt_unit_mmap_get(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_chunk_id_t * c,int * n,int min_n)3492 nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3493     nxt_chunk_id_t *c, int *n, int min_n)
3494 {
3495     int                     res, nchunks, i;
3496     uint32_t                outgoing_size;
3497     nxt_unit_mmap_t         *mm, *mm_end;
3498     nxt_unit_impl_t         *lib;
3499     nxt_port_mmap_header_t  *hdr;
3500 
3501     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3502 
3503     pthread_mutex_lock(&lib->outgoing.mutex);
3504 
3505 retry:
3506 
3507     outgoing_size = lib->outgoing.size;
3508 
3509     mm_end = lib->outgoing.elts + outgoing_size;
3510 
3511     for (mm = lib->outgoing.elts; mm < mm_end; mm++) {
3512         hdr = mm->hdr;
3513 
3514         if (hdr->sent_over != 0xFFFFu
3515             && (hdr->sent_over != port->id.id
3516                 || mm->src_thread != pthread_self()))
3517         {
3518             continue;
3519         }
3520 
3521         *c = 0;
3522 
3523         while (nxt_port_mmap_get_free_chunk(hdr->free_map, c)) {
3524             nchunks = 1;
3525 
3526             while (nchunks < *n) {
3527                 res = nxt_port_mmap_chk_set_chunk_busy(hdr->free_map,
3528                                                        *c + nchunks);
3529 
3530                 if (res == 0) {
3531                     if (nchunks >= min_n) {
3532                         *n = nchunks;
3533 
3534                         goto unlock;
3535                     }
3536 
3537                     for (i = 0; i < nchunks; i++) {
3538                         nxt_port_mmap_set_chunk_free(hdr->free_map, *c + i);
3539                     }
3540 
3541                     *c += nchunks + 1;
3542                     nchunks = 0;
3543                     break;
3544                 }
3545 
3546                 nchunks++;
3547             }
3548 
3549             if (nchunks >= min_n) {
3550                 *n = nchunks;
3551 
3552                 goto unlock;
3553             }
3554         }
3555 
3556         hdr->oosm = 1;
3557     }
3558 
3559     if (outgoing_size >= lib->shm_mmap_limit) {
3560         /* Cannot allocate more shared memory. */
3561         pthread_mutex_unlock(&lib->outgoing.mutex);
3562 
3563         if (min_n == 0) {
3564             *n = 0;
3565         }
3566 
3567         if (nxt_slow_path(lib->outgoing.allocated_chunks + min_n
3568                           >= lib->shm_mmap_limit * PORT_MMAP_CHUNK_COUNT))
3569         {
3570             /* Memory allocated by application, but not send to router. */
3571             return NULL;
3572         }
3573 
3574         /* Notify router about OOSM condition. */
3575 
3576         res = nxt_unit_send_oosm(ctx, port);
3577         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3578             return NULL;
3579         }
3580 
3581         /* Return if caller can handle OOSM condition. Non-blocking mode. */
3582 
3583         if (min_n == 0) {
3584             return NULL;
3585         }
3586 
3587         nxt_unit_debug(ctx, "oosm: waiting for ACK");
3588 
3589         res = nxt_unit_wait_shm_ack(ctx);
3590         if (nxt_slow_path(res != NXT_UNIT_OK)) {
3591             return NULL;
3592         }
3593 
3594         nxt_unit_debug(ctx, "oosm: retry");
3595 
3596         pthread_mutex_lock(&lib->outgoing.mutex);
3597 
3598         goto retry;
3599     }
3600 
3601     *c = 0;
3602     hdr = nxt_unit_new_mmap(ctx, port, *n);
3603 
3604 unlock:
3605 
3606     nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, *n);
3607 
3608     nxt_unit_debug(ctx, "allocated_chunks %d",
3609                    (int) lib->outgoing.allocated_chunks);
3610 
3611     pthread_mutex_unlock(&lib->outgoing.mutex);
3612 
3613     return hdr;
3614 }
3615 
3616 
3617 static int
nxt_unit_send_oosm(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)3618 nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
3619 {
3620     ssize_t          res;
3621     nxt_port_msg_t   msg;
3622     nxt_unit_impl_t  *lib;
3623 
3624     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3625 
3626     msg.stream = 0;
3627     msg.pid = lib->pid;
3628     msg.reply_port = 0;
3629     msg.type = _NXT_PORT_MSG_OOSM;
3630     msg.last = 0;
3631     msg.mmap = 0;
3632     msg.nf = 0;
3633     msg.mf = 0;
3634 
3635     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
3636     if (nxt_slow_path(res != sizeof(msg))) {
3637         return NXT_UNIT_ERROR;
3638     }
3639 
3640     return NXT_UNIT_OK;
3641 }
3642 
3643 
3644 static int
nxt_unit_wait_shm_ack(nxt_unit_ctx_t * ctx)3645 nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx)
3646 {
3647     int                  res;
3648     nxt_unit_ctx_impl_t  *ctx_impl;
3649     nxt_unit_read_buf_t  *rbuf;
3650 
3651     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3652 
3653     while (1) {
3654         rbuf = nxt_unit_read_buf_get(ctx);
3655         if (nxt_slow_path(rbuf == NULL)) {
3656             return NXT_UNIT_ERROR;
3657         }
3658 
3659         do {
3660             res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
3661         } while (res == NXT_UNIT_AGAIN);
3662 
3663         if (res == NXT_UNIT_ERROR) {
3664             nxt_unit_read_buf_release(ctx, rbuf);
3665 
3666             return NXT_UNIT_ERROR;
3667         }
3668 
3669         if (nxt_unit_is_shm_ack(rbuf)) {
3670             nxt_unit_read_buf_release(ctx, rbuf);
3671             break;
3672         }
3673 
3674         pthread_mutex_lock(&ctx_impl->mutex);
3675 
3676         nxt_queue_insert_tail(&ctx_impl->pending_rbuf, &rbuf->link);
3677 
3678         pthread_mutex_unlock(&ctx_impl->mutex);
3679 
3680         if (nxt_unit_is_quit(rbuf)) {
3681             nxt_unit_debug(ctx, "oosm: quit received");
3682 
3683             return NXT_UNIT_ERROR;
3684         }
3685     }
3686 
3687     return NXT_UNIT_OK;
3688 }
3689 
3690 
3691 static nxt_unit_mmap_t *
nxt_unit_mmap_at(nxt_unit_mmaps_t * mmaps,uint32_t i)3692 nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i)
3693 {
3694     uint32_t         cap, n;
3695     nxt_unit_mmap_t  *e;
3696 
3697     if (nxt_fast_path(mmaps->size > i)) {
3698         return mmaps->elts + i;
3699     }
3700 
3701     cap = mmaps->cap;
3702 
3703     if (cap == 0) {
3704         cap = i + 1;
3705     }
3706 
3707     while (i + 1 > cap) {
3708 
3709         if (cap < 16) {
3710             cap = cap * 2;
3711 
3712         } else {
3713             cap = cap + cap / 2;
3714         }
3715     }
3716 
3717     if (cap != mmaps->cap) {
3718 
3719         e = realloc(mmaps->elts, cap * sizeof(nxt_unit_mmap_t));
3720         if (nxt_slow_path(e == NULL)) {
3721             return NULL;
3722         }
3723 
3724         mmaps->elts = e;
3725 
3726         for (n = mmaps->cap; n < cap; n++) {
3727             e = mmaps->elts + n;
3728 
3729             e->hdr = NULL;
3730             nxt_queue_init(&e->awaiting_rbuf);
3731         }
3732 
3733         mmaps->cap = cap;
3734     }
3735 
3736     if (i + 1 > mmaps->size) {
3737         mmaps->size = i + 1;
3738     }
3739 
3740     return mmaps->elts + i;
3741 }
3742 
3743 
3744 static nxt_port_mmap_header_t *
nxt_unit_new_mmap(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,int n)3745 nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int n)
3746 {
3747     int                     i, fd, rc;
3748     void                    *mem;
3749     nxt_unit_mmap_t         *mm;
3750     nxt_unit_impl_t         *lib;
3751     nxt_port_mmap_header_t  *hdr;
3752 
3753     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3754 
3755     mm = nxt_unit_mmap_at(&lib->outgoing, lib->outgoing.size);
3756     if (nxt_slow_path(mm == NULL)) {
3757         nxt_unit_alert(ctx, "failed to add mmap to outgoing array");
3758 
3759         return NULL;
3760     }
3761 
3762     fd = nxt_unit_shm_open(ctx, PORT_MMAP_SIZE);
3763     if (nxt_slow_path(fd == -1)) {
3764         goto remove_fail;
3765     }
3766 
3767     mem = mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
3768     if (nxt_slow_path(mem == MAP_FAILED)) {
3769         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", fd,
3770                        strerror(errno), errno);
3771 
3772         nxt_unit_close(fd);
3773 
3774         goto remove_fail;
3775     }
3776 
3777     mm->hdr = mem;
3778     hdr = mem;
3779 
3780     memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
3781     memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
3782 
3783     hdr->id = lib->outgoing.size - 1;
3784     hdr->src_pid = lib->pid;
3785     hdr->dst_pid = port->id.pid;
3786     hdr->sent_over = port->id.id;
3787     mm->src_thread = pthread_self();
3788 
3789     /* Mark first n chunk(s) as busy */
3790     for (i = 0; i < n; i++) {
3791         nxt_port_mmap_set_chunk_busy(hdr->free_map, i);
3792     }
3793 
3794     /* Mark as busy chunk followed the last available chunk. */
3795     nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
3796     nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
3797 
3798     pthread_mutex_unlock(&lib->outgoing.mutex);
3799 
3800     rc = nxt_unit_send_mmap(ctx, port, fd);
3801     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3802         munmap(mem, PORT_MMAP_SIZE);
3803         hdr = NULL;
3804 
3805     } else {
3806         nxt_unit_debug(ctx, "new mmap #%"PRIu32" created for %d -> %d",
3807                        hdr->id, (int) lib->pid, (int) port->id.pid);
3808     }
3809 
3810     nxt_unit_close(fd);
3811 
3812     pthread_mutex_lock(&lib->outgoing.mutex);
3813 
3814     if (nxt_fast_path(hdr != NULL)) {
3815         return hdr;
3816     }
3817 
3818 remove_fail:
3819 
3820     lib->outgoing.size--;
3821 
3822     return NULL;
3823 }
3824 
3825 
3826 static int
nxt_unit_shm_open(nxt_unit_ctx_t * ctx,size_t size)3827 nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size)
3828 {
3829     int              fd;
3830 
3831 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
3832     char             name[64];
3833     nxt_unit_impl_t  *lib;
3834 
3835     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3836     snprintf(name, sizeof(name), NXT_SHM_PREFIX "unit.%d.%p",
3837              lib->pid, (void *) (uintptr_t) pthread_self());
3838 #endif
3839 
3840 #if (NXT_HAVE_MEMFD_CREATE)
3841 
3842     fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
3843     if (nxt_slow_path(fd == -1)) {
3844         nxt_unit_alert(ctx, "memfd_create(%s) failed: %s (%d)", name,
3845                        strerror(errno), errno);
3846 
3847         return -1;
3848     }
3849 
3850     nxt_unit_debug(ctx, "memfd_create(%s): %d", name, fd);
3851 
3852 #elif (NXT_HAVE_SHM_OPEN_ANON)
3853 
3854     fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
3855     if (nxt_slow_path(fd == -1)) {
3856         nxt_unit_alert(ctx, "shm_open(SHM_ANON) failed: %s (%d)",
3857                        strerror(errno), errno);
3858 
3859         return -1;
3860     }
3861 
3862 #elif (NXT_HAVE_SHM_OPEN)
3863 
3864     /* Just in case. */
3865     shm_unlink(name);
3866 
3867     fd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
3868     if (nxt_slow_path(fd == -1)) {
3869         nxt_unit_alert(ctx, "shm_open(%s) failed: %s (%d)", name,
3870                        strerror(errno), errno);
3871 
3872         return -1;
3873     }
3874 
3875     if (nxt_slow_path(shm_unlink(name) == -1)) {
3876         nxt_unit_alert(ctx, "shm_unlink(%s) failed: %s (%d)", name,
3877                        strerror(errno), errno);
3878     }
3879 
3880 #else
3881 
3882 #error No working shared memory implementation.
3883 
3884 #endif
3885 
3886     if (nxt_slow_path(ftruncate(fd, size) == -1)) {
3887         nxt_unit_alert(ctx, "ftruncate(%d) failed: %s (%d)", fd,
3888                        strerror(errno), errno);
3889 
3890         nxt_unit_close(fd);
3891 
3892         return -1;
3893     }
3894 
3895     return fd;
3896 }
3897 
3898 
3899 static int
nxt_unit_send_mmap(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,int fd)3900 nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
3901 {
3902     ssize_t          res;
3903     nxt_send_oob_t   oob;
3904     nxt_port_msg_t   msg;
3905     nxt_unit_impl_t  *lib;
3906     int              fds[2] = {fd, -1};
3907 
3908     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3909 
3910     msg.stream = 0;
3911     msg.pid = lib->pid;
3912     msg.reply_port = 0;
3913     msg.type = _NXT_PORT_MSG_MMAP;
3914     msg.last = 0;
3915     msg.mmap = 0;
3916     msg.nf = 0;
3917     msg.mf = 0;
3918 
3919     nxt_socket_msg_oob_init(&oob, fds);
3920 
3921     res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &oob);
3922     if (nxt_slow_path(res != sizeof(msg))) {
3923         return NXT_UNIT_ERROR;
3924     }
3925 
3926     return NXT_UNIT_OK;
3927 }
3928 
3929 
3930 static int
nxt_unit_get_outgoing_buf(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,uint32_t size,uint32_t min_size,nxt_unit_mmap_buf_t * mmap_buf,char * local_buf)3931 nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
3932     uint32_t size, uint32_t min_size,
3933     nxt_unit_mmap_buf_t *mmap_buf, char *local_buf)
3934 {
3935     int                     nchunks, min_nchunks;
3936     nxt_chunk_id_t          c;
3937     nxt_port_mmap_header_t  *hdr;
3938 
3939     if (size <= NXT_UNIT_MAX_PLAIN_SIZE) {
3940         if (local_buf != NULL) {
3941             mmap_buf->free_ptr = NULL;
3942             mmap_buf->plain_ptr = local_buf;
3943 
3944         } else {
3945             mmap_buf->free_ptr = nxt_unit_malloc(ctx,
3946                                                  size + sizeof(nxt_port_msg_t));
3947             if (nxt_slow_path(mmap_buf->free_ptr == NULL)) {
3948                 return NXT_UNIT_ERROR;
3949             }
3950 
3951             mmap_buf->plain_ptr = mmap_buf->free_ptr;
3952         }
3953 
3954         mmap_buf->hdr = NULL;
3955         mmap_buf->buf.start = mmap_buf->plain_ptr + sizeof(nxt_port_msg_t);
3956         mmap_buf->buf.free = mmap_buf->buf.start;
3957         mmap_buf->buf.end = mmap_buf->buf.start + size;
3958 
3959         nxt_unit_debug(ctx, "outgoing plain buffer allocation: (%p, %d)",
3960                        mmap_buf->buf.start, (int) size);
3961 
3962         return NXT_UNIT_OK;
3963     }
3964 
3965     nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3966     min_nchunks = (min_size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
3967 
3968     hdr = nxt_unit_mmap_get(ctx, port, &c, &nchunks, min_nchunks);
3969     if (nxt_slow_path(hdr == NULL)) {
3970         if (nxt_fast_path(min_nchunks == 0 && nchunks == 0)) {
3971             mmap_buf->hdr = NULL;
3972             mmap_buf->buf.start = NULL;
3973             mmap_buf->buf.free = NULL;
3974             mmap_buf->buf.end = NULL;
3975             mmap_buf->free_ptr = NULL;
3976 
3977             return NXT_UNIT_OK;
3978         }
3979 
3980         return NXT_UNIT_ERROR;
3981     }
3982 
3983     mmap_buf->hdr = hdr;
3984     mmap_buf->buf.start = (char *) nxt_port_mmap_chunk_start(hdr, c);
3985     mmap_buf->buf.free = mmap_buf->buf.start;
3986     mmap_buf->buf.end = mmap_buf->buf.start + nchunks * PORT_MMAP_CHUNK_SIZE;
3987     mmap_buf->free_ptr = NULL;
3988     mmap_buf->ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3989 
3990     nxt_unit_debug(ctx, "outgoing mmap allocation: (%d,%d,%d)",
3991                   (int) hdr->id, (int) c,
3992                   (int) (nchunks * PORT_MMAP_CHUNK_SIZE));
3993 
3994     return NXT_UNIT_OK;
3995 }
3996 
3997 
3998 static int
nxt_unit_incoming_mmap(nxt_unit_ctx_t * ctx,pid_t pid,int fd)3999 nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
4000 {
4001     int                     rc;
4002     void                    *mem;
4003     nxt_queue_t             awaiting_rbuf;
4004     struct stat             mmap_stat;
4005     nxt_unit_mmap_t         *mm;
4006     nxt_unit_impl_t         *lib;
4007     nxt_unit_ctx_impl_t     *ctx_impl;
4008     nxt_unit_read_buf_t     *rbuf;
4009     nxt_port_mmap_header_t  *hdr;
4010 
4011     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4012 
4013     nxt_unit_debug(ctx, "incoming_mmap: fd %d from process %d", fd, (int) pid);
4014 
4015     if (fstat(fd, &mmap_stat) == -1) {
4016         nxt_unit_alert(ctx, "incoming_mmap: fstat(%d) failed: %s (%d)", fd,
4017                        strerror(errno), errno);
4018 
4019         return NXT_UNIT_ERROR;
4020     }
4021 
4022     mem = mmap(NULL, mmap_stat.st_size, PROT_READ | PROT_WRITE,
4023                MAP_SHARED, fd, 0);
4024     if (nxt_slow_path(mem == MAP_FAILED)) {
4025         nxt_unit_alert(ctx, "incoming_mmap: mmap() failed: %s (%d)",
4026                        strerror(errno), errno);
4027 
4028         return NXT_UNIT_ERROR;
4029     }
4030 
4031     hdr = mem;
4032 
4033     if (nxt_slow_path(hdr->src_pid != pid)) {
4034 
4035         nxt_unit_alert(ctx, "incoming_mmap: unexpected pid in mmap header "
4036                        "detected: %d != %d or %d != %d", (int) hdr->src_pid,
4037                        (int) pid, (int) hdr->dst_pid, (int) lib->pid);
4038 
4039         munmap(mem, PORT_MMAP_SIZE);
4040 
4041         return NXT_UNIT_ERROR;
4042     }
4043 
4044     nxt_queue_init(&awaiting_rbuf);
4045 
4046     pthread_mutex_lock(&lib->incoming.mutex);
4047 
4048     mm = nxt_unit_mmap_at(&lib->incoming, hdr->id);
4049     if (nxt_slow_path(mm == NULL)) {
4050         nxt_unit_alert(ctx, "incoming_mmap: failed to add to incoming array");
4051 
4052         munmap(mem, PORT_MMAP_SIZE);
4053 
4054         rc = NXT_UNIT_ERROR;
4055 
4056     } else {
4057         mm->hdr = hdr;
4058 
4059         hdr->sent_over = 0xFFFFu;
4060 
4061         nxt_queue_add(&awaiting_rbuf, &mm->awaiting_rbuf);
4062         nxt_queue_init(&mm->awaiting_rbuf);
4063 
4064         rc = NXT_UNIT_OK;
4065     }
4066 
4067     pthread_mutex_unlock(&lib->incoming.mutex);
4068 
4069     nxt_queue_each(rbuf, &awaiting_rbuf, nxt_unit_read_buf_t, link) {
4070 
4071         ctx_impl = rbuf->ctx_impl;
4072 
4073         pthread_mutex_lock(&ctx_impl->mutex);
4074 
4075         nxt_queue_insert_head(&ctx_impl->pending_rbuf, &rbuf->link);
4076 
4077         pthread_mutex_unlock(&ctx_impl->mutex);
4078 
4079         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
4080 
4081         nxt_unit_awake_ctx(ctx, ctx_impl);
4082 
4083     } nxt_queue_loop;
4084 
4085     return rc;
4086 }
4087 
4088 
4089 static void
nxt_unit_awake_ctx(nxt_unit_ctx_t * ctx,nxt_unit_ctx_impl_t * ctx_impl)4090 nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
4091 {
4092     nxt_port_msg_t  msg;
4093 
4094     if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
4095         return;
4096     }
4097 
4098     if (nxt_slow_path(ctx_impl->read_port == NULL
4099                       || ctx_impl->read_port->out_fd == -1))
4100     {
4101         nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
4102 
4103         return;
4104     }
4105 
4106     memset(&msg, 0, sizeof(nxt_port_msg_t));
4107 
4108     msg.type = _NXT_PORT_MSG_RPC_READY;
4109 
4110     (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
4111                               &msg, sizeof(msg), NULL);
4112 }
4113 
4114 
4115 static int
nxt_unit_mmaps_init(nxt_unit_mmaps_t * mmaps)4116 nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
4117 {
4118     mmaps->size = 0;
4119     mmaps->cap = 0;
4120     mmaps->elts = NULL;
4121     mmaps->allocated_chunks = 0;
4122 
4123     return pthread_mutex_init(&mmaps->mutex, NULL);
4124 }
4125 
4126 
4127 nxt_inline void
nxt_unit_process_use(nxt_unit_process_t * process)4128 nxt_unit_process_use(nxt_unit_process_t *process)
4129 {
4130     nxt_atomic_fetch_add(&process->use_count, 1);
4131 }
4132 
4133 
4134 nxt_inline void
nxt_unit_process_release(nxt_unit_process_t * process)4135 nxt_unit_process_release(nxt_unit_process_t *process)
4136 {
4137     long c;
4138 
4139     c = nxt_atomic_fetch_add(&process->use_count, -1);
4140 
4141     if (c == 1) {
4142         nxt_unit_debug(NULL, "destroy process #%d", (int) process->pid);
4143 
4144         nxt_unit_free(NULL, process);
4145     }
4146 }
4147 
4148 
4149 static void
nxt_unit_mmaps_destroy(nxt_unit_mmaps_t * mmaps)4150 nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps)
4151 {
4152     nxt_unit_mmap_t  *mm, *end;
4153 
4154     if (mmaps->elts != NULL) {
4155         end = mmaps->elts + mmaps->size;
4156 
4157         for (mm = mmaps->elts; mm < end; mm++) {
4158             munmap(mm->hdr, PORT_MMAP_SIZE);
4159         }
4160 
4161         nxt_unit_free(NULL, mmaps->elts);
4162     }
4163 
4164     pthread_mutex_destroy(&mmaps->mutex);
4165 }
4166 
4167 
4168 static int
nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t * ctx,nxt_unit_mmaps_t * mmaps,pid_t pid,uint32_t id,nxt_port_mmap_header_t ** hdr,nxt_unit_read_buf_t * rbuf)4169 nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, nxt_unit_mmaps_t *mmaps,
4170     pid_t pid, uint32_t id, nxt_port_mmap_header_t **hdr,
4171     nxt_unit_read_buf_t *rbuf)
4172 {
4173     int                  res, need_rbuf;
4174     nxt_unit_mmap_t      *mm;
4175     nxt_unit_ctx_impl_t  *ctx_impl;
4176 
4177     mm = nxt_unit_mmap_at(mmaps, id);
4178     if (nxt_slow_path(mm == NULL)) {
4179         nxt_unit_alert(ctx, "failed to allocate mmap");
4180 
4181         pthread_mutex_unlock(&mmaps->mutex);
4182 
4183         *hdr = NULL;
4184 
4185         return NXT_UNIT_ERROR;
4186     }
4187 
4188     *hdr = mm->hdr;
4189 
4190     if (nxt_fast_path(*hdr != NULL)) {
4191         return NXT_UNIT_OK;
4192     }
4193 
4194     need_rbuf = nxt_queue_is_empty(&mm->awaiting_rbuf);
4195 
4196     nxt_queue_insert_tail(&mm->awaiting_rbuf, &rbuf->link);
4197 
4198     pthread_mutex_unlock(&mmaps->mutex);
4199 
4200     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4201 
4202     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
4203 
4204     if (need_rbuf) {
4205         res = nxt_unit_get_mmap(ctx, pid, id);
4206         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
4207             return NXT_UNIT_ERROR;
4208         }
4209     }
4210 
4211     return NXT_UNIT_AGAIN;
4212 }
4213 
4214 
4215 static int
nxt_unit_mmap_read(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_read_buf_t * rbuf)4216 nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
4217     nxt_unit_read_buf_t *rbuf)
4218 {
4219     int                     res;
4220     void                    *start;
4221     uint32_t                size;
4222     nxt_unit_impl_t         *lib;
4223     nxt_unit_mmaps_t        *mmaps;
4224     nxt_unit_mmap_buf_t     *b, **incoming_tail;
4225     nxt_port_mmap_msg_t     *mmap_msg, *end;
4226     nxt_port_mmap_header_t  *hdr;
4227 
4228     if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
4229         nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
4230                       recv_msg->stream, (int) recv_msg->size);
4231 
4232         return NXT_UNIT_ERROR;
4233     }
4234 
4235     mmap_msg = recv_msg->start;
4236     end = nxt_pointer_to(recv_msg->start, recv_msg->size);
4237 
4238     incoming_tail = &recv_msg->incoming_buf;
4239 
4240     /* Allocating buffer structures. */
4241     for (; mmap_msg < end; mmap_msg++) {
4242         b = nxt_unit_mmap_buf_get(ctx);
4243         if (nxt_slow_path(b == NULL)) {
4244             nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
4245                           recv_msg->stream);
4246 
4247             while (recv_msg->incoming_buf != NULL) {
4248                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4249             }
4250 
4251             return NXT_UNIT_ERROR;
4252         }
4253 
4254         nxt_unit_mmap_buf_insert(incoming_tail, b);
4255         incoming_tail = &b->next;
4256     }
4257 
4258     b = recv_msg->incoming_buf;
4259     mmap_msg = recv_msg->start;
4260 
4261     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4262 
4263     mmaps = &lib->incoming;
4264 
4265     pthread_mutex_lock(&mmaps->mutex);
4266 
4267     for (; mmap_msg < end; mmap_msg++) {
4268         res = nxt_unit_check_rbuf_mmap(ctx, mmaps,
4269                                        recv_msg->pid, mmap_msg->mmap_id,
4270                                        &hdr, rbuf);
4271 
4272         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4273             while (recv_msg->incoming_buf != NULL) {
4274                 nxt_unit_mmap_buf_release(recv_msg->incoming_buf);
4275             }
4276 
4277             return res;
4278         }
4279 
4280         start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
4281         size = mmap_msg->size;
4282 
4283         if (recv_msg->start == mmap_msg) {
4284             recv_msg->start = start;
4285             recv_msg->size = size;
4286         }
4287 
4288         b->buf.start = start;
4289         b->buf.free = start;
4290         b->buf.end = b->buf.start + size;
4291         b->hdr = hdr;
4292 
4293         b = b->next;
4294 
4295         nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
4296                        recv_msg->stream,
4297                        start, (int) size,
4298                        (int) hdr->src_pid, (int) hdr->dst_pid,
4299                        (int) hdr->id, (int) mmap_msg->chunk_id,
4300                        (int) mmap_msg->size);
4301     }
4302 
4303     pthread_mutex_unlock(&mmaps->mutex);
4304 
4305     return NXT_UNIT_OK;
4306 }
4307 
4308 
4309 static int
nxt_unit_get_mmap(nxt_unit_ctx_t * ctx,pid_t pid,uint32_t id)4310 nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
4311 {
4312     ssize_t              res;
4313     nxt_unit_impl_t      *lib;
4314     nxt_unit_ctx_impl_t  *ctx_impl;
4315 
4316     struct {
4317         nxt_port_msg_t           msg;
4318         nxt_port_msg_get_mmap_t  get_mmap;
4319     } m;
4320 
4321     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4322     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4323 
4324     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
4325 
4326     m.msg.pid = lib->pid;
4327     m.msg.reply_port = ctx_impl->read_port->id.id;
4328     m.msg.type = _NXT_PORT_MSG_GET_MMAP;
4329 
4330     m.get_mmap.id = id;
4331 
4332     nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
4333 
4334     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
4335     if (nxt_slow_path(res != sizeof(m))) {
4336         return NXT_UNIT_ERROR;
4337     }
4338 
4339     return NXT_UNIT_OK;
4340 }
4341 
4342 
4343 static void
nxt_unit_mmap_release(nxt_unit_ctx_t * ctx,nxt_port_mmap_header_t * hdr,void * start,uint32_t size)4344 nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, nxt_port_mmap_header_t *hdr,
4345     void *start, uint32_t size)
4346 {
4347     int              freed_chunks;
4348     u_char           *p, *end;
4349     nxt_chunk_id_t   c;
4350     nxt_unit_impl_t  *lib;
4351 
4352     memset(start, 0xA5, size);
4353 
4354     p = start;
4355     end = p + size;
4356     c = nxt_port_mmap_chunk_id(hdr, p);
4357     freed_chunks = 0;
4358 
4359     while (p < end) {
4360         nxt_port_mmap_set_chunk_free(hdr->free_map, c);
4361 
4362         p += PORT_MMAP_CHUNK_SIZE;
4363         c++;
4364         freed_chunks++;
4365     }
4366 
4367     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4368 
4369     if (hdr->src_pid == lib->pid && freed_chunks != 0) {
4370         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, -freed_chunks);
4371 
4372         nxt_unit_debug(ctx, "allocated_chunks %d",
4373                        (int) lib->outgoing.allocated_chunks);
4374     }
4375 
4376     if (hdr->dst_pid == lib->pid
4377         && freed_chunks != 0
4378         && nxt_atomic_cmp_set(&hdr->oosm, 1, 0))
4379     {
4380         nxt_unit_send_shm_ack(ctx, hdr->src_pid);
4381     }
4382 }
4383 
4384 
4385 static int
nxt_unit_send_shm_ack(nxt_unit_ctx_t * ctx,pid_t pid)4386 nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
4387 {
4388     ssize_t          res;
4389     nxt_port_msg_t   msg;
4390     nxt_unit_impl_t  *lib;
4391 
4392     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4393 
4394     msg.stream = 0;
4395     msg.pid = lib->pid;
4396     msg.reply_port = 0;
4397     msg.type = _NXT_PORT_MSG_SHM_ACK;
4398     msg.last = 0;
4399     msg.mmap = 0;
4400     msg.nf = 0;
4401     msg.mf = 0;
4402 
4403     res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
4404     if (nxt_slow_path(res != sizeof(msg))) {
4405         return NXT_UNIT_ERROR;
4406     }
4407 
4408     return NXT_UNIT_OK;
4409 }
4410 
4411 
4412 static nxt_int_t
nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t * lhq,void * data)4413 nxt_unit_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
4414 {
4415     nxt_process_t  *process;
4416 
4417     process = data;
4418 
4419     if (lhq->key.length == sizeof(pid_t)
4420         && *(pid_t *) lhq->key.start == process->pid)
4421     {
4422         return NXT_OK;
4423     }
4424 
4425     return NXT_DECLINED;
4426 }
4427 
4428 
4429 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
4430     NXT_LVLHSH_DEFAULT,
4431     nxt_unit_lvlhsh_pid_test,
4432     nxt_unit_lvlhsh_alloc,
4433     nxt_unit_lvlhsh_free,
4434 };
4435 
4436 
4437 static inline void
nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t * lhq,pid_t * pid)4438 nxt_unit_process_lhq_pid(nxt_lvlhsh_query_t *lhq, pid_t *pid)
4439 {
4440     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
4441     lhq->key.length = sizeof(*pid);
4442     lhq->key.start = (u_char *) pid;
4443     lhq->proto = &lvlhsh_processes_proto;
4444 }
4445 
4446 
4447 static nxt_unit_process_t *
nxt_unit_process_get(nxt_unit_ctx_t * ctx,pid_t pid)4448 nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid)
4449 {
4450     nxt_unit_impl_t     *lib;
4451     nxt_unit_process_t  *process;
4452     nxt_lvlhsh_query_t  lhq;
4453 
4454     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4455 
4456     nxt_unit_process_lhq_pid(&lhq, &pid);
4457 
4458     if (nxt_lvlhsh_find(&lib->processes, &lhq) == NXT_OK) {
4459         process = lhq.value;
4460         nxt_unit_process_use(process);
4461 
4462         return process;
4463     }
4464 
4465     process = nxt_unit_malloc(ctx, sizeof(nxt_unit_process_t));
4466     if (nxt_slow_path(process == NULL)) {
4467         nxt_unit_alert(ctx, "failed to allocate process for #%d", (int) pid);
4468 
4469         return NULL;
4470     }
4471 
4472     process->pid = pid;
4473     process->use_count = 2;
4474     process->next_port_id = 0;
4475     process->lib = lib;
4476 
4477     nxt_queue_init(&process->ports);
4478 
4479     lhq.replace = 0;
4480     lhq.value = process;
4481 
4482     switch (nxt_lvlhsh_insert(&lib->processes, &lhq)) {
4483 
4484     case NXT_OK:
4485         break;
4486 
4487     default:
4488         nxt_unit_alert(ctx, "process %d insert failed", (int) pid);
4489 
4490         nxt_unit_free(ctx, process);
4491         process = NULL;
4492         break;
4493     }
4494 
4495     return process;
4496 }
4497 
4498 
4499 static nxt_unit_process_t *
nxt_unit_process_find(nxt_unit_impl_t * lib,pid_t pid,int remove)4500 nxt_unit_process_find(nxt_unit_impl_t *lib, pid_t pid, int remove)
4501 {
4502     int                 rc;
4503     nxt_lvlhsh_query_t  lhq;
4504 
4505     nxt_unit_process_lhq_pid(&lhq, &pid);
4506 
4507     if (remove) {
4508         rc = nxt_lvlhsh_delete(&lib->processes, &lhq);
4509 
4510     } else {
4511         rc = nxt_lvlhsh_find(&lib->processes, &lhq);
4512     }
4513 
4514     if (rc == NXT_OK) {
4515         if (!remove) {
4516             nxt_unit_process_use(lhq.value);
4517         }
4518 
4519         return lhq.value;
4520     }
4521 
4522     return NULL;
4523 }
4524 
4525 
4526 static nxt_unit_process_t *
nxt_unit_process_pop_first(nxt_unit_impl_t * lib)4527 nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
4528 {
4529     return nxt_lvlhsh_retrieve(&lib->processes, &lvlhsh_processes_proto, NULL);
4530 }
4531 
4532 
4533 int
nxt_unit_run(nxt_unit_ctx_t * ctx)4534 nxt_unit_run(nxt_unit_ctx_t *ctx)
4535 {
4536     int                  rc;
4537     nxt_unit_ctx_impl_t  *ctx_impl;
4538 
4539     nxt_unit_ctx_use(ctx);
4540 
4541     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4542 
4543     rc = NXT_UNIT_OK;
4544 
4545     while (nxt_fast_path(ctx_impl->online)) {
4546         rc = nxt_unit_run_once_impl(ctx);
4547 
4548         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4549             nxt_unit_quit(ctx, NXT_QUIT_NORMAL);
4550             break;
4551         }
4552     }
4553 
4554     nxt_unit_ctx_release(ctx);
4555 
4556     return rc;
4557 }
4558 
4559 
4560 int
nxt_unit_run_once(nxt_unit_ctx_t * ctx)4561 nxt_unit_run_once(nxt_unit_ctx_t *ctx)
4562 {
4563     int  rc;
4564 
4565     nxt_unit_ctx_use(ctx);
4566 
4567     rc = nxt_unit_run_once_impl(ctx);
4568 
4569     nxt_unit_ctx_release(ctx);
4570 
4571     return rc;
4572 }
4573 
4574 
4575 static int
nxt_unit_run_once_impl(nxt_unit_ctx_t * ctx)4576 nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx)
4577 {
4578     int                  rc;
4579     nxt_unit_read_buf_t  *rbuf;
4580 
4581     rbuf = nxt_unit_read_buf_get(ctx);
4582     if (nxt_slow_path(rbuf == NULL)) {
4583         return NXT_UNIT_ERROR;
4584     }
4585 
4586     rc = nxt_unit_read_buf(ctx, rbuf);
4587     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
4588         nxt_unit_read_buf_release(ctx, rbuf);
4589 
4590         return rc;
4591     }
4592 
4593     rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4594     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4595         return NXT_UNIT_ERROR;
4596     }
4597 
4598     rc = nxt_unit_process_pending_rbuf(ctx);
4599     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4600         return NXT_UNIT_ERROR;
4601     }
4602 
4603     nxt_unit_process_ready_req(ctx);
4604 
4605     return rc;
4606 }
4607 
4608 
4609 static int
nxt_unit_read_buf(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)4610 nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
4611 {
4612     int                   nevents, res, err;
4613     nxt_uint_t            nfds;
4614     nxt_unit_impl_t       *lib;
4615     nxt_unit_ctx_impl_t   *ctx_impl;
4616     nxt_unit_port_impl_t  *port_impl;
4617     struct pollfd         fds[2];
4618 
4619     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4620 
4621     if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) {
4622         return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4623     }
4624 
4625     port_impl = nxt_container_of(ctx_impl->read_port, nxt_unit_port_impl_t,
4626                                  port);
4627 
4628     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4629 
4630 retry:
4631 
4632     if (port_impl->from_socket == 0) {
4633         res = nxt_unit_port_queue_recv(ctx_impl->read_port, rbuf);
4634         if (res == NXT_UNIT_OK) {
4635             if (nxt_unit_is_read_socket(rbuf)) {
4636                 port_impl->from_socket++;
4637 
4638                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
4639                                (int) ctx_impl->read_port->id.pid,
4640                                (int) ctx_impl->read_port->id.id,
4641                                port_impl->from_socket);
4642 
4643             } else {
4644                 nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
4645                                (int) ctx_impl->read_port->id.pid,
4646                                (int) ctx_impl->read_port->id.id,
4647                                (int) rbuf->size);
4648 
4649                 return NXT_UNIT_OK;
4650             }
4651         }
4652     }
4653 
4654     if (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
4655         res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
4656         if (res == NXT_UNIT_OK) {
4657             return NXT_UNIT_OK;
4658         }
4659 
4660         fds[1].fd = lib->shared_port->in_fd;
4661         fds[1].events = POLLIN;
4662 
4663         nfds = 2;
4664 
4665     } else {
4666         nfds = 1;
4667     }
4668 
4669     fds[0].fd = ctx_impl->read_port->in_fd;
4670     fds[0].events = POLLIN;
4671     fds[0].revents = 0;
4672 
4673     fds[1].revents = 0;
4674 
4675     nevents = poll(fds, nfds, -1);
4676     if (nxt_slow_path(nevents == -1)) {
4677         err = errno;
4678 
4679         if (err == EINTR) {
4680             goto retry;
4681         }
4682 
4683         nxt_unit_alert(ctx, "poll(%d,%d) failed: %s (%d)",
4684                        fds[0].fd, fds[1].fd, strerror(err), err);
4685 
4686         rbuf->size = -1;
4687 
4688         return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
4689     }
4690 
4691     nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04X, %04X]",
4692                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4693                    fds[1].revents);
4694 
4695     if ((fds[0].revents & POLLIN) != 0) {
4696         res = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4697         if (res == NXT_UNIT_AGAIN) {
4698             goto retry;
4699         }
4700 
4701         return res;
4702     }
4703 
4704     if ((fds[1].revents & POLLIN) != 0) {
4705         res = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4706         if (res == NXT_UNIT_AGAIN) {
4707             goto retry;
4708         }
4709 
4710         return res;
4711     }
4712 
4713     nxt_unit_alert(ctx, "poll(%d,%d): %d unexpected revents [%04uXi, %04uXi]",
4714                    fds[0].fd, fds[1].fd, nevents, fds[0].revents,
4715                    fds[1].revents);
4716 
4717     return NXT_UNIT_ERROR;
4718 }
4719 
4720 
4721 static int
nxt_unit_chk_ready(nxt_unit_ctx_t * ctx)4722 nxt_unit_chk_ready(nxt_unit_ctx_t *ctx)
4723 {
4724     nxt_unit_impl_t      *lib;
4725     nxt_unit_ctx_impl_t  *ctx_impl;
4726 
4727     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4728     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4729 
4730     return (ctx_impl->ready
4731             && (lib->request_limit == 0
4732                 || lib->request_count < lib->request_limit));
4733 }
4734 
4735 
4736 static int
nxt_unit_process_pending_rbuf(nxt_unit_ctx_t * ctx)4737 nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
4738 {
4739     int                  rc;
4740     nxt_queue_t          pending_rbuf;
4741     nxt_unit_ctx_impl_t  *ctx_impl;
4742     nxt_unit_read_buf_t  *rbuf;
4743 
4744     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4745 
4746     pthread_mutex_lock(&ctx_impl->mutex);
4747 
4748     if (nxt_queue_is_empty(&ctx_impl->pending_rbuf)) {
4749         pthread_mutex_unlock(&ctx_impl->mutex);
4750 
4751         return NXT_UNIT_OK;
4752     }
4753 
4754     nxt_queue_init(&pending_rbuf);
4755 
4756     nxt_queue_add(&pending_rbuf, &ctx_impl->pending_rbuf);
4757     nxt_queue_init(&ctx_impl->pending_rbuf);
4758 
4759     pthread_mutex_unlock(&ctx_impl->mutex);
4760 
4761     rc = NXT_UNIT_OK;
4762 
4763     nxt_queue_each(rbuf, &pending_rbuf, nxt_unit_read_buf_t, link) {
4764 
4765         if (nxt_fast_path(rc != NXT_UNIT_ERROR)) {
4766             rc = nxt_unit_process_msg(&ctx_impl->ctx, rbuf, NULL);
4767 
4768         } else {
4769             nxt_unit_read_buf_release(ctx, rbuf);
4770         }
4771 
4772     } nxt_queue_loop;
4773 
4774     if (!ctx_impl->ready) {
4775         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
4776     }
4777 
4778     return rc;
4779 }
4780 
4781 
4782 static void
nxt_unit_process_ready_req(nxt_unit_ctx_t * ctx)4783 nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx)
4784 {
4785     int                           res;
4786     nxt_queue_t                   ready_req;
4787     nxt_unit_impl_t               *lib;
4788     nxt_unit_ctx_impl_t           *ctx_impl;
4789     nxt_unit_request_info_t       *req;
4790     nxt_unit_request_info_impl_t  *req_impl;
4791 
4792     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4793 
4794     pthread_mutex_lock(&ctx_impl->mutex);
4795 
4796     if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
4797         pthread_mutex_unlock(&ctx_impl->mutex);
4798 
4799         return;
4800     }
4801 
4802     nxt_queue_init(&ready_req);
4803 
4804     nxt_queue_add(&ready_req, &ctx_impl->ready_req);
4805     nxt_queue_init(&ctx_impl->ready_req);
4806 
4807     pthread_mutex_unlock(&ctx_impl->mutex);
4808 
4809     nxt_queue_each(req_impl, &ready_req,
4810                    nxt_unit_request_info_impl_t, port_wait_link)
4811     {
4812         lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
4813 
4814         req = &req_impl->req;
4815 
4816         res = nxt_unit_send_req_headers_ack(req);
4817         if (nxt_slow_path(res != NXT_UNIT_OK)) {
4818             nxt_unit_request_done(req, NXT_UNIT_ERROR);
4819 
4820             continue;
4821         }
4822 
4823         if (req->content_length
4824             > (uint64_t) (req->content_buf->end - req->content_buf->free))
4825         {
4826             res = nxt_unit_request_hash_add(ctx, req);
4827             if (nxt_slow_path(res != NXT_UNIT_OK)) {
4828                 nxt_unit_req_warn(req, "failed to add request to hash");
4829 
4830                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
4831 
4832                 continue;
4833             }
4834 
4835             /*
4836              * If application have separate data handler, we may start
4837              * request processing and process data when it is arrived.
4838              */
4839             if (lib->callbacks.data_handler == NULL) {
4840                 continue;
4841             }
4842         }
4843 
4844         lib->callbacks.request_handler(&req_impl->req);
4845 
4846     } nxt_queue_loop;
4847 }
4848 
4849 
4850 int
nxt_unit_run_ctx(nxt_unit_ctx_t * ctx)4851 nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
4852 {
4853     int                  rc;
4854     nxt_unit_read_buf_t  *rbuf;
4855     nxt_unit_ctx_impl_t  *ctx_impl;
4856 
4857     nxt_unit_ctx_use(ctx);
4858 
4859     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
4860 
4861     rc = NXT_UNIT_OK;
4862 
4863     while (nxt_fast_path(ctx_impl->online)) {
4864         rbuf = nxt_unit_read_buf_get(ctx);
4865         if (nxt_slow_path(rbuf == NULL)) {
4866             rc = NXT_UNIT_ERROR;
4867             break;
4868         }
4869 
4870     retry:
4871 
4872         rc = nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
4873         if (rc == NXT_UNIT_AGAIN) {
4874             goto retry;
4875         }
4876 
4877         rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4878         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4879             break;
4880         }
4881 
4882         rc = nxt_unit_process_pending_rbuf(ctx);
4883         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4884             break;
4885         }
4886 
4887         nxt_unit_process_ready_req(ctx);
4888     }
4889 
4890     nxt_unit_ctx_release(ctx);
4891 
4892     return rc;
4893 }
4894 
4895 
4896 nxt_inline int
nxt_unit_is_read_queue(nxt_unit_read_buf_t * rbuf)4897 nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf)
4898 {
4899     nxt_port_msg_t  *port_msg;
4900 
4901     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4902         port_msg = (nxt_port_msg_t *) rbuf->buf;
4903 
4904         return port_msg->type == _NXT_PORT_MSG_READ_QUEUE;
4905     }
4906 
4907     return 0;
4908 }
4909 
4910 
4911 nxt_inline int
nxt_unit_is_read_socket(nxt_unit_read_buf_t * rbuf)4912 nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf)
4913 {
4914     if (nxt_fast_path(rbuf->size == 1)) {
4915         return rbuf->buf[0] == _NXT_PORT_MSG_READ_SOCKET;
4916     }
4917 
4918     return 0;
4919 }
4920 
4921 
4922 nxt_inline int
nxt_unit_is_shm_ack(nxt_unit_read_buf_t * rbuf)4923 nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf)
4924 {
4925     nxt_port_msg_t  *port_msg;
4926 
4927     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4928         port_msg = (nxt_port_msg_t *) rbuf->buf;
4929 
4930         return port_msg->type == _NXT_PORT_MSG_SHM_ACK;
4931     }
4932 
4933     return 0;
4934 }
4935 
4936 
4937 nxt_inline int
nxt_unit_is_quit(nxt_unit_read_buf_t * rbuf)4938 nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf)
4939 {
4940     nxt_port_msg_t  *port_msg;
4941 
4942     if (nxt_fast_path(rbuf->size == (ssize_t) sizeof(nxt_port_msg_t))) {
4943         port_msg = (nxt_port_msg_t *) rbuf->buf;
4944 
4945         return port_msg->type == _NXT_PORT_MSG_QUIT;
4946     }
4947 
4948     return 0;
4949 }
4950 
4951 
4952 int
nxt_unit_run_shared(nxt_unit_ctx_t * ctx)4953 nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
4954 {
4955     int                  rc;
4956     nxt_unit_impl_t      *lib;
4957     nxt_unit_read_buf_t  *rbuf;
4958 
4959     nxt_unit_ctx_use(ctx);
4960 
4961     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
4962 
4963     rc = NXT_UNIT_OK;
4964 
4965     while (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
4966         rbuf = nxt_unit_read_buf_get(ctx);
4967         if (nxt_slow_path(rbuf == NULL)) {
4968             rc = NXT_UNIT_ERROR;
4969             break;
4970         }
4971 
4972     retry:
4973 
4974         rc = nxt_unit_shared_port_recv(ctx, lib->shared_port, rbuf);
4975         if (rc == NXT_UNIT_AGAIN) {
4976             goto retry;
4977         }
4978 
4979         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4980             nxt_unit_read_buf_release(ctx, rbuf);
4981             break;
4982         }
4983 
4984         rc = nxt_unit_process_msg(ctx, rbuf, NULL);
4985         if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
4986             break;
4987         }
4988     }
4989 
4990     nxt_unit_ctx_release(ctx);
4991 
4992     return rc;
4993 }
4994 
4995 
4996 nxt_unit_request_info_t *
nxt_unit_dequeue_request(nxt_unit_ctx_t * ctx)4997 nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
4998 {
4999     int                      rc;
5000     nxt_unit_impl_t          *lib;
5001     nxt_unit_read_buf_t      *rbuf;
5002     nxt_unit_request_info_t  *req;
5003 
5004     nxt_unit_ctx_use(ctx);
5005 
5006     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5007 
5008     req = NULL;
5009 
5010     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
5011         goto done;
5012     }
5013 
5014     rbuf = nxt_unit_read_buf_get(ctx);
5015     if (nxt_slow_path(rbuf == NULL)) {
5016         goto done;
5017     }
5018 
5019     rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
5020     if (rc != NXT_UNIT_OK) {
5021         nxt_unit_read_buf_release(ctx, rbuf);
5022         goto done;
5023     }
5024 
5025     (void) nxt_unit_process_msg(ctx, rbuf, &req);
5026 
5027 done:
5028 
5029     nxt_unit_ctx_release(ctx);
5030 
5031     return req;
5032 }
5033 
5034 
5035 int
nxt_unit_process_port_msg(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)5036 nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5037 {
5038     int  rc;
5039 
5040     nxt_unit_ctx_use(ctx);
5041 
5042     rc = nxt_unit_process_port_msg_impl(ctx, port);
5043 
5044     nxt_unit_ctx_release(ctx);
5045 
5046     return rc;
5047 }
5048 
5049 
5050 static int
nxt_unit_process_port_msg_impl(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)5051 nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
5052 {
5053     int                  rc;
5054     nxt_unit_impl_t      *lib;
5055     nxt_unit_read_buf_t  *rbuf;
5056 
5057     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5058 
5059     if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) {
5060         return NXT_UNIT_AGAIN;
5061     }
5062 
5063     rbuf = nxt_unit_read_buf_get(ctx);
5064     if (nxt_slow_path(rbuf == NULL)) {
5065         return NXT_UNIT_ERROR;
5066     }
5067 
5068     if (port == lib->shared_port) {
5069         rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
5070 
5071     } else {
5072         rc = nxt_unit_ctx_port_recv(ctx, port, rbuf);
5073     }
5074 
5075     if (rc != NXT_UNIT_OK) {
5076         nxt_unit_read_buf_release(ctx, rbuf);
5077         return rc;
5078     }
5079 
5080     rc = nxt_unit_process_msg(ctx, rbuf, NULL);
5081     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5082         return NXT_UNIT_ERROR;
5083     }
5084 
5085     rc = nxt_unit_process_pending_rbuf(ctx);
5086     if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
5087         return NXT_UNIT_ERROR;
5088     }
5089 
5090     nxt_unit_process_ready_req(ctx);
5091 
5092     return rc;
5093 }
5094 
5095 
5096 void
nxt_unit_done(nxt_unit_ctx_t * ctx)5097 nxt_unit_done(nxt_unit_ctx_t *ctx)
5098 {
5099     nxt_unit_ctx_release(ctx);
5100 }
5101 
5102 
5103 nxt_unit_ctx_t *
nxt_unit_ctx_alloc(nxt_unit_ctx_t * ctx,void * data)5104 nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
5105 {
5106     int                   rc, queue_fd;
5107     void                  *mem;
5108     nxt_unit_impl_t       *lib;
5109     nxt_unit_port_t       *port;
5110     nxt_unit_ctx_impl_t   *new_ctx;
5111     nxt_unit_port_impl_t  *port_impl;
5112 
5113     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5114 
5115     new_ctx = nxt_unit_malloc(ctx, sizeof(nxt_unit_ctx_impl_t)
5116                                    + lib->request_data_size);
5117     if (nxt_slow_path(new_ctx == NULL)) {
5118         nxt_unit_alert(ctx, "failed to allocate context");
5119 
5120         return NULL;
5121     }
5122 
5123     rc = nxt_unit_ctx_init(lib, new_ctx, data);
5124     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5125         nxt_unit_free(ctx, new_ctx);
5126 
5127         return NULL;
5128     }
5129 
5130     queue_fd = -1;
5131 
5132     port = nxt_unit_create_port(&new_ctx->ctx);
5133     if (nxt_slow_path(port == NULL)) {
5134         goto fail;
5135     }
5136 
5137     new_ctx->read_port = port;
5138 
5139     queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
5140     if (nxt_slow_path(queue_fd == -1)) {
5141         goto fail;
5142     }
5143 
5144     mem = mmap(NULL, sizeof(nxt_port_queue_t),
5145                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
5146     if (nxt_slow_path(mem == MAP_FAILED)) {
5147         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
5148                        strerror(errno), errno);
5149 
5150         goto fail;
5151     }
5152 
5153     nxt_port_queue_init(mem);
5154 
5155     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5156     port_impl->queue = mem;
5157 
5158     rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
5159     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5160         goto fail;
5161     }
5162 
5163     nxt_unit_close(queue_fd);
5164 
5165     return &new_ctx->ctx;
5166 
5167 fail:
5168 
5169     if (queue_fd != -1) {
5170         nxt_unit_close(queue_fd);
5171     }
5172 
5173     nxt_unit_ctx_release(&new_ctx->ctx);
5174 
5175     return NULL;
5176 }
5177 
5178 
5179 static void
nxt_unit_ctx_free(nxt_unit_ctx_impl_t * ctx_impl)5180 nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
5181 {
5182     nxt_unit_impl_t                  *lib;
5183     nxt_unit_mmap_buf_t              *mmap_buf;
5184     nxt_unit_read_buf_t              *rbuf;
5185     nxt_unit_request_info_impl_t     *req_impl;
5186     nxt_unit_websocket_frame_impl_t  *ws_impl;
5187 
5188     lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
5189 
5190     nxt_queue_each(req_impl, &ctx_impl->active_req,
5191                    nxt_unit_request_info_impl_t, link)
5192     {
5193         nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
5194 
5195         nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
5196 
5197     } nxt_queue_loop;
5198 
5199     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[0]);
5200     nxt_unit_mmap_buf_unlink(&ctx_impl->ctx_buf[1]);
5201 
5202     while (ctx_impl->free_buf != NULL) {
5203         mmap_buf = ctx_impl->free_buf;
5204         nxt_unit_mmap_buf_unlink(mmap_buf);
5205         nxt_unit_free(&ctx_impl->ctx, mmap_buf);
5206     }
5207 
5208     nxt_queue_each(req_impl, &ctx_impl->free_req,
5209                    nxt_unit_request_info_impl_t, link)
5210     {
5211         nxt_unit_request_info_free(req_impl);
5212 
5213     } nxt_queue_loop;
5214 
5215     nxt_queue_each(ws_impl, &ctx_impl->free_ws,
5216                    nxt_unit_websocket_frame_impl_t, link)
5217     {
5218         nxt_unit_websocket_frame_free(&ctx_impl->ctx, ws_impl);
5219 
5220     } nxt_queue_loop;
5221 
5222     nxt_queue_each(rbuf, &ctx_impl->free_rbuf, nxt_unit_read_buf_t, link)
5223     {
5224         if (rbuf != &ctx_impl->ctx_read_buf) {
5225             nxt_unit_free(&ctx_impl->ctx, rbuf);
5226         }
5227     } nxt_queue_loop;
5228 
5229     pthread_mutex_destroy(&ctx_impl->mutex);
5230 
5231     pthread_mutex_lock(&lib->mutex);
5232 
5233     nxt_queue_remove(&ctx_impl->link);
5234 
5235     pthread_mutex_unlock(&lib->mutex);
5236 
5237     if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5238         nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id);
5239         nxt_unit_port_release(ctx_impl->read_port);
5240     }
5241 
5242     if (ctx_impl != &lib->main_ctx) {
5243         nxt_unit_free(&lib->main_ctx.ctx, ctx_impl);
5244     }
5245 
5246     nxt_unit_lib_release(lib);
5247 }
5248 
5249 
5250 /* SOCK_SEQPACKET is disabled to test SOCK_DGRAM on all platforms. */
5251 #if (0 || NXT_HAVE_AF_UNIX_SOCK_SEQPACKET)
5252 #define NXT_UNIX_SOCKET  SOCK_SEQPACKET
5253 #else
5254 #define NXT_UNIX_SOCKET  SOCK_DGRAM
5255 #endif
5256 
5257 
5258 void
nxt_unit_port_id_init(nxt_unit_port_id_t * port_id,pid_t pid,uint16_t id)5259 nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id)
5260 {
5261     nxt_unit_port_hash_id_t  port_hash_id;
5262 
5263     port_hash_id.pid = pid;
5264     port_hash_id.id = id;
5265 
5266     port_id->pid = pid;
5267     port_id->hash = nxt_murmur_hash2(&port_hash_id, sizeof(port_hash_id));
5268     port_id->id = id;
5269 }
5270 
5271 
5272 static nxt_unit_port_t *
nxt_unit_create_port(nxt_unit_ctx_t * ctx)5273 nxt_unit_create_port(nxt_unit_ctx_t *ctx)
5274 {
5275     int                 rc, port_sockets[2];
5276     nxt_unit_impl_t     *lib;
5277     nxt_unit_port_t     new_port, *port;
5278     nxt_unit_process_t  *process;
5279 
5280     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5281 
5282     rc = socketpair(AF_UNIX, NXT_UNIX_SOCKET, 0, port_sockets);
5283     if (nxt_slow_path(rc != 0)) {
5284         nxt_unit_warn(ctx, "create_port: socketpair() failed: %s (%d)",
5285                       strerror(errno), errno);
5286 
5287         return NULL;
5288     }
5289 
5290 #if (NXT_HAVE_SOCKOPT_SO_PASSCRED)
5291     int  enable_creds = 1;
5292 
5293     if (nxt_slow_path(setsockopt(port_sockets[0], SOL_SOCKET, SO_PASSCRED,
5294                         &enable_creds, sizeof(enable_creds)) == -1))
5295     {
5296         nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
5297         return NULL;
5298     }
5299 
5300     if (nxt_slow_path(setsockopt(port_sockets[1], SOL_SOCKET, SO_PASSCRED,
5301                         &enable_creds, sizeof(enable_creds)) == -1))
5302     {
5303         nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
5304         return NULL;
5305     }
5306 #endif
5307 
5308     nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
5309                    port_sockets[0], port_sockets[1]);
5310 
5311     pthread_mutex_lock(&lib->mutex);
5312 
5313     process = nxt_unit_process_get(ctx, lib->pid);
5314     if (nxt_slow_path(process == NULL)) {
5315         pthread_mutex_unlock(&lib->mutex);
5316 
5317         nxt_unit_close(port_sockets[0]);
5318         nxt_unit_close(port_sockets[1]);
5319 
5320         return NULL;
5321     }
5322 
5323     nxt_unit_port_id_init(&new_port.id, lib->pid, process->next_port_id++);
5324 
5325     new_port.in_fd = port_sockets[0];
5326     new_port.out_fd = port_sockets[1];
5327     new_port.data = NULL;
5328 
5329     pthread_mutex_unlock(&lib->mutex);
5330 
5331     nxt_unit_process_release(process);
5332 
5333     port = nxt_unit_add_port(ctx, &new_port, NULL);
5334     if (nxt_slow_path(port == NULL)) {
5335         nxt_unit_close(port_sockets[0]);
5336         nxt_unit_close(port_sockets[1]);
5337     }
5338 
5339     return port;
5340 }
5341 
5342 
5343 static int
nxt_unit_send_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * dst,nxt_unit_port_t * port,int queue_fd)5344 nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
5345     nxt_unit_port_t *port, int queue_fd)
5346 {
5347     ssize_t          res;
5348     nxt_send_oob_t   oob;
5349     nxt_unit_impl_t  *lib;
5350     int              fds[2] = { port->out_fd, queue_fd };
5351 
5352     struct {
5353         nxt_port_msg_t            msg;
5354         nxt_port_msg_new_port_t   new_port;
5355     } m;
5356 
5357     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5358 
5359     m.msg.stream = 0;
5360     m.msg.pid = lib->pid;
5361     m.msg.reply_port = 0;
5362     m.msg.type = _NXT_PORT_MSG_NEW_PORT;
5363     m.msg.last = 0;
5364     m.msg.mmap = 0;
5365     m.msg.nf = 0;
5366     m.msg.mf = 0;
5367 
5368     m.new_port.id = port->id.id;
5369     m.new_port.pid = port->id.pid;
5370     m.new_port.type = NXT_PROCESS_APP;
5371     m.new_port.max_size = 16 * 1024;
5372     m.new_port.max_share = 64 * 1024;
5373 
5374     nxt_socket_msg_oob_init(&oob, fds);
5375 
5376     res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &oob);
5377 
5378     return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
5379 }
5380 
5381 
nxt_unit_port_use(nxt_unit_port_t * port)5382 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port)
5383 {
5384     nxt_unit_port_impl_t  *port_impl;
5385 
5386     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5387 
5388     nxt_atomic_fetch_add(&port_impl->use_count, 1);
5389 }
5390 
5391 
nxt_unit_port_release(nxt_unit_port_t * port)5392 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
5393 {
5394     long                  c;
5395     nxt_unit_port_impl_t  *port_impl;
5396 
5397     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5398 
5399     c = nxt_atomic_fetch_add(&port_impl->use_count, -1);
5400 
5401     if (c == 1) {
5402         nxt_unit_debug(NULL, "destroy port{%d,%d} in_fd %d out_fd %d",
5403                        (int) port->id.pid, (int) port->id.id,
5404                        port->in_fd, port->out_fd);
5405 
5406         nxt_unit_process_release(port_impl->process);
5407 
5408         if (port->in_fd != -1) {
5409             nxt_unit_close(port->in_fd);
5410 
5411             port->in_fd = -1;
5412         }
5413 
5414         if (port->out_fd != -1) {
5415             nxt_unit_close(port->out_fd);
5416 
5417             port->out_fd = -1;
5418         }
5419 
5420         if (port_impl->queue != NULL) {
5421             munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
5422                                      ? sizeof(nxt_app_queue_t)
5423                                      : sizeof(nxt_port_queue_t));
5424         }
5425 
5426         nxt_unit_free(NULL, port_impl);
5427     }
5428 }
5429 
5430 
5431 static nxt_unit_port_t *
nxt_unit_add_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,void * queue)5432 nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
5433 {
5434     int                   rc, ready;
5435     nxt_queue_t           awaiting_req;
5436     nxt_unit_impl_t       *lib;
5437     nxt_unit_port_t       *old_port;
5438     nxt_unit_process_t    *process;
5439     nxt_unit_port_impl_t  *new_port, *old_port_impl;
5440 
5441     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5442 
5443     pthread_mutex_lock(&lib->mutex);
5444 
5445     old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
5446 
5447     if (nxt_slow_path(old_port != NULL)) {
5448         nxt_unit_debug(ctx, "add_port: duplicate port{%d,%d} "
5449                             "in_fd %d out_fd %d queue %p",
5450                             port->id.pid, port->id.id,
5451                             port->in_fd, port->out_fd, queue);
5452 
5453         if (old_port->data == NULL) {
5454             old_port->data = port->data;
5455             port->data = NULL;
5456         }
5457 
5458         if (old_port->in_fd == -1) {
5459             old_port->in_fd = port->in_fd;
5460             port->in_fd = -1;
5461         }
5462 
5463         if (port->in_fd != -1) {
5464             nxt_unit_close(port->in_fd);
5465             port->in_fd = -1;
5466         }
5467 
5468         if (old_port->out_fd == -1) {
5469             old_port->out_fd = port->out_fd;
5470             port->out_fd = -1;
5471         }
5472 
5473         if (port->out_fd != -1) {
5474             nxt_unit_close(port->out_fd);
5475             port->out_fd = -1;
5476         }
5477 
5478         *port = *old_port;
5479 
5480         nxt_queue_init(&awaiting_req);
5481 
5482         old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
5483 
5484         if (old_port_impl->queue == NULL) {
5485             old_port_impl->queue = queue;
5486         }
5487 
5488         ready = (port->in_fd != -1 || port->out_fd != -1);
5489 
5490         /*
5491          * Port can be market as 'ready' only after callbacks.add_port() call.
5492          * Otherwise, request may try to use the port before callback.
5493          */
5494         if (lib->callbacks.add_port == NULL && ready) {
5495             old_port_impl->ready = ready;
5496 
5497             if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5498                 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5499                 nxt_queue_init(&old_port_impl->awaiting_req);
5500             }
5501         }
5502 
5503         pthread_mutex_unlock(&lib->mutex);
5504 
5505         if (lib->callbacks.add_port != NULL && ready) {
5506             lib->callbacks.add_port(ctx, old_port);
5507 
5508             pthread_mutex_lock(&lib->mutex);
5509 
5510             old_port_impl->ready = ready;
5511 
5512             if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
5513                 nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
5514                 nxt_queue_init(&old_port_impl->awaiting_req);
5515             }
5516 
5517             pthread_mutex_unlock(&lib->mutex);
5518         }
5519 
5520         nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5521 
5522         return old_port;
5523     }
5524 
5525     new_port = NULL;
5526     ready = 0;
5527 
5528     nxt_unit_debug(ctx, "add_port: port{%d,%d} in_fd %d out_fd %d queue %p",
5529                    port->id.pid, port->id.id,
5530                    port->in_fd, port->out_fd, queue);
5531 
5532     process = nxt_unit_process_get(ctx, port->id.pid);
5533     if (nxt_slow_path(process == NULL)) {
5534         goto unlock;
5535     }
5536 
5537     if (port->id.id != NXT_UNIT_SHARED_PORT_ID
5538         && port->id.id >= process->next_port_id)
5539     {
5540         process->next_port_id = port->id.id + 1;
5541     }
5542 
5543     new_port = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
5544     if (nxt_slow_path(new_port == NULL)) {
5545         nxt_unit_alert(ctx, "add_port: %d,%d malloc() failed",
5546                        port->id.pid, port->id.id);
5547 
5548         goto unlock;
5549     }
5550 
5551     new_port->port = *port;
5552 
5553     rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
5554     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
5555         nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
5556                        port->id.pid, port->id.id);
5557 
5558         nxt_unit_free(ctx, new_port);
5559 
5560         new_port = NULL;
5561 
5562         goto unlock;
5563     }
5564 
5565     nxt_queue_insert_tail(&process->ports, &new_port->link);
5566 
5567     new_port->use_count = 2;
5568     new_port->process = process;
5569     new_port->queue = queue;
5570     new_port->from_socket = 0;
5571     new_port->socket_rbuf = NULL;
5572 
5573     nxt_queue_init(&new_port->awaiting_req);
5574 
5575     ready = (port->in_fd != -1 || port->out_fd != -1);
5576 
5577     if (lib->callbacks.add_port == NULL) {
5578         new_port->ready = ready;
5579 
5580     } else {
5581         new_port->ready = 0;
5582     }
5583 
5584     process = NULL;
5585 
5586 unlock:
5587 
5588     pthread_mutex_unlock(&lib->mutex);
5589 
5590     if (nxt_slow_path(process != NULL)) {
5591         nxt_unit_process_release(process);
5592     }
5593 
5594     if (lib->callbacks.add_port != NULL && new_port != NULL && ready) {
5595         lib->callbacks.add_port(ctx, &new_port->port);
5596 
5597         nxt_queue_init(&awaiting_req);
5598 
5599         pthread_mutex_lock(&lib->mutex);
5600 
5601         new_port->ready = 1;
5602 
5603         if (!nxt_queue_is_empty(&new_port->awaiting_req)) {
5604             nxt_queue_add(&awaiting_req, &new_port->awaiting_req);
5605             nxt_queue_init(&new_port->awaiting_req);
5606         }
5607 
5608         pthread_mutex_unlock(&lib->mutex);
5609 
5610         nxt_unit_process_awaiting_req(ctx, &awaiting_req);
5611     }
5612 
5613     return (new_port == NULL) ? NULL : &new_port->port;
5614 }
5615 
5616 
5617 static void
nxt_unit_process_awaiting_req(nxt_unit_ctx_t * ctx,nxt_queue_t * awaiting_req)5618 nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
5619 {
5620     nxt_unit_ctx_impl_t           *ctx_impl;
5621     nxt_unit_request_info_impl_t  *req_impl;
5622 
5623     nxt_queue_each(req_impl, awaiting_req,
5624                    nxt_unit_request_info_impl_t, port_wait_link)
5625     {
5626         nxt_queue_remove(&req_impl->port_wait_link);
5627 
5628         ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
5629                                     ctx);
5630 
5631         pthread_mutex_lock(&ctx_impl->mutex);
5632 
5633         nxt_queue_insert_tail(&ctx_impl->ready_req,
5634                               &req_impl->port_wait_link);
5635 
5636         pthread_mutex_unlock(&ctx_impl->mutex);
5637 
5638         nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
5639 
5640         nxt_unit_awake_ctx(ctx, ctx_impl);
5641 
5642     } nxt_queue_loop;
5643 }
5644 
5645 
5646 static void
nxt_unit_remove_port(nxt_unit_impl_t * lib,nxt_unit_ctx_t * ctx,nxt_unit_port_id_t * port_id)5647 nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
5648     nxt_unit_port_id_t *port_id)
5649 {
5650     nxt_unit_port_t       *port;
5651     nxt_unit_port_impl_t  *port_impl;
5652 
5653     pthread_mutex_lock(&lib->mutex);
5654 
5655     port = nxt_unit_remove_port_unsafe(lib, port_id);
5656 
5657     if (nxt_fast_path(port != NULL)) {
5658         port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5659 
5660         nxt_queue_remove(&port_impl->link);
5661     }
5662 
5663     pthread_mutex_unlock(&lib->mutex);
5664 
5665     if (lib->callbacks.remove_port != NULL && port != NULL) {
5666         lib->callbacks.remove_port(&lib->unit, ctx, port);
5667     }
5668 
5669     if (nxt_fast_path(port != NULL)) {
5670         nxt_unit_port_release(port);
5671     }
5672 }
5673 
5674 
5675 static nxt_unit_port_t *
nxt_unit_remove_port_unsafe(nxt_unit_impl_t * lib,nxt_unit_port_id_t * port_id)5676 nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
5677 {
5678     nxt_unit_port_t  *port;
5679 
5680     port = nxt_unit_port_hash_find(&lib->ports, port_id, 1);
5681     if (nxt_slow_path(port == NULL)) {
5682         nxt_unit_debug(NULL, "remove_port: port{%d,%d} not found",
5683                        (int) port_id->pid, (int) port_id->id);
5684 
5685         return NULL;
5686     }
5687 
5688     nxt_unit_debug(NULL, "remove_port: port{%d,%d}, fds %d,%d, data %p",
5689                    (int) port_id->pid, (int) port_id->id,
5690                    port->in_fd, port->out_fd, port->data);
5691 
5692     return port;
5693 }
5694 
5695 
5696 static void
nxt_unit_remove_pid(nxt_unit_impl_t * lib,pid_t pid)5697 nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid)
5698 {
5699     nxt_unit_process_t  *process;
5700 
5701     pthread_mutex_lock(&lib->mutex);
5702 
5703     process = nxt_unit_process_find(lib, pid, 1);
5704     if (nxt_slow_path(process == NULL)) {
5705         nxt_unit_debug(NULL, "remove_pid: process %d not found", (int) pid);
5706 
5707         pthread_mutex_unlock(&lib->mutex);
5708 
5709         return;
5710     }
5711 
5712     nxt_unit_remove_process(lib, process);
5713 
5714     if (lib->callbacks.remove_pid != NULL) {
5715         lib->callbacks.remove_pid(&lib->unit, pid);
5716     }
5717 }
5718 
5719 
5720 static void
nxt_unit_remove_process(nxt_unit_impl_t * lib,nxt_unit_process_t * process)5721 nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
5722 {
5723     nxt_queue_t           ports;
5724     nxt_unit_port_impl_t  *port;
5725 
5726     nxt_queue_init(&ports);
5727 
5728     nxt_queue_add(&ports, &process->ports);
5729 
5730     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5731 
5732         nxt_unit_remove_port_unsafe(lib, &port->port.id);
5733 
5734     } nxt_queue_loop;
5735 
5736     pthread_mutex_unlock(&lib->mutex);
5737 
5738     nxt_queue_each(port, &ports, nxt_unit_port_impl_t, link) {
5739 
5740         nxt_queue_remove(&port->link);
5741 
5742         if (lib->callbacks.remove_port != NULL) {
5743             lib->callbacks.remove_port(&lib->unit, NULL, &port->port);
5744         }
5745 
5746         nxt_unit_port_release(&port->port);
5747 
5748     } nxt_queue_loop;
5749 
5750     nxt_unit_process_release(process);
5751 }
5752 
5753 
5754 static void
nxt_unit_quit(nxt_unit_ctx_t * ctx,uint8_t quit_param)5755 nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param)
5756 {
5757     nxt_bool_t                    skip_graceful_broadcast, quit;
5758     nxt_unit_impl_t               *lib;
5759     nxt_unit_ctx_impl_t           *ctx_impl;
5760     nxt_unit_callbacks_t          *cb;
5761     nxt_unit_request_info_t       *req;
5762     nxt_unit_request_info_impl_t  *req_impl;
5763 
5764     struct {
5765         nxt_port_msg_t            msg;
5766         uint8_t                   quit_param;
5767     } nxt_packed m;
5768 
5769     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5770     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5771 
5772     nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready,
5773                    ctx_impl->online);
5774 
5775     if (nxt_slow_path(!ctx_impl->online)) {
5776         return;
5777     }
5778 
5779     skip_graceful_broadcast = quit_param == NXT_QUIT_GRACEFUL
5780                               && !ctx_impl->ready;
5781 
5782     cb = &lib->callbacks;
5783 
5784     if (nxt_fast_path(ctx_impl->ready)) {
5785         ctx_impl->ready = 0;
5786 
5787         if (cb->remove_port != NULL) {
5788             cb->remove_port(&lib->unit, ctx, lib->shared_port);
5789         }
5790     }
5791 
5792     if (quit_param == NXT_QUIT_GRACEFUL) {
5793         pthread_mutex_lock(&ctx_impl->mutex);
5794 
5795         quit = nxt_queue_is_empty(&ctx_impl->active_req)
5796                && nxt_queue_is_empty(&ctx_impl->pending_rbuf)
5797                && ctx_impl->wait_items == 0;
5798 
5799         pthread_mutex_unlock(&ctx_impl->mutex);
5800 
5801     } else {
5802         quit = 1;
5803         ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
5804     }
5805 
5806     if (quit) {
5807         ctx_impl->online = 0;
5808 
5809         if (cb->quit != NULL) {
5810             cb->quit(ctx);
5811         }
5812 
5813         nxt_queue_each(req_impl, &ctx_impl->active_req,
5814                        nxt_unit_request_info_impl_t, link)
5815         {
5816             req = &req_impl->req;
5817 
5818             nxt_unit_req_warn(req, "active request on ctx quit");
5819 
5820             if (cb->close_handler) {
5821                 nxt_unit_req_debug(req, "close_handler");
5822 
5823                 cb->close_handler(req);
5824 
5825             } else {
5826                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
5827             }
5828 
5829         } nxt_queue_loop;
5830 
5831         if (nxt_fast_path(ctx_impl->read_port != NULL)) {
5832             nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id);
5833         }
5834     }
5835 
5836     if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) {
5837         return;
5838     }
5839 
5840     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5841 
5842     m.msg.pid = lib->pid;
5843     m.msg.type = _NXT_PORT_MSG_QUIT;
5844     m.quit_param = quit_param;
5845 
5846     pthread_mutex_lock(&lib->mutex);
5847 
5848     nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
5849 
5850         if (ctx == &ctx_impl->ctx
5851             || ctx_impl->read_port == NULL
5852             || ctx_impl->read_port->out_fd == -1)
5853         {
5854             continue;
5855         }
5856 
5857         (void) nxt_unit_port_send(ctx, ctx_impl->read_port,
5858                                   &m, sizeof(m), NULL);
5859 
5860     } nxt_queue_loop;
5861 
5862     pthread_mutex_unlock(&lib->mutex);
5863 }
5864 
5865 
5866 static int
nxt_unit_get_port(nxt_unit_ctx_t * ctx,nxt_unit_port_id_t * port_id)5867 nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
5868 {
5869     ssize_t              res;
5870     nxt_unit_impl_t      *lib;
5871     nxt_unit_ctx_impl_t  *ctx_impl;
5872 
5873     struct {
5874         nxt_port_msg_t           msg;
5875         nxt_port_msg_get_port_t  get_port;
5876     } m;
5877 
5878     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5879     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
5880 
5881     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
5882 
5883     m.msg.pid = lib->pid;
5884     m.msg.reply_port = ctx_impl->read_port->id.id;
5885     m.msg.type = _NXT_PORT_MSG_GET_PORT;
5886 
5887     m.get_port.id = port_id->id;
5888     m.get_port.pid = port_id->pid;
5889 
5890     nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
5891                    (int) port_id->id);
5892 
5893     res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
5894     if (nxt_slow_path(res != sizeof(m))) {
5895         return NXT_UNIT_ERROR;
5896     }
5897 
5898     return NXT_UNIT_OK;
5899 }
5900 
5901 
5902 static ssize_t
nxt_unit_port_send(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,const void * buf,size_t buf_size,const nxt_send_oob_t * oob)5903 nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
5904     const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
5905 {
5906     int                   notify;
5907     ssize_t               ret;
5908     nxt_int_t             rc;
5909     nxt_port_msg_t        msg;
5910     nxt_unit_impl_t       *lib;
5911     nxt_unit_port_impl_t  *port_impl;
5912 
5913     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
5914 
5915     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
5916     if (port_impl->queue != NULL && (oob == NULL || oob->size == 0)
5917         && buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
5918     {
5919         rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, &notify);
5920         if (nxt_slow_path(rc != NXT_OK)) {
5921             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5922                            (int) port->id.pid, (int) port->id.id);
5923 
5924             return -1;
5925         }
5926 
5927         nxt_unit_debug(ctx, "port{%d,%d} enqueue %d notify %d",
5928                        (int) port->id.pid, (int) port->id.id,
5929                        (int) buf_size, notify);
5930 
5931         if (notify) {
5932             memcpy(&msg, buf, sizeof(nxt_port_msg_t));
5933 
5934             msg.type = _NXT_PORT_MSG_READ_QUEUE;
5935 
5936             if (lib->callbacks.port_send == NULL) {
5937                 ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
5938                                        sizeof(nxt_port_msg_t), NULL);
5939 
5940                 nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
5941                                (int) port->id.pid, (int) port->id.id,
5942                                (int) ret);
5943 
5944             } else {
5945                 ret = lib->callbacks.port_send(ctx, port, &msg,
5946                                                sizeof(nxt_port_msg_t), NULL, 0);
5947 
5948                 nxt_unit_debug(ctx, "port{%d,%d} sendcb %d read_queue",
5949                                (int) port->id.pid, (int) port->id.id,
5950                                (int) ret);
5951             }
5952 
5953         }
5954 
5955         return buf_size;
5956     }
5957 
5958     if (port_impl->queue != NULL) {
5959         msg.type = _NXT_PORT_MSG_READ_SOCKET;
5960 
5961         rc = nxt_port_queue_send(port_impl->queue, &msg.type, 1, &notify);
5962         if (nxt_slow_path(rc != NXT_OK)) {
5963             nxt_unit_alert(ctx, "port_send: port %d,%d queue overflow",
5964                            (int) port->id.pid, (int) port->id.id);
5965 
5966             return -1;
5967         }
5968 
5969         nxt_unit_debug(ctx, "port{%d,%d} enqueue 1 read_socket notify %d",
5970                        (int) port->id.pid, (int) port->id.id, notify);
5971     }
5972 
5973     if (lib->callbacks.port_send != NULL) {
5974         ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
5975                                        oob != NULL ? oob->buf : NULL,
5976                                        oob != NULL ? oob->size : 0);
5977 
5978         nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
5979                        (int) port->id.pid, (int) port->id.id,
5980                        (int) ret);
5981 
5982     } else {
5983         ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, oob);
5984 
5985         nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
5986                        (int) port->id.pid, (int) port->id.id,
5987                        (int) ret);
5988     }
5989 
5990     return ret;
5991 }
5992 
5993 
5994 static ssize_t
nxt_unit_sendmsg(nxt_unit_ctx_t * ctx,int fd,const void * buf,size_t buf_size,const nxt_send_oob_t * oob)5995 nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
5996     const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
5997 {
5998     int            err;
5999     ssize_t        n;
6000     struct iovec   iov[1];
6001 
6002     iov[0].iov_base = (void *) buf;
6003     iov[0].iov_len = buf_size;
6004 
6005 retry:
6006 
6007     n = nxt_sendmsg(fd, iov, 1, oob);
6008 
6009     if (nxt_slow_path(n == -1)) {
6010         err = errno;
6011 
6012         if (err == EINTR) {
6013             goto retry;
6014         }
6015 
6016         /*
6017          * FIXME: This should be "alert" after router graceful shutdown
6018          * implementation.
6019          */
6020         nxt_unit_warn(ctx, "sendmsg(%d, %d) failed: %s (%d)",
6021                       fd, (int) buf_size, strerror(err), err);
6022 
6023     } else {
6024         nxt_unit_debug(ctx, "sendmsg(%d, %d, %d): %d", fd, (int) buf_size,
6025                        (oob != NULL ? (int) oob->size : 0), (int) n);
6026     }
6027 
6028     return n;
6029 }
6030 
6031 
6032 static int
nxt_unit_ctx_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6033 nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6034     nxt_unit_read_buf_t *rbuf)
6035 {
6036     int                   res, read;
6037     nxt_unit_port_impl_t  *port_impl;
6038 
6039     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6040 
6041     read = 0;
6042 
6043 retry:
6044 
6045     if (port_impl->from_socket > 0) {
6046         if (port_impl->socket_rbuf != NULL
6047             && port_impl->socket_rbuf->size > 0)
6048         {
6049             port_impl->from_socket--;
6050 
6051             nxt_unit_rbuf_cpy(rbuf, port_impl->socket_rbuf);
6052             port_impl->socket_rbuf->size = 0;
6053 
6054             nxt_unit_debug(ctx, "port{%d,%d} use suspended message %d",
6055                            (int) port->id.pid, (int) port->id.id,
6056                            (int) rbuf->size);
6057 
6058             return NXT_UNIT_OK;
6059         }
6060 
6061     } else {
6062         res = nxt_unit_port_queue_recv(port, rbuf);
6063 
6064         if (res == NXT_UNIT_OK) {
6065             if (nxt_unit_is_read_socket(rbuf)) {
6066                 port_impl->from_socket++;
6067 
6068                 nxt_unit_debug(ctx, "port{%d,%d} dequeue 1 read_socket %d",
6069                                (int) port->id.pid, (int) port->id.id,
6070                                port_impl->from_socket);
6071 
6072                 goto retry;
6073             }
6074 
6075             nxt_unit_debug(ctx, "port{%d,%d} dequeue %d",
6076                            (int) port->id.pid, (int) port->id.id,
6077                            (int) rbuf->size);
6078 
6079             return NXT_UNIT_OK;
6080         }
6081     }
6082 
6083     if (read) {
6084         return NXT_UNIT_AGAIN;
6085     }
6086 
6087     res = nxt_unit_port_recv(ctx, port, rbuf);
6088     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6089         return NXT_UNIT_ERROR;
6090     }
6091 
6092     read = 1;
6093 
6094     if (nxt_unit_is_read_queue(rbuf)) {
6095         nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6096                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6097 
6098         goto retry;
6099     }
6100 
6101     nxt_unit_debug(ctx, "port{%d,%d} recvmsg %d",
6102                    (int) port->id.pid, (int) port->id.id,
6103                    (int) rbuf->size);
6104 
6105     if (res == NXT_UNIT_AGAIN) {
6106         return NXT_UNIT_AGAIN;
6107     }
6108 
6109     if (port_impl->from_socket > 0) {
6110         port_impl->from_socket--;
6111 
6112         return NXT_UNIT_OK;
6113     }
6114 
6115     nxt_unit_debug(ctx, "port{%d,%d} suspend message %d",
6116                    (int) port->id.pid, (int) port->id.id,
6117                    (int) rbuf->size);
6118 
6119     if (port_impl->socket_rbuf == NULL) {
6120         port_impl->socket_rbuf = nxt_unit_read_buf_get(ctx);
6121 
6122         if (nxt_slow_path(port_impl->socket_rbuf == NULL)) {
6123             return NXT_UNIT_ERROR;
6124         }
6125 
6126         port_impl->socket_rbuf->size = 0;
6127     }
6128 
6129     if (port_impl->socket_rbuf->size > 0) {
6130         nxt_unit_alert(ctx, "too many port socket messages");
6131 
6132         return NXT_UNIT_ERROR;
6133     }
6134 
6135     nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
6136 
6137     rbuf->oob.size = 0;
6138 
6139     goto retry;
6140 }
6141 
6142 
6143 nxt_inline void
nxt_unit_rbuf_cpy(nxt_unit_read_buf_t * dst,nxt_unit_read_buf_t * src)6144 nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
6145 {
6146     memcpy(dst->buf, src->buf, src->size);
6147     dst->size = src->size;
6148     dst->oob.size = src->oob.size;
6149     memcpy(dst->oob.buf, src->oob.buf, src->oob.size);
6150 }
6151 
6152 
6153 static int
nxt_unit_shared_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6154 nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6155     nxt_unit_read_buf_t *rbuf)
6156 {
6157     int                   res;
6158     nxt_unit_port_impl_t  *port_impl;
6159 
6160     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6161 
6162 retry:
6163 
6164     res = nxt_unit_app_queue_recv(ctx, port, rbuf);
6165 
6166     if (res == NXT_UNIT_OK) {
6167         return NXT_UNIT_OK;
6168     }
6169 
6170     if (res == NXT_UNIT_AGAIN) {
6171         res = nxt_unit_port_recv(ctx, port, rbuf);
6172         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
6173             return NXT_UNIT_ERROR;
6174         }
6175 
6176         if (nxt_unit_is_read_queue(rbuf)) {
6177             nxt_app_queue_notification_received(port_impl->queue);
6178 
6179             nxt_unit_debug(ctx, "port{%d,%d} recv %d read_queue",
6180                            (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6181 
6182             goto retry;
6183         }
6184     }
6185 
6186     return res;
6187 }
6188 
6189 
6190 static int
nxt_unit_port_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6191 nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6192     nxt_unit_read_buf_t *rbuf)
6193 {
6194     int              fd, err;
6195     size_t           oob_size;
6196     struct iovec     iov[1];
6197     nxt_unit_impl_t  *lib;
6198 
6199     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6200 
6201     if (lib->callbacks.port_recv != NULL) {
6202         oob_size = sizeof(rbuf->oob.buf);
6203 
6204         rbuf->size = lib->callbacks.port_recv(ctx, port,
6205                                               rbuf->buf, sizeof(rbuf->buf),
6206                                               rbuf->oob.buf, &oob_size);
6207 
6208         nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
6209                        (int) port->id.pid, (int) port->id.id, (int) rbuf->size);
6210 
6211         if (nxt_slow_path(rbuf->size < 0)) {
6212             return NXT_UNIT_ERROR;
6213         }
6214 
6215         rbuf->oob.size = oob_size;
6216         return NXT_UNIT_OK;
6217     }
6218 
6219     iov[0].iov_base = rbuf->buf;
6220     iov[0].iov_len = sizeof(rbuf->buf);
6221 
6222     fd = port->in_fd;
6223 
6224 retry:
6225 
6226     rbuf->size = nxt_recvmsg(fd, iov, 1, &rbuf->oob);
6227 
6228     if (nxt_slow_path(rbuf->size == -1)) {
6229         err = errno;
6230 
6231         if (err == EINTR) {
6232             goto retry;
6233         }
6234 
6235         if (err == EAGAIN) {
6236             nxt_unit_debug(ctx, "recvmsg(%d) failed: %s (%d)",
6237                            fd, strerror(err), err);
6238 
6239             return NXT_UNIT_AGAIN;
6240         }
6241 
6242         nxt_unit_alert(ctx, "recvmsg(%d) failed: %s (%d)",
6243                        fd, strerror(err), err);
6244 
6245         return NXT_UNIT_ERROR;
6246     }
6247 
6248     nxt_unit_debug(ctx, "recvmsg(%d): %d", fd, (int) rbuf->size);
6249 
6250     return NXT_UNIT_OK;
6251 }
6252 
6253 
6254 static int
nxt_unit_port_queue_recv(nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6255 nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
6256 {
6257     nxt_unit_port_impl_t  *port_impl;
6258 
6259     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6260 
6261     rbuf->size = nxt_port_queue_recv(port_impl->queue, rbuf->buf);
6262 
6263     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6264 }
6265 
6266 
6267 static int
nxt_unit_app_queue_recv(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port,nxt_unit_read_buf_t * rbuf)6268 nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
6269     nxt_unit_read_buf_t *rbuf)
6270 {
6271     uint32_t              cookie;
6272     nxt_port_msg_t        *port_msg;
6273     nxt_app_queue_t       *queue;
6274     nxt_unit_impl_t       *lib;
6275     nxt_unit_port_impl_t  *port_impl;
6276 
6277     struct {
6278         nxt_port_msg_t    msg;
6279         uint8_t           quit_param;
6280     } nxt_packed m;
6281 
6282     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
6283     queue = port_impl->queue;
6284 
6285 retry:
6286 
6287     rbuf->size = nxt_app_queue_recv(queue, rbuf->buf, &cookie);
6288 
6289     nxt_unit_debug(NULL, "app_queue_recv: %d", (int) rbuf->size);
6290 
6291     if (rbuf->size >= (ssize_t) sizeof(nxt_port_msg_t)) {
6292         port_msg = (nxt_port_msg_t *) rbuf->buf;
6293 
6294         if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
6295             lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6296 
6297             if (lib->request_limit != 0) {
6298                 nxt_atomic_fetch_add(&lib->request_count, 1);
6299 
6300                 if (nxt_slow_path(lib->request_count >= lib->request_limit)) {
6301                     nxt_unit_debug(ctx, "request limit reached");
6302 
6303                     memset(&m.msg, 0, sizeof(nxt_port_msg_t));
6304 
6305                     m.msg.pid = lib->pid;
6306                     m.msg.type = _NXT_PORT_MSG_QUIT;
6307                     m.quit_param = NXT_QUIT_GRACEFUL;
6308 
6309                     (void) nxt_unit_port_send(ctx, lib->main_ctx.read_port,
6310                                               &m, sizeof(m), NULL);
6311                 }
6312             }
6313 
6314             return NXT_UNIT_OK;
6315         }
6316 
6317         nxt_unit_debug(NULL, "app_queue_recv: message cancelled");
6318 
6319         goto retry;
6320     }
6321 
6322     return (rbuf->size == -1) ? NXT_UNIT_AGAIN : NXT_UNIT_OK;
6323 }
6324 
6325 
6326 nxt_inline int
nxt_unit_close(int fd)6327 nxt_unit_close(int fd)
6328 {
6329     int  res;
6330 
6331     res = close(fd);
6332 
6333     if (nxt_slow_path(res == -1)) {
6334         nxt_unit_alert(NULL, "close(%d) failed: %s (%d)",
6335                        fd, strerror(errno), errno);
6336 
6337     } else {
6338         nxt_unit_debug(NULL, "close(%d): %d", fd, res);
6339     }
6340 
6341     return res;
6342 }
6343 
6344 
6345 static int
nxt_unit_fd_blocking(int fd)6346 nxt_unit_fd_blocking(int fd)
6347 {
6348     int  nb;
6349 
6350     nb = 0;
6351 
6352     if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
6353         nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
6354                        fd, strerror(errno), errno);
6355 
6356         return NXT_UNIT_ERROR;
6357     }
6358 
6359     return NXT_UNIT_OK;
6360 }
6361 
6362 
6363 static nxt_int_t
nxt_unit_port_hash_test(nxt_lvlhsh_query_t * lhq,void * data)6364 nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6365 {
6366     nxt_unit_port_t          *port;
6367     nxt_unit_port_hash_id_t  *port_id;
6368 
6369     port = data;
6370     port_id = (nxt_unit_port_hash_id_t *) lhq->key.start;
6371 
6372     if (lhq->key.length == sizeof(nxt_unit_port_hash_id_t)
6373         && port_id->pid == port->id.pid
6374         && port_id->id == port->id.id)
6375     {
6376         return NXT_OK;
6377     }
6378 
6379     return NXT_DECLINED;
6380 }
6381 
6382 
6383 static const nxt_lvlhsh_proto_t  lvlhsh_ports_proto  nxt_aligned(64) = {
6384     NXT_LVLHSH_DEFAULT,
6385     nxt_unit_port_hash_test,
6386     nxt_unit_lvlhsh_alloc,
6387     nxt_unit_lvlhsh_free,
6388 };
6389 
6390 
6391 static inline void
nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t * lhq,nxt_unit_port_hash_id_t * port_hash_id,nxt_unit_port_id_t * port_id)6392 nxt_unit_port_hash_lhq(nxt_lvlhsh_query_t *lhq,
6393     nxt_unit_port_hash_id_t *port_hash_id,
6394     nxt_unit_port_id_t *port_id)
6395 {
6396     port_hash_id->pid = port_id->pid;
6397     port_hash_id->id = port_id->id;
6398 
6399     if (nxt_fast_path(port_id->hash != 0)) {
6400         lhq->key_hash = port_id->hash;
6401 
6402     } else {
6403         lhq->key_hash = nxt_murmur_hash2(port_hash_id, sizeof(*port_hash_id));
6404 
6405         port_id->hash = lhq->key_hash;
6406 
6407         nxt_unit_debug(NULL, "calculate hash for port_id (%d, %d): %04X",
6408                        (int) port_id->pid, (int) port_id->id,
6409                        (int) port_id->hash);
6410     }
6411 
6412     lhq->key.length = sizeof(nxt_unit_port_hash_id_t);
6413     lhq->key.start = (u_char *) port_hash_id;
6414     lhq->proto = &lvlhsh_ports_proto;
6415     lhq->pool = NULL;
6416 }
6417 
6418 
6419 static int
nxt_unit_port_hash_add(nxt_lvlhsh_t * port_hash,nxt_unit_port_t * port)6420 nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, nxt_unit_port_t *port)
6421 {
6422     nxt_int_t                res;
6423     nxt_lvlhsh_query_t       lhq;
6424     nxt_unit_port_hash_id_t  port_hash_id;
6425 
6426     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, &port->id);
6427     lhq.replace = 0;
6428     lhq.value = port;
6429 
6430     res = nxt_lvlhsh_insert(port_hash, &lhq);
6431 
6432     switch (res) {
6433 
6434     case NXT_OK:
6435         return NXT_UNIT_OK;
6436 
6437     default:
6438         return NXT_UNIT_ERROR;
6439     }
6440 }
6441 
6442 
6443 static nxt_unit_port_t *
nxt_unit_port_hash_find(nxt_lvlhsh_t * port_hash,nxt_unit_port_id_t * port_id,int remove)6444 nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, nxt_unit_port_id_t *port_id,
6445     int remove)
6446 {
6447     nxt_int_t                res;
6448     nxt_lvlhsh_query_t       lhq;
6449     nxt_unit_port_hash_id_t  port_hash_id;
6450 
6451     nxt_unit_port_hash_lhq(&lhq, &port_hash_id, port_id);
6452 
6453     if (remove) {
6454         res = nxt_lvlhsh_delete(port_hash, &lhq);
6455 
6456     } else {
6457         res = nxt_lvlhsh_find(port_hash, &lhq);
6458     }
6459 
6460     switch (res) {
6461 
6462     case NXT_OK:
6463         if (!remove) {
6464             nxt_unit_port_use(lhq.value);
6465         }
6466 
6467         return lhq.value;
6468 
6469     default:
6470         return NULL;
6471     }
6472 }
6473 
6474 
6475 static nxt_int_t
nxt_unit_request_hash_test(nxt_lvlhsh_query_t * lhq,void * data)6476 nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
6477 {
6478     return NXT_OK;
6479 }
6480 
6481 
6482 static const nxt_lvlhsh_proto_t  lvlhsh_requests_proto  nxt_aligned(64) = {
6483     NXT_LVLHSH_DEFAULT,
6484     nxt_unit_request_hash_test,
6485     nxt_unit_lvlhsh_alloc,
6486     nxt_unit_lvlhsh_free,
6487 };
6488 
6489 
6490 static int
nxt_unit_request_hash_add(nxt_unit_ctx_t * ctx,nxt_unit_request_info_t * req)6491 nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
6492     nxt_unit_request_info_t *req)
6493 {
6494     uint32_t                      *stream;
6495     nxt_int_t                     res;
6496     nxt_lvlhsh_query_t            lhq;
6497     nxt_unit_ctx_impl_t           *ctx_impl;
6498     nxt_unit_request_info_impl_t  *req_impl;
6499 
6500     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6501     if (req_impl->in_hash) {
6502         return NXT_UNIT_OK;
6503     }
6504 
6505     stream = &req_impl->stream;
6506 
6507     lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
6508     lhq.key.length = sizeof(*stream);
6509     lhq.key.start = (u_char *) stream;
6510     lhq.proto = &lvlhsh_requests_proto;
6511     lhq.pool = NULL;
6512     lhq.replace = 0;
6513     lhq.value = req_impl;
6514 
6515     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6516 
6517     pthread_mutex_lock(&ctx_impl->mutex);
6518 
6519     res = nxt_lvlhsh_insert(&ctx_impl->requests, &lhq);
6520 
6521     pthread_mutex_unlock(&ctx_impl->mutex);
6522 
6523     switch (res) {
6524 
6525     case NXT_OK:
6526         req_impl->in_hash = 1;
6527         return NXT_UNIT_OK;
6528 
6529     default:
6530         return NXT_UNIT_ERROR;
6531     }
6532 }
6533 
6534 
6535 static nxt_unit_request_info_t *
nxt_unit_request_hash_find(nxt_unit_ctx_t * ctx,uint32_t stream,int remove)6536 nxt_unit_request_hash_find(nxt_unit_ctx_t *ctx, uint32_t stream, int remove)
6537 {
6538     nxt_int_t                     res;
6539     nxt_lvlhsh_query_t            lhq;
6540     nxt_unit_ctx_impl_t           *ctx_impl;
6541     nxt_unit_request_info_impl_t  *req_impl;
6542 
6543     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
6544     lhq.key.length = sizeof(stream);
6545     lhq.key.start = (u_char *) &stream;
6546     lhq.proto = &lvlhsh_requests_proto;
6547     lhq.pool = NULL;
6548 
6549     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
6550 
6551     pthread_mutex_lock(&ctx_impl->mutex);
6552 
6553     if (remove) {
6554         res = nxt_lvlhsh_delete(&ctx_impl->requests, &lhq);
6555 
6556     } else {
6557         res = nxt_lvlhsh_find(&ctx_impl->requests, &lhq);
6558     }
6559 
6560     pthread_mutex_unlock(&ctx_impl->mutex);
6561 
6562     switch (res) {
6563 
6564     case NXT_OK:
6565         req_impl = nxt_container_of(lhq.value, nxt_unit_request_info_impl_t,
6566                                     req);
6567         if (remove) {
6568             req_impl->in_hash = 0;
6569         }
6570 
6571         return lhq.value;
6572 
6573     default:
6574         return NULL;
6575     }
6576 }
6577 
6578 
6579 void
nxt_unit_log(nxt_unit_ctx_t * ctx,int level,const char * fmt,...)6580 nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
6581 {
6582     int              log_fd, n;
6583     char             msg[NXT_MAX_ERROR_STR], *p, *end;
6584     pid_t            pid;
6585     va_list          ap;
6586     nxt_unit_impl_t  *lib;
6587 
6588     if (nxt_fast_path(ctx != NULL)) {
6589         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
6590 
6591         pid = lib->pid;
6592         log_fd = lib->log_fd;
6593 
6594     } else {
6595         pid = nxt_unit_pid;
6596         log_fd = STDERR_FILENO;
6597     }
6598 
6599     p = msg;
6600     end = p + sizeof(msg) - 1;
6601 
6602     p = nxt_unit_snprint_prefix(p, end, pid, level);
6603 
6604     va_start(ap, fmt);
6605     p += vsnprintf(p, end - p, fmt, ap);
6606     va_end(ap);
6607 
6608     if (nxt_slow_path(p > end)) {
6609         memcpy(end - 5, "[...]", 5);
6610         p = end;
6611     }
6612 
6613     *p++ = '\n';
6614 
6615     n = write(log_fd, msg, p - msg);
6616     if (nxt_slow_path(n < 0)) {
6617         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6618     }
6619 }
6620 
6621 
6622 void
nxt_unit_req_log(nxt_unit_request_info_t * req,int level,const char * fmt,...)6623 nxt_unit_req_log(nxt_unit_request_info_t *req, int level, const char *fmt, ...)
6624 {
6625     int                           log_fd, n;
6626     char                          msg[NXT_MAX_ERROR_STR], *p, *end;
6627     pid_t                         pid;
6628     va_list                       ap;
6629     nxt_unit_impl_t               *lib;
6630     nxt_unit_request_info_impl_t  *req_impl;
6631 
6632     if (nxt_fast_path(req != NULL)) {
6633         lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
6634 
6635         pid = lib->pid;
6636         log_fd = lib->log_fd;
6637 
6638     } else {
6639         pid = nxt_unit_pid;
6640         log_fd = STDERR_FILENO;
6641     }
6642 
6643     p = msg;
6644     end = p + sizeof(msg) - 1;
6645 
6646     p = nxt_unit_snprint_prefix(p, end, pid, level);
6647 
6648     if (nxt_fast_path(req != NULL)) {
6649         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
6650 
6651         p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
6652     }
6653 
6654     va_start(ap, fmt);
6655     p += vsnprintf(p, end - p, fmt, ap);
6656     va_end(ap);
6657 
6658     if (nxt_slow_path(p > end)) {
6659         memcpy(end - 5, "[...]", 5);
6660         p = end;
6661     }
6662 
6663     *p++ = '\n';
6664 
6665     n = write(log_fd, msg, p - msg);
6666     if (nxt_slow_path(n < 0)) {
6667         fprintf(stderr, "Failed to write log: %.*s", (int) (p - msg), msg);
6668     }
6669 }
6670 
6671 
6672 static const char * nxt_unit_log_levels[] = {
6673     "alert",
6674     "error",
6675     "warn",
6676     "notice",
6677     "info",
6678     "debug",
6679 };
6680 
6681 
6682 static char *
nxt_unit_snprint_prefix(char * p,char * end,pid_t pid,int level)6683 nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level)
6684 {
6685     struct tm        tm;
6686     struct timespec  ts;
6687 
6688     (void) clock_gettime(CLOCK_REALTIME, &ts);
6689 
6690 #if (NXT_HAVE_LOCALTIME_R)
6691     (void) localtime_r(&ts.tv_sec, &tm);
6692 #else
6693     tm = *localtime(&ts.tv_sec);
6694 #endif
6695 
6696 #if (NXT_DEBUG)
6697     p += snprintf(p, end - p,
6698                   "%4d/%02d/%02d %02d:%02d:%02d.%03d ",
6699                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6700                   tm.tm_hour, tm.tm_min, tm.tm_sec,
6701                   (int) ts.tv_nsec / 1000000);
6702 #else
6703     p += snprintf(p, end - p,
6704                   "%4d/%02d/%02d %02d:%02d:%02d ",
6705                   tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
6706                   tm.tm_hour, tm.tm_min, tm.tm_sec);
6707 #endif
6708 
6709     p += snprintf(p, end - p,
6710                   "[%s] %d#%"PRIu64" [unit] ", nxt_unit_log_levels[level],
6711                   (int) pid,
6712                   (uint64_t) (uintptr_t) nxt_thread_get_tid());
6713 
6714     return p;
6715 }
6716 
6717 
6718 static void *
nxt_unit_lvlhsh_alloc(void * data,size_t size)6719 nxt_unit_lvlhsh_alloc(void *data, size_t size)
6720 {
6721     int   err;
6722     void  *p;
6723 
6724     err = posix_memalign(&p, size, size);
6725 
6726     if (nxt_fast_path(err == 0)) {
6727         nxt_unit_debug(NULL, "posix_memalign(%d, %d): %p",
6728                        (int) size, (int) size, p);
6729         return p;
6730     }
6731 
6732     nxt_unit_alert(NULL, "posix_memalign(%d, %d) failed: %s (%d)",
6733                    (int) size, (int) size, strerror(err), err);
6734     return NULL;
6735 }
6736 
6737 
6738 static void
nxt_unit_lvlhsh_free(void * data,void * p)6739 nxt_unit_lvlhsh_free(void *data, void *p)
6740 {
6741     nxt_unit_free(NULL, p);
6742 }
6743 
6744 
6745 void *
nxt_unit_malloc(nxt_unit_ctx_t * ctx,size_t size)6746 nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size)
6747 {
6748     void  *p;
6749 
6750     p = malloc(size);
6751 
6752     if (nxt_fast_path(p != NULL)) {
6753 #if (NXT_DEBUG_ALLOC)
6754         nxt_unit_debug(ctx, "malloc(%d): %p", (int) size, p);
6755 #endif
6756 
6757     } else {
6758         nxt_unit_alert(ctx, "malloc(%d) failed: %s (%d)",
6759                        (int) size, strerror(errno), errno);
6760     }
6761 
6762     return p;
6763 }
6764 
6765 
6766 void
nxt_unit_free(nxt_unit_ctx_t * ctx,void * p)6767 nxt_unit_free(nxt_unit_ctx_t *ctx, void *p)
6768 {
6769 #if (NXT_DEBUG_ALLOC)
6770     nxt_unit_debug(ctx, "free(%p)", p);
6771 #endif
6772 
6773     free(p);
6774 }
6775 
6776 
6777 static int
nxt_unit_memcasecmp(const void * p1,const void * p2,size_t length)6778 nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length)
6779 {
6780     u_char        c1, c2;
6781     nxt_int_t     n;
6782     const u_char  *s1, *s2;
6783 
6784     s1 = p1;
6785     s2 = p2;
6786 
6787     while (length-- != 0) {
6788         c1 = *s1++;
6789         c2 = *s2++;
6790 
6791         c1 = nxt_lowcase(c1);
6792         c2 = nxt_lowcase(c2);
6793 
6794         n = c1 - c2;
6795 
6796         if (n != 0) {
6797             return n;
6798         }
6799     }
6800 
6801     return 0;
6802 }
6803