xref: /unit/src/nxt_unit.c (revision 2194:0bce50b93a6a)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11 
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16 
17 #include "nxt_websocket.h"
18 
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22 
23 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE  \
25     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26 
27 enum {
28     NXT_QUIT_NORMAL   = 0,
29     NXT_QUIT_GRACEFUL = 1,
30 };
31 
32 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s               nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
43 
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46     nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52     nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54     nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58     int *shared_port_fd, int *shared_queue_fd,
59     int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60     uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62     int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64     nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66     nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71     nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73     nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76     nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79     nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83     nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86     nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90     nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95     nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97     nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99     nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101     size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108     nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111     int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113     nxt_unit_port_t *port, uint32_t size,
114     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116 
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118     nxt_unit_ctx_impl_t *ctx_impl);
119 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132 
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135     pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147     nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150 
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152     nxt_unit_port_t *port, int queue_fd);
153 
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157     nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159     nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161     nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163     nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166     nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170     nxt_unit_port_t *port, const void *buf, size_t buf_size,
171     const nxt_send_oob_t *oob);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173     const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175     nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177     nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179     nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181     nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183     nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185     nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188 
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190     nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192     nxt_unit_port_id_t *port_id, int remove);
193 
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195     nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198 
199 static char * nxt_unit_snprint_prefix(char *p, const char *end, pid_t pid,
200     int level);
201 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
202 static void nxt_unit_lvlhsh_free(void *data, void *p);
203 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
204 
205 
206 struct nxt_unit_mmap_buf_s {
207     nxt_unit_buf_t           buf;
208 
209     nxt_unit_mmap_buf_t      *next;
210     nxt_unit_mmap_buf_t      **prev;
211 
212     nxt_port_mmap_header_t   *hdr;
213     nxt_unit_request_info_t  *req;
214     nxt_unit_ctx_impl_t      *ctx_impl;
215     char                     *free_ptr;
216     char                     *plain_ptr;
217 };
218 
219 
220 struct nxt_unit_recv_msg_s {
221     uint32_t                 stream;
222     nxt_pid_t                pid;
223     nxt_port_id_t            reply_port;
224 
225     uint8_t                  last;      /* 1 bit */
226     uint8_t                  mmap;      /* 1 bit */
227 
228     void                     *start;
229     uint32_t                 size;
230 
231     int                      fd[2];
232 
233     nxt_unit_mmap_buf_t      *incoming_buf;
234 };
235 
236 
237 typedef enum {
238     NXT_UNIT_RS_START           = 0,
239     NXT_UNIT_RS_RESPONSE_INIT,
240     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
241     NXT_UNIT_RS_RESPONSE_SENT,
242     NXT_UNIT_RS_RELEASED,
243 } nxt_unit_req_state_t;
244 
245 
246 struct nxt_unit_request_info_impl_s {
247     nxt_unit_request_info_t  req;
248 
249     uint32_t                 stream;
250 
251     nxt_unit_mmap_buf_t      *outgoing_buf;
252     nxt_unit_mmap_buf_t      *incoming_buf;
253 
254     nxt_unit_req_state_t     state;
255     uint8_t                  websocket;
256     uint8_t                  in_hash;
257 
258     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
259     nxt_queue_link_t         link;
260     /*  for nxt_unit_port_impl_t.awaiting_req */
261     nxt_queue_link_t         port_wait_link;
262 
263     char                     extra_data[];
264 };
265 
266 
267 struct nxt_unit_websocket_frame_impl_s {
268     nxt_unit_websocket_frame_t  ws;
269 
270     nxt_unit_mmap_buf_t         *buf;
271 
272     nxt_queue_link_t            link;
273 
274     nxt_unit_ctx_impl_t         *ctx_impl;
275 };
276 
277 
278 struct nxt_unit_read_buf_s {
279     nxt_queue_link_t              link;
280     nxt_unit_ctx_impl_t           *ctx_impl;
281     ssize_t                       size;
282     nxt_recv_oob_t                oob;
283     char                          buf[16384];
284 };
285 
286 
287 struct nxt_unit_ctx_impl_s {
288     nxt_unit_ctx_t                ctx;
289 
290     nxt_atomic_t                  use_count;
291     nxt_atomic_t                  wait_items;
292 
293     pthread_mutex_t               mutex;
294 
295     nxt_unit_port_t               *read_port;
296 
297     nxt_queue_link_t              link;
298 
299     nxt_unit_mmap_buf_t           *free_buf;
300 
301     /*  of nxt_unit_request_info_impl_t */
302     nxt_queue_t                   free_req;
303 
304     /*  of nxt_unit_websocket_frame_impl_t */
305     nxt_queue_t                   free_ws;
306 
307     /*  of nxt_unit_request_info_impl_t */
308     nxt_queue_t                   active_req;
309 
310     /*  of nxt_unit_request_info_impl_t */
311     nxt_lvlhsh_t                  requests;
312 
313     /*  of nxt_unit_request_info_impl_t */
314     nxt_queue_t                   ready_req;
315 
316     /*  of nxt_unit_read_buf_t */
317     nxt_queue_t                   pending_rbuf;
318 
319     /*  of nxt_unit_read_buf_t */
320     nxt_queue_t                   free_rbuf;
321 
322     uint8_t                       online;       /* 1 bit */
323     uint8_t                       ready;        /* 1 bit */
324     uint8_t                       quit_param;
325 
326     nxt_unit_mmap_buf_t           ctx_buf[2];
327     nxt_unit_read_buf_t           ctx_read_buf;
328 
329     nxt_unit_request_info_impl_t  req;
330 };
331 
332 
333 struct nxt_unit_mmap_s {
334     nxt_port_mmap_header_t   *hdr;
335     pthread_t                src_thread;
336 
337     /*  of nxt_unit_read_buf_t */
338     nxt_queue_t              awaiting_rbuf;
339 };
340 
341 
342 struct nxt_unit_mmaps_s {
343     pthread_mutex_t          mutex;
344     uint32_t                 size;
345     uint32_t                 cap;
346     nxt_atomic_t             allocated_chunks;
347     nxt_unit_mmap_t          *elts;
348 };
349 
350 
351 struct nxt_unit_impl_s {
352     nxt_unit_t               unit;
353     nxt_unit_callbacks_t     callbacks;
354 
355     nxt_atomic_t             use_count;
356     nxt_atomic_t             request_count;
357 
358     uint32_t                 request_data_size;
359     uint32_t                 shm_mmap_limit;
360     uint32_t                 request_limit;
361 
362     pthread_mutex_t          mutex;
363 
364     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
365     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
366 
367     nxt_unit_port_t          *router_port;
368     nxt_unit_port_t          *shared_port;
369 
370     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
371 
372     nxt_unit_mmaps_t         incoming;
373     nxt_unit_mmaps_t         outgoing;
374 
375     pid_t                    pid;
376     int                      log_fd;
377 
378     nxt_unit_ctx_impl_t      main_ctx;
379 };
380 
381 
382 struct nxt_unit_port_impl_s {
383     nxt_unit_port_t          port;
384 
385     nxt_atomic_t             use_count;
386 
387     /*  for nxt_unit_process_t.ports */
388     nxt_queue_link_t         link;
389     nxt_unit_process_t       *process;
390 
391     /*  of nxt_unit_request_info_impl_t */
392     nxt_queue_t              awaiting_req;
393 
394     int                      ready;
395 
396     void                     *queue;
397 
398     int                      from_socket;
399     nxt_unit_read_buf_t      *socket_rbuf;
400 };
401 
402 
403 struct nxt_unit_process_s {
404     pid_t                    pid;
405 
406     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
407 
408     nxt_unit_impl_t          *lib;
409 
410     nxt_atomic_t             use_count;
411 
412     uint32_t                 next_port_id;
413 };
414 
415 
416 /* Explicitly using 32 bit types to avoid possible alignment. */
417 typedef struct {
418     int32_t   pid;
419     uint32_t  id;
420 } nxt_unit_port_hash_id_t;
421 
422 
423 static pid_t  nxt_unit_pid;
424 
425 
426 nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t * init)427 nxt_unit_init(nxt_unit_init_t *init)
428 {
429     int              rc, queue_fd, shared_queue_fd;
430     void             *mem;
431     uint32_t         ready_stream, shm_limit, request_limit;
432     nxt_unit_ctx_t   *ctx;
433     nxt_unit_impl_t  *lib;
434     nxt_unit_port_t  ready_port, router_port, read_port, shared_port;
435 
436     nxt_unit_pid = getpid();
437 
438     lib = nxt_unit_create(init);
439     if (nxt_slow_path(lib == NULL)) {
440         return NULL;
441     }
442 
443     queue_fd = -1;
444     mem = MAP_FAILED;
445     shared_port.out_fd = -1;
446     shared_port.data = NULL;
447 
448     if (init->ready_port.id.pid != 0
449         && init->ready_stream != 0
450         && init->read_port.id.pid != 0)
451     {
452         ready_port = init->ready_port;
453         ready_stream = init->ready_stream;
454         router_port = init->router_port;
455         read_port = init->read_port;
456         lib->log_fd = init->log_fd;
457 
458         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
459                               ready_port.id.id);
460         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
461                               router_port.id.id);
462         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
463                               read_port.id.id);
464 
465         shared_port.in_fd = init->shared_port_fd;
466         shared_queue_fd = init->shared_queue_fd;
467 
468     } else {
469         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
470                                &shared_port.in_fd, &shared_queue_fd,
471                                &lib->log_fd, &ready_stream, &shm_limit,
472                                &request_limit);
473         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
474             goto fail;
475         }
476 
477         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
478                                 / PORT_MMAP_DATA_SIZE;
479         lib->request_limit = request_limit;
480     }
481 
482     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
483         lib->shm_mmap_limit = 1;
484     }
485 
486     lib->pid = read_port.id.pid;
487     nxt_unit_pid = lib->pid;
488 
489     ctx = &lib->main_ctx.ctx;
490 
491     rc = nxt_unit_fd_blocking(router_port.out_fd);
492     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
493         goto fail;
494     }
495 
496     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
497     if (nxt_slow_path(lib->router_port == NULL)) {
498         nxt_unit_alert(NULL, "failed to add router_port");
499 
500         goto fail;
501     }
502 
503     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
504     if (nxt_slow_path(queue_fd == -1)) {
505         goto fail;
506     }
507 
508     mem = mmap(NULL, sizeof(nxt_port_queue_t),
509                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
510     if (nxt_slow_path(mem == MAP_FAILED)) {
511         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
512                        strerror(errno), errno);
513 
514         goto fail;
515     }
516 
517     nxt_port_queue_init(mem);
518 
519     rc = nxt_unit_fd_blocking(read_port.in_fd);
520     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
521         goto fail;
522     }
523 
524     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
525     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
526         nxt_unit_alert(NULL, "failed to add read_port");
527 
528         goto fail;
529     }
530 
531     rc = nxt_unit_fd_blocking(ready_port.out_fd);
532     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
533         goto fail;
534     }
535 
536     nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
537                           NXT_UNIT_SHARED_PORT_ID);
538 
539     mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
540                MAP_SHARED, shared_queue_fd, 0);
541     if (nxt_slow_path(mem == MAP_FAILED)) {
542         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
543                        strerror(errno), errno);
544 
545         goto fail;
546     }
547 
548     nxt_unit_close(shared_queue_fd);
549 
550     lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
551     if (nxt_slow_path(lib->shared_port == NULL)) {
552         nxt_unit_alert(NULL, "failed to add shared_port");
553 
554         goto fail;
555     }
556 
557     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
558     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
559         nxt_unit_alert(NULL, "failed to send READY message");
560 
561         goto fail;
562     }
563 
564     nxt_unit_close(ready_port.out_fd);
565     nxt_unit_close(queue_fd);
566 
567     return ctx;
568 
569 fail:
570 
571     if (mem != MAP_FAILED) {
572         munmap(mem, sizeof(nxt_port_queue_t));
573     }
574 
575     if (queue_fd != -1) {
576         nxt_unit_close(queue_fd);
577     }
578 
579     nxt_unit_ctx_release(&lib->main_ctx.ctx);
580 
581     return NULL;
582 }
583 
584 
585 static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t * init)586 nxt_unit_create(nxt_unit_init_t *init)
587 {
588     int              rc;
589     nxt_unit_impl_t  *lib;
590 
591     if (nxt_slow_path(init->callbacks.request_handler == NULL)) {
592         nxt_unit_alert(NULL, "request_handler is NULL");
593 
594         return NULL;
595     }
596 
597     lib = nxt_unit_malloc(NULL,
598                           sizeof(nxt_unit_impl_t) + init->request_data_size);
599     if (nxt_slow_path(lib == NULL)) {
600         nxt_unit_alert(NULL, "failed to allocate unit struct");
601 
602         return NULL;
603     }
604 
605     rc = pthread_mutex_init(&lib->mutex, NULL);
606     if (nxt_slow_path(rc != 0)) {
607         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
608 
609         goto fail;
610     }
611 
612     lib->unit.data = init->data;
613     lib->callbacks = init->callbacks;
614 
615     lib->request_data_size = init->request_data_size;
616     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
617                             / PORT_MMAP_DATA_SIZE;
618     lib->request_limit = init->request_limit;
619 
620     lib->processes.slot = NULL;
621     lib->ports.slot = NULL;
622 
623     lib->log_fd = STDERR_FILENO;
624 
625     nxt_queue_init(&lib->contexts);
626 
627     lib->use_count = 0;
628     lib->request_count = 0;
629     lib->router_port = NULL;
630     lib->shared_port = NULL;
631 
632     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
633     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
634         pthread_mutex_destroy(&lib->mutex);
635         goto fail;
636     }
637 
638     nxt_unit_mmaps_init(&lib->incoming);
639     nxt_unit_mmaps_init(&lib->outgoing);
640 
641     return lib;
642 
643 fail:
644 
645     nxt_unit_free(NULL, lib);
646 
647     return NULL;
648 }
649 
650 
651 static int
nxt_unit_ctx_init(nxt_unit_impl_t * lib,nxt_unit_ctx_impl_t * ctx_impl,void * data)652 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
653     void *data)
654 {
655     int  rc;
656 
657     ctx_impl->ctx.data = data;
658     ctx_impl->ctx.unit = &lib->unit;
659 
660     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
661     if (nxt_slow_path(rc != 0)) {
662         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
663 
664         return NXT_UNIT_ERROR;
665     }
666 
667     nxt_unit_lib_use(lib);
668 
669     pthread_mutex_lock(&lib->mutex);
670 
671     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
672 
673     pthread_mutex_unlock(&lib->mutex);
674 
675     ctx_impl->use_count = 1;
676     ctx_impl->wait_items = 0;
677     ctx_impl->online = 1;
678     ctx_impl->ready = 0;
679     ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
680 
681     nxt_queue_init(&ctx_impl->free_req);
682     nxt_queue_init(&ctx_impl->free_ws);
683     nxt_queue_init(&ctx_impl->active_req);
684     nxt_queue_init(&ctx_impl->ready_req);
685     nxt_queue_init(&ctx_impl->pending_rbuf);
686     nxt_queue_init(&ctx_impl->free_rbuf);
687 
688     ctx_impl->free_buf = NULL;
689     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
690     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
691 
692     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
693     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
694 
695     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
696 
697     ctx_impl->req.req.ctx = &ctx_impl->ctx;
698     ctx_impl->req.req.unit = &lib->unit;
699 
700     ctx_impl->read_port = NULL;
701     ctx_impl->requests.slot = 0;
702 
703     return NXT_UNIT_OK;
704 }
705 
706 
707 nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_t * ctx)708 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
709 {
710     nxt_unit_ctx_impl_t  *ctx_impl;
711 
712     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
713 
714     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
715 }
716 
717 
718 nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_t * ctx)719 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
720 {
721     long                 c;
722     nxt_unit_ctx_impl_t  *ctx_impl;
723 
724     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
725 
726     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
727 
728     if (c == 1) {
729         nxt_unit_ctx_free(ctx_impl);
730     }
731 }
732 
733 
734 nxt_inline void
nxt_unit_lib_use(nxt_unit_impl_t * lib)735 nxt_unit_lib_use(nxt_unit_impl_t *lib)
736 {
737     nxt_atomic_fetch_add(&lib->use_count, 1);
738 }
739 
740 
741 nxt_inline void
nxt_unit_lib_release(nxt_unit_impl_t * lib)742 nxt_unit_lib_release(nxt_unit_impl_t *lib)
743 {
744     long                c;
745     nxt_unit_process_t  *process;
746 
747     c = nxt_atomic_fetch_add(&lib->use_count, -1);
748 
749     if (c == 1) {
750         for ( ;; ) {
751             pthread_mutex_lock(&lib->mutex);
752 
753             process = nxt_unit_process_pop_first(lib);
754             if (process == NULL) {
755                 pthread_mutex_unlock(&lib->mutex);
756 
757                 break;
758             }
759 
760             nxt_unit_remove_process(lib, process);
761         }
762 
763         pthread_mutex_destroy(&lib->mutex);
764 
765         if (nxt_fast_path(lib->router_port != NULL)) {
766             nxt_unit_port_release(lib->router_port);
767         }
768 
769         if (nxt_fast_path(lib->shared_port != NULL)) {
770             nxt_unit_port_release(lib->shared_port);
771         }
772 
773         nxt_unit_mmaps_destroy(&lib->incoming);
774         nxt_unit_mmaps_destroy(&lib->outgoing);
775 
776         nxt_unit_free(NULL, lib);
777     }
778 }
779 
780 
781 nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t ** head,nxt_unit_mmap_buf_t * mmap_buf)782 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
783     nxt_unit_mmap_buf_t *mmap_buf)
784 {
785     mmap_buf->next = *head;
786 
787     if (mmap_buf->next != NULL) {
788         mmap_buf->next->prev = &mmap_buf->next;
789     }
790 
791     *head = mmap_buf;
792     mmap_buf->prev = head;
793 }
794 
795 
796 nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t ** prev,nxt_unit_mmap_buf_t * mmap_buf)797 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
798     nxt_unit_mmap_buf_t *mmap_buf)
799 {
800     while (*prev != NULL) {
801         prev = &(*prev)->next;
802     }
803 
804     nxt_unit_mmap_buf_insert(prev, mmap_buf);
805 }
806 
807 
808 nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t * mmap_buf)809 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
810 {
811     nxt_unit_mmap_buf_t  **prev;
812 
813     prev = mmap_buf->prev;
814 
815     if (mmap_buf->next != NULL) {
816         mmap_buf->next->prev = prev;
817     }
818 
819     if (prev != NULL) {
820         *prev = mmap_buf->next;
821     }
822 }
823 
824 
825 static int
nxt_unit_read_env(nxt_unit_port_t * ready_port,nxt_unit_port_t * router_port,nxt_unit_port_t * read_port,int * shared_port_fd,int * shared_queue_fd,int * log_fd,uint32_t * stream,uint32_t * shm_limit,uint32_t * request_limit)826 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
827     nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
828     int *log_fd, uint32_t *stream,
829     uint32_t *shm_limit, uint32_t *request_limit)
830 {
831     int       rc;
832     int       ready_fd, router_fd, read_in_fd, read_out_fd;
833     char      *unit_init, *version_end, *vars;
834     size_t    version_length;
835     int64_t   ready_pid, router_pid, read_pid;
836     uint32_t  ready_stream, router_id, ready_id, read_id;
837 
838     unit_init = getenv(NXT_UNIT_INIT_ENV);
839     if (nxt_slow_path(unit_init == NULL)) {
840         nxt_unit_alert(NULL, "%s is not in the current environment",
841                        NXT_UNIT_INIT_ENV);
842 
843         return NXT_UNIT_ERROR;
844     }
845 
846     version_end = strchr(unit_init, ';');
847     if (nxt_slow_path(version_end == NULL)) {
848         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
849                        NXT_UNIT_INIT_ENV, unit_init);
850 
851         return NXT_UNIT_ERROR;
852     }
853 
854     version_length = version_end - unit_init;
855 
856     rc = version_length != nxt_length(NXT_VERSION)
857          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
858 
859     if (nxt_slow_path(rc != 0)) {
860         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
861                        "%.*s, while the app was compiled with libunit %s",
862                        (int) version_length, unit_init, NXT_VERSION);
863 
864         return NXT_UNIT_ERROR;
865     }
866 
867     vars = version_end + 1;
868 
869     rc = sscanf(vars,
870                 "%"PRIu32";"
871                 "%"PRId64",%"PRIu32",%d;"
872                 "%"PRId64",%"PRIu32",%d;"
873                 "%"PRId64",%"PRIu32",%d,%d;"
874                 "%d,%d;"
875                 "%d,%"PRIu32",%"PRIu32,
876                 &ready_stream,
877                 &ready_pid, &ready_id, &ready_fd,
878                 &router_pid, &router_id, &router_fd,
879                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
880                 shared_port_fd, shared_queue_fd,
881                 log_fd, shm_limit, request_limit);
882 
883     if (nxt_slow_path(rc == EOF)) {
884         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
885                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
886 
887         return NXT_UNIT_ERROR;
888     }
889 
890     if (nxt_slow_path(rc != 16)) {
891         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
892                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
893 
894         return NXT_UNIT_ERROR;
895     }
896 
897     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
898 
899     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
900 
901     ready_port->in_fd = -1;
902     ready_port->out_fd = ready_fd;
903     ready_port->data = NULL;
904 
905     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
906 
907     router_port->in_fd = -1;
908     router_port->out_fd = router_fd;
909     router_port->data = NULL;
910 
911     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
912 
913     read_port->in_fd = read_in_fd;
914     read_port->out_fd = read_out_fd;
915     read_port->data = NULL;
916 
917     *stream = ready_stream;
918 
919     return NXT_UNIT_OK;
920 }
921 
922 
923 static int
nxt_unit_ready(nxt_unit_ctx_t * ctx,int ready_fd,uint32_t stream,int queue_fd)924 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
925 {
926     ssize_t          res;
927     nxt_send_oob_t   oob;
928     nxt_port_msg_t   msg;
929     nxt_unit_impl_t  *lib;
930     int              fds[2] = {queue_fd, -1};
931 
932     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
933 
934     msg.stream = stream;
935     msg.pid = lib->pid;
936     msg.reply_port = 0;
937     msg.type = _NXT_PORT_MSG_PROCESS_READY;
938     msg.last = 1;
939     msg.mmap = 0;
940     msg.nf = 0;
941     msg.mf = 0;
942 
943     nxt_socket_msg_oob_init(&oob, fds);
944 
945     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
946     if (res != sizeof(msg)) {
947         return NXT_UNIT_ERROR;
948     }
949 
950     return NXT_UNIT_OK;
951 }
952 
953 
954 static int
nxt_unit_process_msg(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf,nxt_unit_request_info_t ** preq)955 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
956     nxt_unit_request_info_t **preq)
957 {
958     int                  rc;
959     pid_t                pid;
960     uint8_t              quit_param;
961     nxt_port_msg_t       *port_msg;
962     nxt_unit_impl_t      *lib;
963     nxt_unit_recv_msg_t  recv_msg;
964 
965     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
966 
967     recv_msg.incoming_buf = NULL;
968     recv_msg.fd[0] = -1;
969     recv_msg.fd[1] = -1;
970 
971     rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
972     if (nxt_slow_path(rc != NXT_OK)) {
973         nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
974         rc = NXT_UNIT_ERROR;
975         goto done;
976     }
977 
978     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
979         if (nxt_slow_path(rbuf->size == 0)) {
980             nxt_unit_debug(ctx, "read port closed");
981 
982             nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
983             rc = NXT_UNIT_OK;
984             goto done;
985         }
986 
987         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
988 
989         rc = NXT_UNIT_ERROR;
990         goto done;
991     }
992 
993     port_msg = (nxt_port_msg_t *) rbuf->buf;
994 
995     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
996                    port_msg->stream, (int) port_msg->type,
997                    recv_msg.fd[0], recv_msg.fd[1]);
998 
999     recv_msg.stream = port_msg->stream;
1000     recv_msg.pid = port_msg->pid;
1001     recv_msg.reply_port = port_msg->reply_port;
1002     recv_msg.last = port_msg->last;
1003     recv_msg.mmap = port_msg->mmap;
1004 
1005     recv_msg.start = port_msg + 1;
1006     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1007 
1008     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1009         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1010                        port_msg->stream, (int) port_msg->type);
1011         rc = NXT_UNIT_ERROR;
1012         goto done;
1013     }
1014 
1015     /* Fragmentation is unsupported. */
1016     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1017         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1018                        port_msg->stream, (int) port_msg->type);
1019         rc = NXT_UNIT_ERROR;
1020         goto done;
1021     }
1022 
1023     if (port_msg->mmap) {
1024         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1025 
1026         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1027             if (rc == NXT_UNIT_AGAIN) {
1028                 recv_msg.fd[0] = -1;
1029                 recv_msg.fd[1] = -1;
1030             }
1031 
1032             goto done;
1033         }
1034     }
1035 
1036     switch (port_msg->type) {
1037 
1038     case _NXT_PORT_MSG_RPC_READY:
1039         rc = NXT_UNIT_OK;
1040         break;
1041 
1042     case _NXT_PORT_MSG_QUIT:
1043         if (recv_msg.size == sizeof(quit_param)) {
1044             memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1045 
1046         } else {
1047             quit_param = NXT_QUIT_NORMAL;
1048         }
1049 
1050         nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1051                        (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1052 
1053         nxt_unit_quit(ctx, quit_param);
1054 
1055         rc = NXT_UNIT_OK;
1056         break;
1057 
1058     case _NXT_PORT_MSG_NEW_PORT:
1059         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1060         break;
1061 
1062     case _NXT_PORT_MSG_PORT_ACK:
1063         rc = nxt_unit_ctx_ready(ctx);
1064         break;
1065 
1066     case _NXT_PORT_MSG_CHANGE_FILE:
1067         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1068                        port_msg->stream, recv_msg.fd[0]);
1069 
1070         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1071             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1072                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1073                            strerror(errno), errno);
1074 
1075             rc = NXT_UNIT_ERROR;
1076             goto done;
1077         }
1078 
1079         rc = NXT_UNIT_OK;
1080         break;
1081 
1082     case _NXT_PORT_MSG_MMAP:
1083         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1084             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1085                            port_msg->stream, recv_msg.fd[0]);
1086 
1087             rc = NXT_UNIT_ERROR;
1088             goto done;
1089         }
1090 
1091         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1092         break;
1093 
1094     case _NXT_PORT_MSG_REQ_HEADERS:
1095         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1096         break;
1097 
1098     case _NXT_PORT_MSG_REQ_BODY:
1099         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1100         break;
1101 
1102     case _NXT_PORT_MSG_WEBSOCKET:
1103         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1104         break;
1105 
1106     case _NXT_PORT_MSG_REMOVE_PID:
1107         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1108             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1109                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1110                            (int) sizeof(pid));
1111 
1112             rc = NXT_UNIT_ERROR;
1113             goto done;
1114         }
1115 
1116         memcpy(&pid, recv_msg.start, sizeof(pid));
1117 
1118         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1119                        port_msg->stream, (int) pid);
1120 
1121         nxt_unit_remove_pid(lib, pid);
1122 
1123         rc = NXT_UNIT_OK;
1124         break;
1125 
1126     case _NXT_PORT_MSG_SHM_ACK:
1127         rc = nxt_unit_process_shm_ack(ctx);
1128         break;
1129 
1130     default:
1131         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1132                        port_msg->stream, (int) port_msg->type);
1133 
1134         rc = NXT_UNIT_ERROR;
1135         goto done;
1136     }
1137 
1138 done:
1139 
1140     if (recv_msg.fd[0] != -1) {
1141         nxt_unit_close(recv_msg.fd[0]);
1142     }
1143 
1144     if (recv_msg.fd[1] != -1) {
1145         nxt_unit_close(recv_msg.fd[1]);
1146     }
1147 
1148     while (recv_msg.incoming_buf != NULL) {
1149         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1150     }
1151 
1152     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1153 #if (NXT_DEBUG)
1154         memset(rbuf->buf, 0xAC, rbuf->size);
1155 #endif
1156         nxt_unit_read_buf_release(ctx, rbuf);
1157     }
1158 
1159     return rc;
1160 }
1161 
1162 
1163 static int
nxt_unit_process_new_port(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1164 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1165 {
1166     void                     *mem;
1167     nxt_unit_port_t          new_port, *port;
1168     nxt_port_msg_new_port_t  *new_port_msg;
1169 
1170     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1171         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1172                       "invalid message size (%d)",
1173                       recv_msg->stream, (int) recv_msg->size);
1174 
1175         return NXT_UNIT_ERROR;
1176     }
1177 
1178     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1179         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1180                        recv_msg->stream, recv_msg->fd[0]);
1181 
1182         return NXT_UNIT_ERROR;
1183     }
1184 
1185     new_port_msg = recv_msg->start;
1186 
1187     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1188                    recv_msg->stream, (int) new_port_msg->pid,
1189                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1190 
1191     if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
1192         return NXT_UNIT_ERROR;
1193     }
1194 
1195     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
1196 
1197     new_port.in_fd = -1;
1198     new_port.out_fd = recv_msg->fd[0];
1199 
1200     mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1201                MAP_SHARED, recv_msg->fd[1], 0);
1202 
1203     if (nxt_slow_path(mem == MAP_FAILED)) {
1204         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1205                        strerror(errno), errno);
1206 
1207         return NXT_UNIT_ERROR;
1208     }
1209 
1210     new_port.data = NULL;
1211 
1212     recv_msg->fd[0] = -1;
1213 
1214     port = nxt_unit_add_port(ctx, &new_port, mem);
1215     if (nxt_slow_path(port == NULL)) {
1216         return NXT_UNIT_ERROR;
1217     }
1218 
1219     nxt_unit_port_release(port);
1220 
1221     return NXT_UNIT_OK;
1222 }
1223 
1224 
1225 static int
nxt_unit_ctx_ready(nxt_unit_ctx_t * ctx)1226 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1227 {
1228     nxt_unit_impl_t      *lib;
1229     nxt_unit_ctx_impl_t  *ctx_impl;
1230 
1231     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1232 
1233     if (nxt_slow_path(ctx_impl->ready)) {
1234         return NXT_UNIT_OK;
1235     }
1236 
1237     ctx_impl->ready = 1;
1238 
1239     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1240 
1241     /* Call ready_handler() only for main context. */
1242     if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1243         return lib->callbacks.ready_handler(ctx);
1244     }
1245 
1246     if (&lib->main_ctx != ctx_impl) {
1247         /* Check if the main context is already stopped or quit. */
1248         if (nxt_slow_path(!lib->main_ctx.ready)) {
1249             ctx_impl->ready = 0;
1250 
1251             nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1252 
1253             return NXT_UNIT_OK;
1254         }
1255 
1256         if (lib->callbacks.add_port != NULL) {
1257             lib->callbacks.add_port(ctx, lib->shared_port);
1258         }
1259     }
1260 
1261     return NXT_UNIT_OK;
1262 }
1263 
1264 
1265 static int
nxt_unit_process_req_headers(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_request_info_t ** preq)1266 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1267     nxt_unit_request_info_t **preq)
1268 {
1269     int                           res;
1270     nxt_unit_impl_t               *lib;
1271     nxt_unit_port_id_t            port_id;
1272     nxt_unit_request_t            *r;
1273     nxt_unit_mmap_buf_t           *b;
1274     nxt_unit_request_info_t       *req;
1275     nxt_unit_request_info_impl_t  *req_impl;
1276 
1277     if (nxt_slow_path(recv_msg->mmap == 0)) {
1278         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1279                       recv_msg->stream);
1280 
1281         return NXT_UNIT_ERROR;
1282     }
1283 
1284     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1285         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1286                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1287                       (int) sizeof(nxt_unit_request_t));
1288 
1289         return NXT_UNIT_ERROR;
1290     }
1291 
1292     req_impl = nxt_unit_request_info_get(ctx);
1293     if (nxt_slow_path(req_impl == NULL)) {
1294         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1295                       recv_msg->stream);
1296 
1297         return NXT_UNIT_ERROR;
1298     }
1299 
1300     req = &req_impl->req;
1301 
1302     req->request = recv_msg->start;
1303 
1304     b = recv_msg->incoming_buf;
1305 
1306     req->request_buf = &b->buf;
1307     req->response = NULL;
1308     req->response_buf = NULL;
1309 
1310     r = req->request;
1311 
1312     req->content_length = r->content_length;
1313 
1314     req->content_buf = req->request_buf;
1315     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1316 
1317     req_impl->stream = recv_msg->stream;
1318 
1319     req_impl->outgoing_buf = NULL;
1320 
1321     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1322         b->req = req;
1323     }
1324 
1325     /* "Move" incoming buffer list to req_impl. */
1326     req_impl->incoming_buf = recv_msg->incoming_buf;
1327     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1328     recv_msg->incoming_buf = NULL;
1329 
1330     req->content_fd = recv_msg->fd[0];
1331     recv_msg->fd[0] = -1;
1332 
1333     req->response_max_fields = 0;
1334     req_impl->state = NXT_UNIT_RS_START;
1335     req_impl->websocket = 0;
1336     req_impl->in_hash = 0;
1337 
1338     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1339                    (int) r->method_length,
1340                    (char *) nxt_unit_sptr_get(&r->method),
1341                    (int) r->target_length,
1342                    (char *) nxt_unit_sptr_get(&r->target),
1343                    (int) r->content_length);
1344 
1345     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1346 
1347     res = nxt_unit_request_check_response_port(req, &port_id);
1348     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1349         return NXT_UNIT_ERROR;
1350     }
1351 
1352     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1353         res = nxt_unit_send_req_headers_ack(req);
1354         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1355             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1356 
1357             return NXT_UNIT_ERROR;
1358         }
1359 
1360         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1361 
1362         if (req->content_length
1363             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1364         {
1365             res = nxt_unit_request_hash_add(ctx, req);
1366             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1367                 nxt_unit_req_warn(req, "failed to add request to hash");
1368 
1369                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1370 
1371                 return NXT_UNIT_ERROR;
1372             }
1373 
1374             /*
1375              * If application have separate data handler, we may start
1376              * request processing and process data when it is arrived.
1377              */
1378             if (lib->callbacks.data_handler == NULL) {
1379                 return NXT_UNIT_OK;
1380             }
1381         }
1382 
1383         if (preq == NULL) {
1384             lib->callbacks.request_handler(req);
1385 
1386         } else {
1387             *preq = req;
1388         }
1389     }
1390 
1391     return NXT_UNIT_OK;
1392 }
1393 
1394 
1395 static int
nxt_unit_process_req_body(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1396 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1397 {
1398     uint64_t                 l;
1399     nxt_unit_impl_t          *lib;
1400     nxt_unit_mmap_buf_t      *b;
1401     nxt_unit_request_info_t  *req;
1402 
1403     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1404     if (req == NULL) {
1405         return NXT_UNIT_OK;
1406     }
1407 
1408     l = req->content_buf->end - req->content_buf->free;
1409 
1410     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1411         b->req = req;
1412         l += b->buf.end - b->buf.free;
1413     }
1414 
1415     if (recv_msg->incoming_buf != NULL) {
1416         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1417 
1418         while (b->next != NULL) {
1419             b = b->next;
1420         }
1421 
1422         /* "Move" incoming buffer list to req_impl. */
1423         b->next = recv_msg->incoming_buf;
1424         b->next->prev = &b->next;
1425 
1426         recv_msg->incoming_buf = NULL;
1427     }
1428 
1429     req->content_fd = recv_msg->fd[0];
1430     recv_msg->fd[0] = -1;
1431 
1432     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1433 
1434     if (lib->callbacks.data_handler != NULL) {
1435         lib->callbacks.data_handler(req);
1436 
1437         return NXT_UNIT_OK;
1438     }
1439 
1440     if (req->content_fd != -1 || l == req->content_length) {
1441         lib->callbacks.request_handler(req);
1442     }
1443 
1444     return NXT_UNIT_OK;
1445 }
1446 
1447 
1448 static int
nxt_unit_request_check_response_port(nxt_unit_request_info_t * req,nxt_unit_port_id_t * port_id)1449 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1450     nxt_unit_port_id_t *port_id)
1451 {
1452     int                           res;
1453     nxt_unit_ctx_t                *ctx;
1454     nxt_unit_impl_t               *lib;
1455     nxt_unit_port_t               *port;
1456     nxt_unit_process_t            *process;
1457     nxt_unit_ctx_impl_t           *ctx_impl;
1458     nxt_unit_port_impl_t          *port_impl;
1459     nxt_unit_request_info_impl_t  *req_impl;
1460 
1461     ctx = req->ctx;
1462     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1463     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1464 
1465     pthread_mutex_lock(&lib->mutex);
1466 
1467     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1468     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1469 
1470     if (nxt_fast_path(port != NULL)) {
1471         req->response_port = port;
1472 
1473         if (nxt_fast_path(port_impl->ready)) {
1474             pthread_mutex_unlock(&lib->mutex);
1475 
1476             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1477                            (int) port->id.pid, (int) port->id.id);
1478 
1479             return NXT_UNIT_OK;
1480         }
1481 
1482         nxt_unit_debug(ctx, "check_response_port: "
1483                        "port{%d,%d} already requested",
1484                        (int) port->id.pid, (int) port->id.id);
1485 
1486         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1487 
1488         nxt_queue_insert_tail(&port_impl->awaiting_req,
1489                               &req_impl->port_wait_link);
1490 
1491         pthread_mutex_unlock(&lib->mutex);
1492 
1493         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1494 
1495         return NXT_UNIT_AGAIN;
1496     }
1497 
1498     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1499     if (nxt_slow_path(port_impl == NULL)) {
1500         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1501                        (int) sizeof(nxt_unit_port_impl_t));
1502 
1503         pthread_mutex_unlock(&lib->mutex);
1504 
1505         return NXT_UNIT_ERROR;
1506     }
1507 
1508     port = &port_impl->port;
1509 
1510     port->id = *port_id;
1511     port->in_fd = -1;
1512     port->out_fd = -1;
1513     port->data = NULL;
1514 
1515     res = nxt_unit_port_hash_add(&lib->ports, port);
1516     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1517         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1518                        port->id.pid, port->id.id);
1519 
1520         pthread_mutex_unlock(&lib->mutex);
1521 
1522         nxt_unit_free(ctx, port);
1523 
1524         return NXT_UNIT_ERROR;
1525     }
1526 
1527     process = nxt_unit_process_find(lib, port_id->pid, 0);
1528     if (nxt_slow_path(process == NULL)) {
1529         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1530                        port->id.pid);
1531 
1532         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1533 
1534         pthread_mutex_unlock(&lib->mutex);
1535 
1536         nxt_unit_free(ctx, port);
1537 
1538         return NXT_UNIT_ERROR;
1539     }
1540 
1541     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1542 
1543     port_impl->process = process;
1544     port_impl->queue = NULL;
1545     port_impl->from_socket = 0;
1546     port_impl->socket_rbuf = NULL;
1547 
1548     nxt_queue_init(&port_impl->awaiting_req);
1549 
1550     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1551 
1552     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1553 
1554     port_impl->use_count = 2;
1555     port_impl->ready = 0;
1556 
1557     req->response_port = port;
1558 
1559     pthread_mutex_unlock(&lib->mutex);
1560 
1561     res = nxt_unit_get_port(ctx, port_id);
1562     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1563         return NXT_UNIT_ERROR;
1564     }
1565 
1566     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1567 
1568     return NXT_UNIT_AGAIN;
1569 }
1570 
1571 
1572 static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t * req)1573 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1574 {
1575     ssize_t                       res;
1576     nxt_port_msg_t                msg;
1577     nxt_unit_impl_t               *lib;
1578     nxt_unit_ctx_impl_t           *ctx_impl;
1579     nxt_unit_request_info_impl_t  *req_impl;
1580 
1581     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1582     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1583     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1584 
1585     memset(&msg, 0, sizeof(nxt_port_msg_t));
1586 
1587     msg.stream = req_impl->stream;
1588     msg.pid = lib->pid;
1589     msg.reply_port = ctx_impl->read_port->id.id;
1590     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1591 
1592     res = nxt_unit_port_send(req->ctx, req->response_port,
1593                              &msg, sizeof(msg), NULL);
1594     if (nxt_slow_path(res != sizeof(msg))) {
1595         return NXT_UNIT_ERROR;
1596     }
1597 
1598     return NXT_UNIT_OK;
1599 }
1600 
1601 
1602 static int
nxt_unit_process_websocket(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1603 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1604 {
1605     size_t                           hsize;
1606     nxt_unit_impl_t                  *lib;
1607     nxt_unit_mmap_buf_t              *b;
1608     nxt_unit_callbacks_t             *cb;
1609     nxt_unit_request_info_t          *req;
1610     nxt_unit_request_info_impl_t     *req_impl;
1611     nxt_unit_websocket_frame_impl_t  *ws_impl;
1612 
1613     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1614     if (nxt_slow_path(req == NULL)) {
1615         return NXT_UNIT_OK;
1616     }
1617 
1618     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1619 
1620     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1621     cb = &lib->callbacks;
1622 
1623     if (cb->websocket_handler && recv_msg->size >= 2) {
1624         ws_impl = nxt_unit_websocket_frame_get(ctx);
1625         if (nxt_slow_path(ws_impl == NULL)) {
1626             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1627                           req_impl->stream);
1628 
1629             return NXT_UNIT_ERROR;
1630         }
1631 
1632         ws_impl->ws.req = req;
1633 
1634         ws_impl->buf = NULL;
1635 
1636         if (recv_msg->mmap) {
1637             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1638                 b->req = req;
1639             }
1640 
1641             /* "Move" incoming buffer list to ws_impl. */
1642             ws_impl->buf = recv_msg->incoming_buf;
1643             ws_impl->buf->prev = &ws_impl->buf;
1644             recv_msg->incoming_buf = NULL;
1645 
1646             b = ws_impl->buf;
1647 
1648         } else {
1649             b = nxt_unit_mmap_buf_get(ctx);
1650             if (nxt_slow_path(b == NULL)) {
1651                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1652                                req_impl->stream);
1653 
1654                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1655 
1656                 return NXT_UNIT_ERROR;
1657             }
1658 
1659             b->req = req;
1660             b->buf.start = recv_msg->start;
1661             b->buf.free = b->buf.start;
1662             b->buf.end = b->buf.start + recv_msg->size;
1663 
1664             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1665         }
1666 
1667         ws_impl->ws.header = (void *) b->buf.start;
1668         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1669             ws_impl->ws.header);
1670 
1671         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1672 
1673         if (ws_impl->ws.header->mask) {
1674             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1675 
1676         } else {
1677             ws_impl->ws.mask = NULL;
1678         }
1679 
1680         b->buf.free += hsize;
1681 
1682         ws_impl->ws.content_buf = &b->buf;
1683         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1684 
1685         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1686                            "payload_len=%"PRIu64,
1687                             ws_impl->ws.header->opcode,
1688                             ws_impl->ws.payload_len);
1689 
1690         cb->websocket_handler(&ws_impl->ws);
1691     }
1692 
1693     if (recv_msg->last) {
1694         if (cb->close_handler) {
1695             nxt_unit_req_debug(req, "close_handler");
1696 
1697             cb->close_handler(req);
1698 
1699         } else {
1700             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1701         }
1702     }
1703 
1704     return NXT_UNIT_OK;
1705 }
1706 
1707 
1708 static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t * ctx)1709 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1710 {
1711     nxt_unit_impl_t       *lib;
1712     nxt_unit_callbacks_t  *cb;
1713 
1714     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1715     cb = &lib->callbacks;
1716 
1717     if (cb->shm_ack_handler != NULL) {
1718         cb->shm_ack_handler(ctx);
1719     }
1720 
1721     return NXT_UNIT_OK;
1722 }
1723 
1724 
1725 static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t * ctx)1726 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1727 {
1728     nxt_unit_impl_t               *lib;
1729     nxt_queue_link_t              *lnk;
1730     nxt_unit_ctx_impl_t           *ctx_impl;
1731     nxt_unit_request_info_impl_t  *req_impl;
1732 
1733     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1734 
1735     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1736 
1737     pthread_mutex_lock(&ctx_impl->mutex);
1738 
1739     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1740         pthread_mutex_unlock(&ctx_impl->mutex);
1741 
1742         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1743                                         + lib->request_data_size);
1744         if (nxt_slow_path(req_impl == NULL)) {
1745             return NULL;
1746         }
1747 
1748         req_impl->req.unit = ctx->unit;
1749         req_impl->req.ctx = ctx;
1750 
1751         pthread_mutex_lock(&ctx_impl->mutex);
1752 
1753     } else {
1754         lnk = nxt_queue_first(&ctx_impl->free_req);
1755         nxt_queue_remove(lnk);
1756 
1757         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1758     }
1759 
1760     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1761 
1762     pthread_mutex_unlock(&ctx_impl->mutex);
1763 
1764     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1765 
1766     return req_impl;
1767 }
1768 
1769 
1770 static void
nxt_unit_request_info_release(nxt_unit_request_info_t * req)1771 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1772 {
1773     nxt_unit_ctx_t                *ctx;
1774     nxt_unit_ctx_impl_t           *ctx_impl;
1775     nxt_unit_request_info_impl_t  *req_impl;
1776 
1777     ctx = req->ctx;
1778     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1779     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1780 
1781     req->response = NULL;
1782     req->response_buf = NULL;
1783 
1784     if (req_impl->in_hash) {
1785         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1786     }
1787 
1788     while (req_impl->outgoing_buf != NULL) {
1789         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1790     }
1791 
1792     while (req_impl->incoming_buf != NULL) {
1793         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1794     }
1795 
1796     if (req->content_fd != -1) {
1797         nxt_unit_close(req->content_fd);
1798 
1799         req->content_fd = -1;
1800     }
1801 
1802     if (req->response_port != NULL) {
1803         nxt_unit_port_release(req->response_port);
1804 
1805         req->response_port = NULL;
1806     }
1807 
1808     req_impl->state = NXT_UNIT_RS_RELEASED;
1809 
1810     pthread_mutex_lock(&ctx_impl->mutex);
1811 
1812     nxt_queue_remove(&req_impl->link);
1813 
1814     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1815 
1816     pthread_mutex_unlock(&ctx_impl->mutex);
1817 
1818     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1819         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1820     }
1821 }
1822 
1823 
1824 static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t * req_impl)1825 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1826 {
1827     nxt_unit_ctx_impl_t  *ctx_impl;
1828 
1829     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1830 
1831     nxt_queue_remove(&req_impl->link);
1832 
1833     if (req_impl != &ctx_impl->req) {
1834         nxt_unit_free(&ctx_impl->ctx, req_impl);
1835     }
1836 }
1837 
1838 
1839 static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t * ctx)1840 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1841 {
1842     nxt_queue_link_t                 *lnk;
1843     nxt_unit_ctx_impl_t              *ctx_impl;
1844     nxt_unit_websocket_frame_impl_t  *ws_impl;
1845 
1846     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1847 
1848     pthread_mutex_lock(&ctx_impl->mutex);
1849 
1850     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1851         pthread_mutex_unlock(&ctx_impl->mutex);
1852 
1853         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1854         if (nxt_slow_path(ws_impl == NULL)) {
1855             return NULL;
1856         }
1857 
1858     } else {
1859         lnk = nxt_queue_first(&ctx_impl->free_ws);
1860         nxt_queue_remove(lnk);
1861 
1862         pthread_mutex_unlock(&ctx_impl->mutex);
1863 
1864         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1865     }
1866 
1867     ws_impl->ctx_impl = ctx_impl;
1868 
1869     return ws_impl;
1870 }
1871 
1872 
1873 static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t * ws)1874 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1875 {
1876     nxt_unit_websocket_frame_impl_t  *ws_impl;
1877 
1878     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1879 
1880     while (ws_impl->buf != NULL) {
1881         nxt_unit_mmap_buf_free(ws_impl->buf);
1882     }
1883 
1884     ws->req = NULL;
1885 
1886     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1887 
1888     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1889 
1890     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1891 }
1892 
1893 
1894 static void
nxt_unit_websocket_frame_free(nxt_unit_ctx_t * ctx,nxt_unit_websocket_frame_impl_t * ws_impl)1895 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1896     nxt_unit_websocket_frame_impl_t *ws_impl)
1897 {
1898     nxt_queue_remove(&ws_impl->link);
1899 
1900     nxt_unit_free(ctx, ws_impl);
1901 }
1902 
1903 
1904 uint16_t
nxt_unit_field_hash(const char * name,size_t name_length)1905 nxt_unit_field_hash(const char *name, size_t name_length)
1906 {
1907     u_char      ch;
1908     uint32_t    hash;
1909     const char  *p, *end;
1910 
1911     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1912     end = name + name_length;
1913 
1914     for (p = name; p < end; p++) {
1915         ch = *p;
1916         hash = (hash << 4) + hash + nxt_lowcase(ch);
1917     }
1918 
1919     hash = (hash >> 16) ^ hash;
1920 
1921     return hash;
1922 }
1923 
1924 
1925 void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t * req)1926 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1927 {
1928     char                *name;
1929     uint32_t            i, j;
1930     nxt_unit_field_t    *fields, f;
1931     nxt_unit_request_t  *r;
1932 
1933     static nxt_str_t  content_length = nxt_string("content-length");
1934     static nxt_str_t  content_type = nxt_string("content-type");
1935     static nxt_str_t  cookie = nxt_string("cookie");
1936 
1937     nxt_unit_req_debug(req, "group_dup_fields");
1938 
1939     r = req->request;
1940     fields = r->fields;
1941 
1942     for (i = 0; i < r->fields_count; i++) {
1943         name = nxt_unit_sptr_get(&fields[i].name);
1944 
1945         switch (fields[i].hash) {
1946         case NXT_UNIT_HASH_CONTENT_LENGTH:
1947             if (fields[i].name_length == content_length.length
1948                 && nxt_unit_memcasecmp(name, content_length.start,
1949                                        content_length.length) == 0)
1950             {
1951                 r->content_length_field = i;
1952             }
1953 
1954             break;
1955 
1956         case NXT_UNIT_HASH_CONTENT_TYPE:
1957             if (fields[i].name_length == content_type.length
1958                 && nxt_unit_memcasecmp(name, content_type.start,
1959                                        content_type.length) == 0)
1960             {
1961                 r->content_type_field = i;
1962             }
1963 
1964             break;
1965 
1966         case NXT_UNIT_HASH_COOKIE:
1967             if (fields[i].name_length == cookie.length
1968                 && nxt_unit_memcasecmp(name, cookie.start,
1969                                        cookie.length) == 0)
1970             {
1971                 r->cookie_field = i;
1972             }
1973 
1974             break;
1975         }
1976 
1977         for (j = i + 1; j < r->fields_count; j++) {
1978             if (fields[i].hash != fields[j].hash
1979                 || fields[i].name_length != fields[j].name_length
1980                 || nxt_unit_memcasecmp(name,
1981                                        nxt_unit_sptr_get(&fields[j].name),
1982                                        fields[j].name_length) != 0)
1983             {
1984                 continue;
1985             }
1986 
1987             f = fields[j];
1988             f.value.offset += (j - (i + 1)) * sizeof(f);
1989 
1990             while (j > i + 1) {
1991                 fields[j] = fields[j - 1];
1992                 fields[j].name.offset -= sizeof(f);
1993                 fields[j].value.offset -= sizeof(f);
1994                 j--;
1995             }
1996 
1997             fields[j] = f;
1998 
1999             /* Assign the same name pointer for further grouping simplicity. */
2000             nxt_unit_sptr_set(&fields[j].name, name);
2001 
2002             i++;
2003         }
2004     }
2005 }
2006 
2007 
2008 int
nxt_unit_response_init(nxt_unit_request_info_t * req,uint16_t status,uint32_t max_fields_count,uint32_t max_fields_size)2009 nxt_unit_response_init(nxt_unit_request_info_t *req,
2010     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2011 {
2012     uint32_t                      buf_size;
2013     nxt_unit_buf_t                *buf;
2014     nxt_unit_request_info_impl_t  *req_impl;
2015 
2016     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2017 
2018     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2019         nxt_unit_req_warn(req, "init: response already sent");
2020 
2021         return NXT_UNIT_ERROR;
2022     }
2023 
2024     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2025                        (int) max_fields_count, (int) max_fields_size);
2026 
2027     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2028         nxt_unit_req_debug(req, "duplicate response init");
2029     }
2030 
2031     /*
2032      * Each field name and value 0-terminated by libunit,
2033      * this is the reason of '+ 2' below.
2034      */
2035     buf_size = sizeof(nxt_unit_response_t)
2036                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2037                + max_fields_size;
2038 
2039     if (nxt_slow_path(req->response_buf != NULL)) {
2040         buf = req->response_buf;
2041 
2042         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2043             goto init_response;
2044         }
2045 
2046         nxt_unit_buf_free(buf);
2047 
2048         req->response_buf = NULL;
2049         req->response = NULL;
2050         req->response_max_fields = 0;
2051 
2052         req_impl->state = NXT_UNIT_RS_START;
2053     }
2054 
2055     buf = nxt_unit_response_buf_alloc(req, buf_size);
2056     if (nxt_slow_path(buf == NULL)) {
2057         return NXT_UNIT_ERROR;
2058     }
2059 
2060 init_response:
2061 
2062     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2063 
2064     req->response_buf = buf;
2065 
2066     req->response = (nxt_unit_response_t *) buf->start;
2067     req->response->status = status;
2068 
2069     buf->free = buf->start + sizeof(nxt_unit_response_t)
2070                 + max_fields_count * sizeof(nxt_unit_field_t);
2071 
2072     req->response_max_fields = max_fields_count;
2073     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2074 
2075     return NXT_UNIT_OK;
2076 }
2077 
2078 
2079 int
nxt_unit_response_realloc(nxt_unit_request_info_t * req,uint32_t max_fields_count,uint32_t max_fields_size)2080 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2081     uint32_t max_fields_count, uint32_t max_fields_size)
2082 {
2083     char                          *p;
2084     uint32_t                      i, buf_size;
2085     nxt_unit_buf_t                *buf;
2086     nxt_unit_field_t              *f, *src;
2087     nxt_unit_response_t           *resp;
2088     nxt_unit_request_info_impl_t  *req_impl;
2089 
2090     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2091 
2092     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2093         nxt_unit_req_warn(req, "realloc: response not init");
2094 
2095         return NXT_UNIT_ERROR;
2096     }
2097 
2098     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2099         nxt_unit_req_warn(req, "realloc: response already sent");
2100 
2101         return NXT_UNIT_ERROR;
2102     }
2103 
2104     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2105         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2106 
2107         return NXT_UNIT_ERROR;
2108     }
2109 
2110     /*
2111      * Each field name and value 0-terminated by libunit,
2112      * this is the reason of '+ 2' below.
2113      */
2114     buf_size = sizeof(nxt_unit_response_t)
2115                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2116                + max_fields_size;
2117 
2118     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2119 
2120     buf = nxt_unit_response_buf_alloc(req, buf_size);
2121     if (nxt_slow_path(buf == NULL)) {
2122         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2123         return NXT_UNIT_ERROR;
2124     }
2125 
2126     resp = (nxt_unit_response_t *) buf->start;
2127 
2128     memset(resp, 0, sizeof(nxt_unit_response_t));
2129 
2130     resp->status = req->response->status;
2131     resp->content_length = req->response->content_length;
2132 
2133     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2134     f = resp->fields;
2135 
2136     for (i = 0; i < req->response->fields_count; i++) {
2137         src = req->response->fields + i;
2138 
2139         if (nxt_slow_path(src->skip != 0)) {
2140             continue;
2141         }
2142 
2143         if (nxt_slow_path(src->name_length + src->value_length + 2
2144                           > (uint32_t) (buf->end - p)))
2145         {
2146             nxt_unit_req_warn(req, "realloc: not enough space for field"
2147                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2148                   i, src, src->name_length, src->value_length);
2149 
2150             goto fail;
2151         }
2152 
2153         nxt_unit_sptr_set(&f->name, p);
2154         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2155         *p++ = '\0';
2156 
2157         nxt_unit_sptr_set(&f->value, p);
2158         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2159         *p++ = '\0';
2160 
2161         f->hash = src->hash;
2162         f->skip = 0;
2163         f->name_length = src->name_length;
2164         f->value_length = src->value_length;
2165 
2166         resp->fields_count++;
2167         f++;
2168     }
2169 
2170     if (req->response->piggyback_content_length > 0) {
2171         if (nxt_slow_path(req->response->piggyback_content_length
2172                           > (uint32_t) (buf->end - p)))
2173         {
2174             nxt_unit_req_warn(req, "realloc: not enought space for content"
2175                   " #%"PRIu32", %"PRIu32" required",
2176                   i, req->response->piggyback_content_length);
2177 
2178             goto fail;
2179         }
2180 
2181         resp->piggyback_content_length =
2182                                        req->response->piggyback_content_length;
2183 
2184         nxt_unit_sptr_set(&resp->piggyback_content, p);
2185         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2186                        req->response->piggyback_content_length);
2187     }
2188 
2189     buf->free = p;
2190 
2191     nxt_unit_buf_free(req->response_buf);
2192 
2193     req->response = resp;
2194     req->response_buf = buf;
2195     req->response_max_fields = max_fields_count;
2196 
2197     return NXT_UNIT_OK;
2198 
2199 fail:
2200 
2201     nxt_unit_buf_free(buf);
2202 
2203     return NXT_UNIT_ERROR;
2204 }
2205 
2206 
2207 int
nxt_unit_response_is_init(nxt_unit_request_info_t * req)2208 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2209 {
2210     nxt_unit_request_info_impl_t  *req_impl;
2211 
2212     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2213 
2214     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2215 }
2216 
2217 
2218 int
nxt_unit_response_add_field(nxt_unit_request_info_t * req,const char * name,uint8_t name_length,const char * value,uint32_t value_length)2219 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2220     const char *name, uint8_t name_length,
2221     const char *value, uint32_t value_length)
2222 {
2223     nxt_unit_buf_t                *buf;
2224     nxt_unit_field_t              *f;
2225     nxt_unit_response_t           *resp;
2226     nxt_unit_request_info_impl_t  *req_impl;
2227 
2228     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2229 
2230     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2231         nxt_unit_req_warn(req, "add_field: response not initialized or "
2232                           "already sent");
2233 
2234         return NXT_UNIT_ERROR;
2235     }
2236 
2237     resp = req->response;
2238 
2239     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2240         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2241                           (int) resp->fields_count);
2242 
2243         return NXT_UNIT_ERROR;
2244     }
2245 
2246     buf = req->response_buf;
2247 
2248     if (nxt_slow_path(name_length + value_length + 2
2249                       > (uint32_t) (buf->end - buf->free)))
2250     {
2251         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2252 
2253         return NXT_UNIT_ERROR;
2254     }
2255 
2256     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2257                        resp->fields_count,
2258                        (int) name_length, name,
2259                        (int) value_length, value);
2260 
2261     f = resp->fields + resp->fields_count;
2262 
2263     nxt_unit_sptr_set(&f->name, buf->free);
2264     buf->free = nxt_cpymem(buf->free, name, name_length);
2265     *buf->free++ = '\0';
2266 
2267     nxt_unit_sptr_set(&f->value, buf->free);
2268     buf->free = nxt_cpymem(buf->free, value, value_length);
2269     *buf->free++ = '\0';
2270 
2271     f->hash = nxt_unit_field_hash(name, name_length);
2272     f->skip = 0;
2273     f->name_length = name_length;
2274     f->value_length = value_length;
2275 
2276     resp->fields_count++;
2277 
2278     return NXT_UNIT_OK;
2279 }
2280 
2281 
2282 int
nxt_unit_response_add_content(nxt_unit_request_info_t * req,const void * src,uint32_t size)2283 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2284     const void* src, uint32_t size)
2285 {
2286     nxt_unit_buf_t                *buf;
2287     nxt_unit_response_t           *resp;
2288     nxt_unit_request_info_impl_t  *req_impl;
2289 
2290     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2291 
2292     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2293         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2294 
2295         return NXT_UNIT_ERROR;
2296     }
2297 
2298     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2299         nxt_unit_req_warn(req, "add_content: response already sent");
2300 
2301         return NXT_UNIT_ERROR;
2302     }
2303 
2304     buf = req->response_buf;
2305 
2306     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2307         nxt_unit_req_warn(req, "add_content: buffer overflow");
2308 
2309         return NXT_UNIT_ERROR;
2310     }
2311 
2312     resp = req->response;
2313 
2314     if (resp->piggyback_content_length == 0) {
2315         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2316         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2317     }
2318 
2319     resp->piggyback_content_length += size;
2320 
2321     buf->free = nxt_cpymem(buf->free, src, size);
2322 
2323     return NXT_UNIT_OK;
2324 }
2325 
2326 
2327 int
nxt_unit_response_send(nxt_unit_request_info_t * req)2328 nxt_unit_response_send(nxt_unit_request_info_t *req)
2329 {
2330     int                           rc;
2331     nxt_unit_mmap_buf_t           *mmap_buf;
2332     nxt_unit_request_info_impl_t  *req_impl;
2333 
2334     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2335 
2336     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2337         nxt_unit_req_warn(req, "send: response is not initialized yet");
2338 
2339         return NXT_UNIT_ERROR;
2340     }
2341 
2342     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2343         nxt_unit_req_warn(req, "send: response already sent");
2344 
2345         return NXT_UNIT_ERROR;
2346     }
2347 
2348     if (req->request->websocket_handshake && req->response->status == 101) {
2349         nxt_unit_response_upgrade(req);
2350     }
2351 
2352     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2353                        req->response->fields_count,
2354                        (int) (req->response_buf->free
2355                               - req->response_buf->start));
2356 
2357     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2358 
2359     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2360     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2361         req->response = NULL;
2362         req->response_buf = NULL;
2363         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2364 
2365         nxt_unit_mmap_buf_free(mmap_buf);
2366     }
2367 
2368     return rc;
2369 }
2370 
2371 
2372 int
nxt_unit_response_is_sent(nxt_unit_request_info_t * req)2373 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2374 {
2375     nxt_unit_request_info_impl_t  *req_impl;
2376 
2377     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2378 
2379     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2380 }
2381 
2382 
2383 nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t * req,uint32_t size)2384 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2385 {
2386     int                           rc;
2387     nxt_unit_mmap_buf_t           *mmap_buf;
2388     nxt_unit_request_info_impl_t  *req_impl;
2389 
2390     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2391         nxt_unit_req_warn(req, "response_buf_alloc: "
2392                           "requested buffer (%"PRIu32") too big", size);
2393 
2394         return NULL;
2395     }
2396 
2397     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2398 
2399     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2400 
2401     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2402     if (nxt_slow_path(mmap_buf == NULL)) {
2403         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2404 
2405         return NULL;
2406     }
2407 
2408     mmap_buf->req = req;
2409 
2410     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2411 
2412     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2413                                    size, size, mmap_buf,
2414                                    NULL);
2415     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2416         nxt_unit_mmap_buf_release(mmap_buf);
2417 
2418         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2419 
2420         return NULL;
2421     }
2422 
2423     return &mmap_buf->buf;
2424 }
2425 
2426 
2427 static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t * ctx)2428 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2429 {
2430     nxt_unit_mmap_buf_t  *mmap_buf;
2431     nxt_unit_ctx_impl_t  *ctx_impl;
2432 
2433     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2434 
2435     pthread_mutex_lock(&ctx_impl->mutex);
2436 
2437     if (ctx_impl->free_buf == NULL) {
2438         pthread_mutex_unlock(&ctx_impl->mutex);
2439 
2440         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2441         if (nxt_slow_path(mmap_buf == NULL)) {
2442             return NULL;
2443         }
2444 
2445     } else {
2446         mmap_buf = ctx_impl->free_buf;
2447 
2448         nxt_unit_mmap_buf_unlink(mmap_buf);
2449 
2450         pthread_mutex_unlock(&ctx_impl->mutex);
2451     }
2452 
2453     mmap_buf->ctx_impl = ctx_impl;
2454 
2455     mmap_buf->hdr = NULL;
2456     mmap_buf->free_ptr = NULL;
2457 
2458     return mmap_buf;
2459 }
2460 
2461 
2462 static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t * mmap_buf)2463 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2464 {
2465     nxt_unit_mmap_buf_unlink(mmap_buf);
2466 
2467     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2468 
2469     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2470 
2471     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2472 }
2473 
2474 
2475 int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t * req)2476 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2477 {
2478     return req->request->websocket_handshake;
2479 }
2480 
2481 
2482 int
nxt_unit_response_upgrade(nxt_unit_request_info_t * req)2483 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2484 {
2485     int                           rc;
2486     nxt_unit_request_info_impl_t  *req_impl;
2487 
2488     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2489 
2490     if (nxt_slow_path(req_impl->websocket != 0)) {
2491         nxt_unit_req_debug(req, "upgrade: already upgraded");
2492 
2493         return NXT_UNIT_OK;
2494     }
2495 
2496     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2497         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2498 
2499         return NXT_UNIT_ERROR;
2500     }
2501 
2502     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2503         nxt_unit_req_warn(req, "upgrade: response already sent");
2504 
2505         return NXT_UNIT_ERROR;
2506     }
2507 
2508     rc = nxt_unit_request_hash_add(req->ctx, req);
2509     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2510         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2511 
2512         return NXT_UNIT_ERROR;
2513     }
2514 
2515     req_impl->websocket = 1;
2516 
2517     req->response->status = 101;
2518 
2519     return NXT_UNIT_OK;
2520 }
2521 
2522 
2523 int
nxt_unit_response_is_websocket(nxt_unit_request_info_t * req)2524 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2525 {
2526     nxt_unit_request_info_impl_t  *req_impl;
2527 
2528     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2529 
2530     return req_impl->websocket;
2531 }
2532 
2533 
2534 nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void * data)2535 nxt_unit_get_request_info_from_data(void *data)
2536 {
2537     nxt_unit_request_info_impl_t  *req_impl;
2538 
2539     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2540 
2541     return &req_impl->req;
2542 }
2543 
2544 
2545 int
nxt_unit_buf_send(nxt_unit_buf_t * buf)2546 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2547 {
2548     int                           rc;
2549     nxt_unit_mmap_buf_t           *mmap_buf;
2550     nxt_unit_request_info_t       *req;
2551     nxt_unit_request_info_impl_t  *req_impl;
2552 
2553     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2554 
2555     req = mmap_buf->req;
2556     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2557 
2558     nxt_unit_req_debug(req, "buf_send: %d bytes",
2559                        (int) (buf->free - buf->start));
2560 
2561     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2562         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2563 
2564         return NXT_UNIT_ERROR;
2565     }
2566 
2567     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2568         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2569 
2570         return NXT_UNIT_ERROR;
2571     }
2572 
2573     if (nxt_fast_path(buf->free > buf->start)) {
2574         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2575         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2576             return rc;
2577         }
2578     }
2579 
2580     nxt_unit_mmap_buf_free(mmap_buf);
2581 
2582     return NXT_UNIT_OK;
2583 }
2584 
2585 
2586 static void
nxt_unit_buf_send_done(nxt_unit_buf_t * buf)2587 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2588 {
2589     int                      rc;
2590     nxt_unit_mmap_buf_t      *mmap_buf;
2591     nxt_unit_request_info_t  *req;
2592 
2593     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2594 
2595     req = mmap_buf->req;
2596 
2597     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2598     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2599         nxt_unit_mmap_buf_free(mmap_buf);
2600 
2601         nxt_unit_request_info_release(req);
2602 
2603     } else {
2604         nxt_unit_request_done(req, rc);
2605     }
2606 }
2607 
2608 
2609 static int
nxt_unit_mmap_buf_send(nxt_unit_request_info_t * req,nxt_unit_mmap_buf_t * mmap_buf,int last)2610 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2611     nxt_unit_mmap_buf_t *mmap_buf, int last)
2612 {
2613     struct {
2614         nxt_port_msg_t       msg;
2615         nxt_port_mmap_msg_t  mmap_msg;
2616     } m;
2617 
2618     int                           rc;
2619     u_char                        *last_used, *first_free;
2620     ssize_t                       res;
2621     nxt_chunk_id_t                first_free_chunk;
2622     nxt_unit_buf_t                *buf;
2623     nxt_unit_impl_t               *lib;
2624     nxt_port_mmap_header_t        *hdr;
2625     nxt_unit_request_info_impl_t  *req_impl;
2626 
2627     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2628     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2629 
2630     buf = &mmap_buf->buf;
2631     hdr = mmap_buf->hdr;
2632 
2633     m.mmap_msg.size = buf->free - buf->start;
2634 
2635     m.msg.stream = req_impl->stream;
2636     m.msg.pid = lib->pid;
2637     m.msg.reply_port = 0;
2638     m.msg.type = _NXT_PORT_MSG_DATA;
2639     m.msg.last = last != 0;
2640     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2641     m.msg.nf = 0;
2642     m.msg.mf = 0;
2643 
2644     rc = NXT_UNIT_ERROR;
2645 
2646     if (m.msg.mmap) {
2647         m.mmap_msg.mmap_id = hdr->id;
2648         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2649                                                      (u_char *) buf->start);
2650 
2651         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2652                        req_impl->stream,
2653                        (int) m.mmap_msg.mmap_id,
2654                        (int) m.mmap_msg.chunk_id,
2655                        (int) m.mmap_msg.size);
2656 
2657         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2658                                  NULL);
2659         if (nxt_slow_path(res != sizeof(m))) {
2660             goto free_buf;
2661         }
2662 
2663         last_used = (u_char *) buf->free - 1;
2664         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2665 
2666         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2667             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2668 
2669             buf->start = (char *) first_free;
2670             buf->free = buf->start;
2671 
2672             if (buf->end < buf->start) {
2673                 buf->end = buf->start;
2674             }
2675 
2676         } else {
2677             buf->start = NULL;
2678             buf->free = NULL;
2679             buf->end = NULL;
2680 
2681             mmap_buf->hdr = NULL;
2682         }
2683 
2684         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2685                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2686 
2687         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2688                        (int) lib->outgoing.allocated_chunks);
2689 
2690     } else {
2691         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2692                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2693         {
2694             nxt_unit_alert(req->ctx,
2695                            "#%"PRIu32": failed to send plain memory buffer"
2696                            ": no space reserved for message header",
2697                            req_impl->stream);
2698 
2699             goto free_buf;
2700         }
2701 
2702         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2703 
2704         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2705                        req_impl->stream,
2706                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2707 
2708         res = nxt_unit_port_send(req->ctx, req->response_port,
2709                                  buf->start - sizeof(m.msg),
2710                                  m.mmap_msg.size + sizeof(m.msg), NULL);
2711 
2712         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2713             goto free_buf;
2714         }
2715     }
2716 
2717     rc = NXT_UNIT_OK;
2718 
2719 free_buf:
2720 
2721     nxt_unit_free_outgoing_buf(mmap_buf);
2722 
2723     return rc;
2724 }
2725 
2726 
2727 void
nxt_unit_buf_free(nxt_unit_buf_t * buf)2728 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2729 {
2730     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2731 }
2732 
2733 
2734 static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t * mmap_buf)2735 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2736 {
2737     nxt_unit_free_outgoing_buf(mmap_buf);
2738 
2739     nxt_unit_mmap_buf_release(mmap_buf);
2740 }
2741 
2742 
2743 static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t * mmap_buf)2744 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2745 {
2746     if (mmap_buf->hdr != NULL) {
2747         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2748                               mmap_buf->hdr, mmap_buf->buf.start,
2749                               mmap_buf->buf.end - mmap_buf->buf.start);
2750 
2751         mmap_buf->hdr = NULL;
2752 
2753         return;
2754     }
2755 
2756     if (mmap_buf->free_ptr != NULL) {
2757         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2758 
2759         mmap_buf->free_ptr = NULL;
2760     }
2761 }
2762 
2763 
2764 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t * ctx)2765 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2766 {
2767     nxt_unit_ctx_impl_t  *ctx_impl;
2768     nxt_unit_read_buf_t  *rbuf;
2769 
2770     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2771 
2772     pthread_mutex_lock(&ctx_impl->mutex);
2773 
2774     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2775 
2776     pthread_mutex_unlock(&ctx_impl->mutex);
2777 
2778     rbuf->oob.size = 0;
2779 
2780     return rbuf;
2781 }
2782 
2783 
2784 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t * ctx_impl)2785 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2786 {
2787     nxt_queue_link_t     *link;
2788     nxt_unit_read_buf_t  *rbuf;
2789 
2790     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2791         link = nxt_queue_first(&ctx_impl->free_rbuf);
2792         nxt_queue_remove(link);
2793 
2794         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2795 
2796         return rbuf;
2797     }
2798 
2799     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2800 
2801     if (nxt_fast_path(rbuf != NULL)) {
2802         rbuf->ctx_impl = ctx_impl;
2803     }
2804 
2805     return rbuf;
2806 }
2807 
2808 
2809 static void
nxt_unit_read_buf_release(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)2810 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2811     nxt_unit_read_buf_t *rbuf)
2812 {
2813     nxt_unit_ctx_impl_t  *ctx_impl;
2814 
2815     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2816 
2817     pthread_mutex_lock(&ctx_impl->mutex);
2818 
2819     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2820 
2821     pthread_mutex_unlock(&ctx_impl->mutex);
2822 }
2823 
2824 
2825 nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t * buf)2826 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2827 {
2828     nxt_unit_mmap_buf_t  *mmap_buf;
2829 
2830     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2831 
2832     if (mmap_buf->next == NULL) {
2833         return NULL;
2834     }
2835 
2836     return &mmap_buf->next->buf;
2837 }
2838 
2839 
2840 uint32_t
nxt_unit_buf_max(void)2841 nxt_unit_buf_max(void)
2842 {
2843     return PORT_MMAP_DATA_SIZE;
2844 }
2845 
2846 
2847 uint32_t
nxt_unit_buf_min(void)2848 nxt_unit_buf_min(void)
2849 {
2850     return PORT_MMAP_CHUNK_SIZE;
2851 }
2852 
2853 
2854 int
nxt_unit_response_write(nxt_unit_request_info_t * req,const void * start,size_t size)2855 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2856     size_t size)
2857 {
2858     ssize_t  res;
2859 
2860     res = nxt_unit_response_write_nb(req, start, size, size);
2861 
2862     return res < 0 ? -res : NXT_UNIT_OK;
2863 }
2864 
2865 
2866 ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t * req,const void * start,size_t size,size_t min_size)2867 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2868     size_t size, size_t min_size)
2869 {
2870     int                           rc;
2871     ssize_t                       sent;
2872     uint32_t                      part_size, min_part_size, buf_size;
2873     const char                    *part_start;
2874     nxt_unit_mmap_buf_t           mmap_buf;
2875     nxt_unit_request_info_impl_t  *req_impl;
2876     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2877 
2878     nxt_unit_req_debug(req, "write: %d", (int) size);
2879 
2880     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2881 
2882     part_start = start;
2883     sent = 0;
2884 
2885     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2886         nxt_unit_req_alert(req, "write: response not initialized yet");
2887 
2888         return -NXT_UNIT_ERROR;
2889     }
2890 
2891     /* Check if response is not send yet. */
2892     if (nxt_slow_path(req->response_buf != NULL)) {
2893         part_size = req->response_buf->end - req->response_buf->free;
2894         part_size = nxt_min(size, part_size);
2895 
2896         rc = nxt_unit_response_add_content(req, part_start, part_size);
2897         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2898             return -rc;
2899         }
2900 
2901         rc = nxt_unit_response_send(req);
2902         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2903             return -rc;
2904         }
2905 
2906         size -= part_size;
2907         part_start += part_size;
2908         sent += part_size;
2909 
2910         min_size -= nxt_min(min_size, part_size);
2911     }
2912 
2913     while (size > 0) {
2914         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2915         min_part_size = nxt_min(min_size, part_size);
2916         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2917 
2918         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2919                                        min_part_size, &mmap_buf, local_buf);
2920         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2921             return -rc;
2922         }
2923 
2924         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2925         if (nxt_slow_path(buf_size == 0)) {
2926             return sent;
2927         }
2928         part_size = nxt_min(buf_size, part_size);
2929 
2930         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2931                                        part_start, part_size);
2932 
2933         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2934         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2935             return -rc;
2936         }
2937 
2938         size -= part_size;
2939         part_start += part_size;
2940         sent += part_size;
2941 
2942         min_size -= nxt_min(min_size, part_size);
2943     }
2944 
2945     return sent;
2946 }
2947 
2948 
2949 int
nxt_unit_response_write_cb(nxt_unit_request_info_t * req,nxt_unit_read_info_t * read_info)2950 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2951     nxt_unit_read_info_t *read_info)
2952 {
2953     int                           rc;
2954     ssize_t                       n;
2955     uint32_t                      buf_size;
2956     nxt_unit_buf_t                *buf;
2957     nxt_unit_mmap_buf_t           mmap_buf;
2958     nxt_unit_request_info_impl_t  *req_impl;
2959     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2960 
2961     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2962 
2963     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2964         nxt_unit_req_alert(req, "write: response not initialized yet");
2965 
2966         return NXT_UNIT_ERROR;
2967     }
2968 
2969     /* Check if response is not send yet. */
2970     if (nxt_slow_path(req->response_buf != NULL)) {
2971 
2972         /* Enable content in headers buf. */
2973         rc = nxt_unit_response_add_content(req, "", 0);
2974         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2975             nxt_unit_req_error(req, "Failed to add piggyback content");
2976 
2977             return rc;
2978         }
2979 
2980         buf = req->response_buf;
2981 
2982         while (buf->end - buf->free > 0) {
2983             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2984             if (nxt_slow_path(n < 0)) {
2985                 nxt_unit_req_error(req, "Read error");
2986 
2987                 return NXT_UNIT_ERROR;
2988             }
2989 
2990             /* Manually increase sizes. */
2991             buf->free += n;
2992             req->response->piggyback_content_length += n;
2993 
2994             if (read_info->eof) {
2995                 break;
2996             }
2997         }
2998 
2999         rc = nxt_unit_response_send(req);
3000         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3001             nxt_unit_req_error(req, "Failed to send headers with content");
3002 
3003             return rc;
3004         }
3005 
3006         if (read_info->eof) {
3007             return NXT_UNIT_OK;
3008         }
3009     }
3010 
3011     while (!read_info->eof) {
3012         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3013                            read_info->buf_size);
3014 
3015         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3016 
3017         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3018                                        buf_size, buf_size,
3019                                        &mmap_buf, local_buf);
3020         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3021             return rc;
3022         }
3023 
3024         buf = &mmap_buf.buf;
3025 
3026         while (!read_info->eof && buf->end > buf->free) {
3027             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3028             if (nxt_slow_path(n < 0)) {
3029                 nxt_unit_req_error(req, "Read error");
3030 
3031                 nxt_unit_free_outgoing_buf(&mmap_buf);
3032 
3033                 return NXT_UNIT_ERROR;
3034             }
3035 
3036             buf->free += n;
3037         }
3038 
3039         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3040         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3041             nxt_unit_req_error(req, "Failed to send content");
3042 
3043             return rc;
3044         }
3045     }
3046 
3047     return NXT_UNIT_OK;
3048 }
3049 
3050 
3051 ssize_t
nxt_unit_request_read(nxt_unit_request_info_t * req,void * dst,size_t size)3052 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3053 {
3054     ssize_t  buf_res, res;
3055 
3056     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3057                                 dst, size);
3058 
3059     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3060         res = read(req->content_fd, dst, size);
3061         if (nxt_slow_path(res < 0)) {
3062             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3063                                strerror(errno), errno);
3064 
3065             return res;
3066         }
3067 
3068         if (res < (ssize_t) size) {
3069             nxt_unit_close(req->content_fd);
3070 
3071             req->content_fd = -1;
3072         }
3073 
3074         req->content_length -= res;
3075 
3076         dst = nxt_pointer_to(dst, res);
3077 
3078     } else {
3079         res = 0;
3080     }
3081 
3082     return buf_res + res;
3083 }
3084 
3085 
3086 ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t * req,size_t max_size)3087 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3088 {
3089     char                 *p;
3090     size_t               l_size, b_size;
3091     nxt_unit_buf_t       *b;
3092     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3093 
3094     if (req->content_length == 0) {
3095         return 0;
3096     }
3097 
3098     l_size = 0;
3099 
3100     b = req->content_buf;
3101 
3102     while (b != NULL) {
3103         b_size = b->end - b->free;
3104         p = memchr(b->free, '\n', b_size);
3105 
3106         if (p != NULL) {
3107             p++;
3108             l_size += p - b->free;
3109             break;
3110         }
3111 
3112         l_size += b_size;
3113 
3114         if (max_size <= l_size) {
3115             break;
3116         }
3117 
3118