xref: /unit/src/nxt_unit.c (revision 1980:43553aa72111)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <stdlib.h>
7 
8 #include "nxt_main.h"
9 #include "nxt_port_memory_int.h"
10 #include "nxt_port_queue.h"
11 #include "nxt_app_queue.h"
12 
13 #include "nxt_unit.h"
14 #include "nxt_unit_request.h"
15 #include "nxt_unit_response.h"
16 #include "nxt_unit_websocket.h"
17 
18 #include "nxt_websocket.h"
19 
20 #if (NXT_HAVE_MEMFD_CREATE)
21 #include <linux/memfd.h>
22 #endif
23 
24 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
25 #define NXT_UNIT_LOCAL_BUF_SIZE  \
26     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
27 
28 enum {
29     NXT_QUIT_NORMAL   = 0,
30     NXT_QUIT_GRACEFUL = 1,
31 };
32 
33 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
34 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
35 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
36 typedef struct nxt_unit_process_s               nxt_unit_process_t;
37 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
38 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
39 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
40 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
41 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
42 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
43 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
44 
45 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
46 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
47     nxt_unit_ctx_impl_t *ctx_impl, void *data);
48 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
50 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
52 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
53     nxt_unit_mmap_buf_t *mmap_buf);
54 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
55     nxt_unit_mmap_buf_t *mmap_buf);
56 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
57 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
58     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
59     int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60     uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62     int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64     nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66     nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71     nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73     nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76     nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79     nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83     nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86     nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90     nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95     nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97     nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99     nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101     size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108     nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111     int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113     nxt_unit_port_t *port, uint32_t size,
114     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116 
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118     nxt_unit_ctx_impl_t *ctx_impl);
119 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132 
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135     pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147     nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150 
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152     nxt_unit_port_t *port, int queue_fd);
153 
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157     nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159     nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161     nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163     nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166     nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170     nxt_unit_port_t *port, const void *buf, size_t buf_size,
171     const void *oob, size_t oob_size);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173     const void *buf, size_t buf_size, const void *oob, size_t oob_size);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175     nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177     nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179     nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181     nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183     nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185     nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188 
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190     nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192     nxt_unit_port_id_t *port_id, int remove);
193 
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195     nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198 
199 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
200 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
201 static void nxt_unit_lvlhsh_free(void *data, void *p);
202 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
203 
204 
205 struct nxt_unit_mmap_buf_s {
206     nxt_unit_buf_t           buf;
207 
208     nxt_unit_mmap_buf_t      *next;
209     nxt_unit_mmap_buf_t      **prev;
210 
211     nxt_port_mmap_header_t   *hdr;
212     nxt_unit_request_info_t  *req;
213     nxt_unit_ctx_impl_t      *ctx_impl;
214     char                     *free_ptr;
215     char                     *plain_ptr;
216 };
217 
218 
219 struct nxt_unit_recv_msg_s {
220     uint32_t                 stream;
221     nxt_pid_t                pid;
222     nxt_port_id_t            reply_port;
223 
224     uint8_t                  last;      /* 1 bit */
225     uint8_t                  mmap;      /* 1 bit */
226 
227     void                     *start;
228     uint32_t                 size;
229 
230     int                      fd[2];
231 
232     nxt_unit_mmap_buf_t      *incoming_buf;
233 };
234 
235 
236 typedef enum {
237     NXT_UNIT_RS_START           = 0,
238     NXT_UNIT_RS_RESPONSE_INIT,
239     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
240     NXT_UNIT_RS_RESPONSE_SENT,
241     NXT_UNIT_RS_RELEASED,
242 } nxt_unit_req_state_t;
243 
244 
245 struct nxt_unit_request_info_impl_s {
246     nxt_unit_request_info_t  req;
247 
248     uint32_t                 stream;
249 
250     nxt_unit_mmap_buf_t      *outgoing_buf;
251     nxt_unit_mmap_buf_t      *incoming_buf;
252 
253     nxt_unit_req_state_t     state;
254     uint8_t                  websocket;
255     uint8_t                  in_hash;
256 
257     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
258     nxt_queue_link_t         link;
259     /*  for nxt_unit_port_impl_t.awaiting_req */
260     nxt_queue_link_t         port_wait_link;
261 
262     char                     extra_data[];
263 };
264 
265 
266 struct nxt_unit_websocket_frame_impl_s {
267     nxt_unit_websocket_frame_t  ws;
268 
269     nxt_unit_mmap_buf_t         *buf;
270 
271     nxt_queue_link_t            link;
272 
273     nxt_unit_ctx_impl_t         *ctx_impl;
274 };
275 
276 
277 struct nxt_unit_read_buf_s {
278     nxt_queue_link_t              link;
279     nxt_unit_ctx_impl_t           *ctx_impl;
280     ssize_t                       size;
281     char                          buf[16384];
282     char                          oob[256];
283 };
284 
285 
286 struct nxt_unit_ctx_impl_s {
287     nxt_unit_ctx_t                ctx;
288 
289     nxt_atomic_t                  use_count;
290     nxt_atomic_t                  wait_items;
291 
292     pthread_mutex_t               mutex;
293 
294     nxt_unit_port_t               *read_port;
295 
296     nxt_queue_link_t              link;
297 
298     nxt_unit_mmap_buf_t           *free_buf;
299 
300     /*  of nxt_unit_request_info_impl_t */
301     nxt_queue_t                   free_req;
302 
303     /*  of nxt_unit_websocket_frame_impl_t */
304     nxt_queue_t                   free_ws;
305 
306     /*  of nxt_unit_request_info_impl_t */
307     nxt_queue_t                   active_req;
308 
309     /*  of nxt_unit_request_info_impl_t */
310     nxt_lvlhsh_t                  requests;
311 
312     /*  of nxt_unit_request_info_impl_t */
313     nxt_queue_t                   ready_req;
314 
315     /*  of nxt_unit_read_buf_t */
316     nxt_queue_t                   pending_rbuf;
317 
318     /*  of nxt_unit_read_buf_t */
319     nxt_queue_t                   free_rbuf;
320 
321     uint8_t                       online;       /* 1 bit */
322     uint8_t                       ready;        /* 1 bit */
323     uint8_t                       quit_param;
324 
325     nxt_unit_mmap_buf_t           ctx_buf[2];
326     nxt_unit_read_buf_t           ctx_read_buf;
327 
328     nxt_unit_request_info_impl_t  req;
329 };
330 
331 
332 struct nxt_unit_mmap_s {
333     nxt_port_mmap_header_t   *hdr;
334     pthread_t                src_thread;
335 
336     /*  of nxt_unit_read_buf_t */
337     nxt_queue_t              awaiting_rbuf;
338 };
339 
340 
341 struct nxt_unit_mmaps_s {
342     pthread_mutex_t          mutex;
343     uint32_t                 size;
344     uint32_t                 cap;
345     nxt_atomic_t             allocated_chunks;
346     nxt_unit_mmap_t          *elts;
347 };
348 
349 
350 struct nxt_unit_impl_s {
351     nxt_unit_t               unit;
352     nxt_unit_callbacks_t     callbacks;
353 
354     nxt_atomic_t             use_count;
355     nxt_atomic_t             request_count;
356 
357     uint32_t                 request_data_size;
358     uint32_t                 shm_mmap_limit;
359     uint32_t                 request_limit;
360 
361     pthread_mutex_t          mutex;
362 
363     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
364     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
365 
366     nxt_unit_port_t          *router_port;
367     nxt_unit_port_t          *shared_port;
368 
369     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
370 
371     nxt_unit_mmaps_t         incoming;
372     nxt_unit_mmaps_t         outgoing;
373 
374     pid_t                    pid;
375     int                      log_fd;
376 
377     nxt_unit_ctx_impl_t      main_ctx;
378 };
379 
380 
381 struct nxt_unit_port_impl_s {
382     nxt_unit_port_t          port;
383 
384     nxt_atomic_t             use_count;
385 
386     /*  for nxt_unit_process_t.ports */
387     nxt_queue_link_t         link;
388     nxt_unit_process_t       *process;
389 
390     /*  of nxt_unit_request_info_impl_t */
391     nxt_queue_t              awaiting_req;
392 
393     int                      ready;
394 
395     void                     *queue;
396 
397     int                      from_socket;
398     nxt_unit_read_buf_t      *socket_rbuf;
399 };
400 
401 
402 struct nxt_unit_process_s {
403     pid_t                    pid;
404 
405     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
406 
407     nxt_unit_impl_t          *lib;
408 
409     nxt_atomic_t             use_count;
410 
411     uint32_t                 next_port_id;
412 };
413 
414 
415 /* Explicitly using 32 bit types to avoid possible alignment. */
416 typedef struct {
417     int32_t   pid;
418     uint32_t  id;
419 } nxt_unit_port_hash_id_t;
420 
421 
422 nxt_unit_ctx_t *
423 nxt_unit_init(nxt_unit_init_t *init)
424 {
425     int              rc, queue_fd;
426     void             *mem;
427     uint32_t         ready_stream, shm_limit, request_limit;
428     nxt_unit_ctx_t   *ctx;
429     nxt_unit_impl_t  *lib;
430     nxt_unit_port_t  ready_port, router_port, read_port;
431 
432     lib = nxt_unit_create(init);
433     if (nxt_slow_path(lib == NULL)) {
434         return NULL;
435     }
436 
437     queue_fd = -1;
438     mem = MAP_FAILED;
439 
440     if (init->ready_port.id.pid != 0
441         && init->ready_stream != 0
442         && init->read_port.id.pid != 0)
443     {
444         ready_port = init->ready_port;
445         ready_stream = init->ready_stream;
446         router_port = init->router_port;
447         read_port = init->read_port;
448         lib->log_fd = init->log_fd;
449 
450         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
451                               ready_port.id.id);
452         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
453                               router_port.id.id);
454         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
455                               read_port.id.id);
456 
457     } else {
458         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
459                                &lib->log_fd, &ready_stream, &shm_limit,
460                                &request_limit);
461         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
462             goto fail;
463         }
464 
465         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
466                                 / PORT_MMAP_DATA_SIZE;
467         lib->request_limit = request_limit;
468     }
469 
470     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
471         lib->shm_mmap_limit = 1;
472     }
473 
474     lib->pid = read_port.id.pid;
475 
476     ctx = &lib->main_ctx.ctx;
477 
478     rc = nxt_unit_fd_blocking(router_port.out_fd);
479     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
480         goto fail;
481     }
482 
483     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
484     if (nxt_slow_path(lib->router_port == NULL)) {
485         nxt_unit_alert(NULL, "failed to add router_port");
486 
487         goto fail;
488     }
489 
490     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
491     if (nxt_slow_path(queue_fd == -1)) {
492         goto fail;
493     }
494 
495     mem = mmap(NULL, sizeof(nxt_port_queue_t),
496                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
497     if (nxt_slow_path(mem == MAP_FAILED)) {
498         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
499                        strerror(errno), errno);
500 
501         goto fail;
502     }
503 
504     nxt_port_queue_init(mem);
505 
506     rc = nxt_unit_fd_blocking(read_port.in_fd);
507     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
508         goto fail;
509     }
510 
511     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
512     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
513         nxt_unit_alert(NULL, "failed to add read_port");
514 
515         goto fail;
516     }
517 
518     rc = nxt_unit_fd_blocking(ready_port.out_fd);
519     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
520         goto fail;
521     }
522 
523     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
524     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
525         nxt_unit_alert(NULL, "failed to send READY message");
526 
527         goto fail;
528     }
529 
530     nxt_unit_close(ready_port.out_fd);
531     nxt_unit_close(queue_fd);
532 
533     return ctx;
534 
535 fail:
536 
537     if (mem != MAP_FAILED) {
538         munmap(mem, sizeof(nxt_port_queue_t));
539     }
540 
541     if (queue_fd != -1) {
542         nxt_unit_close(queue_fd);
543     }
544 
545     nxt_unit_ctx_release(&lib->main_ctx.ctx);
546 
547     return NULL;
548 }
549 
550 
551 static nxt_unit_impl_t *
552 nxt_unit_create(nxt_unit_init_t *init)
553 {
554     int                   rc;
555     nxt_unit_impl_t       *lib;
556     nxt_unit_callbacks_t  *cb;
557 
558     lib = nxt_unit_malloc(NULL,
559                           sizeof(nxt_unit_impl_t) + init->request_data_size);
560     if (nxt_slow_path(lib == NULL)) {
561         nxt_unit_alert(NULL, "failed to allocate unit struct");
562 
563         return NULL;
564     }
565 
566     rc = pthread_mutex_init(&lib->mutex, NULL);
567     if (nxt_slow_path(rc != 0)) {
568         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
569 
570         goto fail;
571     }
572 
573     lib->unit.data = init->data;
574     lib->callbacks = init->callbacks;
575 
576     lib->request_data_size = init->request_data_size;
577     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
578                             / PORT_MMAP_DATA_SIZE;
579     lib->request_limit = init->request_limit;
580 
581     lib->processes.slot = NULL;
582     lib->ports.slot = NULL;
583 
584     lib->log_fd = STDERR_FILENO;
585 
586     nxt_queue_init(&lib->contexts);
587 
588     lib->use_count = 0;
589     lib->request_count = 0;
590     lib->router_port = NULL;
591     lib->shared_port = NULL;
592 
593     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
594     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
595         pthread_mutex_destroy(&lib->mutex);
596         goto fail;
597     }
598 
599     cb = &lib->callbacks;
600 
601     if (cb->request_handler == NULL) {
602         nxt_unit_alert(NULL, "request_handler is NULL");
603 
604         pthread_mutex_destroy(&lib->mutex);
605         goto fail;
606     }
607 
608     nxt_unit_mmaps_init(&lib->incoming);
609     nxt_unit_mmaps_init(&lib->outgoing);
610 
611     return lib;
612 
613 fail:
614 
615     nxt_unit_free(NULL, lib);
616 
617     return NULL;
618 }
619 
620 
621 static int
622 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
623     void *data)
624 {
625     int  rc;
626 
627     ctx_impl->ctx.data = data;
628     ctx_impl->ctx.unit = &lib->unit;
629 
630     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
631     if (nxt_slow_path(rc != 0)) {
632         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
633 
634         return NXT_UNIT_ERROR;
635     }
636 
637     nxt_unit_lib_use(lib);
638 
639     pthread_mutex_lock(&lib->mutex);
640 
641     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
642 
643     pthread_mutex_unlock(&lib->mutex);
644 
645     ctx_impl->use_count = 1;
646     ctx_impl->wait_items = 0;
647     ctx_impl->online = 1;
648     ctx_impl->ready = 0;
649     ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
650 
651     nxt_queue_init(&ctx_impl->free_req);
652     nxt_queue_init(&ctx_impl->free_ws);
653     nxt_queue_init(&ctx_impl->active_req);
654     nxt_queue_init(&ctx_impl->ready_req);
655     nxt_queue_init(&ctx_impl->pending_rbuf);
656     nxt_queue_init(&ctx_impl->free_rbuf);
657 
658     ctx_impl->free_buf = NULL;
659     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
660     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
661 
662     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
663     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
664 
665     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
666 
667     ctx_impl->req.req.ctx = &ctx_impl->ctx;
668     ctx_impl->req.req.unit = &lib->unit;
669 
670     ctx_impl->read_port = NULL;
671     ctx_impl->requests.slot = 0;
672 
673     return NXT_UNIT_OK;
674 }
675 
676 
677 nxt_inline void
678 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
679 {
680     nxt_unit_ctx_impl_t  *ctx_impl;
681 
682     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
683 
684     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
685 }
686 
687 
688 nxt_inline void
689 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
690 {
691     long                 c;
692     nxt_unit_ctx_impl_t  *ctx_impl;
693 
694     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
695 
696     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
697 
698     if (c == 1) {
699         nxt_unit_ctx_free(ctx_impl);
700     }
701 }
702 
703 
704 nxt_inline void
705 nxt_unit_lib_use(nxt_unit_impl_t *lib)
706 {
707     nxt_atomic_fetch_add(&lib->use_count, 1);
708 }
709 
710 
711 nxt_inline void
712 nxt_unit_lib_release(nxt_unit_impl_t *lib)
713 {
714     long                c;
715     nxt_unit_process_t  *process;
716 
717     c = nxt_atomic_fetch_add(&lib->use_count, -1);
718 
719     if (c == 1) {
720         for ( ;; ) {
721             pthread_mutex_lock(&lib->mutex);
722 
723             process = nxt_unit_process_pop_first(lib);
724             if (process == NULL) {
725                 pthread_mutex_unlock(&lib->mutex);
726 
727                 break;
728             }
729 
730             nxt_unit_remove_process(lib, process);
731         }
732 
733         pthread_mutex_destroy(&lib->mutex);
734 
735         if (nxt_fast_path(lib->router_port != NULL)) {
736             nxt_unit_port_release(lib->router_port);
737         }
738 
739         if (nxt_fast_path(lib->shared_port != NULL)) {
740             nxt_unit_port_release(lib->shared_port);
741         }
742 
743         nxt_unit_mmaps_destroy(&lib->incoming);
744         nxt_unit_mmaps_destroy(&lib->outgoing);
745 
746         nxt_unit_free(NULL, lib);
747     }
748 }
749 
750 
751 nxt_inline void
752 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
753     nxt_unit_mmap_buf_t *mmap_buf)
754 {
755     mmap_buf->next = *head;
756 
757     if (mmap_buf->next != NULL) {
758         mmap_buf->next->prev = &mmap_buf->next;
759     }
760 
761     *head = mmap_buf;
762     mmap_buf->prev = head;
763 }
764 
765 
766 nxt_inline void
767 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
768     nxt_unit_mmap_buf_t *mmap_buf)
769 {
770     while (*prev != NULL) {
771         prev = &(*prev)->next;
772     }
773 
774     nxt_unit_mmap_buf_insert(prev, mmap_buf);
775 }
776 
777 
778 nxt_inline void
779 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
780 {
781     nxt_unit_mmap_buf_t  **prev;
782 
783     prev = mmap_buf->prev;
784 
785     if (mmap_buf->next != NULL) {
786         mmap_buf->next->prev = prev;
787     }
788 
789     if (prev != NULL) {
790         *prev = mmap_buf->next;
791     }
792 }
793 
794 
795 static int
796 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
797     nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
798     uint32_t *shm_limit, uint32_t *request_limit)
799 {
800     int       rc;
801     int       ready_fd, router_fd, read_in_fd, read_out_fd;
802     char      *unit_init, *version_end, *vars;
803     size_t    version_length;
804     int64_t   ready_pid, router_pid, read_pid;
805     uint32_t  ready_stream, router_id, ready_id, read_id;
806 
807     unit_init = getenv(NXT_UNIT_INIT_ENV);
808     if (nxt_slow_path(unit_init == NULL)) {
809         nxt_unit_alert(NULL, "%s is not in the current environment",
810                        NXT_UNIT_INIT_ENV);
811 
812         return NXT_UNIT_ERROR;
813     }
814 
815     version_end = strchr(unit_init, ';');
816     if (nxt_slow_path(version_end == NULL)) {
817         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
818                        NXT_UNIT_INIT_ENV, unit_init);
819 
820         return NXT_UNIT_ERROR;
821     }
822 
823     version_length = version_end - unit_init;
824 
825     rc = version_length != nxt_length(NXT_VERSION)
826          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
827 
828     if (nxt_slow_path(rc != 0)) {
829         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
830                        "%.*s, while the app was compiled with libunit %s",
831                        (int) version_length, unit_init, NXT_VERSION);
832 
833         return NXT_UNIT_ERROR;
834     }
835 
836     vars = version_end + 1;
837 
838     rc = sscanf(vars,
839                 "%"PRIu32";"
840                 "%"PRId64",%"PRIu32",%d;"
841                 "%"PRId64",%"PRIu32",%d;"
842                 "%"PRId64",%"PRIu32",%d,%d;"
843                 "%d,%"PRIu32",%"PRIu32,
844                 &ready_stream,
845                 &ready_pid, &ready_id, &ready_fd,
846                 &router_pid, &router_id, &router_fd,
847                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
848                 log_fd, shm_limit, request_limit);
849 
850     if (nxt_slow_path(rc == EOF)) {
851         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
852                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
853 
854         return NXT_UNIT_ERROR;
855     }
856 
857     if (nxt_slow_path(rc != 14)) {
858         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
859                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
860 
861         return NXT_UNIT_ERROR;
862     }
863 
864     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
865 
866     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
867 
868     ready_port->in_fd = -1;
869     ready_port->out_fd = ready_fd;
870     ready_port->data = NULL;
871 
872     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
873 
874     router_port->in_fd = -1;
875     router_port->out_fd = router_fd;
876     router_port->data = NULL;
877 
878     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
879 
880     read_port->in_fd = read_in_fd;
881     read_port->out_fd = read_out_fd;
882     read_port->data = NULL;
883 
884     *stream = ready_stream;
885 
886     return NXT_UNIT_OK;
887 }
888 
889 
890 static int
891 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
892 {
893     ssize_t          res;
894     nxt_port_msg_t   msg;
895     nxt_unit_impl_t  *lib;
896 
897     union {
898         struct cmsghdr  cm;
899         char            space[CMSG_SPACE(sizeof(int))];
900     } cmsg;
901 
902     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
903 
904     msg.stream = stream;
905     msg.pid = lib->pid;
906     msg.reply_port = 0;
907     msg.type = _NXT_PORT_MSG_PROCESS_READY;
908     msg.last = 1;
909     msg.mmap = 0;
910     msg.nf = 0;
911     msg.mf = 0;
912     msg.tracking = 0;
913 
914     memset(&cmsg, 0, sizeof(cmsg));
915 
916     cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
917     cmsg.cm.cmsg_level = SOL_SOCKET;
918     cmsg.cm.cmsg_type = SCM_RIGHTS;
919 
920     /*
921      * memcpy() is used instead of simple
922      *   *(int *) CMSG_DATA(&cmsg.cm) = fd;
923      * because GCC 4.4 with -O2/3/s optimization may issue a warning:
924      *   dereferencing type-punned pointer will break strict-aliasing rules
925      *
926      * Fortunately, GCC with -O1 compiles this nxt_memcpy()
927      * in the same simple assignment as in the code above.
928      */
929     memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
930 
931     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
932                            &cmsg, sizeof(cmsg));
933     if (res != sizeof(msg)) {
934         return NXT_UNIT_ERROR;
935     }
936 
937     return NXT_UNIT_OK;
938 }
939 
940 
941 static int
942 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
943     nxt_unit_request_info_t **preq)
944 {
945     int                  rc;
946     pid_t                pid;
947     uint8_t              quit_param;
948     struct cmsghdr       *cm;
949     nxt_port_msg_t       *port_msg;
950     nxt_unit_impl_t      *lib;
951     nxt_unit_recv_msg_t  recv_msg;
952 
953     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
954 
955     recv_msg.fd[0] = -1;
956     recv_msg.fd[1] = -1;
957     port_msg = (nxt_port_msg_t *) rbuf->buf;
958     cm = (struct cmsghdr *) rbuf->oob;
959 
960     if (cm->cmsg_level == SOL_SOCKET
961         && cm->cmsg_type == SCM_RIGHTS)
962     {
963         if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
964             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
965         }
966 
967         if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
968             memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
969         }
970     }
971 
972     recv_msg.incoming_buf = NULL;
973 
974     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
975         if (nxt_slow_path(rbuf->size == 0)) {
976             nxt_unit_debug(ctx, "read port closed");
977 
978             nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
979             rc = NXT_UNIT_OK;
980             goto done;
981         }
982 
983         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
984 
985         rc = NXT_UNIT_ERROR;
986         goto done;
987     }
988 
989     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
990                    port_msg->stream, (int) port_msg->type,
991                    recv_msg.fd[0], recv_msg.fd[1]);
992 
993     recv_msg.stream = port_msg->stream;
994     recv_msg.pid = port_msg->pid;
995     recv_msg.reply_port = port_msg->reply_port;
996     recv_msg.last = port_msg->last;
997     recv_msg.mmap = port_msg->mmap;
998 
999     recv_msg.start = port_msg + 1;
1000     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1001 
1002     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1003         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1004                        port_msg->stream, (int) port_msg->type);
1005         rc = NXT_UNIT_ERROR;
1006         goto done;
1007     }
1008 
1009     /* Fragmentation is unsupported. */
1010     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1011         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1012                        port_msg->stream, (int) port_msg->type);
1013         rc = NXT_UNIT_ERROR;
1014         goto done;
1015     }
1016 
1017     if (port_msg->mmap) {
1018         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1019 
1020         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1021             if (rc == NXT_UNIT_AGAIN) {
1022                 recv_msg.fd[0] = -1;
1023                 recv_msg.fd[1] = -1;
1024             }
1025 
1026             goto done;
1027         }
1028     }
1029 
1030     switch (port_msg->type) {
1031 
1032     case _NXT_PORT_MSG_RPC_READY:
1033         rc = NXT_UNIT_OK;
1034         break;
1035 
1036     case _NXT_PORT_MSG_QUIT:
1037         if (recv_msg.size == sizeof(quit_param)) {
1038             memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1039 
1040         } else {
1041             quit_param = NXT_QUIT_NORMAL;
1042         }
1043 
1044         nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1045                        (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1046 
1047         nxt_unit_quit(ctx, quit_param);
1048 
1049         rc = NXT_UNIT_OK;
1050         break;
1051 
1052     case _NXT_PORT_MSG_NEW_PORT:
1053         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1054         break;
1055 
1056     case _NXT_PORT_MSG_PORT_ACK:
1057         rc = nxt_unit_ctx_ready(ctx);
1058         break;
1059 
1060     case _NXT_PORT_MSG_CHANGE_FILE:
1061         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1062                        port_msg->stream, recv_msg.fd[0]);
1063 
1064         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1065             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1066                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1067                            strerror(errno), errno);
1068 
1069             rc = NXT_UNIT_ERROR;
1070             goto done;
1071         }
1072 
1073         rc = NXT_UNIT_OK;
1074         break;
1075 
1076     case _NXT_PORT_MSG_MMAP:
1077         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1078             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1079                            port_msg->stream, recv_msg.fd[0]);
1080 
1081             rc = NXT_UNIT_ERROR;
1082             goto done;
1083         }
1084 
1085         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1086         break;
1087 
1088     case _NXT_PORT_MSG_REQ_HEADERS:
1089         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1090         break;
1091 
1092     case _NXT_PORT_MSG_REQ_BODY:
1093         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1094         break;
1095 
1096     case _NXT_PORT_MSG_WEBSOCKET:
1097         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1098         break;
1099 
1100     case _NXT_PORT_MSG_REMOVE_PID:
1101         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1102             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1103                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1104                            (int) sizeof(pid));
1105 
1106             rc = NXT_UNIT_ERROR;
1107             goto done;
1108         }
1109 
1110         memcpy(&pid, recv_msg.start, sizeof(pid));
1111 
1112         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1113                        port_msg->stream, (int) pid);
1114 
1115         nxt_unit_remove_pid(lib, pid);
1116 
1117         rc = NXT_UNIT_OK;
1118         break;
1119 
1120     case _NXT_PORT_MSG_SHM_ACK:
1121         rc = nxt_unit_process_shm_ack(ctx);
1122         break;
1123 
1124     default:
1125         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1126                        port_msg->stream, (int) port_msg->type);
1127 
1128         rc = NXT_UNIT_ERROR;
1129         goto done;
1130     }
1131 
1132 done:
1133 
1134     if (recv_msg.fd[0] != -1) {
1135         nxt_unit_close(recv_msg.fd[0]);
1136     }
1137 
1138     if (recv_msg.fd[1] != -1) {
1139         nxt_unit_close(recv_msg.fd[1]);
1140     }
1141 
1142     while (recv_msg.incoming_buf != NULL) {
1143         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1144     }
1145 
1146     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1147 #if (NXT_DEBUG)
1148         memset(rbuf->buf, 0xAC, rbuf->size);
1149 #endif
1150         nxt_unit_read_buf_release(ctx, rbuf);
1151     }
1152 
1153     return rc;
1154 }
1155 
1156 
1157 static int
1158 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1159 {
1160     void                     *mem;
1161     nxt_unit_impl_t          *lib;
1162     nxt_unit_port_t          new_port, *port;
1163     nxt_port_msg_new_port_t  *new_port_msg;
1164 
1165     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1166         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1167                       "invalid message size (%d)",
1168                       recv_msg->stream, (int) recv_msg->size);
1169 
1170         return NXT_UNIT_ERROR;
1171     }
1172 
1173     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1174         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1175                        recv_msg->stream, recv_msg->fd[0]);
1176 
1177         return NXT_UNIT_ERROR;
1178     }
1179 
1180     new_port_msg = recv_msg->start;
1181 
1182     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1183                    recv_msg->stream, (int) new_port_msg->pid,
1184                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1185 
1186     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1187 
1188     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1189         nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
1190 
1191         new_port.in_fd = recv_msg->fd[0];
1192         new_port.out_fd = -1;
1193 
1194         mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
1195                    MAP_SHARED, recv_msg->fd[1], 0);
1196 
1197     } else {
1198         if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
1199                           != NXT_UNIT_OK))
1200         {
1201             return NXT_UNIT_ERROR;
1202         }
1203 
1204         nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
1205                               new_port_msg->id);
1206 
1207         new_port.in_fd = -1;
1208         new_port.out_fd = recv_msg->fd[0];
1209 
1210         mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1211                    MAP_SHARED, recv_msg->fd[1], 0);
1212     }
1213 
1214     if (nxt_slow_path(mem == MAP_FAILED)) {
1215         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1216                        strerror(errno), errno);
1217 
1218         return NXT_UNIT_ERROR;
1219     }
1220 
1221     new_port.data = NULL;
1222 
1223     recv_msg->fd[0] = -1;
1224 
1225     port = nxt_unit_add_port(ctx, &new_port, mem);
1226     if (nxt_slow_path(port == NULL)) {
1227         return NXT_UNIT_ERROR;
1228     }
1229 
1230     if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
1231         lib->shared_port = port;
1232 
1233         return nxt_unit_ctx_ready(ctx);
1234     }
1235 
1236     nxt_unit_port_release(port);
1237 
1238     return NXT_UNIT_OK;
1239 }
1240 
1241 
1242 static int
1243 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1244 {
1245     nxt_unit_impl_t      *lib;
1246     nxt_unit_ctx_impl_t  *ctx_impl;
1247 
1248     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1249 
1250     if (nxt_slow_path(ctx_impl->ready)) {
1251         return NXT_UNIT_OK;
1252     }
1253 
1254     ctx_impl->ready = 1;
1255 
1256     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1257 
1258     /* Call ready_handler() only for main context. */
1259     if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1260         return lib->callbacks.ready_handler(ctx);
1261     }
1262 
1263     if (&lib->main_ctx != ctx_impl) {
1264         /* Check if the main context is already stopped or quit. */
1265         if (nxt_slow_path(!lib->main_ctx.ready)) {
1266             ctx_impl->ready = 0;
1267 
1268             nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1269 
1270             return NXT_UNIT_OK;
1271         }
1272 
1273         if (lib->callbacks.add_port != NULL) {
1274             lib->callbacks.add_port(ctx, lib->shared_port);
1275         }
1276     }
1277 
1278     return NXT_UNIT_OK;
1279 }
1280 
1281 
1282 static int
1283 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1284     nxt_unit_request_info_t **preq)
1285 {
1286     int                           res;
1287     nxt_unit_impl_t               *lib;
1288     nxt_unit_port_id_t            port_id;
1289     nxt_unit_request_t            *r;
1290     nxt_unit_mmap_buf_t           *b;
1291     nxt_unit_request_info_t       *req;
1292     nxt_unit_request_info_impl_t  *req_impl;
1293 
1294     if (nxt_slow_path(recv_msg->mmap == 0)) {
1295         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1296                       recv_msg->stream);
1297 
1298         return NXT_UNIT_ERROR;
1299     }
1300 
1301     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1302         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1303                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1304                       (int) sizeof(nxt_unit_request_t));
1305 
1306         return NXT_UNIT_ERROR;
1307     }
1308 
1309     req_impl = nxt_unit_request_info_get(ctx);
1310     if (nxt_slow_path(req_impl == NULL)) {
1311         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1312                       recv_msg->stream);
1313 
1314         return NXT_UNIT_ERROR;
1315     }
1316 
1317     req = &req_impl->req;
1318 
1319     req->request = recv_msg->start;
1320 
1321     b = recv_msg->incoming_buf;
1322 
1323     req->request_buf = &b->buf;
1324     req->response = NULL;
1325     req->response_buf = NULL;
1326 
1327     r = req->request;
1328 
1329     req->content_length = r->content_length;
1330 
1331     req->content_buf = req->request_buf;
1332     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1333 
1334     req_impl->stream = recv_msg->stream;
1335 
1336     req_impl->outgoing_buf = NULL;
1337 
1338     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1339         b->req = req;
1340     }
1341 
1342     /* "Move" incoming buffer list to req_impl. */
1343     req_impl->incoming_buf = recv_msg->incoming_buf;
1344     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1345     recv_msg->incoming_buf = NULL;
1346 
1347     req->content_fd = recv_msg->fd[0];
1348     recv_msg->fd[0] = -1;
1349 
1350     req->response_max_fields = 0;
1351     req_impl->state = NXT_UNIT_RS_START;
1352     req_impl->websocket = 0;
1353     req_impl->in_hash = 0;
1354 
1355     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1356                    (int) r->method_length,
1357                    (char *) nxt_unit_sptr_get(&r->method),
1358                    (int) r->target_length,
1359                    (char *) nxt_unit_sptr_get(&r->target),
1360                    (int) r->content_length);
1361 
1362     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1363 
1364     res = nxt_unit_request_check_response_port(req, &port_id);
1365     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1366         return NXT_UNIT_ERROR;
1367     }
1368 
1369     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1370         res = nxt_unit_send_req_headers_ack(req);
1371         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1372             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1373 
1374             return NXT_UNIT_ERROR;
1375         }
1376 
1377         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1378 
1379         if (req->content_length
1380             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1381         {
1382             res = nxt_unit_request_hash_add(ctx, req);
1383             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1384                 nxt_unit_req_warn(req, "failed to add request to hash");
1385 
1386                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1387 
1388                 return NXT_UNIT_ERROR;
1389             }
1390 
1391             /*
1392              * If application have separate data handler, we may start
1393              * request processing and process data when it is arrived.
1394              */
1395             if (lib->callbacks.data_handler == NULL) {
1396                 return NXT_UNIT_OK;
1397             }
1398         }
1399 
1400         if (preq == NULL) {
1401             lib->callbacks.request_handler(req);
1402 
1403         } else {
1404             *preq = req;
1405         }
1406     }
1407 
1408     return NXT_UNIT_OK;
1409 }
1410 
1411 
1412 static int
1413 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1414 {
1415     uint64_t                 l;
1416     nxt_unit_impl_t          *lib;
1417     nxt_unit_mmap_buf_t      *b;
1418     nxt_unit_request_info_t  *req;
1419 
1420     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1421     if (req == NULL) {
1422         return NXT_UNIT_OK;
1423     }
1424 
1425     l = req->content_buf->end - req->content_buf->free;
1426 
1427     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1428         b->req = req;
1429         l += b->buf.end - b->buf.free;
1430     }
1431 
1432     if (recv_msg->incoming_buf != NULL) {
1433         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1434 
1435         while (b->next != NULL) {
1436             b = b->next;
1437         }
1438 
1439         /* "Move" incoming buffer list to req_impl. */
1440         b->next = recv_msg->incoming_buf;
1441         b->next->prev = &b->next;
1442 
1443         recv_msg->incoming_buf = NULL;
1444     }
1445 
1446     req->content_fd = recv_msg->fd[0];
1447     recv_msg->fd[0] = -1;
1448 
1449     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1450 
1451     if (lib->callbacks.data_handler != NULL) {
1452         lib->callbacks.data_handler(req);
1453 
1454         return NXT_UNIT_OK;
1455     }
1456 
1457     if (req->content_fd != -1 || l == req->content_length) {
1458         lib->callbacks.request_handler(req);
1459     }
1460 
1461     return NXT_UNIT_OK;
1462 }
1463 
1464 
1465 static int
1466 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1467     nxt_unit_port_id_t *port_id)
1468 {
1469     int                           res;
1470     nxt_unit_ctx_t                *ctx;
1471     nxt_unit_impl_t               *lib;
1472     nxt_unit_port_t               *port;
1473     nxt_unit_process_t            *process;
1474     nxt_unit_ctx_impl_t           *ctx_impl;
1475     nxt_unit_port_impl_t          *port_impl;
1476     nxt_unit_request_info_impl_t  *req_impl;
1477 
1478     ctx = req->ctx;
1479     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1480     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1481 
1482     pthread_mutex_lock(&lib->mutex);
1483 
1484     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1485     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1486 
1487     if (nxt_fast_path(port != NULL)) {
1488         req->response_port = port;
1489 
1490         if (nxt_fast_path(port_impl->ready)) {
1491             pthread_mutex_unlock(&lib->mutex);
1492 
1493             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1494                            (int) port->id.pid, (int) port->id.id);
1495 
1496             return NXT_UNIT_OK;
1497         }
1498 
1499         nxt_unit_debug(ctx, "check_response_port: "
1500                        "port{%d,%d} already requested",
1501                        (int) port->id.pid, (int) port->id.id);
1502 
1503         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1504 
1505         nxt_queue_insert_tail(&port_impl->awaiting_req,
1506                               &req_impl->port_wait_link);
1507 
1508         pthread_mutex_unlock(&lib->mutex);
1509 
1510         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1511 
1512         return NXT_UNIT_AGAIN;
1513     }
1514 
1515     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1516     if (nxt_slow_path(port_impl == NULL)) {
1517         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1518                        (int) sizeof(nxt_unit_port_impl_t));
1519 
1520         pthread_mutex_unlock(&lib->mutex);
1521 
1522         return NXT_UNIT_ERROR;
1523     }
1524 
1525     port = &port_impl->port;
1526 
1527     port->id = *port_id;
1528     port->in_fd = -1;
1529     port->out_fd = -1;
1530     port->data = NULL;
1531 
1532     res = nxt_unit_port_hash_add(&lib->ports, port);
1533     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1534         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1535                        port->id.pid, port->id.id);
1536 
1537         pthread_mutex_unlock(&lib->mutex);
1538 
1539         nxt_unit_free(ctx, port);
1540 
1541         return NXT_UNIT_ERROR;
1542     }
1543 
1544     process = nxt_unit_process_find(lib, port_id->pid, 0);
1545     if (nxt_slow_path(process == NULL)) {
1546         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1547                        port->id.pid);
1548 
1549         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1550 
1551         pthread_mutex_unlock(&lib->mutex);
1552 
1553         nxt_unit_free(ctx, port);
1554 
1555         return NXT_UNIT_ERROR;
1556     }
1557 
1558     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1559 
1560     port_impl->process = process;
1561     port_impl->queue = NULL;
1562     port_impl->from_socket = 0;
1563     port_impl->socket_rbuf = NULL;
1564 
1565     nxt_queue_init(&port_impl->awaiting_req);
1566 
1567     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1568 
1569     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1570 
1571     port_impl->use_count = 2;
1572     port_impl->ready = 0;
1573 
1574     req->response_port = port;
1575 
1576     pthread_mutex_unlock(&lib->mutex);
1577 
1578     res = nxt_unit_get_port(ctx, port_id);
1579     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1580         return NXT_UNIT_ERROR;
1581     }
1582 
1583     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1584 
1585     return NXT_UNIT_AGAIN;
1586 }
1587 
1588 
1589 static int
1590 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1591 {
1592     ssize_t                       res;
1593     nxt_port_msg_t                msg;
1594     nxt_unit_impl_t               *lib;
1595     nxt_unit_ctx_impl_t           *ctx_impl;
1596     nxt_unit_request_info_impl_t  *req_impl;
1597 
1598     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1599     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1600     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1601 
1602     memset(&msg, 0, sizeof(nxt_port_msg_t));
1603 
1604     msg.stream = req_impl->stream;
1605     msg.pid = lib->pid;
1606     msg.reply_port = ctx_impl->read_port->id.id;
1607     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1608 
1609     res = nxt_unit_port_send(req->ctx, req->response_port,
1610                              &msg, sizeof(msg), NULL, 0);
1611     if (nxt_slow_path(res != sizeof(msg))) {
1612         return NXT_UNIT_ERROR;
1613     }
1614 
1615     return NXT_UNIT_OK;
1616 }
1617 
1618 
1619 static int
1620 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1621 {
1622     size_t                           hsize;
1623     nxt_unit_impl_t                  *lib;
1624     nxt_unit_mmap_buf_t              *b;
1625     nxt_unit_callbacks_t             *cb;
1626     nxt_unit_request_info_t          *req;
1627     nxt_unit_request_info_impl_t     *req_impl;
1628     nxt_unit_websocket_frame_impl_t  *ws_impl;
1629 
1630     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1631     if (nxt_slow_path(req == NULL)) {
1632         return NXT_UNIT_OK;
1633     }
1634 
1635     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1636 
1637     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1638     cb = &lib->callbacks;
1639 
1640     if (cb->websocket_handler && recv_msg->size >= 2) {
1641         ws_impl = nxt_unit_websocket_frame_get(ctx);
1642         if (nxt_slow_path(ws_impl == NULL)) {
1643             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1644                           req_impl->stream);
1645 
1646             return NXT_UNIT_ERROR;
1647         }
1648 
1649         ws_impl->ws.req = req;
1650 
1651         ws_impl->buf = NULL;
1652 
1653         if (recv_msg->mmap) {
1654             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1655                 b->req = req;
1656             }
1657 
1658             /* "Move" incoming buffer list to ws_impl. */
1659             ws_impl->buf = recv_msg->incoming_buf;
1660             ws_impl->buf->prev = &ws_impl->buf;
1661             recv_msg->incoming_buf = NULL;
1662 
1663             b = ws_impl->buf;
1664 
1665         } else {
1666             b = nxt_unit_mmap_buf_get(ctx);
1667             if (nxt_slow_path(b == NULL)) {
1668                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1669                                req_impl->stream);
1670 
1671                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1672 
1673                 return NXT_UNIT_ERROR;
1674             }
1675 
1676             b->req = req;
1677             b->buf.start = recv_msg->start;
1678             b->buf.free = b->buf.start;
1679             b->buf.end = b->buf.start + recv_msg->size;
1680 
1681             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1682         }
1683 
1684         ws_impl->ws.header = (void *) b->buf.start;
1685         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1686             ws_impl->ws.header);
1687 
1688         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1689 
1690         if (ws_impl->ws.header->mask) {
1691             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1692 
1693         } else {
1694             ws_impl->ws.mask = NULL;
1695         }
1696 
1697         b->buf.free += hsize;
1698 
1699         ws_impl->ws.content_buf = &b->buf;
1700         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1701 
1702         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1703                            "payload_len=%"PRIu64,
1704                             ws_impl->ws.header->opcode,
1705                             ws_impl->ws.payload_len);
1706 
1707         cb->websocket_handler(&ws_impl->ws);
1708     }
1709 
1710     if (recv_msg->last) {
1711         if (cb->close_handler) {
1712             nxt_unit_req_debug(req, "close_handler");
1713 
1714             cb->close_handler(req);
1715 
1716         } else {
1717             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1718         }
1719     }
1720 
1721     return NXT_UNIT_OK;
1722 }
1723 
1724 
1725 static int
1726 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1727 {
1728     nxt_unit_impl_t       *lib;
1729     nxt_unit_callbacks_t  *cb;
1730 
1731     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1732     cb = &lib->callbacks;
1733 
1734     if (cb->shm_ack_handler != NULL) {
1735         cb->shm_ack_handler(ctx);
1736     }
1737 
1738     return NXT_UNIT_OK;
1739 }
1740 
1741 
1742 static nxt_unit_request_info_impl_t *
1743 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1744 {
1745     nxt_unit_impl_t               *lib;
1746     nxt_queue_link_t              *lnk;
1747     nxt_unit_ctx_impl_t           *ctx_impl;
1748     nxt_unit_request_info_impl_t  *req_impl;
1749 
1750     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1751 
1752     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1753 
1754     pthread_mutex_lock(&ctx_impl->mutex);
1755 
1756     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1757         pthread_mutex_unlock(&ctx_impl->mutex);
1758 
1759         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1760                                         + lib->request_data_size);
1761         if (nxt_slow_path(req_impl == NULL)) {
1762             return NULL;
1763         }
1764 
1765         req_impl->req.unit = ctx->unit;
1766         req_impl->req.ctx = ctx;
1767 
1768         pthread_mutex_lock(&ctx_impl->mutex);
1769 
1770     } else {
1771         lnk = nxt_queue_first(&ctx_impl->free_req);
1772         nxt_queue_remove(lnk);
1773 
1774         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1775     }
1776 
1777     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1778 
1779     pthread_mutex_unlock(&ctx_impl->mutex);
1780 
1781     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1782 
1783     return req_impl;
1784 }
1785 
1786 
1787 static void
1788 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1789 {
1790     nxt_unit_ctx_t                *ctx;
1791     nxt_unit_ctx_impl_t           *ctx_impl;
1792     nxt_unit_request_info_impl_t  *req_impl;
1793 
1794     ctx = req->ctx;
1795     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1796     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1797 
1798     req->response = NULL;
1799     req->response_buf = NULL;
1800 
1801     if (req_impl->in_hash) {
1802         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1803     }
1804 
1805     while (req_impl->outgoing_buf != NULL) {
1806         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1807     }
1808 
1809     while (req_impl->incoming_buf != NULL) {
1810         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1811     }
1812 
1813     if (req->content_fd != -1) {
1814         nxt_unit_close(req->content_fd);
1815 
1816         req->content_fd = -1;
1817     }
1818 
1819     if (req->response_port != NULL) {
1820         nxt_unit_port_release(req->response_port);
1821 
1822         req->response_port = NULL;
1823     }
1824 
1825     req_impl->state = NXT_UNIT_RS_RELEASED;
1826 
1827     pthread_mutex_lock(&ctx_impl->mutex);
1828 
1829     nxt_queue_remove(&req_impl->link);
1830 
1831     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1832 
1833     pthread_mutex_unlock(&ctx_impl->mutex);
1834 
1835     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1836         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1837     }
1838 }
1839 
1840 
1841 static void
1842 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1843 {
1844     nxt_unit_ctx_impl_t  *ctx_impl;
1845 
1846     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1847 
1848     nxt_queue_remove(&req_impl->link);
1849 
1850     if (req_impl != &ctx_impl->req) {
1851         nxt_unit_free(&ctx_impl->ctx, req_impl);
1852     }
1853 }
1854 
1855 
1856 static nxt_unit_websocket_frame_impl_t *
1857 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1858 {
1859     nxt_queue_link_t                 *lnk;
1860     nxt_unit_ctx_impl_t              *ctx_impl;
1861     nxt_unit_websocket_frame_impl_t  *ws_impl;
1862 
1863     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1864 
1865     pthread_mutex_lock(&ctx_impl->mutex);
1866 
1867     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1868         pthread_mutex_unlock(&ctx_impl->mutex);
1869 
1870         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1871         if (nxt_slow_path(ws_impl == NULL)) {
1872             return NULL;
1873         }
1874 
1875     } else {
1876         lnk = nxt_queue_first(&ctx_impl->free_ws);
1877         nxt_queue_remove(lnk);
1878 
1879         pthread_mutex_unlock(&ctx_impl->mutex);
1880 
1881         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1882     }
1883 
1884     ws_impl->ctx_impl = ctx_impl;
1885 
1886     return ws_impl;
1887 }
1888 
1889 
1890 static void
1891 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1892 {
1893     nxt_unit_websocket_frame_impl_t  *ws_impl;
1894 
1895     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1896 
1897     while (ws_impl->buf != NULL) {
1898         nxt_unit_mmap_buf_free(ws_impl->buf);
1899     }
1900 
1901     ws->req = NULL;
1902 
1903     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1904 
1905     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1906 
1907     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1908 }
1909 
1910 
1911 static void
1912 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1913     nxt_unit_websocket_frame_impl_t *ws_impl)
1914 {
1915     nxt_queue_remove(&ws_impl->link);
1916 
1917     nxt_unit_free(ctx, ws_impl);
1918 }
1919 
1920 
1921 uint16_t
1922 nxt_unit_field_hash(const char *name, size_t name_length)
1923 {
1924     u_char      ch;
1925     uint32_t    hash;
1926     const char  *p, *end;
1927 
1928     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1929     end = name + name_length;
1930 
1931     for (p = name; p < end; p++) {
1932         ch = *p;
1933         hash = (hash << 4) + hash + nxt_lowcase(ch);
1934     }
1935 
1936     hash = (hash >> 16) ^ hash;
1937 
1938     return hash;
1939 }
1940 
1941 
1942 void
1943 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1944 {
1945     char                *name;
1946     uint32_t            i, j;
1947     nxt_unit_field_t    *fields, f;
1948     nxt_unit_request_t  *r;
1949 
1950     static nxt_str_t  content_length = nxt_string("content-length");
1951     static nxt_str_t  content_type = nxt_string("content-type");
1952     static nxt_str_t  cookie = nxt_string("cookie");
1953 
1954     nxt_unit_req_debug(req, "group_dup_fields");
1955 
1956     r = req->request;
1957     fields = r->fields;
1958 
1959     for (i = 0; i < r->fields_count; i++) {
1960         name = nxt_unit_sptr_get(&fields[i].name);
1961 
1962         switch (fields[i].hash) {
1963         case NXT_UNIT_HASH_CONTENT_LENGTH:
1964             if (fields[i].name_length == content_length.length
1965                 && nxt_unit_memcasecmp(name, content_length.start,
1966                                        content_length.length) == 0)
1967             {
1968                 r->content_length_field = i;
1969             }
1970 
1971             break;
1972 
1973         case NXT_UNIT_HASH_CONTENT_TYPE:
1974             if (fields[i].name_length == content_type.length
1975                 && nxt_unit_memcasecmp(name, content_type.start,
1976                                        content_type.length) == 0)
1977             {
1978                 r->content_type_field = i;
1979             }
1980 
1981             break;
1982 
1983         case NXT_UNIT_HASH_COOKIE:
1984             if (fields[i].name_length == cookie.length
1985                 && nxt_unit_memcasecmp(name, cookie.start,
1986                                        cookie.length) == 0)
1987             {
1988                 r->cookie_field = i;
1989             }
1990 
1991             break;
1992         }
1993 
1994         for (j = i + 1; j < r->fields_count; j++) {
1995             if (fields[i].hash != fields[j].hash
1996                 || fields[i].name_length != fields[j].name_length
1997                 || nxt_unit_memcasecmp(name,
1998                                        nxt_unit_sptr_get(&fields[j].name),
1999                                        fields[j].name_length) != 0)
2000             {
2001                 continue;
2002             }
2003 
2004             f = fields[j];
2005             f.value.offset += (j - (i + 1)) * sizeof(f);
2006 
2007             while (j > i + 1) {
2008                 fields[j] = fields[j - 1];
2009                 fields[j].name.offset -= sizeof(f);
2010                 fields[j].value.offset -= sizeof(f);
2011                 j--;
2012             }
2013 
2014             fields[j] = f;
2015 
2016             /* Assign the same name pointer for further grouping simplicity. */
2017             nxt_unit_sptr_set(&fields[j].name, name);
2018 
2019             i++;
2020         }
2021     }
2022 }
2023 
2024 
2025 int
2026 nxt_unit_response_init(nxt_unit_request_info_t *req,
2027     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2028 {
2029     uint32_t                      buf_size;
2030     nxt_unit_buf_t                *buf;
2031     nxt_unit_request_info_impl_t  *req_impl;
2032 
2033     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2034 
2035     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2036         nxt_unit_req_warn(req, "init: response already sent");
2037 
2038         return NXT_UNIT_ERROR;
2039     }
2040 
2041     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2042                        (int) max_fields_count, (int) max_fields_size);
2043 
2044     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2045         nxt_unit_req_debug(req, "duplicate response init");
2046     }
2047 
2048     /*
2049      * Each field name and value 0-terminated by libunit,
2050      * this is the reason of '+ 2' below.
2051      */
2052     buf_size = sizeof(nxt_unit_response_t)
2053                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2054                + max_fields_size;
2055 
2056     if (nxt_slow_path(req->response_buf != NULL)) {
2057         buf = req->response_buf;
2058 
2059         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2060             goto init_response;
2061         }
2062 
2063         nxt_unit_buf_free(buf);
2064 
2065         req->response_buf = NULL;
2066         req->response = NULL;
2067         req->response_max_fields = 0;
2068 
2069         req_impl->state = NXT_UNIT_RS_START;
2070     }
2071 
2072     buf = nxt_unit_response_buf_alloc(req, buf_size);
2073     if (nxt_slow_path(buf == NULL)) {
2074         return NXT_UNIT_ERROR;
2075     }
2076 
2077 init_response:
2078 
2079     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2080 
2081     req->response_buf = buf;
2082 
2083     req->response = (nxt_unit_response_t *) buf->start;
2084     req->response->status = status;
2085 
2086     buf->free = buf->start + sizeof(nxt_unit_response_t)
2087                 + max_fields_count * sizeof(nxt_unit_field_t);
2088 
2089     req->response_max_fields = max_fields_count;
2090     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2091 
2092     return NXT_UNIT_OK;
2093 }
2094 
2095 
2096 int
2097 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2098     uint32_t max_fields_count, uint32_t max_fields_size)
2099 {
2100     char                          *p;
2101     uint32_t                      i, buf_size;
2102     nxt_unit_buf_t                *buf;
2103     nxt_unit_field_t              *f, *src;
2104     nxt_unit_response_t           *resp;
2105     nxt_unit_request_info_impl_t  *req_impl;
2106 
2107     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2108 
2109     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2110         nxt_unit_req_warn(req, "realloc: response not init");
2111 
2112         return NXT_UNIT_ERROR;
2113     }
2114 
2115     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2116         nxt_unit_req_warn(req, "realloc: response already sent");
2117 
2118         return NXT_UNIT_ERROR;
2119     }
2120 
2121     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2122         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2123 
2124         return NXT_UNIT_ERROR;
2125     }
2126 
2127     /*
2128      * Each field name and value 0-terminated by libunit,
2129      * this is the reason of '+ 2' below.
2130      */
2131     buf_size = sizeof(nxt_unit_response_t)
2132                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2133                + max_fields_size;
2134 
2135     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2136 
2137     buf = nxt_unit_response_buf_alloc(req, buf_size);
2138     if (nxt_slow_path(buf == NULL)) {
2139         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2140         return NXT_UNIT_ERROR;
2141     }
2142 
2143     resp = (nxt_unit_response_t *) buf->start;
2144 
2145     memset(resp, 0, sizeof(nxt_unit_response_t));
2146 
2147     resp->status = req->response->status;
2148     resp->content_length = req->response->content_length;
2149 
2150     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2151     f = resp->fields;
2152 
2153     for (i = 0; i < req->response->fields_count; i++) {
2154         src = req->response->fields + i;
2155 
2156         if (nxt_slow_path(src->skip != 0)) {
2157             continue;
2158         }
2159 
2160         if (nxt_slow_path(src->name_length + src->value_length + 2
2161                           > (uint32_t) (buf->end - p)))
2162         {
2163             nxt_unit_req_warn(req, "realloc: not enough space for field"
2164                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2165                   i, src, src->name_length, src->value_length);
2166 
2167             goto fail;
2168         }
2169 
2170         nxt_unit_sptr_set(&f->name, p);
2171         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2172         *p++ = '\0';
2173 
2174         nxt_unit_sptr_set(&f->value, p);
2175         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2176         *p++ = '\0';
2177 
2178         f->hash = src->hash;
2179         f->skip = 0;
2180         f->name_length = src->name_length;
2181         f->value_length = src->value_length;
2182 
2183         resp->fields_count++;
2184         f++;
2185     }
2186 
2187     if (req->response->piggyback_content_length > 0) {
2188         if (nxt_slow_path(req->response->piggyback_content_length
2189                           > (uint32_t) (buf->end - p)))
2190         {
2191             nxt_unit_req_warn(req, "realloc: not enought space for content"
2192                   " #%"PRIu32", %"PRIu32" required",
2193                   i, req->response->piggyback_content_length);
2194 
2195             goto fail;
2196         }
2197 
2198         resp->piggyback_content_length =
2199                                        req->response->piggyback_content_length;
2200 
2201         nxt_unit_sptr_set(&resp->piggyback_content, p);
2202         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2203                        req->response->piggyback_content_length);
2204     }
2205 
2206     buf->free = p;
2207 
2208     nxt_unit_buf_free(req->response_buf);
2209 
2210     req->response = resp;
2211     req->response_buf = buf;
2212     req->response_max_fields = max_fields_count;
2213 
2214     return NXT_UNIT_OK;
2215 
2216 fail:
2217 
2218     nxt_unit_buf_free(buf);
2219 
2220     return NXT_UNIT_ERROR;
2221 }
2222 
2223 
2224 int
2225 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2226 {
2227     nxt_unit_request_info_impl_t  *req_impl;
2228 
2229     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2230 
2231     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2232 }
2233 
2234 
2235 int
2236 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2237     const char *name, uint8_t name_length,
2238     const char *value, uint32_t value_length)
2239 {
2240     nxt_unit_buf_t                *buf;
2241     nxt_unit_field_t              *f;
2242     nxt_unit_response_t           *resp;
2243     nxt_unit_request_info_impl_t  *req_impl;
2244 
2245     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2246 
2247     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2248         nxt_unit_req_warn(req, "add_field: response not initialized or "
2249                           "already sent");
2250 
2251         return NXT_UNIT_ERROR;
2252     }
2253 
2254     resp = req->response;
2255 
2256     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2257         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2258                           (int) resp->fields_count);
2259 
2260         return NXT_UNIT_ERROR;
2261     }
2262 
2263     buf = req->response_buf;
2264 
2265     if (nxt_slow_path(name_length + value_length + 2
2266                       > (uint32_t) (buf->end - buf->free)))
2267     {
2268         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2269 
2270         return NXT_UNIT_ERROR;
2271     }
2272 
2273     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2274                        resp->fields_count,
2275                        (int) name_length, name,
2276                        (int) value_length, value);
2277 
2278     f = resp->fields + resp->fields_count;
2279 
2280     nxt_unit_sptr_set(&f->name, buf->free);
2281     buf->free = nxt_cpymem(buf->free, name, name_length);
2282     *buf->free++ = '\0';
2283 
2284     nxt_unit_sptr_set(&f->value, buf->free);
2285     buf->free = nxt_cpymem(buf->free, value, value_length);
2286     *buf->free++ = '\0';
2287 
2288     f->hash = nxt_unit_field_hash(name, name_length);
2289     f->skip = 0;
2290     f->name_length = name_length;
2291     f->value_length = value_length;
2292 
2293     resp->fields_count++;
2294 
2295     return NXT_UNIT_OK;
2296 }
2297 
2298 
2299 int
2300 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2301     const void* src, uint32_t size)
2302 {
2303     nxt_unit_buf_t                *buf;
2304     nxt_unit_response_t           *resp;
2305     nxt_unit_request_info_impl_t  *req_impl;
2306 
2307     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2308 
2309     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2310         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2311 
2312         return NXT_UNIT_ERROR;
2313     }
2314 
2315     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2316         nxt_unit_req_warn(req, "add_content: response already sent");
2317 
2318         return NXT_UNIT_ERROR;
2319     }
2320 
2321     buf = req->response_buf;
2322 
2323     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2324         nxt_unit_req_warn(req, "add_content: buffer overflow");
2325 
2326         return NXT_UNIT_ERROR;
2327     }
2328 
2329     resp = req->response;
2330 
2331     if (resp->piggyback_content_length == 0) {
2332         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2333         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2334     }
2335 
2336     resp->piggyback_content_length += size;
2337 
2338     buf->free = nxt_cpymem(buf->free, src, size);
2339 
2340     return NXT_UNIT_OK;
2341 }
2342 
2343 
2344 int
2345 nxt_unit_response_send(nxt_unit_request_info_t *req)
2346 {
2347     int                           rc;
2348     nxt_unit_mmap_buf_t           *mmap_buf;
2349     nxt_unit_request_info_impl_t  *req_impl;
2350 
2351     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2352 
2353     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2354         nxt_unit_req_warn(req, "send: response is not initialized yet");
2355 
2356         return NXT_UNIT_ERROR;
2357     }
2358 
2359     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2360         nxt_unit_req_warn(req, "send: response already sent");
2361 
2362         return NXT_UNIT_ERROR;
2363     }
2364 
2365     if (req->request->websocket_handshake && req->response->status == 101) {
2366         nxt_unit_response_upgrade(req);
2367     }
2368 
2369     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2370                        req->response->fields_count,
2371                        (int) (req->response_buf->free
2372                               - req->response_buf->start));
2373 
2374     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2375 
2376     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2377     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2378         req->response = NULL;
2379         req->response_buf = NULL;
2380         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2381 
2382         nxt_unit_mmap_buf_free(mmap_buf);
2383     }
2384 
2385     return rc;
2386 }
2387 
2388 
2389 int
2390 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2391 {
2392     nxt_unit_request_info_impl_t  *req_impl;
2393 
2394     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2395 
2396     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2397 }
2398 
2399 
2400 nxt_unit_buf_t *
2401 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2402 {
2403     int                           rc;
2404     nxt_unit_mmap_buf_t           *mmap_buf;
2405     nxt_unit_request_info_impl_t  *req_impl;
2406 
2407     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2408         nxt_unit_req_warn(req, "response_buf_alloc: "
2409                           "requested buffer (%"PRIu32") too big", size);
2410 
2411         return NULL;
2412     }
2413 
2414     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2415 
2416     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2417 
2418     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2419     if (nxt_slow_path(mmap_buf == NULL)) {
2420         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2421 
2422         return NULL;
2423     }
2424 
2425     mmap_buf->req = req;
2426 
2427     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2428 
2429     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2430                                    size, size, mmap_buf,
2431                                    NULL);
2432     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2433         nxt_unit_mmap_buf_release(mmap_buf);
2434 
2435         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2436 
2437         return NULL;
2438     }
2439 
2440     return &mmap_buf->buf;
2441 }
2442 
2443 
2444 static nxt_unit_mmap_buf_t *
2445 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2446 {
2447     nxt_unit_mmap_buf_t  *mmap_buf;
2448     nxt_unit_ctx_impl_t  *ctx_impl;
2449 
2450     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2451 
2452     pthread_mutex_lock(&ctx_impl->mutex);
2453 
2454     if (ctx_impl->free_buf == NULL) {
2455         pthread_mutex_unlock(&ctx_impl->mutex);
2456 
2457         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2458         if (nxt_slow_path(mmap_buf == NULL)) {
2459             return NULL;
2460         }
2461 
2462     } else {
2463         mmap_buf = ctx_impl->free_buf;
2464 
2465         nxt_unit_mmap_buf_unlink(mmap_buf);
2466 
2467         pthread_mutex_unlock(&ctx_impl->mutex);
2468     }
2469 
2470     mmap_buf->ctx_impl = ctx_impl;
2471 
2472     mmap_buf->hdr = NULL;
2473     mmap_buf->free_ptr = NULL;
2474 
2475     return mmap_buf;
2476 }
2477 
2478 
2479 static void
2480 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2481 {
2482     nxt_unit_mmap_buf_unlink(mmap_buf);
2483 
2484     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2485 
2486     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2487 
2488     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2489 }
2490 
2491 
2492 int
2493 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2494 {
2495     return req->request->websocket_handshake;
2496 }
2497 
2498 
2499 int
2500 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2501 {
2502     int                           rc;
2503     nxt_unit_request_info_impl_t  *req_impl;
2504 
2505     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2506 
2507     if (nxt_slow_path(req_impl->websocket != 0)) {
2508         nxt_unit_req_debug(req, "upgrade: already upgraded");
2509 
2510         return NXT_UNIT_OK;
2511     }
2512 
2513     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2514         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2515 
2516         return NXT_UNIT_ERROR;
2517     }
2518 
2519     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2520         nxt_unit_req_warn(req, "upgrade: response already sent");
2521 
2522         return NXT_UNIT_ERROR;
2523     }
2524 
2525     rc = nxt_unit_request_hash_add(req->ctx, req);
2526     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2527         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2528 
2529         return NXT_UNIT_ERROR;
2530     }
2531 
2532     req_impl->websocket = 1;
2533 
2534     req->response->status = 101;
2535 
2536     return NXT_UNIT_OK;
2537 }
2538 
2539 
2540 int
2541 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2542 {
2543     nxt_unit_request_info_impl_t  *req_impl;
2544 
2545     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2546 
2547     return req_impl->websocket;
2548 }
2549 
2550 
2551 nxt_unit_request_info_t *
2552 nxt_unit_get_request_info_from_data(void *data)
2553 {
2554     nxt_unit_request_info_impl_t  *req_impl;
2555 
2556     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2557 
2558     return &req_impl->req;
2559 }
2560 
2561 
2562 int
2563 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2564 {
2565     int                           rc;
2566     nxt_unit_mmap_buf_t           *mmap_buf;
2567     nxt_unit_request_info_t       *req;
2568     nxt_unit_request_info_impl_t  *req_impl;
2569 
2570     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2571 
2572     req = mmap_buf->req;
2573     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2574 
2575     nxt_unit_req_debug(req, "buf_send: %d bytes",
2576                        (int) (buf->free - buf->start));
2577 
2578     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2579         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2580 
2581         return NXT_UNIT_ERROR;
2582     }
2583 
2584     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2585         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2586 
2587         return NXT_UNIT_ERROR;
2588     }
2589 
2590     if (nxt_fast_path(buf->free > buf->start)) {
2591         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2592         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2593             return rc;
2594         }
2595     }
2596 
2597     nxt_unit_mmap_buf_free(mmap_buf);
2598 
2599     return NXT_UNIT_OK;
2600 }
2601 
2602 
2603 static void
2604 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2605 {
2606     int                      rc;
2607     nxt_unit_mmap_buf_t      *mmap_buf;
2608     nxt_unit_request_info_t  *req;
2609 
2610     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2611 
2612     req = mmap_buf->req;
2613 
2614     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2615     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2616         nxt_unit_mmap_buf_free(mmap_buf);
2617 
2618         nxt_unit_request_info_release(req);
2619 
2620     } else {
2621         nxt_unit_request_done(req, rc);
2622     }
2623 }
2624 
2625 
2626 static int
2627 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2628     nxt_unit_mmap_buf_t *mmap_buf, int last)
2629 {
2630     struct {
2631         nxt_port_msg_t       msg;
2632         nxt_port_mmap_msg_t  mmap_msg;
2633     } m;
2634 
2635     int                           rc;
2636     u_char                        *last_used, *first_free;
2637     ssize_t                       res;
2638     nxt_chunk_id_t                first_free_chunk;
2639     nxt_unit_buf_t                *buf;
2640     nxt_unit_impl_t               *lib;
2641     nxt_port_mmap_header_t        *hdr;
2642     nxt_unit_request_info_impl_t  *req_impl;
2643 
2644     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2645     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2646 
2647     buf = &mmap_buf->buf;
2648     hdr = mmap_buf->hdr;
2649 
2650     m.mmap_msg.size = buf->free - buf->start;
2651 
2652     m.msg.stream = req_impl->stream;
2653     m.msg.pid = lib->pid;
2654     m.msg.reply_port = 0;
2655     m.msg.type = _NXT_PORT_MSG_DATA;
2656     m.msg.last = last != 0;
2657     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2658     m.msg.nf = 0;
2659     m.msg.mf = 0;
2660     m.msg.tracking = 0;
2661 
2662     rc = NXT_UNIT_ERROR;
2663 
2664     if (m.msg.mmap) {
2665         m.mmap_msg.mmap_id = hdr->id;
2666         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2667                                                      (u_char *) buf->start);
2668 
2669         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2670                        req_impl->stream,
2671                        (int) m.mmap_msg.mmap_id,
2672                        (int) m.mmap_msg.chunk_id,
2673                        (int) m.mmap_msg.size);
2674 
2675         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2676                                  NULL, 0);
2677         if (nxt_slow_path(res != sizeof(m))) {
2678             goto free_buf;
2679         }
2680 
2681         last_used = (u_char *) buf->free - 1;
2682         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2683 
2684         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2685             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2686 
2687             buf->start = (char *) first_free;
2688             buf->free = buf->start;
2689 
2690             if (buf->end < buf->start) {
2691                 buf->end = buf->start;
2692             }
2693 
2694         } else {
2695             buf->start = NULL;
2696             buf->free = NULL;
2697             buf->end = NULL;
2698 
2699             mmap_buf->hdr = NULL;
2700         }
2701 
2702         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2703                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2704 
2705         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2706                        (int) lib->outgoing.allocated_chunks);
2707 
2708     } else {
2709         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2710                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2711         {
2712             nxt_unit_alert(req->ctx,
2713                            "#%"PRIu32": failed to send plain memory buffer"
2714                            ": no space reserved for message header",
2715                            req_impl->stream);
2716 
2717             goto free_buf;
2718         }
2719 
2720         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2721 
2722         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2723                        req_impl->stream,
2724                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2725 
2726         res = nxt_unit_port_send(req->ctx, req->response_port,
2727                                  buf->start - sizeof(m.msg),
2728                                  m.mmap_msg.size + sizeof(m.msg),
2729                                  NULL, 0);
2730         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2731             goto free_buf;
2732         }
2733     }
2734 
2735     rc = NXT_UNIT_OK;
2736 
2737 free_buf:
2738 
2739     nxt_unit_free_outgoing_buf(mmap_buf);
2740 
2741     return rc;
2742 }
2743 
2744 
2745 void
2746 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2747 {
2748     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2749 }
2750 
2751 
2752 static void
2753 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2754 {
2755     nxt_unit_free_outgoing_buf(mmap_buf);
2756 
2757     nxt_unit_mmap_buf_release(mmap_buf);
2758 }
2759 
2760 
2761 static void
2762 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2763 {
2764     if (mmap_buf->hdr != NULL) {
2765         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2766                               mmap_buf->hdr, mmap_buf->buf.start,
2767                               mmap_buf->buf.end - mmap_buf->buf.start);
2768 
2769         mmap_buf->hdr = NULL;
2770 
2771         return;
2772     }
2773 
2774     if (mmap_buf->free_ptr != NULL) {
2775         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2776 
2777         mmap_buf->free_ptr = NULL;
2778     }
2779 }
2780 
2781 
2782 static nxt_unit_read_buf_t *
2783 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2784 {
2785     nxt_unit_ctx_impl_t  *ctx_impl;
2786     nxt_unit_read_buf_t  *rbuf;
2787 
2788     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2789 
2790     pthread_mutex_lock(&ctx_impl->mutex);
2791 
2792     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2793 
2794     pthread_mutex_unlock(&ctx_impl->mutex);
2795 
2796     memset(rbuf->oob, 0, sizeof(struct cmsghdr));
2797 
2798     return rbuf;
2799 }
2800 
2801 
2802 static nxt_unit_read_buf_t *
2803 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2804 {
2805     nxt_queue_link_t     *link;
2806     nxt_unit_read_buf_t  *rbuf;
2807 
2808     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2809         link = nxt_queue_first(&ctx_impl->free_rbuf);
2810         nxt_queue_remove(link);
2811 
2812         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2813 
2814         return rbuf;
2815     }
2816 
2817     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2818 
2819     if (nxt_fast_path(rbuf != NULL)) {
2820         rbuf->ctx_impl = ctx_impl;
2821     }
2822 
2823     return rbuf;
2824 }
2825 
2826 
2827 static void
2828 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2829     nxt_unit_read_buf_t *rbuf)
2830 {
2831     nxt_unit_ctx_impl_t  *ctx_impl;
2832 
2833     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2834 
2835     pthread_mutex_lock(&ctx_impl->mutex);
2836 
2837     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2838 
2839     pthread_mutex_unlock(&ctx_impl->mutex);
2840 }
2841 
2842 
2843 nxt_unit_buf_t *
2844 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2845 {
2846     nxt_unit_mmap_buf_t  *mmap_buf;
2847 
2848     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2849 
2850     if (mmap_buf->next == NULL) {
2851         return NULL;
2852     }
2853 
2854     return &mmap_buf->next->buf;
2855 }
2856 
2857 
2858 uint32_t
2859 nxt_unit_buf_max(void)
2860 {
2861     return PORT_MMAP_DATA_SIZE;
2862 }
2863 
2864 
2865 uint32_t
2866 nxt_unit_buf_min(void)
2867 {
2868     return PORT_MMAP_CHUNK_SIZE;
2869 }
2870 
2871 
2872 int
2873 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2874     size_t size)
2875 {
2876     ssize_t  res;
2877 
2878     res = nxt_unit_response_write_nb(req, start, size, size);
2879 
2880     return res < 0 ? -res : NXT_UNIT_OK;
2881 }
2882 
2883 
2884 ssize_t
2885 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2886     size_t size, size_t min_size)
2887 {
2888     int                           rc;
2889     ssize_t                       sent;
2890     uint32_t                      part_size, min_part_size, buf_size;
2891     const char                    *part_start;
2892     nxt_unit_mmap_buf_t           mmap_buf;
2893     nxt_unit_request_info_impl_t  *req_impl;
2894     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2895 
2896     nxt_unit_req_debug(req, "write: %d", (int) size);
2897 
2898     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2899 
2900     part_start = start;
2901     sent = 0;
2902 
2903     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2904         nxt_unit_req_alert(req, "write: response not initialized yet");
2905 
2906         return -NXT_UNIT_ERROR;
2907     }
2908 
2909     /* Check if response is not send yet. */
2910     if (nxt_slow_path(req->response_buf != NULL)) {
2911         part_size = req->response_buf->end - req->response_buf->free;
2912         part_size = nxt_min(size, part_size);
2913 
2914         rc = nxt_unit_response_add_content(req, part_start, part_size);
2915         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2916             return -rc;
2917         }
2918 
2919         rc = nxt_unit_response_send(req);
2920         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2921             return -rc;
2922         }
2923 
2924         size -= part_size;
2925         part_start += part_size;
2926         sent += part_size;
2927 
2928         min_size -= nxt_min(min_size, part_size);
2929     }
2930 
2931     while (size > 0) {
2932         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2933         min_part_size = nxt_min(min_size, part_size);
2934         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2935 
2936         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2937                                        min_part_size, &mmap_buf, local_buf);
2938         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2939             return -rc;
2940         }
2941 
2942         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2943         if (nxt_slow_path(buf_size == 0)) {
2944             return sent;
2945         }
2946         part_size = nxt_min(buf_size, part_size);
2947 
2948         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2949                                        part_start, part_size);
2950 
2951         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2952         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2953             return -rc;
2954         }
2955 
2956         size -= part_size;
2957         part_start += part_size;
2958         sent += part_size;
2959 
2960         min_size -= nxt_min(min_size, part_size);
2961     }
2962 
2963     return sent;
2964 }
2965 
2966 
2967 int
2968 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2969     nxt_unit_read_info_t *read_info)
2970 {
2971     int                           rc;
2972     ssize_t                       n;
2973     uint32_t                      buf_size;
2974     nxt_unit_buf_t                *buf;
2975     nxt_unit_mmap_buf_t           mmap_buf;
2976     nxt_unit_request_info_impl_t  *req_impl;
2977     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2978 
2979     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2980 
2981     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2982         nxt_unit_req_alert(req, "write: response not initialized yet");
2983 
2984         return NXT_UNIT_ERROR;
2985     }
2986 
2987     /* Check if response is not send yet. */
2988     if (nxt_slow_path(req->response_buf != NULL)) {
2989 
2990         /* Enable content in headers buf. */
2991         rc = nxt_unit_response_add_content(req, "", 0);
2992         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2993             nxt_unit_req_error(req, "Failed to add piggyback content");
2994 
2995             return rc;
2996         }
2997 
2998         buf = req->response_buf;
2999 
3000         while (buf->end - buf->free > 0) {
3001             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3002             if (nxt_slow_path(n < 0)) {
3003                 nxt_unit_req_error(req, "Read error");
3004 
3005                 return NXT_UNIT_ERROR;
3006             }
3007 
3008             /* Manually increase sizes. */
3009             buf->free += n;
3010             req->response->piggyback_content_length += n;
3011 
3012             if (read_info->eof) {
3013                 break;
3014             }
3015         }
3016 
3017         rc = nxt_unit_response_send(req);
3018         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3019             nxt_unit_req_error(req, "Failed to send headers with content");
3020 
3021             return rc;
3022         }
3023 
3024         if (read_info->eof) {
3025             return NXT_UNIT_OK;
3026         }
3027     }
3028 
3029     while (!read_info->eof) {
3030         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3031                            read_info->buf_size);
3032 
3033         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3034 
3035         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3036                                        buf_size, buf_size,
3037                                        &mmap_buf, local_buf);
3038         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3039             return rc;
3040         }
3041 
3042         buf = &mmap_buf.buf;
3043 
3044         while (!read_info->eof && buf->end > buf->free) {
3045             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3046             if (nxt_slow_path(n < 0)) {
3047                 nxt_unit_req_error(req, "Read error");
3048 
3049                 nxt_unit_free_outgoing_buf(&mmap_buf);
3050 
3051                 return NXT_UNIT_ERROR;
3052             }
3053 
3054             buf->free += n;
3055         }
3056 
3057         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3058         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3059             nxt_unit_req_error(req, "Failed to send content");
3060 
3061             return rc;
3062         }
3063     }
3064 
3065     return NXT_UNIT_OK;
3066 }
3067 
3068 
3069 ssize_t
3070 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3071 {
3072     ssize_t  buf_res, res;
3073 
3074     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3075                                 dst, size);
3076 
3077     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3078         res = read(req->content_fd, dst, size);
3079         if (nxt_slow_path(res < 0)) {
3080             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3081                                strerror(errno), errno);
3082 
3083             return res;
3084         }
3085 
3086         if (res < (ssize_t) size) {
3087             nxt_unit_close(req->content_fd);
3088 
3089             req->content_fd = -1;
3090         }
3091 
3092         req->content_length -= res;
3093         size -= res;
3094 
3095         dst = nxt_pointer_to(dst, res);
3096 
3097     } else {
3098         res = 0;
3099     }
3100 
3101     return buf_res + res;
3102 }
3103 
3104 
3105 ssize_t
3106 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3107 {
3108     char                 *p;
3109     size_t               l_size, b_size;
3110     nxt_unit_buf_t       *b;
3111     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3112 
3113     if (req->content_length == 0) {
3114         return 0;
3115     }
3116 
3117     l_size = 0;
3118 
3119     b = req->content_buf;
3120 
3121     while (b != NULL) {
3122         b_size = b->end - b->free;
3123         p = memchr(b->free, '\n', b_size);
3124 
3125         if (p != NULL) {
3126             p++;
3127             l_size += p - b->free;
3128             break;
3129         }
3130 
3131         l_size += b_size;
3132 
3133         if (max_size <= l_size) {
3134             break;
3135         }
3136 
3137         mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf);
3138         if (mmap_buf->next == NULL
3139             && req->content_fd != -1
3140             && l_size < req->content_length)
3141         {
3142             preread_buf = nxt_unit_request_preread(