xref: /unit/src/nxt_unit.c (revision 2217:8019e6c650f6)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11 
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16 
17 #include "nxt_websocket.h"
18 
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22 
23 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE  \
25     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26 
27 enum {
28     NXT_QUIT_NORMAL   = 0,
29     NXT_QUIT_GRACEFUL = 1,
30 };
31 
32 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s               nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
43 
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46     nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52     nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54     nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58     int *shared_port_fd, int *shared_queue_fd,
59     int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60     uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62     int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64     nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66     nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71     nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73     nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76     nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79     nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83     nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86     nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90     nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95     nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97     nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99     nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101     size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108     nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111     int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113     nxt_unit_port_t *port, uint32_t size,
114     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116 
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118     nxt_unit_ctx_impl_t *ctx_impl);
119 static int nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132 
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135     pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147     nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150 
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152     nxt_unit_port_t *port, int queue_fd);
153 
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157     nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159     nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161     nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163     nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166     nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170     nxt_unit_port_t *port, const void *buf, size_t buf_size,
171     const nxt_send_oob_t *oob);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173     const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175     nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177     nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179     nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181     nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183     nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185     nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188 
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190     nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192     nxt_unit_port_id_t *port_id, int remove);
193 
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195     nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198 
199 static char * nxt_unit_snprint_prefix(char *p, const char *end, pid_t pid,
200     int level);
201 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
202 static void nxt_unit_lvlhsh_free(void *data, void *p);
203 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
204 
205 
206 struct nxt_unit_mmap_buf_s {
207     nxt_unit_buf_t           buf;
208 
209     nxt_unit_mmap_buf_t      *next;
210     nxt_unit_mmap_buf_t      **prev;
211 
212     nxt_port_mmap_header_t   *hdr;
213     nxt_unit_request_info_t  *req;
214     nxt_unit_ctx_impl_t      *ctx_impl;
215     char                     *free_ptr;
216     char                     *plain_ptr;
217 };
218 
219 
220 struct nxt_unit_recv_msg_s {
221     uint32_t                 stream;
222     nxt_pid_t                pid;
223     nxt_port_id_t            reply_port;
224 
225     uint8_t                  last;      /* 1 bit */
226     uint8_t                  mmap;      /* 1 bit */
227 
228     void                     *start;
229     uint32_t                 size;
230 
231     int                      fd[2];
232 
233     nxt_unit_mmap_buf_t      *incoming_buf;
234 };
235 
236 
237 typedef enum {
238     NXT_UNIT_RS_START           = 0,
239     NXT_UNIT_RS_RESPONSE_INIT,
240     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
241     NXT_UNIT_RS_RESPONSE_SENT,
242     NXT_UNIT_RS_RELEASED,
243 } nxt_unit_req_state_t;
244 
245 
246 struct nxt_unit_request_info_impl_s {
247     nxt_unit_request_info_t  req;
248 
249     uint32_t                 stream;
250 
251     nxt_unit_mmap_buf_t      *outgoing_buf;
252     nxt_unit_mmap_buf_t      *incoming_buf;
253 
254     nxt_unit_req_state_t     state;
255     uint8_t                  websocket;
256     uint8_t                  in_hash;
257 
258     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
259     nxt_queue_link_t         link;
260     /*  for nxt_unit_port_impl_t.awaiting_req */
261     nxt_queue_link_t         port_wait_link;
262 
263     char                     extra_data[];
264 };
265 
266 
267 struct nxt_unit_websocket_frame_impl_s {
268     nxt_unit_websocket_frame_t  ws;
269 
270     nxt_unit_mmap_buf_t         *buf;
271 
272     nxt_queue_link_t            link;
273 
274     nxt_unit_ctx_impl_t         *ctx_impl;
275 };
276 
277 
278 struct nxt_unit_read_buf_s {
279     nxt_queue_link_t              link;
280     nxt_unit_ctx_impl_t           *ctx_impl;
281     ssize_t                       size;
282     nxt_recv_oob_t                oob;
283     char                          buf[16384];
284 };
285 
286 
287 struct nxt_unit_ctx_impl_s {
288     nxt_unit_ctx_t                ctx;
289 
290     nxt_atomic_t                  use_count;
291     nxt_atomic_t                  wait_items;
292 
293     pthread_mutex_t               mutex;
294 
295     nxt_unit_port_t               *read_port;
296 
297     nxt_queue_link_t              link;
298 
299     nxt_unit_mmap_buf_t           *free_buf;
300 
301     /*  of nxt_unit_request_info_impl_t */
302     nxt_queue_t                   free_req;
303 
304     /*  of nxt_unit_websocket_frame_impl_t */
305     nxt_queue_t                   free_ws;
306 
307     /*  of nxt_unit_request_info_impl_t */
308     nxt_queue_t                   active_req;
309 
310     /*  of nxt_unit_request_info_impl_t */
311     nxt_lvlhsh_t                  requests;
312 
313     /*  of nxt_unit_request_info_impl_t */
314     nxt_queue_t                   ready_req;
315 
316     /*  of nxt_unit_read_buf_t */
317     nxt_queue_t                   pending_rbuf;
318 
319     /*  of nxt_unit_read_buf_t */
320     nxt_queue_t                   free_rbuf;
321 
322     uint8_t                       online;       /* 1 bit */
323     uint8_t                       ready;        /* 1 bit */
324     uint8_t                       quit_param;
325 
326     nxt_unit_mmap_buf_t           ctx_buf[2];
327     nxt_unit_read_buf_t           ctx_read_buf;
328 
329     nxt_unit_request_info_impl_t  req;
330 };
331 
332 
333 struct nxt_unit_mmap_s {
334     nxt_port_mmap_header_t   *hdr;
335     pthread_t                src_thread;
336 
337     /*  of nxt_unit_read_buf_t */
338     nxt_queue_t              awaiting_rbuf;
339 };
340 
341 
342 struct nxt_unit_mmaps_s {
343     pthread_mutex_t          mutex;
344     uint32_t                 size;
345     uint32_t                 cap;
346     nxt_atomic_t             allocated_chunks;
347     nxt_unit_mmap_t          *elts;
348 };
349 
350 
351 struct nxt_unit_impl_s {
352     nxt_unit_t               unit;
353     nxt_unit_callbacks_t     callbacks;
354 
355     nxt_atomic_t             use_count;
356     nxt_atomic_t             request_count;
357 
358     uint32_t                 request_data_size;
359     uint32_t                 shm_mmap_limit;
360     uint32_t                 request_limit;
361 
362     pthread_mutex_t          mutex;
363 
364     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
365     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
366 
367     nxt_unit_port_t          *router_port;
368     nxt_unit_port_t          *shared_port;
369 
370     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
371 
372     nxt_unit_mmaps_t         incoming;
373     nxt_unit_mmaps_t         outgoing;
374 
375     pid_t                    pid;
376     int                      log_fd;
377 
378     nxt_unit_ctx_impl_t      main_ctx;
379 };
380 
381 
382 struct nxt_unit_port_impl_s {
383     nxt_unit_port_t          port;
384 
385     nxt_atomic_t             use_count;
386 
387     /*  for nxt_unit_process_t.ports */
388     nxt_queue_link_t         link;
389     nxt_unit_process_t       *process;
390 
391     /*  of nxt_unit_request_info_impl_t */
392     nxt_queue_t              awaiting_req;
393 
394     int                      ready;
395 
396     void                     *queue;
397 
398     int                      from_socket;
399     nxt_unit_read_buf_t      *socket_rbuf;
400 };
401 
402 
403 struct nxt_unit_process_s {
404     pid_t                    pid;
405 
406     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
407 
408     nxt_unit_impl_t          *lib;
409 
410     nxt_atomic_t             use_count;
411 
412     uint32_t                 next_port_id;
413 };
414 
415 
416 /* Explicitly using 32 bit types to avoid possible alignment. */
417 typedef struct {
418     int32_t   pid;
419     uint32_t  id;
420 } nxt_unit_port_hash_id_t;
421 
422 
423 static pid_t  nxt_unit_pid;
424 
425 
426 nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t * init)427 nxt_unit_init(nxt_unit_init_t *init)
428 {
429     int              rc, queue_fd, shared_queue_fd;
430     void             *mem;
431     uint32_t         ready_stream, shm_limit, request_limit;
432     nxt_unit_ctx_t   *ctx;
433     nxt_unit_impl_t  *lib;
434     nxt_unit_port_t  ready_port, router_port, read_port, shared_port;
435 
436     nxt_unit_pid = getpid();
437 
438     lib = nxt_unit_create(init);
439     if (nxt_slow_path(lib == NULL)) {
440         return NULL;
441     }
442 
443     queue_fd = -1;
444     mem = MAP_FAILED;
445     shared_port.out_fd = -1;
446     shared_port.data = NULL;
447 
448     if (init->ready_port.id.pid != 0
449         && init->ready_stream != 0
450         && init->read_port.id.pid != 0)
451     {
452         ready_port = init->ready_port;
453         ready_stream = init->ready_stream;
454         router_port = init->router_port;
455         read_port = init->read_port;
456         lib->log_fd = init->log_fd;
457 
458         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
459                               ready_port.id.id);
460         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
461                               router_port.id.id);
462         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
463                               read_port.id.id);
464 
465         shared_port.in_fd = init->shared_port_fd;
466         shared_queue_fd = init->shared_queue_fd;
467 
468     } else {
469         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
470                                &shared_port.in_fd, &shared_queue_fd,
471                                &lib->log_fd, &ready_stream, &shm_limit,
472                                &request_limit);
473         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
474             goto fail;
475         }
476 
477         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
478                                 / PORT_MMAP_DATA_SIZE;
479         lib->request_limit = request_limit;
480     }
481 
482     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
483         lib->shm_mmap_limit = 1;
484     }
485 
486     lib->pid = read_port.id.pid;
487     nxt_unit_pid = lib->pid;
488 
489     ctx = &lib->main_ctx.ctx;
490 
491     rc = nxt_unit_fd_blocking(router_port.out_fd);
492     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
493         goto fail;
494     }
495 
496     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
497     if (nxt_slow_path(lib->router_port == NULL)) {
498         nxt_unit_alert(NULL, "failed to add router_port");
499 
500         goto fail;
501     }
502 
503     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
504     if (nxt_slow_path(queue_fd == -1)) {
505         goto fail;
506     }
507 
508     mem = mmap(NULL, sizeof(nxt_port_queue_t),
509                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
510     if (nxt_slow_path(mem == MAP_FAILED)) {
511         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
512                        strerror(errno), errno);
513 
514         goto fail;
515     }
516 
517     nxt_port_queue_init(mem);
518 
519     rc = nxt_unit_fd_blocking(read_port.in_fd);
520     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
521         goto fail;
522     }
523 
524     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
525     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
526         nxt_unit_alert(NULL, "failed to add read_port");
527 
528         goto fail;
529     }
530 
531     rc = nxt_unit_fd_blocking(ready_port.out_fd);
532     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
533         goto fail;
534     }
535 
536     nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
537                           NXT_UNIT_SHARED_PORT_ID);
538 
539     mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
540                MAP_SHARED, shared_queue_fd, 0);
541     if (nxt_slow_path(mem == MAP_FAILED)) {
542         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
543                        strerror(errno), errno);
544 
545         goto fail;
546     }
547 
548     nxt_unit_close(shared_queue_fd);
549 
550     lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
551     if (nxt_slow_path(lib->shared_port == NULL)) {
552         nxt_unit_alert(NULL, "failed to add shared_port");
553 
554         goto fail;
555     }
556 
557     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
558     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
559         nxt_unit_alert(NULL, "failed to send READY message");
560 
561         goto fail;
562     }
563 
564     nxt_unit_close(ready_port.out_fd);
565     nxt_unit_close(queue_fd);
566 
567     return ctx;
568 
569 fail:
570 
571     if (mem != MAP_FAILED) {
572         munmap(mem, sizeof(nxt_port_queue_t));
573     }
574 
575     if (queue_fd != -1) {
576         nxt_unit_close(queue_fd);
577     }
578 
579     nxt_unit_ctx_release(&lib->main_ctx.ctx);
580 
581     return NULL;
582 }
583 
584 
585 static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t * init)586 nxt_unit_create(nxt_unit_init_t *init)
587 {
588     int              rc;
589     nxt_unit_impl_t  *lib;
590 
591     if (nxt_slow_path(init->callbacks.request_handler == NULL)) {
592         nxt_unit_alert(NULL, "request_handler is NULL");
593 
594         return NULL;
595     }
596 
597     lib = nxt_unit_malloc(NULL,
598                           sizeof(nxt_unit_impl_t) + init->request_data_size);
599     if (nxt_slow_path(lib == NULL)) {
600         nxt_unit_alert(NULL, "failed to allocate unit struct");
601 
602         return NULL;
603     }
604 
605     rc = pthread_mutex_init(&lib->mutex, NULL);
606     if (nxt_slow_path(rc != 0)) {
607         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
608 
609         goto out_unit_free;
610     }
611 
612     lib->unit.data = init->data;
613     lib->callbacks = init->callbacks;
614 
615     lib->request_data_size = init->request_data_size;
616     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
617                             / PORT_MMAP_DATA_SIZE;
618     lib->request_limit = init->request_limit;
619 
620     lib->processes.slot = NULL;
621     lib->ports.slot = NULL;
622 
623     lib->log_fd = STDERR_FILENO;
624 
625     nxt_queue_init(&lib->contexts);
626 
627     lib->use_count = 0;
628     lib->request_count = 0;
629     lib->router_port = NULL;
630     lib->shared_port = NULL;
631 
632     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
633     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
634         goto out_mutex_destroy;
635     }
636 
637     rc = nxt_unit_mmaps_init(&lib->incoming);
638     if (nxt_slow_path(rc != 0)) {
639         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
640 
641         goto out_ctx_free;
642     }
643 
644     rc = nxt_unit_mmaps_init(&lib->outgoing);
645     if (nxt_slow_path(rc != 0)) {
646         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
647 
648         goto out_mmaps_destroy;
649     }
650 
651     return lib;
652 
653 out_mmaps_destroy:
654     nxt_unit_mmaps_destroy(&lib->incoming);
655 
656 out_ctx_free:
657     nxt_unit_ctx_free(&lib->main_ctx);
658 
659 out_mutex_destroy:
660     pthread_mutex_destroy(&lib->mutex);
661 
662 out_unit_free:
663     nxt_unit_free(NULL, lib);
664 
665     return NULL;
666 }
667 
668 
669 static int
nxt_unit_ctx_init(nxt_unit_impl_t * lib,nxt_unit_ctx_impl_t * ctx_impl,void * data)670 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
671     void *data)
672 {
673     int  rc;
674 
675     ctx_impl->ctx.data = data;
676     ctx_impl->ctx.unit = &lib->unit;
677 
678     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
679     if (nxt_slow_path(rc != 0)) {
680         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
681 
682         return NXT_UNIT_ERROR;
683     }
684 
685     nxt_unit_lib_use(lib);
686 
687     pthread_mutex_lock(&lib->mutex);
688 
689     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
690 
691     pthread_mutex_unlock(&lib->mutex);
692 
693     ctx_impl->use_count = 1;
694     ctx_impl->wait_items = 0;
695     ctx_impl->online = 1;
696     ctx_impl->ready = 0;
697     ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
698 
699     nxt_queue_init(&ctx_impl->free_req);
700     nxt_queue_init(&ctx_impl->free_ws);
701     nxt_queue_init(&ctx_impl->active_req);
702     nxt_queue_init(&ctx_impl->ready_req);
703     nxt_queue_init(&ctx_impl->pending_rbuf);
704     nxt_queue_init(&ctx_impl->free_rbuf);
705 
706     ctx_impl->free_buf = NULL;
707     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
708     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
709 
710     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
711     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
712 
713     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
714 
715     ctx_impl->req.req.ctx = &ctx_impl->ctx;
716     ctx_impl->req.req.unit = &lib->unit;
717 
718     ctx_impl->read_port = NULL;
719     ctx_impl->requests.slot = 0;
720 
721     return NXT_UNIT_OK;
722 }
723 
724 
725 nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_t * ctx)726 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
727 {
728     nxt_unit_ctx_impl_t  *ctx_impl;
729 
730     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
731 
732     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
733 }
734 
735 
736 nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_t * ctx)737 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
738 {
739     long                 c;
740     nxt_unit_ctx_impl_t  *ctx_impl;
741 
742     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
743 
744     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
745 
746     if (c == 1) {
747         nxt_unit_ctx_free(ctx_impl);
748     }
749 }
750 
751 
752 nxt_inline void
nxt_unit_lib_use(nxt_unit_impl_t * lib)753 nxt_unit_lib_use(nxt_unit_impl_t *lib)
754 {
755     nxt_atomic_fetch_add(&lib->use_count, 1);
756 }
757 
758 
759 nxt_inline void
nxt_unit_lib_release(nxt_unit_impl_t * lib)760 nxt_unit_lib_release(nxt_unit_impl_t *lib)
761 {
762     long                c;
763     nxt_unit_process_t  *process;
764 
765     c = nxt_atomic_fetch_add(&lib->use_count, -1);
766 
767     if (c == 1) {
768         for ( ;; ) {
769             pthread_mutex_lock(&lib->mutex);
770 
771             process = nxt_unit_process_pop_first(lib);
772             if (process == NULL) {
773                 pthread_mutex_unlock(&lib->mutex);
774 
775                 break;
776             }
777 
778             nxt_unit_remove_process(lib, process);
779         }
780 
781         pthread_mutex_destroy(&lib->mutex);
782 
783         if (nxt_fast_path(lib->router_port != NULL)) {
784             nxt_unit_port_release(lib->router_port);
785         }
786 
787         if (nxt_fast_path(lib->shared_port != NULL)) {
788             nxt_unit_port_release(lib->shared_port);
789         }
790 
791         nxt_unit_mmaps_destroy(&lib->incoming);
792         nxt_unit_mmaps_destroy(&lib->outgoing);
793 
794         nxt_unit_free(NULL, lib);
795     }
796 }
797 
798 
799 nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t ** head,nxt_unit_mmap_buf_t * mmap_buf)800 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
801     nxt_unit_mmap_buf_t *mmap_buf)
802 {
803     mmap_buf->next = *head;
804 
805     if (mmap_buf->next != NULL) {
806         mmap_buf->next->prev = &mmap_buf->next;
807     }
808 
809     *head = mmap_buf;
810     mmap_buf->prev = head;
811 }
812 
813 
814 nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t ** prev,nxt_unit_mmap_buf_t * mmap_buf)815 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
816     nxt_unit_mmap_buf_t *mmap_buf)
817 {
818     while (*prev != NULL) {
819         prev = &(*prev)->next;
820     }
821 
822     nxt_unit_mmap_buf_insert(prev, mmap_buf);
823 }
824 
825 
826 nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t * mmap_buf)827 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
828 {
829     nxt_unit_mmap_buf_t  **prev;
830 
831     prev = mmap_buf->prev;
832 
833     if (mmap_buf->next != NULL) {
834         mmap_buf->next->prev = prev;
835     }
836 
837     if (prev != NULL) {
838         *prev = mmap_buf->next;
839     }
840 }
841 
842 
843 static int
nxt_unit_read_env(nxt_unit_port_t * ready_port,nxt_unit_port_t * router_port,nxt_unit_port_t * read_port,int * shared_port_fd,int * shared_queue_fd,int * log_fd,uint32_t * stream,uint32_t * shm_limit,uint32_t * request_limit)844 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
845     nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
846     int *log_fd, uint32_t *stream,
847     uint32_t *shm_limit, uint32_t *request_limit)
848 {
849     int       rc;
850     int       ready_fd, router_fd, read_in_fd, read_out_fd;
851     char      *unit_init, *version_end, *vars;
852     size_t    version_length;
853     int64_t   ready_pid, router_pid, read_pid;
854     uint32_t  ready_stream, router_id, ready_id, read_id;
855 
856     unit_init = getenv(NXT_UNIT_INIT_ENV);
857     if (nxt_slow_path(unit_init == NULL)) {
858         nxt_unit_alert(NULL, "%s is not in the current environment",
859                        NXT_UNIT_INIT_ENV);
860 
861         return NXT_UNIT_ERROR;
862     }
863 
864     version_end = strchr(unit_init, ';');
865     if (nxt_slow_path(version_end == NULL)) {
866         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
867                        NXT_UNIT_INIT_ENV, unit_init);
868 
869         return NXT_UNIT_ERROR;
870     }
871 
872     version_length = version_end - unit_init;
873 
874     rc = version_length != nxt_length(NXT_VERSION)
875          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
876 
877     if (nxt_slow_path(rc != 0)) {
878         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
879                        "%.*s, while the app was compiled with libunit %s",
880                        (int) version_length, unit_init, NXT_VERSION);
881 
882         return NXT_UNIT_ERROR;
883     }
884 
885     vars = version_end + 1;
886 
887     rc = sscanf(vars,
888                 "%"PRIu32";"
889                 "%"PRId64",%"PRIu32",%d;"
890                 "%"PRId64",%"PRIu32",%d;"
891                 "%"PRId64",%"PRIu32",%d,%d;"
892                 "%d,%d;"
893                 "%d,%"PRIu32",%"PRIu32,
894                 &ready_stream,
895                 &ready_pid, &ready_id, &ready_fd,
896                 &router_pid, &router_id, &router_fd,
897                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
898                 shared_port_fd, shared_queue_fd,
899                 log_fd, shm_limit, request_limit);
900 
901     if (nxt_slow_path(rc == EOF)) {
902         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
903                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
904 
905         return NXT_UNIT_ERROR;
906     }
907 
908     if (nxt_slow_path(rc != 16)) {
909         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
910                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
911 
912         return NXT_UNIT_ERROR;
913     }
914 
915     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
916 
917     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
918 
919     ready_port->in_fd = -1;
920     ready_port->out_fd = ready_fd;
921     ready_port->data = NULL;
922 
923     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
924 
925     router_port->in_fd = -1;
926     router_port->out_fd = router_fd;
927     router_port->data = NULL;
928 
929     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
930 
931     read_port->in_fd = read_in_fd;
932     read_port->out_fd = read_out_fd;
933     read_port->data = NULL;
934 
935     *stream = ready_stream;
936 
937     return NXT_UNIT_OK;
938 }
939 
940 
941 static int
nxt_unit_ready(nxt_unit_ctx_t * ctx,int ready_fd,uint32_t stream,int queue_fd)942 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
943 {
944     ssize_t          res;
945     nxt_send_oob_t   oob;
946     nxt_port_msg_t   msg;
947     nxt_unit_impl_t  *lib;
948     int              fds[2] = {queue_fd, -1};
949 
950     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
951 
952     msg.stream = stream;
953     msg.pid = lib->pid;
954     msg.reply_port = 0;
955     msg.type = _NXT_PORT_MSG_PROCESS_READY;
956     msg.last = 1;
957     msg.mmap = 0;
958     msg.nf = 0;
959     msg.mf = 0;
960 
961     nxt_socket_msg_oob_init(&oob, fds);
962 
963     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
964     if (res != sizeof(msg)) {
965         return NXT_UNIT_ERROR;
966     }
967 
968     return NXT_UNIT_OK;
969 }
970 
971 
972 static int
nxt_unit_process_msg(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf,nxt_unit_request_info_t ** preq)973 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
974     nxt_unit_request_info_t **preq)
975 {
976     int                  rc;
977     pid_t                pid;
978     uint8_t              quit_param;
979     nxt_port_msg_t       *port_msg;
980     nxt_unit_impl_t      *lib;
981     nxt_unit_recv_msg_t  recv_msg;
982 
983     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
984 
985     recv_msg.incoming_buf = NULL;
986     recv_msg.fd[0] = -1;
987     recv_msg.fd[1] = -1;
988 
989     rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
990     if (nxt_slow_path(rc != NXT_OK)) {
991         nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
992         rc = NXT_UNIT_ERROR;
993         goto done;
994     }
995 
996     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
997         if (nxt_slow_path(rbuf->size == 0)) {
998             nxt_unit_debug(ctx, "read port closed");
999 
1000             nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1001             rc = NXT_UNIT_OK;
1002             goto done;
1003         }
1004 
1005         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
1006 
1007         rc = NXT_UNIT_ERROR;
1008         goto done;
1009     }
1010 
1011     port_msg = (nxt_port_msg_t *) rbuf->buf;
1012 
1013     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
1014                    port_msg->stream, (int) port_msg->type,
1015                    recv_msg.fd[0], recv_msg.fd[1]);
1016 
1017     recv_msg.stream = port_msg->stream;
1018     recv_msg.pid = port_msg->pid;
1019     recv_msg.reply_port = port_msg->reply_port;
1020     recv_msg.last = port_msg->last;
1021     recv_msg.mmap = port_msg->mmap;
1022 
1023     recv_msg.start = port_msg + 1;
1024     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1025 
1026     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1027         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1028                        port_msg->stream, (int) port_msg->type);
1029         rc = NXT_UNIT_ERROR;
1030         goto done;
1031     }
1032 
1033     /* Fragmentation is unsupported. */
1034     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1035         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1036                        port_msg->stream, (int) port_msg->type);
1037         rc = NXT_UNIT_ERROR;
1038         goto done;
1039     }
1040 
1041     if (port_msg->mmap) {
1042         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1043 
1044         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1045             if (rc == NXT_UNIT_AGAIN) {
1046                 recv_msg.fd[0] = -1;
1047                 recv_msg.fd[1] = -1;
1048             }
1049 
1050             goto done;
1051         }
1052     }
1053 
1054     switch (port_msg->type) {
1055 
1056     case _NXT_PORT_MSG_RPC_READY:
1057         rc = NXT_UNIT_OK;
1058         break;
1059 
1060     case _NXT_PORT_MSG_QUIT:
1061         if (recv_msg.size == sizeof(quit_param)) {
1062             memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1063 
1064         } else {
1065             quit_param = NXT_QUIT_NORMAL;
1066         }
1067 
1068         nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1069                        (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1070 
1071         nxt_unit_quit(ctx, quit_param);
1072 
1073         rc = NXT_UNIT_OK;
1074         break;
1075 
1076     case _NXT_PORT_MSG_NEW_PORT:
1077         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1078         break;
1079 
1080     case _NXT_PORT_MSG_PORT_ACK:
1081         rc = nxt_unit_ctx_ready(ctx);
1082         break;
1083 
1084     case _NXT_PORT_MSG_CHANGE_FILE:
1085         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1086                        port_msg->stream, recv_msg.fd[0]);
1087 
1088         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1089             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1090                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1091                            strerror(errno), errno);
1092 
1093             rc = NXT_UNIT_ERROR;
1094             goto done;
1095         }
1096 
1097         rc = NXT_UNIT_OK;
1098         break;
1099 
1100     case _NXT_PORT_MSG_MMAP:
1101         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1102             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1103                            port_msg->stream, recv_msg.fd[0]);
1104 
1105             rc = NXT_UNIT_ERROR;
1106             goto done;
1107         }
1108 
1109         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1110         break;
1111 
1112     case _NXT_PORT_MSG_REQ_HEADERS:
1113         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1114         break;
1115 
1116     case _NXT_PORT_MSG_REQ_BODY:
1117         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1118         break;
1119 
1120     case _NXT_PORT_MSG_WEBSOCKET:
1121         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1122         break;
1123 
1124     case _NXT_PORT_MSG_REMOVE_PID:
1125         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1126             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1127                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1128                            (int) sizeof(pid));
1129 
1130             rc = NXT_UNIT_ERROR;
1131             goto done;
1132         }
1133 
1134         memcpy(&pid, recv_msg.start, sizeof(pid));
1135 
1136         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1137                        port_msg->stream, (int) pid);
1138 
1139         nxt_unit_remove_pid(lib, pid);
1140 
1141         rc = NXT_UNIT_OK;
1142         break;
1143 
1144     case _NXT_PORT_MSG_SHM_ACK:
1145         rc = nxt_unit_process_shm_ack(ctx);
1146         break;
1147 
1148     default:
1149         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1150                        port_msg->stream, (int) port_msg->type);
1151 
1152         rc = NXT_UNIT_ERROR;
1153         goto done;
1154     }
1155 
1156 done:
1157 
1158     if (recv_msg.fd[0] != -1) {
1159         nxt_unit_close(recv_msg.fd[0]);
1160     }
1161 
1162     if (recv_msg.fd[1] != -1) {
1163         nxt_unit_close(recv_msg.fd[1]);
1164     }
1165 
1166     while (recv_msg.incoming_buf != NULL) {
1167         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1168     }
1169 
1170     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1171 #if (NXT_DEBUG)
1172         memset(rbuf->buf, 0xAC, rbuf->size);
1173 #endif
1174         nxt_unit_read_buf_release(ctx, rbuf);
1175     }
1176 
1177     return rc;
1178 }
1179 
1180 
1181 static int
nxt_unit_process_new_port(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1182 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1183 {
1184     void                     *mem;
1185     nxt_unit_port_t          new_port, *port;
1186     nxt_port_msg_new_port_t  *new_port_msg;
1187 
1188     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1189         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1190                       "invalid message size (%d)",
1191                       recv_msg->stream, (int) recv_msg->size);
1192 
1193         return NXT_UNIT_ERROR;
1194     }
1195 
1196     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1197         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1198                        recv_msg->stream, recv_msg->fd[0]);
1199 
1200         return NXT_UNIT_ERROR;
1201     }
1202 
1203     new_port_msg = recv_msg->start;
1204 
1205     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1206                    recv_msg->stream, (int) new_port_msg->pid,
1207                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1208 
1209     if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
1210         return NXT_UNIT_ERROR;
1211     }
1212 
1213     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
1214 
1215     new_port.in_fd = -1;
1216     new_port.out_fd = recv_msg->fd[0];
1217 
1218     mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1219                MAP_SHARED, recv_msg->fd[1], 0);
1220 
1221     if (nxt_slow_path(mem == MAP_FAILED)) {
1222         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1223                        strerror(errno), errno);
1224 
1225         return NXT_UNIT_ERROR;
1226     }
1227 
1228     new_port.data = NULL;
1229 
1230     recv_msg->fd[0] = -1;
1231 
1232     port = nxt_unit_add_port(ctx, &new_port, mem);
1233     if (nxt_slow_path(port == NULL)) {
1234         return NXT_UNIT_ERROR;
1235     }
1236 
1237     nxt_unit_port_release(port);
1238 
1239     return NXT_UNIT_OK;
1240 }
1241 
1242 
1243 static int
nxt_unit_ctx_ready(nxt_unit_ctx_t * ctx)1244 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1245 {
1246     nxt_unit_impl_t      *lib;
1247     nxt_unit_ctx_impl_t  *ctx_impl;
1248 
1249     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1250 
1251     if (nxt_slow_path(ctx_impl->ready)) {
1252         return NXT_UNIT_OK;
1253     }
1254 
1255     ctx_impl->ready = 1;
1256 
1257     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1258 
1259     /* Call ready_handler() only for main context. */
1260     if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1261         return lib->callbacks.ready_handler(ctx);
1262     }
1263 
1264     if (&lib->main_ctx != ctx_impl) {
1265         /* Check if the main context is already stopped or quit. */
1266         if (nxt_slow_path(!lib->main_ctx.ready)) {
1267             ctx_impl->ready = 0;
1268 
1269             nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1270 
1271             return NXT_UNIT_OK;
1272         }
1273 
1274         if (lib->callbacks.add_port != NULL) {
1275             lib->callbacks.add_port(ctx, lib->shared_port);
1276         }
1277     }
1278 
1279     return NXT_UNIT_OK;
1280 }
1281 
1282 
1283 static int
nxt_unit_process_req_headers(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_request_info_t ** preq)1284 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1285     nxt_unit_request_info_t **preq)
1286 {
1287     int                           res;
1288     nxt_unit_impl_t               *lib;
1289     nxt_unit_port_id_t            port_id;
1290     nxt_unit_request_t            *r;
1291     nxt_unit_mmap_buf_t           *b;
1292     nxt_unit_request_info_t       *req;
1293     nxt_unit_request_info_impl_t  *req_impl;
1294 
1295     if (nxt_slow_path(recv_msg->mmap == 0)) {
1296         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1297                       recv_msg->stream);
1298 
1299         return NXT_UNIT_ERROR;
1300     }
1301 
1302     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1303         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1304                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1305                       (int) sizeof(nxt_unit_request_t));
1306 
1307         return NXT_UNIT_ERROR;
1308     }
1309 
1310     req_impl = nxt_unit_request_info_get(ctx);
1311     if (nxt_slow_path(req_impl == NULL)) {
1312         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1313                       recv_msg->stream);
1314 
1315         return NXT_UNIT_ERROR;
1316     }
1317 
1318     req = &req_impl->req;
1319 
1320     req->request = recv_msg->start;
1321 
1322     b = recv_msg->incoming_buf;
1323 
1324     req->request_buf = &b->buf;
1325     req->response = NULL;
1326     req->response_buf = NULL;
1327 
1328     r = req->request;
1329 
1330     req->content_length = r->content_length;
1331 
1332     req->content_buf = req->request_buf;
1333     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1334 
1335     req_impl->stream = recv_msg->stream;
1336 
1337     req_impl->outgoing_buf = NULL;
1338 
1339     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1340         b->req = req;
1341     }
1342 
1343     /* "Move" incoming buffer list to req_impl. */
1344     req_impl->incoming_buf = recv_msg->incoming_buf;
1345     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1346     recv_msg->incoming_buf = NULL;
1347 
1348     req->content_fd = recv_msg->fd[0];
1349     recv_msg->fd[0] = -1;
1350 
1351     req->response_max_fields = 0;
1352     req_impl->state = NXT_UNIT_RS_START;
1353     req_impl->websocket = 0;
1354     req_impl->in_hash = 0;
1355 
1356     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1357                    (int) r->method_length,
1358                    (char *) nxt_unit_sptr_get(&r->method),
1359                    (int) r->target_length,
1360                    (char *) nxt_unit_sptr_get(&r->target),
1361                    (int) r->content_length);
1362 
1363     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1364 
1365     res = nxt_unit_request_check_response_port(req, &port_id);
1366     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1367         return NXT_UNIT_ERROR;
1368     }
1369 
1370     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1371         res = nxt_unit_send_req_headers_ack(req);
1372         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1373             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1374 
1375             return NXT_UNIT_ERROR;
1376         }
1377 
1378         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1379 
1380         if (req->content_length
1381             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1382         {
1383             res = nxt_unit_request_hash_add(ctx, req);
1384             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1385                 nxt_unit_req_warn(req, "failed to add request to hash");
1386 
1387                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1388 
1389                 return NXT_UNIT_ERROR;
1390             }
1391 
1392             /*
1393              * If application have separate data handler, we may start
1394              * request processing and process data when it is arrived.
1395              */
1396             if (lib->callbacks.data_handler == NULL) {
1397                 return NXT_UNIT_OK;
1398             }
1399         }
1400 
1401         if (preq == NULL) {
1402             lib->callbacks.request_handler(req);
1403 
1404         } else {
1405             *preq = req;
1406         }
1407     }
1408 
1409     return NXT_UNIT_OK;
1410 }
1411 
1412 
1413 static int
nxt_unit_process_req_body(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1414 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1415 {
1416     uint64_t                 l;
1417     nxt_unit_impl_t          *lib;
1418     nxt_unit_mmap_buf_t      *b;
1419     nxt_unit_request_info_t  *req;
1420 
1421     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1422     if (req == NULL) {
1423         return NXT_UNIT_OK;
1424     }
1425 
1426     l = req->content_buf->end - req->content_buf->free;
1427 
1428     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1429         b->req = req;
1430         l += b->buf.end - b->buf.free;
1431     }
1432 
1433     if (recv_msg->incoming_buf != NULL) {
1434         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1435 
1436         while (b->next != NULL) {
1437             b = b->next;
1438         }
1439 
1440         /* "Move" incoming buffer list to req_impl. */
1441         b->next = recv_msg->incoming_buf;
1442         b->next->prev = &b->next;
1443 
1444         recv_msg->incoming_buf = NULL;
1445     }
1446 
1447     req->content_fd = recv_msg->fd[0];
1448     recv_msg->fd[0] = -1;
1449 
1450     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1451 
1452     if (lib->callbacks.data_handler != NULL) {
1453         lib->callbacks.data_handler(req);
1454 
1455         return NXT_UNIT_OK;
1456     }
1457 
1458     if (req->content_fd != -1 || l == req->content_length) {
1459         lib->callbacks.request_handler(req);
1460     }
1461 
1462     return NXT_UNIT_OK;
1463 }
1464 
1465 
1466 static int
nxt_unit_request_check_response_port(nxt_unit_request_info_t * req,nxt_unit_port_id_t * port_id)1467 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1468     nxt_unit_port_id_t *port_id)
1469 {
1470     int                           res;
1471     nxt_unit_ctx_t                *ctx;
1472     nxt_unit_impl_t               *lib;
1473     nxt_unit_port_t               *port;
1474     nxt_unit_process_t            *process;
1475     nxt_unit_ctx_impl_t           *ctx_impl;
1476     nxt_unit_port_impl_t          *port_impl;
1477     nxt_unit_request_info_impl_t  *req_impl;
1478 
1479     ctx = req->ctx;
1480     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1481     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1482 
1483     pthread_mutex_lock(&lib->mutex);
1484 
1485     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1486     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1487 
1488     if (nxt_fast_path(port != NULL)) {
1489         req->response_port = port;
1490 
1491         if (nxt_fast_path(port_impl->ready)) {
1492             pthread_mutex_unlock(&lib->mutex);
1493 
1494             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1495                            (int) port->id.pid, (int) port->id.id);
1496 
1497             return NXT_UNIT_OK;
1498         }
1499 
1500         nxt_unit_debug(ctx, "check_response_port: "
1501                        "port{%d,%d} already requested",
1502                        (int) port->id.pid, (int) port->id.id);
1503 
1504         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1505 
1506         nxt_queue_insert_tail(&port_impl->awaiting_req,
1507                               &req_impl->port_wait_link);
1508 
1509         pthread_mutex_unlock(&lib->mutex);
1510 
1511         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1512 
1513         return NXT_UNIT_AGAIN;
1514     }
1515 
1516     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1517     if (nxt_slow_path(port_impl == NULL)) {
1518         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1519                        (int) sizeof(nxt_unit_port_impl_t));
1520 
1521         pthread_mutex_unlock(&lib->mutex);
1522 
1523         return NXT_UNIT_ERROR;
1524     }
1525 
1526     port = &port_impl->port;
1527 
1528     port->id = *port_id;
1529     port->in_fd = -1;
1530     port->out_fd = -1;
1531     port->data = NULL;
1532 
1533     res = nxt_unit_port_hash_add(&lib->ports, port);
1534     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1535         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1536                        port->id.pid, port->id.id);
1537 
1538         pthread_mutex_unlock(&lib->mutex);
1539 
1540         nxt_unit_free(ctx, port);
1541 
1542         return NXT_UNIT_ERROR;
1543     }
1544 
1545     process = nxt_unit_process_find(lib, port_id->pid, 0);
1546     if (nxt_slow_path(process == NULL)) {
1547         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1548                        port->id.pid);
1549 
1550         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1551 
1552         pthread_mutex_unlock(&lib->mutex);
1553 
1554         nxt_unit_free(ctx, port);
1555 
1556         return NXT_UNIT_ERROR;
1557     }
1558 
1559     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1560 
1561     port_impl->process = process;
1562     port_impl->queue = NULL;
1563     port_impl->from_socket = 0;
1564     port_impl->socket_rbuf = NULL;
1565 
1566     nxt_queue_init(&port_impl->awaiting_req);
1567 
1568     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1569 
1570     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1571 
1572     port_impl->use_count = 2;
1573     port_impl->ready = 0;
1574 
1575     req->response_port = port;
1576 
1577     pthread_mutex_unlock(&lib->mutex);
1578 
1579     res = nxt_unit_get_port(ctx, port_id);
1580     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1581         return NXT_UNIT_ERROR;
1582     }
1583 
1584     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1585 
1586     return NXT_UNIT_AGAIN;
1587 }
1588 
1589 
1590 static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t * req)1591 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1592 {
1593     ssize_t                       res;
1594     nxt_port_msg_t                msg;
1595     nxt_unit_impl_t               *lib;
1596     nxt_unit_ctx_impl_t           *ctx_impl;
1597     nxt_unit_request_info_impl_t  *req_impl;
1598 
1599     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1600     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1601     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1602 
1603     memset(&msg, 0, sizeof(nxt_port_msg_t));
1604 
1605     msg.stream = req_impl->stream;
1606     msg.pid = lib->pid;
1607     msg.reply_port = ctx_impl->read_port->id.id;
1608     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1609 
1610     res = nxt_unit_port_send(req->ctx, req->response_port,
1611                              &msg, sizeof(msg), NULL);
1612     if (nxt_slow_path(res != sizeof(msg))) {
1613         return NXT_UNIT_ERROR;
1614     }
1615 
1616     return NXT_UNIT_OK;
1617 }
1618 
1619 
1620 static int
nxt_unit_process_websocket(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1621 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1622 {
1623     size_t                           hsize;
1624     nxt_unit_impl_t                  *lib;
1625     nxt_unit_mmap_buf_t              *b;
1626     nxt_unit_callbacks_t             *cb;
1627     nxt_unit_request_info_t          *req;
1628     nxt_unit_request_info_impl_t     *req_impl;
1629     nxt_unit_websocket_frame_impl_t  *ws_impl;
1630 
1631     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1632     if (nxt_slow_path(req == NULL)) {
1633         return NXT_UNIT_OK;
1634     }
1635 
1636     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1637 
1638     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1639     cb = &lib->callbacks;
1640 
1641     if (cb->websocket_handler && recv_msg->size >= 2) {
1642         ws_impl = nxt_unit_websocket_frame_get(ctx);
1643         if (nxt_slow_path(ws_impl == NULL)) {
1644             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1645                           req_impl->stream);
1646 
1647             return NXT_UNIT_ERROR;
1648         }
1649 
1650         ws_impl->ws.req = req;
1651 
1652         ws_impl->buf = NULL;
1653 
1654         if (recv_msg->mmap) {
1655             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1656                 b->req = req;
1657             }
1658 
1659             /* "Move" incoming buffer list to ws_impl. */
1660             ws_impl->buf = recv_msg->incoming_buf;
1661             ws_impl->buf->prev = &ws_impl->buf;
1662             recv_msg->incoming_buf = NULL;
1663 
1664             b = ws_impl->buf;
1665 
1666         } else {
1667             b = nxt_unit_mmap_buf_get(ctx);
1668             if (nxt_slow_path(b == NULL)) {
1669                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1670                                req_impl->stream);
1671 
1672                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1673 
1674                 return NXT_UNIT_ERROR;
1675             }
1676 
1677             b->req = req;
1678             b->buf.start = recv_msg->start;
1679             b->buf.free = b->buf.start;
1680             b->buf.end = b->buf.start + recv_msg->size;
1681 
1682             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1683         }
1684 
1685         ws_impl->ws.header = (void *) b->buf.start;
1686         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1687             ws_impl->ws.header);
1688 
1689         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1690 
1691         if (ws_impl->ws.header->mask) {
1692             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1693 
1694         } else {
1695             ws_impl->ws.mask = NULL;
1696         }
1697 
1698         b->buf.free += hsize;
1699 
1700         ws_impl->ws.content_buf = &b->buf;
1701         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1702 
1703         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1704                            "payload_len=%"PRIu64,
1705                             ws_impl->ws.header->opcode,
1706                             ws_impl->ws.payload_len);
1707 
1708         cb->websocket_handler(&ws_impl->ws);
1709     }
1710 
1711     if (recv_msg->last) {
1712         if (cb->close_handler) {
1713             nxt_unit_req_debug(req, "close_handler");
1714 
1715             cb->close_handler(req);
1716 
1717         } else {
1718             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1719         }
1720     }
1721 
1722     return NXT_UNIT_OK;
1723 }
1724 
1725 
1726 static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t * ctx)1727 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1728 {
1729     nxt_unit_impl_t       *lib;
1730     nxt_unit_callbacks_t  *cb;
1731 
1732     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1733     cb = &lib->callbacks;
1734 
1735     if (cb->shm_ack_handler != NULL) {
1736         cb->shm_ack_handler(ctx);
1737     }
1738 
1739     return NXT_UNIT_OK;
1740 }
1741 
1742 
1743 static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t * ctx)1744 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1745 {
1746     nxt_unit_impl_t               *lib;
1747     nxt_queue_link_t              *lnk;
1748     nxt_unit_ctx_impl_t           *ctx_impl;
1749     nxt_unit_request_info_impl_t  *req_impl;
1750 
1751     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1752 
1753     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1754 
1755     pthread_mutex_lock(&ctx_impl->mutex);
1756 
1757     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1758         pthread_mutex_unlock(&ctx_impl->mutex);
1759 
1760         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1761                                         + lib->request_data_size);
1762         if (nxt_slow_path(req_impl == NULL)) {
1763             return NULL;
1764         }
1765 
1766         req_impl->req.unit = ctx->unit;
1767         req_impl->req.ctx = ctx;
1768 
1769         pthread_mutex_lock(&ctx_impl->mutex);
1770 
1771     } else {
1772         lnk = nxt_queue_first(&ctx_impl->free_req);
1773         nxt_queue_remove(lnk);
1774 
1775         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1776     }
1777 
1778     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1779 
1780     pthread_mutex_unlock(&ctx_impl->mutex);
1781 
1782     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1783 
1784     return req_impl;
1785 }
1786 
1787 
1788 static void
nxt_unit_request_info_release(nxt_unit_request_info_t * req)1789 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1790 {
1791     nxt_unit_ctx_t                *ctx;
1792     nxt_unit_ctx_impl_t           *ctx_impl;
1793     nxt_unit_request_info_impl_t  *req_impl;
1794 
1795     ctx = req->ctx;
1796     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1797     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1798 
1799     req->response = NULL;
1800     req->response_buf = NULL;
1801 
1802     if (req_impl->in_hash) {
1803         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1804     }
1805 
1806     while (req_impl->outgoing_buf != NULL) {
1807         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1808     }
1809 
1810     while (req_impl->incoming_buf != NULL) {
1811         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1812     }
1813 
1814     if (req->content_fd != -1) {
1815         nxt_unit_close(req->content_fd);
1816 
1817         req->content_fd = -1;
1818     }
1819 
1820     if (req->response_port != NULL) {
1821         nxt_unit_port_release(req->response_port);
1822 
1823         req->response_port = NULL;
1824     }
1825 
1826     req_impl->state = NXT_UNIT_RS_RELEASED;
1827 
1828     pthread_mutex_lock(&ctx_impl->mutex);
1829 
1830     nxt_queue_remove(&req_impl->link);
1831 
1832     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1833 
1834     pthread_mutex_unlock(&ctx_impl->mutex);
1835 
1836     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1837         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1838     }
1839 }
1840 
1841 
1842 static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t * req_impl)1843 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1844 {
1845     nxt_unit_ctx_impl_t  *ctx_impl;
1846 
1847     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1848 
1849     nxt_queue_remove(&req_impl->link);
1850 
1851     if (req_impl != &ctx_impl->req) {
1852         nxt_unit_free(&ctx_impl->ctx, req_impl);
1853     }
1854 }
1855 
1856 
1857 static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t * ctx)1858 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1859 {
1860     nxt_queue_link_t                 *lnk;
1861     nxt_unit_ctx_impl_t              *ctx_impl;
1862     nxt_unit_websocket_frame_impl_t  *ws_impl;
1863 
1864     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1865 
1866     pthread_mutex_lock(&ctx_impl->mutex);
1867 
1868     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1869         pthread_mutex_unlock(&ctx_impl->mutex);
1870 
1871         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1872         if (nxt_slow_path(ws_impl == NULL)) {
1873             return NULL;
1874         }
1875 
1876     } else {
1877         lnk = nxt_queue_first(&ctx_impl->free_ws);
1878         nxt_queue_remove(lnk);
1879 
1880         pthread_mutex_unlock(&ctx_impl->mutex);
1881 
1882         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1883     }
1884 
1885     ws_impl->ctx_impl = ctx_impl;
1886 
1887     return ws_impl;
1888 }
1889 
1890 
1891 static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t * ws)1892 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1893 {
1894     nxt_unit_websocket_frame_impl_t  *ws_impl;
1895 
1896     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1897 
1898     while (ws_impl->buf != NULL) {
1899         nxt_unit_mmap_buf_free(ws_impl->buf);
1900     }
1901 
1902     ws->req = NULL;
1903 
1904     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1905 
1906     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1907 
1908     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1909 }
1910 
1911 
1912 static void
nxt_unit_websocket_frame_free(nxt_unit_ctx_t * ctx,nxt_unit_websocket_frame_impl_t * ws_impl)1913 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1914     nxt_unit_websocket_frame_impl_t *ws_impl)
1915 {
1916     nxt_queue_remove(&ws_impl->link);
1917 
1918     nxt_unit_free(ctx, ws_impl);
1919 }
1920 
1921 
1922 uint16_t
nxt_unit_field_hash(const char * name,size_t name_length)1923 nxt_unit_field_hash(const char *name, size_t name_length)
1924 {
1925     u_char      ch;
1926     uint32_t    hash;
1927     const char  *p, *end;
1928 
1929     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1930     end = name + name_length;
1931 
1932     for (p = name; p < end; p++) {
1933         ch = *p;
1934         hash = (hash << 4) + hash + nxt_lowcase(ch);
1935     }
1936 
1937     hash = (hash >> 16) ^ hash;
1938 
1939     return hash;
1940 }
1941 
1942 
1943 void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t * req)1944 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1945 {
1946     char                *name;
1947     uint32_t            i, j;
1948     nxt_unit_field_t    *fields, f;
1949     nxt_unit_request_t  *r;
1950 
1951     static nxt_str_t  content_length = nxt_string("content-length");
1952     static nxt_str_t  content_type = nxt_string("content-type");
1953     static nxt_str_t  cookie = nxt_string("cookie");
1954 
1955     nxt_unit_req_debug(req, "group_dup_fields");
1956 
1957     r = req->request;
1958     fields = r->fields;
1959 
1960     for (i = 0; i < r->fields_count; i++) {
1961         name = nxt_unit_sptr_get(&fields[i].name);
1962 
1963         switch (fields[i].hash) {
1964         case NXT_UNIT_HASH_CONTENT_LENGTH:
1965             if (fields[i].name_length == content_length.length
1966                 && nxt_unit_memcasecmp(name, content_length.start,
1967                                        content_length.length) == 0)
1968             {
1969                 r->content_length_field = i;
1970             }
1971 
1972             break;
1973 
1974         case NXT_UNIT_HASH_CONTENT_TYPE:
1975             if (fields[i].name_length == content_type.length
1976                 && nxt_unit_memcasecmp(name, content_type.start,
1977                                        content_type.length) == 0)
1978             {
1979                 r->content_type_field = i;
1980             }
1981 
1982             break;
1983 
1984         case NXT_UNIT_HASH_COOKIE:
1985             if (fields[i].name_length == cookie.length
1986                 && nxt_unit_memcasecmp(name, cookie.start,
1987                                        cookie.length) == 0)
1988             {
1989                 r->cookie_field = i;
1990             }
1991 
1992             break;
1993         }
1994 
1995         for (j = i + 1; j < r->fields_count; j++) {
1996             if (fields[i].hash != fields[j].hash
1997                 || fields[i].name_length != fields[j].name_length
1998                 || nxt_unit_memcasecmp(name,
1999                                        nxt_unit_sptr_get(&fields[j].name),
2000                                        fields[j].name_length) != 0)
2001             {
2002                 continue;
2003             }
2004 
2005             f = fields[j];
2006             f.value.offset += (j - (i + 1)) * sizeof(f);
2007 
2008             while (j > i + 1) {
2009                 fields[j] = fields[j - 1];
2010                 fields[j].name.offset -= sizeof(f);
2011                 fields[j].value.offset -= sizeof(f);
2012                 j--;
2013             }
2014 
2015             fields[j] = f;
2016 
2017             /* Assign the same name pointer for further grouping simplicity. */
2018             nxt_unit_sptr_set(&fields[j].name, name);
2019 
2020             i++;
2021         }
2022     }
2023 }
2024 
2025 
2026 int
nxt_unit_response_init(nxt_unit_request_info_t * req,uint16_t status,uint32_t max_fields_count,uint32_t max_fields_size)2027 nxt_unit_response_init(nxt_unit_request_info_t *req,
2028     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2029 {
2030     uint32_t                      buf_size;
2031     nxt_unit_buf_t                *buf;
2032     nxt_unit_request_info_impl_t  *req_impl;
2033 
2034     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2035 
2036     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2037         nxt_unit_req_warn(req, "init: response already sent");
2038 
2039         return NXT_UNIT_ERROR;
2040     }
2041 
2042     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2043                        (int) max_fields_count, (int) max_fields_size);
2044 
2045     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2046         nxt_unit_req_debug(req, "duplicate response init");
2047     }
2048 
2049     /*
2050      * Each field name and value 0-terminated by libunit,
2051      * this is the reason of '+ 2' below.
2052      */
2053     buf_size = sizeof(nxt_unit_response_t)
2054                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2055                + max_fields_size;
2056 
2057     if (nxt_slow_path(req->response_buf != NULL)) {
2058         buf = req->response_buf;
2059 
2060         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2061             goto init_response;
2062         }
2063 
2064         nxt_unit_buf_free(buf);
2065 
2066         req->response_buf = NULL;
2067         req->response = NULL;
2068         req->response_max_fields = 0;
2069 
2070         req_impl->state = NXT_UNIT_RS_START;
2071     }
2072 
2073     buf = nxt_unit_response_buf_alloc(req, buf_size);
2074     if (nxt_slow_path(buf == NULL)) {
2075         return NXT_UNIT_ERROR;
2076     }
2077 
2078 init_response:
2079 
2080     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2081 
2082     req->response_buf = buf;
2083 
2084     req->response = (nxt_unit_response_t *) buf->start;
2085     req->response->status = status;
2086 
2087     buf->free = buf->start + sizeof(nxt_unit_response_t)
2088                 + max_fields_count * sizeof(nxt_unit_field_t);
2089 
2090     req->response_max_fields = max_fields_count;
2091     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2092 
2093     return NXT_UNIT_OK;
2094 }
2095 
2096 
2097 int
nxt_unit_response_realloc(nxt_unit_request_info_t * req,uint32_t max_fields_count,uint32_t max_fields_size)2098 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2099     uint32_t max_fields_count, uint32_t max_fields_size)
2100 {
2101     char                          *p;
2102     uint32_t                      i, buf_size;
2103     nxt_unit_buf_t                *buf;
2104     nxt_unit_field_t              *f, *src;
2105     nxt_unit_response_t           *resp;
2106     nxt_unit_request_info_impl_t  *req_impl;
2107 
2108     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2109 
2110     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2111         nxt_unit_req_warn(req, "realloc: response not init");
2112 
2113         return NXT_UNIT_ERROR;
2114     }
2115 
2116     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2117         nxt_unit_req_warn(req, "realloc: response already sent");
2118 
2119         return NXT_UNIT_ERROR;
2120     }
2121 
2122     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2123         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2124 
2125         return NXT_UNIT_ERROR;
2126     }
2127 
2128     /*
2129      * Each field name and value 0-terminated by libunit,
2130      * this is the reason of '+ 2' below.
2131      */
2132     buf_size = sizeof(nxt_unit_response_t)
2133                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2134                + max_fields_size;
2135 
2136     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2137 
2138     buf = nxt_unit_response_buf_alloc(req, buf_size);
2139     if (nxt_slow_path(buf == NULL)) {
2140         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2141         return NXT_UNIT_ERROR;
2142     }
2143 
2144     resp = (nxt_unit_response_t *) buf->start;
2145 
2146     memset(resp, 0, sizeof(nxt_unit_response_t));
2147 
2148     resp->status = req->response->status;
2149     resp->content_length = req->response->content_length;
2150 
2151     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2152     f = resp->fields;
2153 
2154     for (i = 0; i < req->response->fields_count; i++) {
2155         src = req->response->fields + i;
2156 
2157         if (nxt_slow_path(src->skip != 0)) {
2158             continue;
2159         }
2160 
2161         if (nxt_slow_path(src->name_length + src->value_length + 2
2162                           > (uint32_t) (buf->end - p)))
2163         {
2164             nxt_unit_req_warn(req, "realloc: not enough space for field"
2165                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2166                   i, src, src->name_length, src->value_length);
2167 
2168             goto fail;
2169         }
2170 
2171         nxt_unit_sptr_set(&f->name, p);
2172         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2173         *p++ = '\0';
2174 
2175         nxt_unit_sptr_set(&f->value, p);
2176         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2177         *p++ = '\0';
2178 
2179         f->hash = src->hash;
2180         f->skip = 0;
2181         f->name_length = src->name_length;
2182         f->value_length = src->value_length;
2183 
2184         resp->fields_count++;
2185         f++;
2186     }
2187 
2188     if (req->response->piggyback_content_length > 0) {
2189         if (nxt_slow_path(req->response->piggyback_content_length
2190                           > (uint32_t) (buf->end - p)))
2191         {
2192             nxt_unit_req_warn(req, "realloc: not enought space for content"
2193                   " #%"PRIu32", %"PRIu32" required",
2194                   i, req->response->piggyback_content_length);
2195 
2196             goto fail;
2197         }
2198 
2199         resp->piggyback_content_length =
2200                                        req->response->piggyback_content_length;
2201 
2202         nxt_unit_sptr_set(&resp->piggyback_content, p);
2203         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2204                        req->response->piggyback_content_length);
2205     }
2206 
2207     buf->free = p;
2208 
2209     nxt_unit_buf_free(req->response_buf);
2210 
2211     req->response = resp;
2212     req->response_buf = buf;
2213     req->response_max_fields = max_fields_count;
2214 
2215     return NXT_UNIT_OK;
2216 
2217 fail:
2218 
2219     nxt_unit_buf_free(buf);
2220 
2221     return NXT_UNIT_ERROR;
2222 }
2223 
2224 
2225 int
nxt_unit_response_is_init(nxt_unit_request_info_t * req)2226 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2227 {
2228     nxt_unit_request_info_impl_t  *req_impl;
2229 
2230     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2231 
2232     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2233 }
2234 
2235 
2236 int
nxt_unit_response_add_field(nxt_unit_request_info_t * req,const char * name,uint8_t name_length,const char * value,uint32_t value_length)2237 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2238     const char *name, uint8_t name_length,
2239     const char *value, uint32_t value_length)
2240 {
2241     nxt_unit_buf_t                *buf;
2242     nxt_unit_field_t              *f;
2243     nxt_unit_response_t           *resp;
2244     nxt_unit_request_info_impl_t  *req_impl;
2245 
2246     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2247 
2248     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2249         nxt_unit_req_warn(req, "add_field: response not initialized or "
2250                           "already sent");
2251 
2252         return NXT_UNIT_ERROR;
2253     }
2254 
2255     resp = req->response;
2256 
2257     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2258         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2259                           (int) resp->fields_count);
2260 
2261         return NXT_UNIT_ERROR;
2262     }
2263 
2264     buf = req->response_buf;
2265 
2266     if (nxt_slow_path(name_length + value_length + 2
2267                       > (uint32_t) (buf->end - buf->free)))
2268     {
2269         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2270 
2271         return NXT_UNIT_ERROR;
2272     }
2273 
2274     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2275                        resp->fields_count,
2276                        (int) name_length, name,
2277                        (int) value_length, value);
2278 
2279     f = resp->fields + resp->fields_count;
2280 
2281     nxt_unit_sptr_set(&f->name, buf->free);
2282     buf->free = nxt_cpymem(buf->free, name, name_length);
2283     *buf->free++ = '\0';
2284 
2285     nxt_unit_sptr_set(&f->value, buf->free);
2286     buf->free = nxt_cpymem(buf->free, value, value_length);
2287     *buf->free++ = '\0';
2288 
2289     f->hash = nxt_unit_field_hash(name, name_length);
2290     f->skip = 0;
2291     f->name_length = name_length;
2292     f->value_length = value_length;
2293 
2294     resp->fields_count++;
2295 
2296     return NXT_UNIT_OK;
2297 }
2298 
2299 
2300 int
nxt_unit_response_add_content(nxt_unit_request_info_t * req,const void * src,uint32_t size)2301 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2302     const void* src, uint32_t size)
2303 {
2304     nxt_unit_buf_t                *buf;
2305     nxt_unit_response_t           *resp;
2306     nxt_unit_request_info_impl_t  *req_impl;
2307 
2308     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2309 
2310     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2311         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2312 
2313         return NXT_UNIT_ERROR;
2314     }
2315 
2316     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2317         nxt_unit_req_warn(req, "add_content: response already sent");
2318 
2319         return NXT_UNIT_ERROR;
2320     }
2321 
2322     buf = req->response_buf;
2323 
2324     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2325         nxt_unit_req_warn(req, "add_content: buffer overflow");
2326 
2327         return NXT_UNIT_ERROR;
2328     }
2329 
2330     resp = req->response;
2331 
2332     if (resp->piggyback_content_length == 0) {
2333         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2334         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2335     }
2336 
2337     resp->piggyback_content_length += size;
2338 
2339     buf->free = nxt_cpymem(buf->free, src, size);
2340 
2341     return NXT_UNIT_OK;
2342 }
2343 
2344 
2345 int
nxt_unit_response_send(nxt_unit_request_info_t * req)2346 nxt_unit_response_send(nxt_unit_request_info_t *req)
2347 {
2348     int                           rc;
2349     nxt_unit_mmap_buf_t           *mmap_buf;
2350     nxt_unit_request_info_impl_t  *req_impl;
2351 
2352     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2353 
2354     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2355         nxt_unit_req_warn(req, "send: response is not initialized yet");
2356 
2357         return NXT_UNIT_ERROR;
2358     }
2359 
2360     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2361         nxt_unit_req_warn(req, "send: response already sent");
2362 
2363         return NXT_UNIT_ERROR;
2364     }
2365 
2366     if (req->request->websocket_handshake && req->response->status == 101) {
2367         nxt_unit_response_upgrade(req);
2368     }
2369 
2370     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2371                        req->response->fields_count,
2372                        (int) (req->response_buf->free
2373                               - req->response_buf->start));
2374 
2375     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2376 
2377     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2378     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2379         req->response = NULL;
2380         req->response_buf = NULL;
2381         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2382 
2383         nxt_unit_mmap_buf_free(mmap_buf);
2384     }
2385 
2386     return rc;
2387 }
2388 
2389 
2390 int
nxt_unit_response_is_sent(nxt_unit_request_info_t * req)2391 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2392 {
2393     nxt_unit_request_info_impl_t  *req_impl;
2394 
2395     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2396 
2397     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2398 }
2399 
2400 
2401 nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t * req,uint32_t size)2402 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2403 {
2404     int                           rc;
2405     nxt_unit_mmap_buf_t           *mmap_buf;
2406     nxt_unit_request_info_impl_t  *req_impl;
2407 
2408     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2409         nxt_unit_req_warn(req, "response_buf_alloc: "
2410                           "requested buffer (%"PRIu32") too big", size);
2411 
2412         return NULL;
2413     }
2414 
2415     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2416 
2417     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2418 
2419     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2420     if (nxt_slow_path(mmap_buf == NULL)) {
2421         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2422 
2423         return NULL;
2424     }
2425 
2426     mmap_buf->req = req;
2427 
2428     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2429 
2430     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2431                                    size, size, mmap_buf,
2432                                    NULL);
2433     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2434         nxt_unit_mmap_buf_release(mmap_buf);
2435 
2436         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2437 
2438         return NULL;
2439     }
2440 
2441     return &mmap_buf->buf;
2442 }
2443 
2444 
2445 static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t * ctx)2446 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2447 {
2448     nxt_unit_mmap_buf_t  *mmap_buf;
2449     nxt_unit_ctx_impl_t  *ctx_impl;
2450 
2451     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2452 
2453     pthread_mutex_lock(&ctx_impl->mutex);
2454 
2455     if (ctx_impl->free_buf == NULL) {
2456         pthread_mutex_unlock(&ctx_impl->mutex);
2457 
2458         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2459         if (nxt_slow_path(mmap_buf == NULL)) {
2460             return NULL;
2461         }
2462 
2463     } else {
2464         mmap_buf = ctx_impl->free_buf;
2465 
2466         nxt_unit_mmap_buf_unlink(mmap_buf);
2467 
2468         pthread_mutex_unlock(&ctx_impl->mutex);
2469     }
2470 
2471     mmap_buf->ctx_impl = ctx_impl;
2472 
2473     mmap_buf->hdr = NULL;
2474     mmap_buf->free_ptr = NULL;
2475 
2476     return mmap_buf;
2477 }
2478 
2479 
2480 static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t * mmap_buf)2481 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2482 {
2483     nxt_unit_mmap_buf_unlink(mmap_buf);
2484 
2485     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2486 
2487     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2488 
2489     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2490 }
2491 
2492 
2493 int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t * req)2494 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2495 {
2496     return req->request->websocket_handshake;
2497 }
2498 
2499 
2500 int
nxt_unit_response_upgrade(nxt_unit_request_info_t * req)2501 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2502 {
2503     int                           rc;
2504     nxt_unit_request_info_impl_t  *req_impl;
2505 
2506     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2507 
2508     if (nxt_slow_path(req_impl->websocket != 0)) {
2509         nxt_unit_req_debug(req, "upgrade: already upgraded");
2510 
2511         return NXT_UNIT_OK;
2512     }
2513 
2514     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2515         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2516 
2517         return NXT_UNIT_ERROR;
2518     }
2519 
2520     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2521         nxt_unit_req_warn(req, "upgrade: response already sent");
2522 
2523         return NXT_UNIT_ERROR;
2524     }
2525 
2526     rc = nxt_unit_request_hash_add(req->ctx, req);
2527     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2528         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2529 
2530         return NXT_UNIT_ERROR;
2531     }
2532 
2533     req_impl->websocket = 1;
2534 
2535     req->response->status = 101;
2536 
2537     return NXT_UNIT_OK;
2538 }
2539 
2540 
2541 int
nxt_unit_response_is_websocket(nxt_unit_request_info_t * req)2542 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2543 {
2544     nxt_unit_request_info_impl_t  *req_impl;
2545 
2546     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2547 
2548     return req_impl->websocket;
2549 }
2550 
2551 
2552 nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void * data)2553 nxt_unit_get_request_info_from_data(void *data)
2554 {
2555     nxt_unit_request_info_impl_t  *req_impl;
2556 
2557     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2558 
2559     return &req_impl->req;
2560 }
2561 
2562 
2563 int
nxt_unit_buf_send(nxt_unit_buf_t * buf)2564 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2565 {
2566     int                           rc;
2567     nxt_unit_mmap_buf_t           *mmap_buf;
2568     nxt_unit_request_info_t       *req;
2569     nxt_unit_request_info_impl_t  *req_impl;
2570 
2571     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2572 
2573     req = mmap_buf->req;
2574     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2575 
2576     nxt_unit_req_debug(req, "buf_send: %d bytes",
2577                        (int) (buf->free - buf->start));
2578 
2579     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2580         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2581 
2582         return NXT_UNIT_ERROR;
2583     }
2584 
2585     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2586         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2587 
2588         return NXT_UNIT_ERROR;
2589     }
2590 
2591     if (nxt_fast_path(buf->free > buf->start)) {
2592         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2593         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2594             return rc;
2595         }
2596     }
2597 
2598     nxt_unit_mmap_buf_free(mmap_buf);
2599 
2600     return NXT_UNIT_OK;
2601 }
2602 
2603 
2604 static void
nxt_unit_buf_send_done(nxt_unit_buf_t * buf)2605 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2606 {
2607     int                      rc;
2608     nxt_unit_mmap_buf_t      *mmap_buf;
2609     nxt_unit_request_info_t  *req;
2610 
2611     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2612 
2613     req = mmap_buf->req;
2614 
2615     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2616     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2617         nxt_unit_mmap_buf_free(mmap_buf);
2618 
2619         nxt_unit_request_info_release(req);
2620 
2621     } else {
2622         nxt_unit_request_done(req, rc);
2623     }
2624 }
2625 
2626 
2627 static int
nxt_unit_mmap_buf_send(nxt_unit_request_info_t * req,nxt_unit_mmap_buf_t * mmap_buf,int last)2628 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2629     nxt_unit_mmap_buf_t *mmap_buf, int last)
2630 {
2631     struct {
2632         nxt_port_msg_t       msg;
2633         nxt_port_mmap_msg_t  mmap_msg;
2634     } m;
2635 
2636     int                           rc;
2637     u_char                        *last_used, *first_free;
2638     ssize_t                       res;
2639     nxt_chunk_id_t                first_free_chunk;
2640     nxt_unit_buf_t                *buf;
2641     nxt_unit_impl_t               *lib;
2642     nxt_port_mmap_header_t        *hdr;
2643     nxt_unit_request_info_impl_t  *req_impl;
2644 
2645     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2646     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2647 
2648     buf = &mmap_buf->buf;
2649     hdr = mmap_buf->hdr;
2650 
2651     m.mmap_msg.size = buf->free - buf->start;
2652 
2653     m.msg.stream = req_impl->stream;
2654     m.msg.pid = lib->pid;
2655     m.msg.reply_port = 0;
2656     m.msg.type = _NXT_PORT_MSG_DATA;
2657     m.msg.last = last != 0;
2658     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2659     m.msg.nf = 0;
2660     m.msg.mf = 0;
2661 
2662     rc = NXT_UNIT_ERROR;
2663 
2664     if (m.msg.mmap) {
2665         m.mmap_msg.mmap_id = hdr->id;
2666         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2667                                                      (u_char *) buf->start);
2668 
2669         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2670                        req_impl->stream,
2671                        (int) m.mmap_msg.mmap_id,
2672                        (int) m.mmap_msg.chunk_id,
2673                        (int) m.mmap_msg.size);
2674 
2675         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2676                                  NULL);
2677         if (nxt_slow_path(res != sizeof(m))) {
2678             goto free_buf;
2679         }
2680 
2681         last_used = (u_char *) buf->free - 1;
2682         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2683 
2684         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2685             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2686 
2687             buf->start = (char *) first_free;
2688             buf->free = buf->start;
2689 
2690             if (buf->end < buf->start) {
2691                 buf->end = buf->start;
2692             }
2693 
2694         } else {
2695             buf->start = NULL;
2696             buf->free = NULL;
2697             buf->end = NULL;
2698 
2699             mmap_buf->hdr = NULL;
2700         }
2701 
2702         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2703                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2704 
2705         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2706                        (int) lib->outgoing.allocated_chunks);
2707 
2708     } else {
2709         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2710                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2711         {
2712             nxt_unit_alert(req->ctx,
2713                            "#%"PRIu32": failed to send plain memory buffer"
2714                            ": no space reserved for message header",
2715                            req_impl->stream);
2716 
2717             goto free_buf;
2718         }
2719 
2720         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2721 
2722         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2723                        req_impl->stream,
2724                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2725 
2726         res = nxt_unit_port_send(req->ctx, req->response_port,
2727                                  buf->start - sizeof(m.msg),
2728                                  m.mmap_msg.size + sizeof(m.msg), NULL);
2729 
2730         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2731             goto free_buf;
2732         }
2733     }
2734 
2735     rc = NXT_UNIT_OK;
2736 
2737 free_buf:
2738 
2739     nxt_unit_free_outgoing_buf(mmap_buf);
2740 
2741     return rc;
2742 }
2743 
2744 
2745 void
nxt_unit_buf_free(nxt_unit_buf_t * buf)2746 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2747 {
2748     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2749 }
2750 
2751 
2752 static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t * mmap_buf)2753 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2754 {
2755     nxt_unit_free_outgoing_buf(mmap_buf);
2756 
2757     nxt_unit_mmap_buf_release(mmap_buf);
2758 }
2759 
2760 
2761 static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t * mmap_buf)2762 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2763 {
2764     if (mmap_buf->hdr != NULL) {
2765         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2766                               mmap_buf->hdr, mmap_buf->buf.start,
2767                               mmap_buf->buf.end - mmap_buf->buf.start);
2768 
2769         mmap_buf->hdr = NULL;
2770 
2771         return;
2772     }
2773 
2774     if (mmap_buf->free_ptr != NULL) {
2775         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2776 
2777         mmap_buf->free_ptr = NULL;
2778     }
2779 }
2780 
2781 
2782 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t * ctx)2783 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2784 {
2785     nxt_unit_ctx_impl_t  *ctx_impl;
2786     nxt_unit_read_buf_t  *rbuf;
2787 
2788     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2789 
2790     pthread_mutex_lock(&ctx_impl->mutex);
2791 
2792     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2793 
2794     pthread_mutex_unlock(&ctx_impl->mutex);
2795 
2796     rbuf->oob.size = 0;
2797 
2798     return rbuf;
2799 }
2800 
2801 
2802 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t * ctx_impl)2803 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2804 {
2805     nxt_queue_link_t     *link;
2806     nxt_unit_read_buf_t  *rbuf;
2807 
2808     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2809         link = nxt_queue_first(&ctx_impl->free_rbuf);
2810         nxt_queue_remove(link);
2811 
2812         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2813 
2814         return rbuf;
2815     }
2816 
2817     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2818 
2819     if (nxt_fast_path(rbuf != NULL)) {
2820         rbuf->ctx_impl = ctx_impl;
2821     }
2822 
2823     return rbuf;
2824 }
2825 
2826 
2827 static void
nxt_unit_read_buf_release(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)2828 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2829     nxt_unit_read_buf_t *rbuf)
2830 {
2831     nxt_unit_ctx_impl_t  *ctx_impl;
2832 
2833     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2834 
2835     pthread_mutex_lock(&ctx_impl->mutex);
2836 
2837     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2838 
2839     pthread_mutex_unlock(&ctx_impl->mutex);
2840 }
2841 
2842 
2843 nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t * buf)2844 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2845 {
2846     nxt_unit_mmap_buf_t  *mmap_buf;
2847 
2848     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2849 
2850     if (mmap_buf->next == NULL) {
2851         return NULL;
2852     }
2853 
2854     return &mmap_buf->next->buf;
2855 }
2856 
2857 
2858 uint32_t
nxt_unit_buf_max(void)2859 nxt_unit_buf_max(void)
2860 {
2861     return PORT_MMAP_DATA_SIZE;
2862 }
2863 
2864 
2865 uint32_t
nxt_unit_buf_min(void)2866 nxt_unit_buf_min(void)
2867 {
2868     return PORT_MMAP_CHUNK_SIZE;
2869 }
2870 
2871 
2872 int
nxt_unit_response_write(nxt_unit_request_info_t * req,const void * start,size_t size)2873 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2874     size_t size)
2875 {
2876     ssize_t  res;
2877 
2878     res = nxt_unit_response_write_nb(req, start, size, size);
2879 
2880     return res < 0 ? -res : NXT_UNIT_OK;
2881 }
2882 
2883 
2884 ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t * req,const void * start,size_t size,size_t min_size)2885 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2886     size_t size, size_t min_size)
2887 {
2888     int                           rc;
2889     ssize_t                       sent;
2890     uint32_t                      part_size, min_part_size, buf_size;
2891     const char                    *part_start;
2892     nxt_unit_mmap_buf_t           mmap_buf;
2893     nxt_unit_request_info_impl_t  *req_impl;
2894     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2895 
2896     nxt_unit_req_debug(req, "write: %d", (int) size);
2897 
2898     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2899 
2900     part_start = start;
2901     sent = 0;
2902 
2903     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2904         nxt_unit_req_alert(req, "write: response not initialized yet");
2905 
2906         return -NXT_UNIT_ERROR;
2907     }
2908 
2909     /* Check if response is not send yet. */
2910     if (nxt_slow_path(req->response_buf != NULL)) {
2911         part_size = req->response_buf->end - req->response_buf->free;
2912         part_size = nxt_min(size, part_size);
2913 
2914         rc = nxt_unit_response_add_content(req, part_start, part_size);
2915         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2916             return -rc;
2917         }
2918 
2919         rc = nxt_unit_response_send(req);
2920         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2921             return -rc;
2922         }
2923 
2924         size -= part_size;
2925         part_start += part_size;
2926         sent += part_size;
2927 
2928         min_size -= nxt_min(min_size, part_size);
2929     }
2930 
2931     while (size > 0) {
2932         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2933         min_part_size = nxt_min(min_size, part_size);
2934         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2935 
2936         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2937                                        min_part_size, &mmap_buf, local_buf);
2938         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2939             return -rc;
2940         }
2941 
2942         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2943         if (nxt_slow_path(buf_size == 0)) {
2944             return sent;
2945         }
2946         part_size = nxt_min(buf_size, part_size);
2947 
2948         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2949                                        part_start, part_size);
2950 
2951         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2952         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2953             return -rc;
2954         }
2955 
2956         size -= part_size;
2957         part_start += part_size;
2958         sent += part_size;
2959 
2960         min_size -= nxt_min(min_size, part_size);
2961     }
2962 
2963     return sent;
2964 }
2965 
2966 
2967 int
nxt_unit_response_write_cb(nxt_unit_request_info_t * req,nxt_unit_read_info_t * read_info)2968 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2969     nxt_unit_read_info_t *read_info)
2970 {
2971     int                           rc;
2972     ssize_t                       n;
2973     uint32_t                      buf_size;
2974     nxt_unit_buf_t                *buf;
2975     nxt_unit_mmap_buf_t           mmap_buf;
2976     nxt_unit_request_info_impl_t  *req_impl;
2977     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2978 
2979     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2980 
2981     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2982         nxt_unit_req_alert(req, "write: response not initialized yet");
2983 
2984         return NXT_UNIT_ERROR;
2985     }
2986 
2987     /* Check if response is not send yet. */
2988     if (nxt_slow_path(req->response_buf != NULL)) {
2989 
2990         /* Enable content in headers buf. */
2991         rc = nxt_unit_response_add_content(req, "", 0);
2992         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2993             nxt_unit_req_error(req, "Failed to add piggyback content");
2994 
2995             return rc;
2996         }
2997 
2998         buf = req->response_buf;
2999 
3000         while (buf->end - buf->free > 0) {
3001             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3002             if (nxt_slow_path(n < 0)) {
3003                 nxt_unit_req_error(req, "Read error");
3004 
3005                 return NXT_UNIT_ERROR;
3006             }
3007 
3008             /* Manually increase sizes. */
3009             buf->free += n;
3010             req->response->piggyback_content_length += n;
3011 
3012             if (read_info->eof) {
3013                 break;
3014             }
3015         }
3016 
3017         rc = nxt_unit_response_send(req);
3018         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3019             nxt_unit_req_error(req, "Failed to send headers with content");
3020 
3021             return rc;
3022         }
3023 
3024         if (read_info->eof) {
3025             return NXT_UNIT_OK;
3026         }
3027     }
3028 
3029     while (!read_info->eof) {
3030         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3031                            read_info->buf_size);
3032 
3033         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3034 
3035         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3036                                        buf_size, buf_size,
3037                                        &mmap_buf, local_buf);
3038         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3039             return rc;
3040         }
3041 
3042         buf = &mmap_buf.buf;
3043 
3044         while (!read_info->eof && buf->end > buf->free) {
3045             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3046             if (nxt_slow_path(n < 0)) {
3047                 nxt_unit_req_error(req, "Read error");
3048 
3049                 nxt_unit_free_outgoing_buf(&mmap_buf);
3050 
3051                 return NXT_UNIT_ERROR;
3052             }
3053 
3054             buf->free += n;
3055         }
3056 
3057         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3058         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3059             nxt_unit_req_error(req, "Failed to send content");
3060 
3061             return rc;
3062         }
3063     }
3064 
3065     return NXT_UNIT_OK;
3066 }
3067 
3068 
3069 ssize_t
nxt_unit_request_read(nxt_unit_request_info_t * req,void * dst,size_t size)3070 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3071 {
3072     ssize_t  buf_res, res;
3073 
3074     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3075                                 dst, size);
3076 
3077     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3078         res = read(req->content_fd, dst, size);
3079         if (nxt_slow_path(res < 0)) {
3080             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3081                                strerror(errno), errno);
3082 
3083             return res;
3084         }
3085 
3086         if (res < (ssize_t) size) {
3087             nxt_unit_close(req->content_fd);
3088 
3089             req->content_fd = -1;
3090         }
3091 
3092         req->content_length -= res;
3093 
3094         dst = nxt_pointer_to(dst, res);
3095 
3096     } else {
3097         res = 0;
3098     }
3099 
3100     return buf_res + res;
3101 }
3102 
3103 
3104 ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t * req,size_t max_size)3105 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3106 {
3107     char                 *p;
3108     size_t               l_size, b_size;
3109     nxt_unit_buf_t       *b;
3110     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3111 
3112     if (req->content_length == 0) {
3113         return 0;
3114     }
3115 
3116     l_size = 0;
3117 
3118