xref: /unit/src/nxt_unit.c (revision 2143:52dda4e05868)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "nxt_main.h"
7 #include "nxt_port_memory_int.h"
8 #include "nxt_socket_msg.h"
9 #include "nxt_port_queue.h"
10 #include "nxt_app_queue.h"
11 
12 #include "nxt_unit.h"
13 #include "nxt_unit_request.h"
14 #include "nxt_unit_response.h"
15 #include "nxt_unit_websocket.h"
16 
17 #include "nxt_websocket.h"
18 
19 #if (NXT_HAVE_MEMFD_CREATE)
20 #include <linux/memfd.h>
21 #endif
22 
23 #define NXT_UNIT_MAX_PLAIN_SIZE  1024
24 #define NXT_UNIT_LOCAL_BUF_SIZE  \
25     (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
26 
27 enum {
28     NXT_QUIT_NORMAL   = 0,
29     NXT_QUIT_GRACEFUL = 1,
30 };
31 
32 typedef struct nxt_unit_impl_s                  nxt_unit_impl_t;
33 typedef struct nxt_unit_mmap_s                  nxt_unit_mmap_t;
34 typedef struct nxt_unit_mmaps_s                 nxt_unit_mmaps_t;
35 typedef struct nxt_unit_process_s               nxt_unit_process_t;
36 typedef struct nxt_unit_mmap_buf_s              nxt_unit_mmap_buf_t;
37 typedef struct nxt_unit_recv_msg_s              nxt_unit_recv_msg_t;
38 typedef struct nxt_unit_read_buf_s              nxt_unit_read_buf_t;
39 typedef struct nxt_unit_ctx_impl_s              nxt_unit_ctx_impl_t;
40 typedef struct nxt_unit_port_impl_s             nxt_unit_port_impl_t;
41 typedef struct nxt_unit_request_info_impl_s     nxt_unit_request_info_impl_t;
42 typedef struct nxt_unit_websocket_frame_impl_s  nxt_unit_websocket_frame_impl_t;
43 
44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib,
46     nxt_unit_ctx_impl_t *ctx_impl, void *data);
47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx);
48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx);
49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib);
50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib);
51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
52     nxt_unit_mmap_buf_t *mmap_buf);
53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
54     nxt_unit_mmap_buf_t *mmap_buf);
55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
57     nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
58     int *shared_port_fd, int *shared_queue_fd,
59     int *log_fd, uint32_t *stream, uint32_t *shm_limit,
60     uint32_t *request_limit);
61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
62     int queue_fd);
63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
64     nxt_unit_request_info_t **preq);
65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
66     nxt_unit_recv_msg_t *recv_msg);
67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx);
68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
69     nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq);
70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx,
71     nxt_unit_recv_msg_t *recv_msg);
72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
73     nxt_unit_port_id_t *port_id);
74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req);
75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
76     nxt_unit_recv_msg_t *recv_msg);
77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
79     nxt_unit_ctx_t *ctx);
80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
83     nxt_unit_ctx_t *ctx);
84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
86     nxt_unit_websocket_frame_impl_t *ws);
87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
90     nxt_unit_mmap_buf_t *mmap_buf, int last);
91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf);
93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx);
94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl(
95     nxt_unit_ctx_impl_t *ctx_impl);
96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
97     nxt_unit_read_buf_t *rbuf);
98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread(
99     nxt_unit_request_info_t *req, size_t size);
100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
101     size_t size);
102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
103     nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n);
104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx);
106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
108     nxt_unit_port_t *port, int n);
109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size);
110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
111     int fd);
112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
113     nxt_unit_port_t *port, uint32_t size,
114     uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
116 
117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
118     nxt_unit_ctx_impl_t *ctx_impl);
119 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx,
124     nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id,
125     nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf);
126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
127     nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf);
128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id);
129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx,
130     nxt_port_mmap_header_t *hdr, void *start, uint32_t size);
131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid);
132 
133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid);
134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
135     pid_t pid, int remove);
136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf);
144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf);
145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf);
146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx,
147     nxt_unit_port_t *port);
148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
150 
151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
152     nxt_unit_port_t *port, int queue_fd);
153 
154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port);
155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port);
156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
157     nxt_unit_port_t *port, void *queue);
158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
159     nxt_queue_t *awaiting_req);
160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
161     nxt_unit_port_id_t *port_id);
162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
163     nxt_unit_port_id_t *port_id);
164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
166     nxt_unit_process_t *process);
167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
170     nxt_unit_port_t *port, const void *buf, size_t buf_size,
171     const nxt_send_oob_t *oob);
172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
173     const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
175     nxt_unit_read_buf_t *rbuf);
176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
177     nxt_unit_read_buf_t *src);
178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
179     nxt_unit_read_buf_t *rbuf);
180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
181     nxt_unit_read_buf_t *rbuf);
182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
183     nxt_unit_read_buf_t *rbuf);
184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
185     nxt_unit_read_buf_t *rbuf);
186 nxt_inline int nxt_unit_close(int fd);
187 static int nxt_unit_fd_blocking(int fd);
188 
189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
190     nxt_unit_port_t *port);
191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
192     nxt_unit_port_id_t *port_id, int remove);
193 
194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx,
195     nxt_unit_request_info_t *req);
196 static nxt_unit_request_info_t *nxt_unit_request_hash_find(
197     nxt_unit_ctx_t *ctx, uint32_t stream, int remove);
198 
199 static char * nxt_unit_snprint_prefix(char *p, const char *end, pid_t pid,
200     int level);
201 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size);
202 static void nxt_unit_lvlhsh_free(void *data, void *p);
203 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length);
204 
205 
206 struct nxt_unit_mmap_buf_s {
207     nxt_unit_buf_t           buf;
208 
209     nxt_unit_mmap_buf_t      *next;
210     nxt_unit_mmap_buf_t      **prev;
211 
212     nxt_port_mmap_header_t   *hdr;
213     nxt_unit_request_info_t  *req;
214     nxt_unit_ctx_impl_t      *ctx_impl;
215     char                     *free_ptr;
216     char                     *plain_ptr;
217 };
218 
219 
220 struct nxt_unit_recv_msg_s {
221     uint32_t                 stream;
222     nxt_pid_t                pid;
223     nxt_port_id_t            reply_port;
224 
225     uint8_t                  last;      /* 1 bit */
226     uint8_t                  mmap;      /* 1 bit */
227 
228     void                     *start;
229     uint32_t                 size;
230 
231     int                      fd[2];
232 
233     nxt_unit_mmap_buf_t      *incoming_buf;
234 };
235 
236 
237 typedef enum {
238     NXT_UNIT_RS_START           = 0,
239     NXT_UNIT_RS_RESPONSE_INIT,
240     NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
241     NXT_UNIT_RS_RESPONSE_SENT,
242     NXT_UNIT_RS_RELEASED,
243 } nxt_unit_req_state_t;
244 
245 
246 struct nxt_unit_request_info_impl_s {
247     nxt_unit_request_info_t  req;
248 
249     uint32_t                 stream;
250 
251     nxt_unit_mmap_buf_t      *outgoing_buf;
252     nxt_unit_mmap_buf_t      *incoming_buf;
253 
254     nxt_unit_req_state_t     state;
255     uint8_t                  websocket;
256     uint8_t                  in_hash;
257 
258     /*  for nxt_unit_ctx_impl_t.free_req or active_req */
259     nxt_queue_link_t         link;
260     /*  for nxt_unit_port_impl_t.awaiting_req */
261     nxt_queue_link_t         port_wait_link;
262 
263     char                     extra_data[];
264 };
265 
266 
267 struct nxt_unit_websocket_frame_impl_s {
268     nxt_unit_websocket_frame_t  ws;
269 
270     nxt_unit_mmap_buf_t         *buf;
271 
272     nxt_queue_link_t            link;
273 
274     nxt_unit_ctx_impl_t         *ctx_impl;
275 };
276 
277 
278 struct nxt_unit_read_buf_s {
279     nxt_queue_link_t              link;
280     nxt_unit_ctx_impl_t           *ctx_impl;
281     ssize_t                       size;
282     nxt_recv_oob_t                oob;
283     char                          buf[16384];
284 };
285 
286 
287 struct nxt_unit_ctx_impl_s {
288     nxt_unit_ctx_t                ctx;
289 
290     nxt_atomic_t                  use_count;
291     nxt_atomic_t                  wait_items;
292 
293     pthread_mutex_t               mutex;
294 
295     nxt_unit_port_t               *read_port;
296 
297     nxt_queue_link_t              link;
298 
299     nxt_unit_mmap_buf_t           *free_buf;
300 
301     /*  of nxt_unit_request_info_impl_t */
302     nxt_queue_t                   free_req;
303 
304     /*  of nxt_unit_websocket_frame_impl_t */
305     nxt_queue_t                   free_ws;
306 
307     /*  of nxt_unit_request_info_impl_t */
308     nxt_queue_t                   active_req;
309 
310     /*  of nxt_unit_request_info_impl_t */
311     nxt_lvlhsh_t                  requests;
312 
313     /*  of nxt_unit_request_info_impl_t */
314     nxt_queue_t                   ready_req;
315 
316     /*  of nxt_unit_read_buf_t */
317     nxt_queue_t                   pending_rbuf;
318 
319     /*  of nxt_unit_read_buf_t */
320     nxt_queue_t                   free_rbuf;
321 
322     uint8_t                       online;       /* 1 bit */
323     uint8_t                       ready;        /* 1 bit */
324     uint8_t                       quit_param;
325 
326     nxt_unit_mmap_buf_t           ctx_buf[2];
327     nxt_unit_read_buf_t           ctx_read_buf;
328 
329     nxt_unit_request_info_impl_t  req;
330 };
331 
332 
333 struct nxt_unit_mmap_s {
334     nxt_port_mmap_header_t   *hdr;
335     pthread_t                src_thread;
336 
337     /*  of nxt_unit_read_buf_t */
338     nxt_queue_t              awaiting_rbuf;
339 };
340 
341 
342 struct nxt_unit_mmaps_s {
343     pthread_mutex_t          mutex;
344     uint32_t                 size;
345     uint32_t                 cap;
346     nxt_atomic_t             allocated_chunks;
347     nxt_unit_mmap_t          *elts;
348 };
349 
350 
351 struct nxt_unit_impl_s {
352     nxt_unit_t               unit;
353     nxt_unit_callbacks_t     callbacks;
354 
355     nxt_atomic_t             use_count;
356     nxt_atomic_t             request_count;
357 
358     uint32_t                 request_data_size;
359     uint32_t                 shm_mmap_limit;
360     uint32_t                 request_limit;
361 
362     pthread_mutex_t          mutex;
363 
364     nxt_lvlhsh_t             processes;        /* of nxt_unit_process_t */
365     nxt_lvlhsh_t             ports;            /* of nxt_unit_port_impl_t */
366 
367     nxt_unit_port_t          *router_port;
368     nxt_unit_port_t          *shared_port;
369 
370     nxt_queue_t              contexts;         /* of nxt_unit_ctx_impl_t */
371 
372     nxt_unit_mmaps_t         incoming;
373     nxt_unit_mmaps_t         outgoing;
374 
375     pid_t                    pid;
376     int                      log_fd;
377 
378     nxt_unit_ctx_impl_t      main_ctx;
379 };
380 
381 
382 struct nxt_unit_port_impl_s {
383     nxt_unit_port_t          port;
384 
385     nxt_atomic_t             use_count;
386 
387     /*  for nxt_unit_process_t.ports */
388     nxt_queue_link_t         link;
389     nxt_unit_process_t       *process;
390 
391     /*  of nxt_unit_request_info_impl_t */
392     nxt_queue_t              awaiting_req;
393 
394     int                      ready;
395 
396     void                     *queue;
397 
398     int                      from_socket;
399     nxt_unit_read_buf_t      *socket_rbuf;
400 };
401 
402 
403 struct nxt_unit_process_s {
404     pid_t                    pid;
405 
406     nxt_queue_t              ports;            /* of nxt_unit_port_impl_t */
407 
408     nxt_unit_impl_t          *lib;
409 
410     nxt_atomic_t             use_count;
411 
412     uint32_t                 next_port_id;
413 };
414 
415 
416 /* Explicitly using 32 bit types to avoid possible alignment. */
417 typedef struct {
418     int32_t   pid;
419     uint32_t  id;
420 } nxt_unit_port_hash_id_t;
421 
422 
423 static pid_t  nxt_unit_pid;
424 
425 
426 nxt_unit_ctx_t *
nxt_unit_init(nxt_unit_init_t * init)427 nxt_unit_init(nxt_unit_init_t *init)
428 {
429     int              rc, queue_fd, shared_queue_fd;
430     void             *mem;
431     uint32_t         ready_stream, shm_limit, request_limit;
432     nxt_unit_ctx_t   *ctx;
433     nxt_unit_impl_t  *lib;
434     nxt_unit_port_t  ready_port, router_port, read_port, shared_port;
435 
436     nxt_unit_pid = getpid();
437 
438     lib = nxt_unit_create(init);
439     if (nxt_slow_path(lib == NULL)) {
440         return NULL;
441     }
442 
443     queue_fd = -1;
444     mem = MAP_FAILED;
445     shared_port.out_fd = -1;
446     shared_port.data = NULL;
447 
448     if (init->ready_port.id.pid != 0
449         && init->ready_stream != 0
450         && init->read_port.id.pid != 0)
451     {
452         ready_port = init->ready_port;
453         ready_stream = init->ready_stream;
454         router_port = init->router_port;
455         read_port = init->read_port;
456         lib->log_fd = init->log_fd;
457 
458         nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid,
459                               ready_port.id.id);
460         nxt_unit_port_id_init(&router_port.id, router_port.id.pid,
461                               router_port.id.id);
462         nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
463                               read_port.id.id);
464 
465         shared_port.in_fd = init->shared_port_fd;
466         shared_queue_fd = init->shared_queue_fd;
467 
468     } else {
469         rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
470                                &shared_port.in_fd, &shared_queue_fd,
471                                &lib->log_fd, &ready_stream, &shm_limit,
472                                &request_limit);
473         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
474             goto fail;
475         }
476 
477         lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
478                                 / PORT_MMAP_DATA_SIZE;
479         lib->request_limit = request_limit;
480     }
481 
482     if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
483         lib->shm_mmap_limit = 1;
484     }
485 
486     lib->pid = read_port.id.pid;
487     nxt_unit_pid = lib->pid;
488 
489     ctx = &lib->main_ctx.ctx;
490 
491     rc = nxt_unit_fd_blocking(router_port.out_fd);
492     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
493         goto fail;
494     }
495 
496     lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
497     if (nxt_slow_path(lib->router_port == NULL)) {
498         nxt_unit_alert(NULL, "failed to add router_port");
499 
500         goto fail;
501     }
502 
503     queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
504     if (nxt_slow_path(queue_fd == -1)) {
505         goto fail;
506     }
507 
508     mem = mmap(NULL, sizeof(nxt_port_queue_t),
509                PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0);
510     if (nxt_slow_path(mem == MAP_FAILED)) {
511         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd,
512                        strerror(errno), errno);
513 
514         goto fail;
515     }
516 
517     nxt_port_queue_init(mem);
518 
519     rc = nxt_unit_fd_blocking(read_port.in_fd);
520     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
521         goto fail;
522     }
523 
524     lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
525     if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
526         nxt_unit_alert(NULL, "failed to add read_port");
527 
528         goto fail;
529     }
530 
531     rc = nxt_unit_fd_blocking(ready_port.out_fd);
532     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
533         goto fail;
534     }
535 
536     nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
537                           NXT_UNIT_SHARED_PORT_ID);
538 
539     mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
540                MAP_SHARED, shared_queue_fd, 0);
541     if (nxt_slow_path(mem == MAP_FAILED)) {
542         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
543                        strerror(errno), errno);
544 
545         goto fail;
546     }
547 
548     nxt_unit_close(shared_queue_fd);
549 
550     lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
551     if (nxt_slow_path(lib->shared_port == NULL)) {
552         nxt_unit_alert(NULL, "failed to add shared_port");
553 
554         goto fail;
555     }
556 
557     rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
558     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
559         nxt_unit_alert(NULL, "failed to send READY message");
560 
561         goto fail;
562     }
563 
564     nxt_unit_close(ready_port.out_fd);
565     nxt_unit_close(queue_fd);
566 
567     return ctx;
568 
569 fail:
570 
571     if (mem != MAP_FAILED) {
572         munmap(mem, sizeof(nxt_port_queue_t));
573     }
574 
575     if (queue_fd != -1) {
576         nxt_unit_close(queue_fd);
577     }
578 
579     nxt_unit_ctx_release(&lib->main_ctx.ctx);
580 
581     return NULL;
582 }
583 
584 
585 static nxt_unit_impl_t *
nxt_unit_create(nxt_unit_init_t * init)586 nxt_unit_create(nxt_unit_init_t *init)
587 {
588     int                   rc;
589     nxt_unit_impl_t       *lib;
590     nxt_unit_callbacks_t  *cb;
591 
592     lib = nxt_unit_malloc(NULL,
593                           sizeof(nxt_unit_impl_t) + init->request_data_size);
594     if (nxt_slow_path(lib == NULL)) {
595         nxt_unit_alert(NULL, "failed to allocate unit struct");
596 
597         return NULL;
598     }
599 
600     rc = pthread_mutex_init(&lib->mutex, NULL);
601     if (nxt_slow_path(rc != 0)) {
602         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
603 
604         goto fail;
605     }
606 
607     lib->unit.data = init->data;
608     lib->callbacks = init->callbacks;
609 
610     lib->request_data_size = init->request_data_size;
611     lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
612                             / PORT_MMAP_DATA_SIZE;
613     lib->request_limit = init->request_limit;
614 
615     lib->processes.slot = NULL;
616     lib->ports.slot = NULL;
617 
618     lib->log_fd = STDERR_FILENO;
619 
620     nxt_queue_init(&lib->contexts);
621 
622     lib->use_count = 0;
623     lib->request_count = 0;
624     lib->router_port = NULL;
625     lib->shared_port = NULL;
626 
627     rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data);
628     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
629         pthread_mutex_destroy(&lib->mutex);
630         goto fail;
631     }
632 
633     cb = &lib->callbacks;
634 
635     if (cb->request_handler == NULL) {
636         nxt_unit_alert(NULL, "request_handler is NULL");
637 
638         pthread_mutex_destroy(&lib->mutex);
639         goto fail;
640     }
641 
642     nxt_unit_mmaps_init(&lib->incoming);
643     nxt_unit_mmaps_init(&lib->outgoing);
644 
645     return lib;
646 
647 fail:
648 
649     nxt_unit_free(NULL, lib);
650 
651     return NULL;
652 }
653 
654 
655 static int
nxt_unit_ctx_init(nxt_unit_impl_t * lib,nxt_unit_ctx_impl_t * ctx_impl,void * data)656 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
657     void *data)
658 {
659     int  rc;
660 
661     ctx_impl->ctx.data = data;
662     ctx_impl->ctx.unit = &lib->unit;
663 
664     rc = pthread_mutex_init(&ctx_impl->mutex, NULL);
665     if (nxt_slow_path(rc != 0)) {
666         nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc);
667 
668         return NXT_UNIT_ERROR;
669     }
670 
671     nxt_unit_lib_use(lib);
672 
673     pthread_mutex_lock(&lib->mutex);
674 
675     nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
676 
677     pthread_mutex_unlock(&lib->mutex);
678 
679     ctx_impl->use_count = 1;
680     ctx_impl->wait_items = 0;
681     ctx_impl->online = 1;
682     ctx_impl->ready = 0;
683     ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
684 
685     nxt_queue_init(&ctx_impl->free_req);
686     nxt_queue_init(&ctx_impl->free_ws);
687     nxt_queue_init(&ctx_impl->active_req);
688     nxt_queue_init(&ctx_impl->ready_req);
689     nxt_queue_init(&ctx_impl->pending_rbuf);
690     nxt_queue_init(&ctx_impl->free_rbuf);
691 
692     ctx_impl->free_buf = NULL;
693     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
694     nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
695 
696     nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
697     nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link);
698 
699     ctx_impl->ctx_read_buf.ctx_impl = ctx_impl;
700 
701     ctx_impl->req.req.ctx = &ctx_impl->ctx;
702     ctx_impl->req.req.unit = &lib->unit;
703 
704     ctx_impl->read_port = NULL;
705     ctx_impl->requests.slot = 0;
706 
707     return NXT_UNIT_OK;
708 }
709 
710 
711 nxt_inline void
nxt_unit_ctx_use(nxt_unit_ctx_t * ctx)712 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx)
713 {
714     nxt_unit_ctx_impl_t  *ctx_impl;
715 
716     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
717 
718     nxt_atomic_fetch_add(&ctx_impl->use_count, 1);
719 }
720 
721 
722 nxt_inline void
nxt_unit_ctx_release(nxt_unit_ctx_t * ctx)723 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx)
724 {
725     long                 c;
726     nxt_unit_ctx_impl_t  *ctx_impl;
727 
728     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
729 
730     c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1);
731 
732     if (c == 1) {
733         nxt_unit_ctx_free(ctx_impl);
734     }
735 }
736 
737 
738 nxt_inline void
nxt_unit_lib_use(nxt_unit_impl_t * lib)739 nxt_unit_lib_use(nxt_unit_impl_t *lib)
740 {
741     nxt_atomic_fetch_add(&lib->use_count, 1);
742 }
743 
744 
745 nxt_inline void
nxt_unit_lib_release(nxt_unit_impl_t * lib)746 nxt_unit_lib_release(nxt_unit_impl_t *lib)
747 {
748     long                c;
749     nxt_unit_process_t  *process;
750 
751     c = nxt_atomic_fetch_add(&lib->use_count, -1);
752 
753     if (c == 1) {
754         for ( ;; ) {
755             pthread_mutex_lock(&lib->mutex);
756 
757             process = nxt_unit_process_pop_first(lib);
758             if (process == NULL) {
759                 pthread_mutex_unlock(&lib->mutex);
760 
761                 break;
762             }
763 
764             nxt_unit_remove_process(lib, process);
765         }
766 
767         pthread_mutex_destroy(&lib->mutex);
768 
769         if (nxt_fast_path(lib->router_port != NULL)) {
770             nxt_unit_port_release(lib->router_port);
771         }
772 
773         if (nxt_fast_path(lib->shared_port != NULL)) {
774             nxt_unit_port_release(lib->shared_port);
775         }
776 
777         nxt_unit_mmaps_destroy(&lib->incoming);
778         nxt_unit_mmaps_destroy(&lib->outgoing);
779 
780         nxt_unit_free(NULL, lib);
781     }
782 }
783 
784 
785 nxt_inline void
nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t ** head,nxt_unit_mmap_buf_t * mmap_buf)786 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
787     nxt_unit_mmap_buf_t *mmap_buf)
788 {
789     mmap_buf->next = *head;
790 
791     if (mmap_buf->next != NULL) {
792         mmap_buf->next->prev = &mmap_buf->next;
793     }
794 
795     *head = mmap_buf;
796     mmap_buf->prev = head;
797 }
798 
799 
800 nxt_inline void
nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t ** prev,nxt_unit_mmap_buf_t * mmap_buf)801 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
802     nxt_unit_mmap_buf_t *mmap_buf)
803 {
804     while (*prev != NULL) {
805         prev = &(*prev)->next;
806     }
807 
808     nxt_unit_mmap_buf_insert(prev, mmap_buf);
809 }
810 
811 
812 nxt_inline void
nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t * mmap_buf)813 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
814 {
815     nxt_unit_mmap_buf_t  **prev;
816 
817     prev = mmap_buf->prev;
818 
819     if (mmap_buf->next != NULL) {
820         mmap_buf->next->prev = prev;
821     }
822 
823     if (prev != NULL) {
824         *prev = mmap_buf->next;
825     }
826 }
827 
828 
829 static int
nxt_unit_read_env(nxt_unit_port_t * ready_port,nxt_unit_port_t * router_port,nxt_unit_port_t * read_port,int * shared_port_fd,int * shared_queue_fd,int * log_fd,uint32_t * stream,uint32_t * shm_limit,uint32_t * request_limit)830 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
831     nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
832     int *log_fd, uint32_t *stream,
833     uint32_t *shm_limit, uint32_t *request_limit)
834 {
835     int       rc;
836     int       ready_fd, router_fd, read_in_fd, read_out_fd;
837     char      *unit_init, *version_end, *vars;
838     size_t    version_length;
839     int64_t   ready_pid, router_pid, read_pid;
840     uint32_t  ready_stream, router_id, ready_id, read_id;
841 
842     unit_init = getenv(NXT_UNIT_INIT_ENV);
843     if (nxt_slow_path(unit_init == NULL)) {
844         nxt_unit_alert(NULL, "%s is not in the current environment",
845                        NXT_UNIT_INIT_ENV);
846 
847         return NXT_UNIT_ERROR;
848     }
849 
850     version_end = strchr(unit_init, ';');
851     if (nxt_slow_path(version_end == NULL)) {
852         nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"",
853                        NXT_UNIT_INIT_ENV, unit_init);
854 
855         return NXT_UNIT_ERROR;
856     }
857 
858     version_length = version_end - unit_init;
859 
860     rc = version_length != nxt_length(NXT_VERSION)
861          || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION));
862 
863     if (nxt_slow_path(rc != 0)) {
864         nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version "
865                        "%.*s, while the app was compiled with libunit %s",
866                        (int) version_length, unit_init, NXT_VERSION);
867 
868         return NXT_UNIT_ERROR;
869     }
870 
871     vars = version_end + 1;
872 
873     rc = sscanf(vars,
874                 "%"PRIu32";"
875                 "%"PRId64",%"PRIu32",%d;"
876                 "%"PRId64",%"PRIu32",%d;"
877                 "%"PRId64",%"PRIu32",%d,%d;"
878                 "%d,%d;"
879                 "%d,%"PRIu32",%"PRIu32,
880                 &ready_stream,
881                 &ready_pid, &ready_id, &ready_fd,
882                 &router_pid, &router_id, &router_fd,
883                 &read_pid, &read_id, &read_in_fd, &read_out_fd,
884                 shared_port_fd, shared_queue_fd,
885                 log_fd, shm_limit, request_limit);
886 
887     if (nxt_slow_path(rc == EOF)) {
888         nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
889                        vars, strerror(errno), errno, NXT_UNIT_INIT_ENV);
890 
891         return NXT_UNIT_ERROR;
892     }
893 
894     if (nxt_slow_path(rc != 16)) {
895         nxt_unit_alert(NULL, "invalid number of variables in %s env: "
896                        "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
897 
898         return NXT_UNIT_ERROR;
899     }
900 
901     nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init);
902 
903     nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id);
904 
905     ready_port->in_fd = -1;
906     ready_port->out_fd = ready_fd;
907     ready_port->data = NULL;
908 
909     nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id);
910 
911     router_port->in_fd = -1;
912     router_port->out_fd = router_fd;
913     router_port->data = NULL;
914 
915     nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id);
916 
917     read_port->in_fd = read_in_fd;
918     read_port->out_fd = read_out_fd;
919     read_port->data = NULL;
920 
921     *stream = ready_stream;
922 
923     return NXT_UNIT_OK;
924 }
925 
926 
927 static int
nxt_unit_ready(nxt_unit_ctx_t * ctx,int ready_fd,uint32_t stream,int queue_fd)928 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
929 {
930     ssize_t          res;
931     nxt_send_oob_t   oob;
932     nxt_port_msg_t   msg;
933     nxt_unit_impl_t  *lib;
934     int              fds[2] = {queue_fd, -1};
935 
936     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
937 
938     msg.stream = stream;
939     msg.pid = lib->pid;
940     msg.reply_port = 0;
941     msg.type = _NXT_PORT_MSG_PROCESS_READY;
942     msg.last = 1;
943     msg.mmap = 0;
944     msg.nf = 0;
945     msg.mf = 0;
946 
947     nxt_socket_msg_oob_init(&oob, fds);
948 
949     res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
950     if (res != sizeof(msg)) {
951         return NXT_UNIT_ERROR;
952     }
953 
954     return NXT_UNIT_OK;
955 }
956 
957 
958 static int
nxt_unit_process_msg(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf,nxt_unit_request_info_t ** preq)959 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
960     nxt_unit_request_info_t **preq)
961 {
962     int                  rc;
963     pid_t                pid;
964     uint8_t              quit_param;
965     nxt_port_msg_t       *port_msg;
966     nxt_unit_impl_t      *lib;
967     nxt_unit_recv_msg_t  recv_msg;
968 
969     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
970 
971     recv_msg.incoming_buf = NULL;
972     recv_msg.fd[0] = -1;
973     recv_msg.fd[1] = -1;
974 
975     rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
976     if (nxt_slow_path(rc != NXT_OK)) {
977         nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
978         rc = NXT_UNIT_ERROR;
979         goto done;
980     }
981 
982     if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) {
983         if (nxt_slow_path(rbuf->size == 0)) {
984             nxt_unit_debug(ctx, "read port closed");
985 
986             nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
987             rc = NXT_UNIT_OK;
988             goto done;
989         }
990 
991         nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size);
992 
993         rc = NXT_UNIT_ERROR;
994         goto done;
995     }
996 
997     port_msg = (nxt_port_msg_t *) rbuf->buf;
998 
999     nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
1000                    port_msg->stream, (int) port_msg->type,
1001                    recv_msg.fd[0], recv_msg.fd[1]);
1002 
1003     recv_msg.stream = port_msg->stream;
1004     recv_msg.pid = port_msg->pid;
1005     recv_msg.reply_port = port_msg->reply_port;
1006     recv_msg.last = port_msg->last;
1007     recv_msg.mmap = port_msg->mmap;
1008 
1009     recv_msg.start = port_msg + 1;
1010     recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t);
1011 
1012     if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
1013         nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)",
1014                        port_msg->stream, (int) port_msg->type);
1015         rc = NXT_UNIT_ERROR;
1016         goto done;
1017     }
1018 
1019     /* Fragmentation is unsupported. */
1020     if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
1021         nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)",
1022                        port_msg->stream, (int) port_msg->type);
1023         rc = NXT_UNIT_ERROR;
1024         goto done;
1025     }
1026 
1027     if (port_msg->mmap) {
1028         rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf);
1029 
1030         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1031             if (rc == NXT_UNIT_AGAIN) {
1032                 recv_msg.fd[0] = -1;
1033                 recv_msg.fd[1] = -1;
1034             }
1035 
1036             goto done;
1037         }
1038     }
1039 
1040     switch (port_msg->type) {
1041 
1042     case _NXT_PORT_MSG_RPC_READY:
1043         rc = NXT_UNIT_OK;
1044         break;
1045 
1046     case _NXT_PORT_MSG_QUIT:
1047         if (recv_msg.size == sizeof(quit_param)) {
1048             memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
1049 
1050         } else {
1051             quit_param = NXT_QUIT_NORMAL;
1052         }
1053 
1054         nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
1055                        (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
1056 
1057         nxt_unit_quit(ctx, quit_param);
1058 
1059         rc = NXT_UNIT_OK;
1060         break;
1061 
1062     case _NXT_PORT_MSG_NEW_PORT:
1063         rc = nxt_unit_process_new_port(ctx, &recv_msg);
1064         break;
1065 
1066     case _NXT_PORT_MSG_PORT_ACK:
1067         rc = nxt_unit_ctx_ready(ctx);
1068         break;
1069 
1070     case _NXT_PORT_MSG_CHANGE_FILE:
1071         nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
1072                        port_msg->stream, recv_msg.fd[0]);
1073 
1074         if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
1075             nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
1076                            port_msg->stream, recv_msg.fd[0], lib->log_fd,
1077                            strerror(errno), errno);
1078 
1079             rc = NXT_UNIT_ERROR;
1080             goto done;
1081         }
1082 
1083         rc = NXT_UNIT_OK;
1084         break;
1085 
1086     case _NXT_PORT_MSG_MMAP:
1087         if (nxt_slow_path(recv_msg.fd[0] < 0)) {
1088             nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
1089                            port_msg->stream, recv_msg.fd[0]);
1090 
1091             rc = NXT_UNIT_ERROR;
1092             goto done;
1093         }
1094 
1095         rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
1096         break;
1097 
1098     case _NXT_PORT_MSG_REQ_HEADERS:
1099         rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq);
1100         break;
1101 
1102     case _NXT_PORT_MSG_REQ_BODY:
1103         rc = nxt_unit_process_req_body(ctx, &recv_msg);
1104         break;
1105 
1106     case _NXT_PORT_MSG_WEBSOCKET:
1107         rc = nxt_unit_process_websocket(ctx, &recv_msg);
1108         break;
1109 
1110     case _NXT_PORT_MSG_REMOVE_PID:
1111         if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
1112             nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size "
1113                            "(%d != %d)", port_msg->stream, (int) recv_msg.size,
1114                            (int) sizeof(pid));
1115 
1116             rc = NXT_UNIT_ERROR;
1117             goto done;
1118         }
1119 
1120         memcpy(&pid, recv_msg.start, sizeof(pid));
1121 
1122         nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
1123                        port_msg->stream, (int) pid);
1124 
1125         nxt_unit_remove_pid(lib, pid);
1126 
1127         rc = NXT_UNIT_OK;
1128         break;
1129 
1130     case _NXT_PORT_MSG_SHM_ACK:
1131         rc = nxt_unit_process_shm_ack(ctx);
1132         break;
1133 
1134     default:
1135         nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
1136                        port_msg->stream, (int) port_msg->type);
1137 
1138         rc = NXT_UNIT_ERROR;
1139         goto done;
1140     }
1141 
1142 done:
1143 
1144     if (recv_msg.fd[0] != -1) {
1145         nxt_unit_close(recv_msg.fd[0]);
1146     }
1147 
1148     if (recv_msg.fd[1] != -1) {
1149         nxt_unit_close(recv_msg.fd[1]);
1150     }
1151 
1152     while (recv_msg.incoming_buf != NULL) {
1153         nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
1154     }
1155 
1156     if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) {
1157 #if (NXT_DEBUG)
1158         memset(rbuf->buf, 0xAC, rbuf->size);
1159 #endif
1160         nxt_unit_read_buf_release(ctx, rbuf);
1161     }
1162 
1163     return rc;
1164 }
1165 
1166 
1167 static int
nxt_unit_process_new_port(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1168 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1169 {
1170     void                     *mem;
1171     nxt_unit_port_t          new_port, *port;
1172     nxt_port_msg_new_port_t  *new_port_msg;
1173 
1174     if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
1175         nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
1176                       "invalid message size (%d)",
1177                       recv_msg->stream, (int) recv_msg->size);
1178 
1179         return NXT_UNIT_ERROR;
1180     }
1181 
1182     if (nxt_slow_path(recv_msg->fd[0] < 0)) {
1183         nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
1184                        recv_msg->stream, recv_msg->fd[0]);
1185 
1186         return NXT_UNIT_ERROR;
1187     }
1188 
1189     new_port_msg = recv_msg->start;
1190 
1191     nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
1192                    recv_msg->stream, (int) new_port_msg->pid,
1193                    (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
1194 
1195     if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
1196         return NXT_UNIT_ERROR;
1197     }
1198 
1199     nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
1200 
1201     new_port.in_fd = -1;
1202     new_port.out_fd = recv_msg->fd[0];
1203 
1204     mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
1205                MAP_SHARED, recv_msg->fd[1], 0);
1206 
1207     if (nxt_slow_path(mem == MAP_FAILED)) {
1208         nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
1209                        strerror(errno), errno);
1210 
1211         return NXT_UNIT_ERROR;
1212     }
1213 
1214     new_port.data = NULL;
1215 
1216     recv_msg->fd[0] = -1;
1217 
1218     port = nxt_unit_add_port(ctx, &new_port, mem);
1219     if (nxt_slow_path(port == NULL)) {
1220         return NXT_UNIT_ERROR;
1221     }
1222 
1223     nxt_unit_port_release(port);
1224 
1225     return NXT_UNIT_OK;
1226 }
1227 
1228 
1229 static int
nxt_unit_ctx_ready(nxt_unit_ctx_t * ctx)1230 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
1231 {
1232     nxt_unit_impl_t      *lib;
1233     nxt_unit_ctx_impl_t  *ctx_impl;
1234 
1235     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1236 
1237     if (nxt_slow_path(ctx_impl->ready)) {
1238         return NXT_UNIT_OK;
1239     }
1240 
1241     ctx_impl->ready = 1;
1242 
1243     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1244 
1245     /* Call ready_handler() only for main context. */
1246     if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
1247         return lib->callbacks.ready_handler(ctx);
1248     }
1249 
1250     if (&lib->main_ctx != ctx_impl) {
1251         /* Check if the main context is already stopped or quit. */
1252         if (nxt_slow_path(!lib->main_ctx.ready)) {
1253             ctx_impl->ready = 0;
1254 
1255             nxt_unit_quit(ctx, lib->main_ctx.quit_param);
1256 
1257             return NXT_UNIT_OK;
1258         }
1259 
1260         if (lib->callbacks.add_port != NULL) {
1261             lib->callbacks.add_port(ctx, lib->shared_port);
1262         }
1263     }
1264 
1265     return NXT_UNIT_OK;
1266 }
1267 
1268 
1269 static int
nxt_unit_process_req_headers(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg,nxt_unit_request_info_t ** preq)1270 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
1271     nxt_unit_request_info_t **preq)
1272 {
1273     int                           res;
1274     nxt_unit_impl_t               *lib;
1275     nxt_unit_port_id_t            port_id;
1276     nxt_unit_request_t            *r;
1277     nxt_unit_mmap_buf_t           *b;
1278     nxt_unit_request_info_t       *req;
1279     nxt_unit_request_info_impl_t  *req_impl;
1280 
1281     if (nxt_slow_path(recv_msg->mmap == 0)) {
1282         nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
1283                       recv_msg->stream);
1284 
1285         return NXT_UNIT_ERROR;
1286     }
1287 
1288     if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
1289         nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
1290                       "%d expected", recv_msg->stream, (int) recv_msg->size,
1291                       (int) sizeof(nxt_unit_request_t));
1292 
1293         return NXT_UNIT_ERROR;
1294     }
1295 
1296     req_impl = nxt_unit_request_info_get(ctx);
1297     if (nxt_slow_path(req_impl == NULL)) {
1298         nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
1299                       recv_msg->stream);
1300 
1301         return NXT_UNIT_ERROR;
1302     }
1303 
1304     req = &req_impl->req;
1305 
1306     req->request = recv_msg->start;
1307 
1308     b = recv_msg->incoming_buf;
1309 
1310     req->request_buf = &b->buf;
1311     req->response = NULL;
1312     req->response_buf = NULL;
1313 
1314     r = req->request;
1315 
1316     req->content_length = r->content_length;
1317 
1318     req->content_buf = req->request_buf;
1319     req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
1320 
1321     req_impl->stream = recv_msg->stream;
1322 
1323     req_impl->outgoing_buf = NULL;
1324 
1325     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1326         b->req = req;
1327     }
1328 
1329     /* "Move" incoming buffer list to req_impl. */
1330     req_impl->incoming_buf = recv_msg->incoming_buf;
1331     req_impl->incoming_buf->prev = &req_impl->incoming_buf;
1332     recv_msg->incoming_buf = NULL;
1333 
1334     req->content_fd = recv_msg->fd[0];
1335     recv_msg->fd[0] = -1;
1336 
1337     req->response_max_fields = 0;
1338     req_impl->state = NXT_UNIT_RS_START;
1339     req_impl->websocket = 0;
1340     req_impl->in_hash = 0;
1341 
1342     nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
1343                    (int) r->method_length,
1344                    (char *) nxt_unit_sptr_get(&r->method),
1345                    (int) r->target_length,
1346                    (char *) nxt_unit_sptr_get(&r->target),
1347                    (int) r->content_length);
1348 
1349     nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
1350 
1351     res = nxt_unit_request_check_response_port(req, &port_id);
1352     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1353         return NXT_UNIT_ERROR;
1354     }
1355 
1356     if (nxt_fast_path(res == NXT_UNIT_OK)) {
1357         res = nxt_unit_send_req_headers_ack(req);
1358         if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1359             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1360 
1361             return NXT_UNIT_ERROR;
1362         }
1363 
1364         lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1365 
1366         if (req->content_length
1367             > (uint64_t) (req->content_buf->end - req->content_buf->free))
1368         {
1369             res = nxt_unit_request_hash_add(ctx, req);
1370             if (nxt_slow_path(res != NXT_UNIT_OK)) {
1371                 nxt_unit_req_warn(req, "failed to add request to hash");
1372 
1373                 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1374 
1375                 return NXT_UNIT_ERROR;
1376             }
1377 
1378             /*
1379              * If application have separate data handler, we may start
1380              * request processing and process data when it is arrived.
1381              */
1382             if (lib->callbacks.data_handler == NULL) {
1383                 return NXT_UNIT_OK;
1384             }
1385         }
1386 
1387         if (preq == NULL) {
1388             lib->callbacks.request_handler(req);
1389 
1390         } else {
1391             *preq = req;
1392         }
1393     }
1394 
1395     return NXT_UNIT_OK;
1396 }
1397 
1398 
1399 static int
nxt_unit_process_req_body(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1400 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1401 {
1402     uint64_t                 l;
1403     nxt_unit_impl_t          *lib;
1404     nxt_unit_mmap_buf_t      *b;
1405     nxt_unit_request_info_t  *req;
1406 
1407     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1408     if (req == NULL) {
1409         return NXT_UNIT_OK;
1410     }
1411 
1412     l = req->content_buf->end - req->content_buf->free;
1413 
1414     for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1415         b->req = req;
1416         l += b->buf.end - b->buf.free;
1417     }
1418 
1419     if (recv_msg->incoming_buf != NULL) {
1420         b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
1421 
1422         while (b->next != NULL) {
1423             b = b->next;
1424         }
1425 
1426         /* "Move" incoming buffer list to req_impl. */
1427         b->next = recv_msg->incoming_buf;
1428         b->next->prev = &b->next;
1429 
1430         recv_msg->incoming_buf = NULL;
1431     }
1432 
1433     req->content_fd = recv_msg->fd[0];
1434     recv_msg->fd[0] = -1;
1435 
1436     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1437 
1438     if (lib->callbacks.data_handler != NULL) {
1439         lib->callbacks.data_handler(req);
1440 
1441         return NXT_UNIT_OK;
1442     }
1443 
1444     if (req->content_fd != -1 || l == req->content_length) {
1445         lib->callbacks.request_handler(req);
1446     }
1447 
1448     return NXT_UNIT_OK;
1449 }
1450 
1451 
1452 static int
nxt_unit_request_check_response_port(nxt_unit_request_info_t * req,nxt_unit_port_id_t * port_id)1453 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
1454     nxt_unit_port_id_t *port_id)
1455 {
1456     int                           res;
1457     nxt_unit_ctx_t                *ctx;
1458     nxt_unit_impl_t               *lib;
1459     nxt_unit_port_t               *port;
1460     nxt_unit_process_t            *process;
1461     nxt_unit_ctx_impl_t           *ctx_impl;
1462     nxt_unit_port_impl_t          *port_impl;
1463     nxt_unit_request_info_impl_t  *req_impl;
1464 
1465     ctx = req->ctx;
1466     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1467     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1468 
1469     pthread_mutex_lock(&lib->mutex);
1470 
1471     port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
1472     port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
1473 
1474     if (nxt_fast_path(port != NULL)) {
1475         req->response_port = port;
1476 
1477         if (nxt_fast_path(port_impl->ready)) {
1478             pthread_mutex_unlock(&lib->mutex);
1479 
1480             nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
1481                            (int) port->id.pid, (int) port->id.id);
1482 
1483             return NXT_UNIT_OK;
1484         }
1485 
1486         nxt_unit_debug(ctx, "check_response_port: "
1487                        "port{%d,%d} already requested",
1488                        (int) port->id.pid, (int) port->id.id);
1489 
1490         req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1491 
1492         nxt_queue_insert_tail(&port_impl->awaiting_req,
1493                               &req_impl->port_wait_link);
1494 
1495         pthread_mutex_unlock(&lib->mutex);
1496 
1497         nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1498 
1499         return NXT_UNIT_AGAIN;
1500     }
1501 
1502     port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t));
1503     if (nxt_slow_path(port_impl == NULL)) {
1504         nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
1505                        (int) sizeof(nxt_unit_port_impl_t));
1506 
1507         pthread_mutex_unlock(&lib->mutex);
1508 
1509         return NXT_UNIT_ERROR;
1510     }
1511 
1512     port = &port_impl->port;
1513 
1514     port->id = *port_id;
1515     port->in_fd = -1;
1516     port->out_fd = -1;
1517     port->data = NULL;
1518 
1519     res = nxt_unit_port_hash_add(&lib->ports, port);
1520     if (nxt_slow_path(res != NXT_UNIT_OK)) {
1521         nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
1522                        port->id.pid, port->id.id);
1523 
1524         pthread_mutex_unlock(&lib->mutex);
1525 
1526         nxt_unit_free(ctx, port);
1527 
1528         return NXT_UNIT_ERROR;
1529     }
1530 
1531     process = nxt_unit_process_find(lib, port_id->pid, 0);
1532     if (nxt_slow_path(process == NULL)) {
1533         nxt_unit_alert(ctx, "check_response_port: process %d not found",
1534                        port->id.pid);
1535 
1536         nxt_unit_port_hash_find(&lib->ports, port_id, 1);
1537 
1538         pthread_mutex_unlock(&lib->mutex);
1539 
1540         nxt_unit_free(ctx, port);
1541 
1542         return NXT_UNIT_ERROR;
1543     }
1544 
1545     nxt_queue_insert_tail(&process->ports, &port_impl->link);
1546 
1547     port_impl->process = process;
1548     port_impl->queue = NULL;
1549     port_impl->from_socket = 0;
1550     port_impl->socket_rbuf = NULL;
1551 
1552     nxt_queue_init(&port_impl->awaiting_req);
1553 
1554     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1555 
1556     nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
1557 
1558     port_impl->use_count = 2;
1559     port_impl->ready = 0;
1560 
1561     req->response_port = port;
1562 
1563     pthread_mutex_unlock(&lib->mutex);
1564 
1565     res = nxt_unit_get_port(ctx, port_id);
1566     if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
1567         return NXT_UNIT_ERROR;
1568     }
1569 
1570     nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
1571 
1572     return NXT_UNIT_AGAIN;
1573 }
1574 
1575 
1576 static int
nxt_unit_send_req_headers_ack(nxt_unit_request_info_t * req)1577 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
1578 {
1579     ssize_t                       res;
1580     nxt_port_msg_t                msg;
1581     nxt_unit_impl_t               *lib;
1582     nxt_unit_ctx_impl_t           *ctx_impl;
1583     nxt_unit_request_info_impl_t  *req_impl;
1584 
1585     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
1586     ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1587     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1588 
1589     memset(&msg, 0, sizeof(nxt_port_msg_t));
1590 
1591     msg.stream = req_impl->stream;
1592     msg.pid = lib->pid;
1593     msg.reply_port = ctx_impl->read_port->id.id;
1594     msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
1595 
1596     res = nxt_unit_port_send(req->ctx, req->response_port,
1597                              &msg, sizeof(msg), NULL);
1598     if (nxt_slow_path(res != sizeof(msg))) {
1599         return NXT_UNIT_ERROR;
1600     }
1601 
1602     return NXT_UNIT_OK;
1603 }
1604 
1605 
1606 static int
nxt_unit_process_websocket(nxt_unit_ctx_t * ctx,nxt_unit_recv_msg_t * recv_msg)1607 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
1608 {
1609     size_t                           hsize;
1610     nxt_unit_impl_t                  *lib;
1611     nxt_unit_mmap_buf_t              *b;
1612     nxt_unit_callbacks_t             *cb;
1613     nxt_unit_request_info_t          *req;
1614     nxt_unit_request_info_impl_t     *req_impl;
1615     nxt_unit_websocket_frame_impl_t  *ws_impl;
1616 
1617     req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last);
1618     if (nxt_slow_path(req == NULL)) {
1619         return NXT_UNIT_OK;
1620     }
1621 
1622     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1623 
1624     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1625     cb = &lib->callbacks;
1626 
1627     if (cb->websocket_handler && recv_msg->size >= 2) {
1628         ws_impl = nxt_unit_websocket_frame_get(ctx);
1629         if (nxt_slow_path(ws_impl == NULL)) {
1630             nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
1631                           req_impl->stream);
1632 
1633             return NXT_UNIT_ERROR;
1634         }
1635 
1636         ws_impl->ws.req = req;
1637 
1638         ws_impl->buf = NULL;
1639 
1640         if (recv_msg->mmap) {
1641             for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
1642                 b->req = req;
1643             }
1644 
1645             /* "Move" incoming buffer list to ws_impl. */
1646             ws_impl->buf = recv_msg->incoming_buf;
1647             ws_impl->buf->prev = &ws_impl->buf;
1648             recv_msg->incoming_buf = NULL;
1649 
1650             b = ws_impl->buf;
1651 
1652         } else {
1653             b = nxt_unit_mmap_buf_get(ctx);
1654             if (nxt_slow_path(b == NULL)) {
1655                 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf",
1656                                req_impl->stream);
1657 
1658                 nxt_unit_websocket_frame_release(&ws_impl->ws);
1659 
1660                 return NXT_UNIT_ERROR;
1661             }
1662 
1663             b->req = req;
1664             b->buf.start = recv_msg->start;
1665             b->buf.free = b->buf.start;
1666             b->buf.end = b->buf.start + recv_msg->size;
1667 
1668             nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
1669         }
1670 
1671         ws_impl->ws.header = (void *) b->buf.start;
1672         ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
1673             ws_impl->ws.header);
1674 
1675         hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
1676 
1677         if (ws_impl->ws.header->mask) {
1678             ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
1679 
1680         } else {
1681             ws_impl->ws.mask = NULL;
1682         }
1683 
1684         b->buf.free += hsize;
1685 
1686         ws_impl->ws.content_buf = &b->buf;
1687         ws_impl->ws.content_length = ws_impl->ws.payload_len;
1688 
1689         nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
1690                            "payload_len=%"PRIu64,
1691                             ws_impl->ws.header->opcode,
1692                             ws_impl->ws.payload_len);
1693 
1694         cb->websocket_handler(&ws_impl->ws);
1695     }
1696 
1697     if (recv_msg->last) {
1698         if (cb->close_handler) {
1699             nxt_unit_req_debug(req, "close_handler");
1700 
1701             cb->close_handler(req);
1702 
1703         } else {
1704             nxt_unit_request_done(req, NXT_UNIT_ERROR);
1705         }
1706     }
1707 
1708     return NXT_UNIT_OK;
1709 }
1710 
1711 
1712 static int
nxt_unit_process_shm_ack(nxt_unit_ctx_t * ctx)1713 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx)
1714 {
1715     nxt_unit_impl_t       *lib;
1716     nxt_unit_callbacks_t  *cb;
1717 
1718     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1719     cb = &lib->callbacks;
1720 
1721     if (cb->shm_ack_handler != NULL) {
1722         cb->shm_ack_handler(ctx);
1723     }
1724 
1725     return NXT_UNIT_OK;
1726 }
1727 
1728 
1729 static nxt_unit_request_info_impl_t *
nxt_unit_request_info_get(nxt_unit_ctx_t * ctx)1730 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1731 {
1732     nxt_unit_impl_t               *lib;
1733     nxt_queue_link_t              *lnk;
1734     nxt_unit_ctx_impl_t           *ctx_impl;
1735     nxt_unit_request_info_impl_t  *req_impl;
1736 
1737     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1738 
1739     lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1740 
1741     pthread_mutex_lock(&ctx_impl->mutex);
1742 
1743     if (nxt_queue_is_empty(&ctx_impl->free_req)) {
1744         pthread_mutex_unlock(&ctx_impl->mutex);
1745 
1746         req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t)
1747                                         + lib->request_data_size);
1748         if (nxt_slow_path(req_impl == NULL)) {
1749             return NULL;
1750         }
1751 
1752         req_impl->req.unit = ctx->unit;
1753         req_impl->req.ctx = ctx;
1754 
1755         pthread_mutex_lock(&ctx_impl->mutex);
1756 
1757     } else {
1758         lnk = nxt_queue_first(&ctx_impl->free_req);
1759         nxt_queue_remove(lnk);
1760 
1761         req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link);
1762     }
1763 
1764     nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link);
1765 
1766     pthread_mutex_unlock(&ctx_impl->mutex);
1767 
1768     req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL;
1769 
1770     return req_impl;
1771 }
1772 
1773 
1774 static void
nxt_unit_request_info_release(nxt_unit_request_info_t * req)1775 nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1776 {
1777     nxt_unit_ctx_t                *ctx;
1778     nxt_unit_ctx_impl_t           *ctx_impl;
1779     nxt_unit_request_info_impl_t  *req_impl;
1780 
1781     ctx = req->ctx;
1782     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1783     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1784 
1785     req->response = NULL;
1786     req->response_buf = NULL;
1787 
1788     if (req_impl->in_hash) {
1789         nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
1790     }
1791 
1792     while (req_impl->outgoing_buf != NULL) {
1793         nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1794     }
1795 
1796     while (req_impl->incoming_buf != NULL) {
1797         nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1798     }
1799 
1800     if (req->content_fd != -1) {
1801         nxt_unit_close(req->content_fd);
1802 
1803         req->content_fd = -1;
1804     }
1805 
1806     if (req->response_port != NULL) {
1807         nxt_unit_port_release(req->response_port);
1808 
1809         req->response_port = NULL;
1810     }
1811 
1812     req_impl->state = NXT_UNIT_RS_RELEASED;
1813 
1814     pthread_mutex_lock(&ctx_impl->mutex);
1815 
1816     nxt_queue_remove(&req_impl->link);
1817 
1818     nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1819 
1820     pthread_mutex_unlock(&ctx_impl->mutex);
1821 
1822     if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
1823         nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
1824     }
1825 }
1826 
1827 
1828 static void
nxt_unit_request_info_free(nxt_unit_request_info_impl_t * req_impl)1829 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1830 {
1831     nxt_unit_ctx_impl_t  *ctx_impl;
1832 
1833     ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1834 
1835     nxt_queue_remove(&req_impl->link);
1836 
1837     if (req_impl != &ctx_impl->req) {
1838         nxt_unit_free(&ctx_impl->ctx, req_impl);
1839     }
1840 }
1841 
1842 
1843 static nxt_unit_websocket_frame_impl_t *
nxt_unit_websocket_frame_get(nxt_unit_ctx_t * ctx)1844 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1845 {
1846     nxt_queue_link_t                 *lnk;
1847     nxt_unit_ctx_impl_t              *ctx_impl;
1848     nxt_unit_websocket_frame_impl_t  *ws_impl;
1849 
1850     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1851 
1852     pthread_mutex_lock(&ctx_impl->mutex);
1853 
1854     if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1855         pthread_mutex_unlock(&ctx_impl->mutex);
1856 
1857         ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t));
1858         if (nxt_slow_path(ws_impl == NULL)) {
1859             return NULL;
1860         }
1861 
1862     } else {
1863         lnk = nxt_queue_first(&ctx_impl->free_ws);
1864         nxt_queue_remove(lnk);
1865 
1866         pthread_mutex_unlock(&ctx_impl->mutex);
1867 
1868         ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1869     }
1870 
1871     ws_impl->ctx_impl = ctx_impl;
1872 
1873     return ws_impl;
1874 }
1875 
1876 
1877 static void
nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t * ws)1878 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1879 {
1880     nxt_unit_websocket_frame_impl_t  *ws_impl;
1881 
1882     ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1883 
1884     while (ws_impl->buf != NULL) {
1885         nxt_unit_mmap_buf_free(ws_impl->buf);
1886     }
1887 
1888     ws->req = NULL;
1889 
1890     pthread_mutex_lock(&ws_impl->ctx_impl->mutex);
1891 
1892     nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1893 
1894     pthread_mutex_unlock(&ws_impl->ctx_impl->mutex);
1895 }
1896 
1897 
1898 static void
nxt_unit_websocket_frame_free(nxt_unit_ctx_t * ctx,nxt_unit_websocket_frame_impl_t * ws_impl)1899 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx,
1900     nxt_unit_websocket_frame_impl_t *ws_impl)
1901 {
1902     nxt_queue_remove(&ws_impl->link);
1903 
1904     nxt_unit_free(ctx, ws_impl);
1905 }
1906 
1907 
1908 uint16_t
nxt_unit_field_hash(const char * name,size_t name_length)1909 nxt_unit_field_hash(const char *name, size_t name_length)
1910 {
1911     u_char      ch;
1912     uint32_t    hash;
1913     const char  *p, *end;
1914 
1915     hash = 159406; /* Magic value copied from nxt_http_parse.c */
1916     end = name + name_length;
1917 
1918     for (p = name; p < end; p++) {
1919         ch = *p;
1920         hash = (hash << 4) + hash + nxt_lowcase(ch);
1921     }
1922 
1923     hash = (hash >> 16) ^ hash;
1924 
1925     return hash;
1926 }
1927 
1928 
1929 void
nxt_unit_request_group_dup_fields(nxt_unit_request_info_t * req)1930 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req)
1931 {
1932     char                *name;
1933     uint32_t            i, j;
1934     nxt_unit_field_t    *fields, f;
1935     nxt_unit_request_t  *r;
1936 
1937     static nxt_str_t  content_length = nxt_string("content-length");
1938     static nxt_str_t  content_type = nxt_string("content-type");
1939     static nxt_str_t  cookie = nxt_string("cookie");
1940 
1941     nxt_unit_req_debug(req, "group_dup_fields");
1942 
1943     r = req->request;
1944     fields = r->fields;
1945 
1946     for (i = 0; i < r->fields_count; i++) {
1947         name = nxt_unit_sptr_get(&fields[i].name);
1948 
1949         switch (fields[i].hash) {
1950         case NXT_UNIT_HASH_CONTENT_LENGTH:
1951             if (fields[i].name_length == content_length.length
1952                 && nxt_unit_memcasecmp(name, content_length.start,
1953                                        content_length.length) == 0)
1954             {
1955                 r->content_length_field = i;
1956             }
1957 
1958             break;
1959 
1960         case NXT_UNIT_HASH_CONTENT_TYPE:
1961             if (fields[i].name_length == content_type.length
1962                 && nxt_unit_memcasecmp(name, content_type.start,
1963                                        content_type.length) == 0)
1964             {
1965                 r->content_type_field = i;
1966             }
1967 
1968             break;
1969 
1970         case NXT_UNIT_HASH_COOKIE:
1971             if (fields[i].name_length == cookie.length
1972                 && nxt_unit_memcasecmp(name, cookie.start,
1973                                        cookie.length) == 0)
1974             {
1975                 r->cookie_field = i;
1976             }
1977 
1978             break;
1979         }
1980 
1981         for (j = i + 1; j < r->fields_count; j++) {
1982             if (fields[i].hash != fields[j].hash
1983                 || fields[i].name_length != fields[j].name_length
1984                 || nxt_unit_memcasecmp(name,
1985                                        nxt_unit_sptr_get(&fields[j].name),
1986                                        fields[j].name_length) != 0)
1987             {
1988                 continue;
1989             }
1990 
1991             f = fields[j];
1992             f.value.offset += (j - (i + 1)) * sizeof(f);
1993 
1994             while (j > i + 1) {
1995                 fields[j] = fields[j - 1];
1996                 fields[j].name.offset -= sizeof(f);
1997                 fields[j].value.offset -= sizeof(f);
1998                 j--;
1999             }
2000 
2001             fields[j] = f;
2002 
2003             /* Assign the same name pointer for further grouping simplicity. */
2004             nxt_unit_sptr_set(&fields[j].name, name);
2005 
2006             i++;
2007         }
2008     }
2009 }
2010 
2011 
2012 int
nxt_unit_response_init(nxt_unit_request_info_t * req,uint16_t status,uint32_t max_fields_count,uint32_t max_fields_size)2013 nxt_unit_response_init(nxt_unit_request_info_t *req,
2014     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size)
2015 {
2016     uint32_t                      buf_size;
2017     nxt_unit_buf_t                *buf;
2018     nxt_unit_request_info_impl_t  *req_impl;
2019 
2020     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2021 
2022     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2023         nxt_unit_req_warn(req, "init: response already sent");
2024 
2025         return NXT_UNIT_ERROR;
2026     }
2027 
2028     nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status,
2029                        (int) max_fields_count, (int) max_fields_size);
2030 
2031     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) {
2032         nxt_unit_req_debug(req, "duplicate response init");
2033     }
2034 
2035     /*
2036      * Each field name and value 0-terminated by libunit,
2037      * this is the reason of '+ 2' below.
2038      */
2039     buf_size = sizeof(nxt_unit_response_t)
2040                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2041                + max_fields_size;
2042 
2043     if (nxt_slow_path(req->response_buf != NULL)) {
2044         buf = req->response_buf;
2045 
2046         if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) {
2047             goto init_response;
2048         }
2049 
2050         nxt_unit_buf_free(buf);
2051 
2052         req->response_buf = NULL;
2053         req->response = NULL;
2054         req->response_max_fields = 0;
2055 
2056         req_impl->state = NXT_UNIT_RS_START;
2057     }
2058 
2059     buf = nxt_unit_response_buf_alloc(req, buf_size);
2060     if (nxt_slow_path(buf == NULL)) {
2061         return NXT_UNIT_ERROR;
2062     }
2063 
2064 init_response:
2065 
2066     memset(buf->start, 0, sizeof(nxt_unit_response_t));
2067 
2068     req->response_buf = buf;
2069 
2070     req->response = (nxt_unit_response_t *) buf->start;
2071     req->response->status = status;
2072 
2073     buf->free = buf->start + sizeof(nxt_unit_response_t)
2074                 + max_fields_count * sizeof(nxt_unit_field_t);
2075 
2076     req->response_max_fields = max_fields_count;
2077     req_impl->state = NXT_UNIT_RS_RESPONSE_INIT;
2078 
2079     return NXT_UNIT_OK;
2080 }
2081 
2082 
2083 int
nxt_unit_response_realloc(nxt_unit_request_info_t * req,uint32_t max_fields_count,uint32_t max_fields_size)2084 nxt_unit_response_realloc(nxt_unit_request_info_t *req,
2085     uint32_t max_fields_count, uint32_t max_fields_size)
2086 {
2087     char                          *p;
2088     uint32_t                      i, buf_size;
2089     nxt_unit_buf_t                *buf;
2090     nxt_unit_field_t              *f, *src;
2091     nxt_unit_response_t           *resp;
2092     nxt_unit_request_info_impl_t  *req_impl;
2093 
2094     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2095 
2096     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2097         nxt_unit_req_warn(req, "realloc: response not init");
2098 
2099         return NXT_UNIT_ERROR;
2100     }
2101 
2102     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2103         nxt_unit_req_warn(req, "realloc: response already sent");
2104 
2105         return NXT_UNIT_ERROR;
2106     }
2107 
2108     if (nxt_slow_path(max_fields_count < req->response->fields_count)) {
2109         nxt_unit_req_warn(req, "realloc: new max_fields_count is too small");
2110 
2111         return NXT_UNIT_ERROR;
2112     }
2113 
2114     /*
2115      * Each field name and value 0-terminated by libunit,
2116      * this is the reason of '+ 2' below.
2117      */
2118     buf_size = sizeof(nxt_unit_response_t)
2119                + max_fields_count * (sizeof(nxt_unit_field_t) + 2)
2120                + max_fields_size;
2121 
2122     nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size);
2123 
2124     buf = nxt_unit_response_buf_alloc(req, buf_size);
2125     if (nxt_slow_path(buf == NULL)) {
2126         nxt_unit_req_warn(req, "realloc: new buf allocation failed");
2127         return NXT_UNIT_ERROR;
2128     }
2129 
2130     resp = (nxt_unit_response_t *) buf->start;
2131 
2132     memset(resp, 0, sizeof(nxt_unit_response_t));
2133 
2134     resp->status = req->response->status;
2135     resp->content_length = req->response->content_length;
2136 
2137     p = buf->start + max_fields_count * sizeof(nxt_unit_field_t);
2138     f = resp->fields;
2139 
2140     for (i = 0; i < req->response->fields_count; i++) {
2141         src = req->response->fields + i;
2142 
2143         if (nxt_slow_path(src->skip != 0)) {
2144             continue;
2145         }
2146 
2147         if (nxt_slow_path(src->name_length + src->value_length + 2
2148                           > (uint32_t) (buf->end - p)))
2149         {
2150             nxt_unit_req_warn(req, "realloc: not enough space for field"
2151                   " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required",
2152                   i, src, src->name_length, src->value_length);
2153 
2154             goto fail;
2155         }
2156 
2157         nxt_unit_sptr_set(&f->name, p);
2158         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length);
2159         *p++ = '\0';
2160 
2161         nxt_unit_sptr_set(&f->value, p);
2162         p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length);
2163         *p++ = '\0';
2164 
2165         f->hash = src->hash;
2166         f->skip = 0;
2167         f->name_length = src->name_length;
2168         f->value_length = src->value_length;
2169 
2170         resp->fields_count++;
2171         f++;
2172     }
2173 
2174     if (req->response->piggyback_content_length > 0) {
2175         if (nxt_slow_path(req->response->piggyback_content_length
2176                           > (uint32_t) (buf->end - p)))
2177         {
2178             nxt_unit_req_warn(req, "realloc: not enought space for content"
2179                   " #%"PRIu32", %"PRIu32" required",
2180                   i, req->response->piggyback_content_length);
2181 
2182             goto fail;
2183         }
2184 
2185         resp->piggyback_content_length =
2186                                        req->response->piggyback_content_length;
2187 
2188         nxt_unit_sptr_set(&resp->piggyback_content, p);
2189         p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content),
2190                        req->response->piggyback_content_length);
2191     }
2192 
2193     buf->free = p;
2194 
2195     nxt_unit_buf_free(req->response_buf);
2196 
2197     req->response = resp;
2198     req->response_buf = buf;
2199     req->response_max_fields = max_fields_count;
2200 
2201     return NXT_UNIT_OK;
2202 
2203 fail:
2204 
2205     nxt_unit_buf_free(buf);
2206 
2207     return NXT_UNIT_ERROR;
2208 }
2209 
2210 
2211 int
nxt_unit_response_is_init(nxt_unit_request_info_t * req)2212 nxt_unit_response_is_init(nxt_unit_request_info_t *req)
2213 {
2214     nxt_unit_request_info_impl_t  *req_impl;
2215 
2216     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2217 
2218     return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT;
2219 }
2220 
2221 
2222 int
nxt_unit_response_add_field(nxt_unit_request_info_t * req,const char * name,uint8_t name_length,const char * value,uint32_t value_length)2223 nxt_unit_response_add_field(nxt_unit_request_info_t *req,
2224     const char *name, uint8_t name_length,
2225     const char *value, uint32_t value_length)
2226 {
2227     nxt_unit_buf_t                *buf;
2228     nxt_unit_field_t              *f;
2229     nxt_unit_response_t           *resp;
2230     nxt_unit_request_info_impl_t  *req_impl;
2231 
2232     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2233 
2234     if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) {
2235         nxt_unit_req_warn(req, "add_field: response not initialized or "
2236                           "already sent");
2237 
2238         return NXT_UNIT_ERROR;
2239     }
2240 
2241     resp = req->response;
2242 
2243     if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) {
2244         nxt_unit_req_warn(req, "add_field: too many response fields (%d)",
2245                           (int) resp->fields_count);
2246 
2247         return NXT_UNIT_ERROR;
2248     }
2249 
2250     buf = req->response_buf;
2251 
2252     if (nxt_slow_path(name_length + value_length + 2
2253                       > (uint32_t) (buf->end - buf->free)))
2254     {
2255         nxt_unit_req_warn(req, "add_field: response buffer overflow");
2256 
2257         return NXT_UNIT_ERROR;
2258     }
2259 
2260     nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s",
2261                        resp->fields_count,
2262                        (int) name_length, name,
2263                        (int) value_length, value);
2264 
2265     f = resp->fields + resp->fields_count;
2266 
2267     nxt_unit_sptr_set(&f->name, buf->free);
2268     buf->free = nxt_cpymem(buf->free, name, name_length);
2269     *buf->free++ = '\0';
2270 
2271     nxt_unit_sptr_set(&f->value, buf->free);
2272     buf->free = nxt_cpymem(buf->free, value, value_length);
2273     *buf->free++ = '\0';
2274 
2275     f->hash = nxt_unit_field_hash(name, name_length);
2276     f->skip = 0;
2277     f->name_length = name_length;
2278     f->value_length = value_length;
2279 
2280     resp->fields_count++;
2281 
2282     return NXT_UNIT_OK;
2283 }
2284 
2285 
2286 int
nxt_unit_response_add_content(nxt_unit_request_info_t * req,const void * src,uint32_t size)2287 nxt_unit_response_add_content(nxt_unit_request_info_t *req,
2288     const void* src, uint32_t size)
2289 {
2290     nxt_unit_buf_t                *buf;
2291     nxt_unit_response_t           *resp;
2292     nxt_unit_request_info_impl_t  *req_impl;
2293 
2294     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2295 
2296     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2297         nxt_unit_req_warn(req, "add_content: response not initialized yet");
2298 
2299         return NXT_UNIT_ERROR;
2300     }
2301 
2302     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2303         nxt_unit_req_warn(req, "add_content: response already sent");
2304 
2305         return NXT_UNIT_ERROR;
2306     }
2307 
2308     buf = req->response_buf;
2309 
2310     if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) {
2311         nxt_unit_req_warn(req, "add_content: buffer overflow");
2312 
2313         return NXT_UNIT_ERROR;
2314     }
2315 
2316     resp = req->response;
2317 
2318     if (resp->piggyback_content_length == 0) {
2319         nxt_unit_sptr_set(&resp->piggyback_content, buf->free);
2320         req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT;
2321     }
2322 
2323     resp->piggyback_content_length += size;
2324 
2325     buf->free = nxt_cpymem(buf->free, src, size);
2326 
2327     return NXT_UNIT_OK;
2328 }
2329 
2330 
2331 int
nxt_unit_response_send(nxt_unit_request_info_t * req)2332 nxt_unit_response_send(nxt_unit_request_info_t *req)
2333 {
2334     int                           rc;
2335     nxt_unit_mmap_buf_t           *mmap_buf;
2336     nxt_unit_request_info_impl_t  *req_impl;
2337 
2338     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2339 
2340     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2341         nxt_unit_req_warn(req, "send: response is not initialized yet");
2342 
2343         return NXT_UNIT_ERROR;
2344     }
2345 
2346     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2347         nxt_unit_req_warn(req, "send: response already sent");
2348 
2349         return NXT_UNIT_ERROR;
2350     }
2351 
2352     if (req->request->websocket_handshake && req->response->status == 101) {
2353         nxt_unit_response_upgrade(req);
2354     }
2355 
2356     nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
2357                        req->response->fields_count,
2358                        (int) (req->response_buf->free
2359                               - req->response_buf->start));
2360 
2361     mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
2362 
2363     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2364     if (nxt_fast_path(rc == NXT_UNIT_OK)) {
2365         req->response = NULL;
2366         req->response_buf = NULL;
2367         req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
2368 
2369         nxt_unit_mmap_buf_free(mmap_buf);
2370     }
2371 
2372     return rc;
2373 }
2374 
2375 
2376 int
nxt_unit_response_is_sent(nxt_unit_request_info_t * req)2377 nxt_unit_response_is_sent(nxt_unit_request_info_t *req)
2378 {
2379     nxt_unit_request_info_impl_t  *req_impl;
2380 
2381     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2382 
2383     return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
2384 }
2385 
2386 
2387 nxt_unit_buf_t *
nxt_unit_response_buf_alloc(nxt_unit_request_info_t * req,uint32_t size)2388 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
2389 {
2390     int                           rc;
2391     nxt_unit_mmap_buf_t           *mmap_buf;
2392     nxt_unit_request_info_impl_t  *req_impl;
2393 
2394     if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
2395         nxt_unit_req_warn(req, "response_buf_alloc: "
2396                           "requested buffer (%"PRIu32") too big", size);
2397 
2398         return NULL;
2399     }
2400 
2401     nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
2402 
2403     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2404 
2405     mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
2406     if (nxt_slow_path(mmap_buf == NULL)) {
2407         nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf");
2408 
2409         return NULL;
2410     }
2411 
2412     mmap_buf->req = req;
2413 
2414     nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
2415 
2416     rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
2417                                    size, size, mmap_buf,
2418                                    NULL);
2419     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2420         nxt_unit_mmap_buf_release(mmap_buf);
2421 
2422         nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf");
2423 
2424         return NULL;
2425     }
2426 
2427     return &mmap_buf->buf;
2428 }
2429 
2430 
2431 static nxt_unit_mmap_buf_t *
nxt_unit_mmap_buf_get(nxt_unit_ctx_t * ctx)2432 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
2433 {
2434     nxt_unit_mmap_buf_t  *mmap_buf;
2435     nxt_unit_ctx_impl_t  *ctx_impl;
2436 
2437     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2438 
2439     pthread_mutex_lock(&ctx_impl->mutex);
2440 
2441     if (ctx_impl->free_buf == NULL) {
2442         pthread_mutex_unlock(&ctx_impl->mutex);
2443 
2444         mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t));
2445         if (nxt_slow_path(mmap_buf == NULL)) {
2446             return NULL;
2447         }
2448 
2449     } else {
2450         mmap_buf = ctx_impl->free_buf;
2451 
2452         nxt_unit_mmap_buf_unlink(mmap_buf);
2453 
2454         pthread_mutex_unlock(&ctx_impl->mutex);
2455     }
2456 
2457     mmap_buf->ctx_impl = ctx_impl;
2458 
2459     mmap_buf->hdr = NULL;
2460     mmap_buf->free_ptr = NULL;
2461 
2462     return mmap_buf;
2463 }
2464 
2465 
2466 static void
nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t * mmap_buf)2467 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
2468 {
2469     nxt_unit_mmap_buf_unlink(mmap_buf);
2470 
2471     pthread_mutex_lock(&mmap_buf->ctx_impl->mutex);
2472 
2473     nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
2474 
2475     pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex);
2476 }
2477 
2478 
2479 int
nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t * req)2480 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
2481 {
2482     return req->request->websocket_handshake;
2483 }
2484 
2485 
2486 int
nxt_unit_response_upgrade(nxt_unit_request_info_t * req)2487 nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
2488 {
2489     int                           rc;
2490     nxt_unit_request_info_impl_t  *req_impl;
2491 
2492     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2493 
2494     if (nxt_slow_path(req_impl->websocket != 0)) {
2495         nxt_unit_req_debug(req, "upgrade: already upgraded");
2496 
2497         return NXT_UNIT_OK;
2498     }
2499 
2500     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2501         nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
2502 
2503         return NXT_UNIT_ERROR;
2504     }
2505 
2506     if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
2507         nxt_unit_req_warn(req, "upgrade: response already sent");
2508 
2509         return NXT_UNIT_ERROR;
2510     }
2511 
2512     rc = nxt_unit_request_hash_add(req->ctx, req);
2513     if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2514         nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
2515 
2516         return NXT_UNIT_ERROR;
2517     }
2518 
2519     req_impl->websocket = 1;
2520 
2521     req->response->status = 101;
2522 
2523     return NXT_UNIT_OK;
2524 }
2525 
2526 
2527 int
nxt_unit_response_is_websocket(nxt_unit_request_info_t * req)2528 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
2529 {
2530     nxt_unit_request_info_impl_t  *req_impl;
2531 
2532     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2533 
2534     return req_impl->websocket;
2535 }
2536 
2537 
2538 nxt_unit_request_info_t *
nxt_unit_get_request_info_from_data(void * data)2539 nxt_unit_get_request_info_from_data(void *data)
2540 {
2541     nxt_unit_request_info_impl_t  *req_impl;
2542 
2543     req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
2544 
2545     return &req_impl->req;
2546 }
2547 
2548 
2549 int
nxt_unit_buf_send(nxt_unit_buf_t * buf)2550 nxt_unit_buf_send(nxt_unit_buf_t *buf)
2551 {
2552     int                           rc;
2553     nxt_unit_mmap_buf_t           *mmap_buf;
2554     nxt_unit_request_info_t       *req;
2555     nxt_unit_request_info_impl_t  *req_impl;
2556 
2557     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2558 
2559     req = mmap_buf->req;
2560     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2561 
2562     nxt_unit_req_debug(req, "buf_send: %d bytes",
2563                        (int) (buf->free - buf->start));
2564 
2565     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2566         nxt_unit_req_warn(req, "buf_send: response not initialized yet");
2567 
2568         return NXT_UNIT_ERROR;
2569     }
2570 
2571     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
2572         nxt_unit_req_warn(req, "buf_send: headers not sent yet");
2573 
2574         return NXT_UNIT_ERROR;
2575     }
2576 
2577     if (nxt_fast_path(buf->free > buf->start)) {
2578         rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0);
2579         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2580             return rc;
2581         }
2582     }
2583 
2584     nxt_unit_mmap_buf_free(mmap_buf);
2585 
2586     return NXT_UNIT_OK;
2587 }
2588 
2589 
2590 static void
nxt_unit_buf_send_done(nxt_unit_buf_t * buf)2591 nxt_unit_buf_send_done(nxt_unit_buf_t *buf)
2592 {
2593     int                      rc;
2594     nxt_unit_mmap_buf_t      *mmap_buf;
2595     nxt_unit_request_info_t  *req;
2596 
2597     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2598 
2599     req = mmap_buf->req;
2600 
2601     rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1);
2602     if (nxt_slow_path(rc == NXT_UNIT_OK)) {
2603         nxt_unit_mmap_buf_free(mmap_buf);
2604 
2605         nxt_unit_request_info_release(req);
2606 
2607     } else {
2608         nxt_unit_request_done(req, rc);
2609     }
2610 }
2611 
2612 
2613 static int
nxt_unit_mmap_buf_send(nxt_unit_request_info_t * req,nxt_unit_mmap_buf_t * mmap_buf,int last)2614 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
2615     nxt_unit_mmap_buf_t *mmap_buf, int last)
2616 {
2617     struct {
2618         nxt_port_msg_t       msg;
2619         nxt_port_mmap_msg_t  mmap_msg;
2620     } m;
2621 
2622     int                           rc;
2623     u_char                        *last_used, *first_free;
2624     ssize_t                       res;
2625     nxt_chunk_id_t                first_free_chunk;
2626     nxt_unit_buf_t                *buf;
2627     nxt_unit_impl_t               *lib;
2628     nxt_port_mmap_header_t        *hdr;
2629     nxt_unit_request_info_impl_t  *req_impl;
2630 
2631     lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit);
2632     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2633 
2634     buf = &mmap_buf->buf;
2635     hdr = mmap_buf->hdr;
2636 
2637     m.mmap_msg.size = buf->free - buf->start;
2638 
2639     m.msg.stream = req_impl->stream;
2640     m.msg.pid = lib->pid;
2641     m.msg.reply_port = 0;
2642     m.msg.type = _NXT_PORT_MSG_DATA;
2643     m.msg.last = last != 0;
2644     m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
2645     m.msg.nf = 0;
2646     m.msg.mf = 0;
2647 
2648     rc = NXT_UNIT_ERROR;
2649 
2650     if (m.msg.mmap) {
2651         m.mmap_msg.mmap_id = hdr->id;
2652         m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr,
2653                                                      (u_char *) buf->start);
2654 
2655         nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
2656                        req_impl->stream,
2657                        (int) m.mmap_msg.mmap_id,
2658                        (int) m.mmap_msg.chunk_id,
2659                        (int) m.mmap_msg.size);
2660 
2661         res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
2662                                  NULL);
2663         if (nxt_slow_path(res != sizeof(m))) {
2664             goto free_buf;
2665         }
2666 
2667         last_used = (u_char *) buf->free - 1;
2668         first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
2669 
2670         if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
2671             first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
2672 
2673             buf->start = (char *) first_free;
2674             buf->free = buf->start;
2675 
2676             if (buf->end < buf->start) {
2677                 buf->end = buf->start;
2678             }
2679 
2680         } else {
2681             buf->start = NULL;
2682             buf->free = NULL;
2683             buf->end = NULL;
2684 
2685             mmap_buf->hdr = NULL;
2686         }
2687 
2688         nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks,
2689                             (int) m.mmap_msg.chunk_id - (int) first_free_chunk);
2690 
2691         nxt_unit_debug(req->ctx, "allocated_chunks %d",
2692                        (int) lib->outgoing.allocated_chunks);
2693 
2694     } else {
2695         if (nxt_slow_path(mmap_buf->plain_ptr == NULL
2696                           || mmap_buf->plain_ptr > buf->start - sizeof(m.msg)))
2697         {
2698             nxt_unit_alert(req->ctx,
2699                            "#%"PRIu32": failed to send plain memory buffer"
2700                            ": no space reserved for message header",
2701                            req_impl->stream);
2702 
2703             goto free_buf;
2704         }
2705 
2706         memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg));
2707 
2708         nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d",
2709                        req_impl->stream,
2710                        (int) (sizeof(m.msg) + m.mmap_msg.size));
2711 
2712         res = nxt_unit_port_send(req->ctx, req->response_port,
2713                                  buf->start - sizeof(m.msg),
2714                                  m.mmap_msg.size + sizeof(m.msg), NULL);
2715 
2716         if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
2717             goto free_buf;
2718         }
2719     }
2720 
2721     rc = NXT_UNIT_OK;
2722 
2723 free_buf:
2724 
2725     nxt_unit_free_outgoing_buf(mmap_buf);
2726 
2727     return rc;
2728 }
2729 
2730 
2731 void
nxt_unit_buf_free(nxt_unit_buf_t * buf)2732 nxt_unit_buf_free(nxt_unit_buf_t *buf)
2733 {
2734     nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
2735 }
2736 
2737 
2738 static void
nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t * mmap_buf)2739 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
2740 {
2741     nxt_unit_free_outgoing_buf(mmap_buf);
2742 
2743     nxt_unit_mmap_buf_release(mmap_buf);
2744 }
2745 
2746 
2747 static void
nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t * mmap_buf)2748 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf)
2749 {
2750     if (mmap_buf->hdr != NULL) {
2751         nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx,
2752                               mmap_buf->hdr, mmap_buf->buf.start,
2753                               mmap_buf->buf.end - mmap_buf->buf.start);
2754 
2755         mmap_buf->hdr = NULL;
2756 
2757         return;
2758     }
2759 
2760     if (mmap_buf->free_ptr != NULL) {
2761         nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr);
2762 
2763         mmap_buf->free_ptr = NULL;
2764     }
2765 }
2766 
2767 
2768 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get(nxt_unit_ctx_t * ctx)2769 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
2770 {
2771     nxt_unit_ctx_impl_t  *ctx_impl;
2772     nxt_unit_read_buf_t  *rbuf;
2773 
2774     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2775 
2776     pthread_mutex_lock(&ctx_impl->mutex);
2777 
2778     rbuf = nxt_unit_read_buf_get_impl(ctx_impl);
2779 
2780     pthread_mutex_unlock(&ctx_impl->mutex);
2781 
2782     rbuf->oob.size = 0;
2783 
2784     return rbuf;
2785 }
2786 
2787 
2788 static nxt_unit_read_buf_t *
nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t * ctx_impl)2789 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl)
2790 {
2791     nxt_queue_link_t     *link;
2792     nxt_unit_read_buf_t  *rbuf;
2793 
2794     if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) {
2795         link = nxt_queue_first(&ctx_impl->free_rbuf);
2796         nxt_queue_remove(link);
2797 
2798         rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link);
2799 
2800         return rbuf;
2801     }
2802 
2803     rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t));
2804 
2805     if (nxt_fast_path(rbuf != NULL)) {
2806         rbuf->ctx_impl = ctx_impl;
2807     }
2808 
2809     return rbuf;
2810 }
2811 
2812 
2813 static void
nxt_unit_read_buf_release(nxt_unit_ctx_t * ctx,nxt_unit_read_buf_t * rbuf)2814 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx,
2815     nxt_unit_read_buf_t *rbuf)
2816 {
2817     nxt_unit_ctx_impl_t  *ctx_impl;
2818 
2819     ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2820 
2821     pthread_mutex_lock(&ctx_impl->mutex);
2822 
2823     nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link);
2824 
2825     pthread_mutex_unlock(&ctx_impl->mutex);
2826 }
2827 
2828 
2829 nxt_unit_buf_t *
nxt_unit_buf_next(nxt_unit_buf_t * buf)2830 nxt_unit_buf_next(nxt_unit_buf_t *buf)
2831 {
2832     nxt_unit_mmap_buf_t  *mmap_buf;
2833 
2834     mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
2835 
2836     if (mmap_buf->next == NULL) {
2837         return NULL;
2838     }
2839 
2840     return &mmap_buf->next->buf;
2841 }
2842 
2843 
2844 uint32_t
nxt_unit_buf_max(void)2845 nxt_unit_buf_max(void)
2846 {
2847     return PORT_MMAP_DATA_SIZE;
2848 }
2849 
2850 
2851 uint32_t
nxt_unit_buf_min(void)2852 nxt_unit_buf_min(void)
2853 {
2854     return PORT_MMAP_CHUNK_SIZE;
2855 }
2856 
2857 
2858 int
nxt_unit_response_write(nxt_unit_request_info_t * req,const void * start,size_t size)2859 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
2860     size_t size)
2861 {
2862     ssize_t  res;
2863 
2864     res = nxt_unit_response_write_nb(req, start, size, size);
2865 
2866     return res < 0 ? -res : NXT_UNIT_OK;
2867 }
2868 
2869 
2870 ssize_t
nxt_unit_response_write_nb(nxt_unit_request_info_t * req,const void * start,size_t size,size_t min_size)2871 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start,
2872     size_t size, size_t min_size)
2873 {
2874     int                           rc;
2875     ssize_t                       sent;
2876     uint32_t                      part_size, min_part_size, buf_size;
2877     const char                    *part_start;
2878     nxt_unit_mmap_buf_t           mmap_buf;
2879     nxt_unit_request_info_impl_t  *req_impl;
2880     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2881 
2882     nxt_unit_req_debug(req, "write: %d", (int) size);
2883 
2884     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2885 
2886     part_start = start;
2887     sent = 0;
2888 
2889     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2890         nxt_unit_req_alert(req, "write: response not initialized yet");
2891 
2892         return -NXT_UNIT_ERROR;
2893     }
2894 
2895     /* Check if response is not send yet. */
2896     if (nxt_slow_path(req->response_buf != NULL)) {
2897         part_size = req->response_buf->end - req->response_buf->free;
2898         part_size = nxt_min(size, part_size);
2899 
2900         rc = nxt_unit_response_add_content(req, part_start, part_size);
2901         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2902             return -rc;
2903         }
2904 
2905         rc = nxt_unit_response_send(req);
2906         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2907             return -rc;
2908         }
2909 
2910         size -= part_size;
2911         part_start += part_size;
2912         sent += part_size;
2913 
2914         min_size -= nxt_min(min_size, part_size);
2915     }
2916 
2917     while (size > 0) {
2918         part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2919         min_part_size = nxt_min(min_size, part_size);
2920         min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE);
2921 
2922         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size,
2923                                        min_part_size, &mmap_buf, local_buf);
2924         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2925             return -rc;
2926         }
2927 
2928         buf_size = mmap_buf.buf.end - mmap_buf.buf.free;
2929         if (nxt_slow_path(buf_size == 0)) {
2930             return sent;
2931         }
2932         part_size = nxt_min(buf_size, part_size);
2933 
2934         mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2935                                        part_start, part_size);
2936 
2937         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
2938         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2939             return -rc;
2940         }
2941 
2942         size -= part_size;
2943         part_start += part_size;
2944         sent += part_size;
2945 
2946         min_size -= nxt_min(min_size, part_size);
2947     }
2948 
2949     return sent;
2950 }
2951 
2952 
2953 int
nxt_unit_response_write_cb(nxt_unit_request_info_t * req,nxt_unit_read_info_t * read_info)2954 nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
2955     nxt_unit_read_info_t *read_info)
2956 {
2957     int                           rc;
2958     ssize_t                       n;
2959     uint32_t                      buf_size;
2960     nxt_unit_buf_t                *buf;
2961     nxt_unit_mmap_buf_t           mmap_buf;
2962     nxt_unit_request_info_impl_t  *req_impl;
2963     char                          local_buf[NXT_UNIT_LOCAL_BUF_SIZE];
2964 
2965     req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
2966 
2967     if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
2968         nxt_unit_req_alert(req, "write: response not initialized yet");
2969 
2970         return NXT_UNIT_ERROR;
2971     }
2972 
2973     /* Check if response is not send yet. */
2974     if (nxt_slow_path(req->response_buf != NULL)) {
2975 
2976         /* Enable content in headers buf. */
2977         rc = nxt_unit_response_add_content(req, "", 0);
2978         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2979             nxt_unit_req_error(req, "Failed to add piggyback content");
2980 
2981             return rc;
2982         }
2983 
2984         buf = req->response_buf;
2985 
2986         while (buf->end - buf->free > 0) {
2987             n = read_info->read(read_info, buf->free, buf->end - buf->free);
2988             if (nxt_slow_path(n < 0)) {
2989                 nxt_unit_req_error(req, "Read error");
2990 
2991                 return NXT_UNIT_ERROR;
2992             }
2993 
2994             /* Manually increase sizes. */
2995             buf->free += n;
2996             req->response->piggyback_content_length += n;
2997 
2998             if (read_info->eof) {
2999                 break;
3000             }
3001         }
3002 
3003         rc = nxt_unit_response_send(req);
3004         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3005             nxt_unit_req_error(req, "Failed to send headers with content");
3006 
3007             return rc;
3008         }
3009 
3010         if (read_info->eof) {
3011             return NXT_UNIT_OK;
3012         }
3013     }
3014 
3015     while (!read_info->eof) {
3016         nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"",
3017                            read_info->buf_size);
3018 
3019         buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE);
3020 
3021         rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port,
3022                                        buf_size, buf_size,
3023                                        &mmap_buf, local_buf);
3024         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3025             return rc;
3026         }
3027 
3028         buf = &mmap_buf.buf;
3029 
3030         while (!read_info->eof && buf->end > buf->free) {
3031             n = read_info->read(read_info, buf->free, buf->end - buf->free);
3032             if (nxt_slow_path(n < 0)) {
3033                 nxt_unit_req_error(req, "Read error");
3034 
3035                 nxt_unit_free_outgoing_buf(&mmap_buf);
3036 
3037                 return NXT_UNIT_ERROR;
3038             }
3039 
3040             buf->free += n;
3041         }
3042 
3043         rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0);
3044         if (nxt_slow_path(rc != NXT_UNIT_OK)) {
3045             nxt_unit_req_error(req, "Failed to send content");
3046 
3047             return rc;
3048         }
3049     }
3050 
3051     return NXT_UNIT_OK;
3052 }
3053 
3054 
3055 ssize_t
nxt_unit_request_read(nxt_unit_request_info_t * req,void * dst,size_t size)3056 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
3057 {
3058     ssize_t  buf_res, res;
3059 
3060     buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
3061                                 dst, size);
3062 
3063     if (buf_res < (ssize_t) size && req->content_fd != -1) {
3064         res = read(req->content_fd, dst, size);
3065         if (nxt_slow_path(res < 0)) {
3066             nxt_unit_req_alert(req, "failed to read content: %s (%d)",
3067                                strerror(errno), errno);
3068 
3069             return res;
3070         }
3071 
3072         if (res < (ssize_t) size) {
3073             nxt_unit_close(req->content_fd);
3074 
3075             req->content_fd = -1;
3076         }
3077 
3078         req->content_length -= res;
3079 
3080         dst = nxt_pointer_to(dst, res);
3081 
3082     } else {
3083         res = 0;
3084     }
3085 
3086     return buf_res + res;
3087 }
3088 
3089 
3090 ssize_t
nxt_unit_request_readline_size(nxt_unit_request_info_t * req,size_t max_size)3091 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size)
3092 {
3093     char                 *p;
3094     size_t               l_size, b_size;
3095     nxt_unit_buf_t       *b;
3096     nxt_unit_mmap_buf_t  *mmap_buf, *preread_buf;
3097 
3098     if (req->content_length == 0) {
3099         return 0;
3100     }
3101 
3102     l_size = 0;
3103 
3104     b = req->content_buf;
3105 
3106     while (b != NULL) {
3107         b_size = b->end - b->free;
3108         p = memchr(b->free, '\n', b_size);
3109 
3110         if (p != NULL) {
3111             p++;
3112             l_size += p - b->free;
3113             break;
3114         }
3115