nxt_unit.c (1008:84f2370bd642) nxt_unit.c (1131:ec7d924d8dfb)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"
9#include "nxt_port_memory_int.h"
10
11#include "nxt_unit.h"
12#include "nxt_unit_request.h"
13#include "nxt_unit_response.h"
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"
9#include "nxt_port_memory_int.h"
10
11#include "nxt_unit.h"
12#include "nxt_unit_request.h"
13#include "nxt_unit_response.h"
14#include "nxt_unit_websocket.h"
14
15
16#include "nxt_websocket.h"
17
15#if (NXT_HAVE_MEMFD_CREATE)
16#include <linux/memfd.h>
17#endif
18
18#if (NXT_HAVE_MEMFD_CREATE)
19#include <linux/memfd.h>
20#endif
21
19typedef struct nxt_unit_impl_s nxt_unit_impl_t;
20typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
21typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
22typedef struct nxt_unit_process_s nxt_unit_process_t;
23typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
24typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
25typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
26typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
27typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
22typedef struct nxt_unit_impl_s nxt_unit_impl_t;
23typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
24typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
25typedef struct nxt_unit_process_s nxt_unit_process_t;
26typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
27typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
28typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
29typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
30typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
31typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
28
29static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
30static void nxt_unit_ctx_init(nxt_unit_impl_t *lib,
31 nxt_unit_ctx_impl_t *ctx_impl, void *data);
32
33static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
34static void nxt_unit_ctx_init(nxt_unit_impl_t *lib,
35 nxt_unit_ctx_impl_t *ctx_impl, void *data);
36nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
37 nxt_unit_mmap_buf_t *mmap_buf);
38nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
39 nxt_unit_mmap_buf_t *mmap_buf);
40nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf);
32static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
33 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
34static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
35 uint32_t stream);
41static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
42 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
43static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
44 uint32_t stream);
45static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
46 nxt_unit_recv_msg_t *recv_msg);
47static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
48 nxt_unit_recv_msg_t *recv_msg);
49static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
50 nxt_unit_recv_msg_t *recv_msg);
36static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
37 nxt_unit_ctx_t *ctx);
38static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
39static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
51static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
52 nxt_unit_ctx_t *ctx);
53static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
54static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
55static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
56 nxt_unit_ctx_t *ctx);
57static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
58static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
40static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
41 nxt_unit_recv_msg_t *recv_msg);
42static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
43static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
44static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
45 nxt_unit_mmap_buf_t *mmap_buf, int last);
59static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
60 nxt_unit_recv_msg_t *recv_msg);
61static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
62static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
63static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
64 nxt_unit_mmap_buf_t *mmap_buf, int last);
65static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
66static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
67 size_t size);
46static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
47 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
48 nxt_chunk_id_t *c, int n);
49static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
50static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
51 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
52static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
53 int fd);

--- 6 unchanged lines hidden (view full) ---

60static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
61 nxt_unit_process_t *process, int i);
62static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
63static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
64 nxt_unit_process_t *process, uint32_t id);
65static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
66 nxt_unit_recv_msg_t *recv_msg);
67static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
68static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
69 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
70 nxt_chunk_id_t *c, int n);
71static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
72static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
73 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
74static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
75 int fd);

--- 6 unchanged lines hidden (view full) ---

82static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
83 nxt_unit_process_t *process, int i);
84static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
85static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
86 nxt_unit_process_t *process, uint32_t id);
87static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
88 nxt_unit_recv_msg_t *recv_msg);
89static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
68 nxt_unit_recv_msg_t *recv_msg, nxt_queue_t *incoming_buf);
90 nxt_unit_recv_msg_t *recv_msg);
69static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
70 uint32_t size);
71
72static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
73 pid_t pid);
74static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
75 pid_t pid, int remove);
76static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);

--- 16 unchanged lines hidden (view full) ---

93 nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
94 void *oob, size_t oob_size);
95
96static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
97 nxt_unit_port_t *port);
98static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
99 nxt_unit_port_id_t *port_id, int remove);
100
91static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
92 uint32_t size);
93
94static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
95 pid_t pid);
96static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
97 pid_t pid, int remove);
98static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);

--- 16 unchanged lines hidden (view full) ---

115 nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
116 void *oob, size_t oob_size);
117
118static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
119 nxt_unit_port_t *port);
120static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
121 nxt_unit_port_id_t *port_id, int remove);
122
123static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
124 nxt_unit_request_info_impl_t *req_impl);
125static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
126 nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
127
101static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
102
103
104struct nxt_unit_mmap_buf_s {
105 nxt_unit_buf_t buf;
106
128static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
129
130
131struct nxt_unit_mmap_buf_s {
132 nxt_unit_buf_t buf;
133
134 nxt_unit_mmap_buf_t *next;
135 nxt_unit_mmap_buf_t **prev;
136
107 nxt_port_mmap_header_t *hdr;
137 nxt_port_mmap_header_t *hdr;
108 nxt_queue_link_t link;
138// nxt_queue_link_t link;
109 nxt_unit_port_id_t port_id;
110 nxt_unit_request_info_t *req;
111 nxt_unit_ctx_impl_t *ctx_impl;
112};
113
114
115struct nxt_unit_recv_msg_s {
139 nxt_unit_port_id_t port_id;
140 nxt_unit_request_info_t *req;
141 nxt_unit_ctx_impl_t *ctx_impl;
142};
143
144
145struct nxt_unit_recv_msg_s {
116 nxt_port_msg_t port_msg;
146 uint32_t stream;
147 nxt_pid_t pid;
148 nxt_port_id_t reply_port;
117
149
150 uint8_t last; /* 1 bit */
151 uint8_t mmap; /* 1 bit */
152
118 void *start;
119 uint32_t size;
120
153 void *start;
154 uint32_t size;
155
156 int fd;
121 nxt_unit_process_t *process;
157 nxt_unit_process_t *process;
158
159 nxt_unit_mmap_buf_t *incoming_buf;
122};
123
124
125typedef enum {
126 NXT_UNIT_RS_START = 0,
127 NXT_UNIT_RS_RESPONSE_INIT,
128 NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
129 NXT_UNIT_RS_RESPONSE_SENT,
160};
161
162
163typedef enum {
164 NXT_UNIT_RS_START = 0,
165 NXT_UNIT_RS_RESPONSE_INIT,
166 NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
167 NXT_UNIT_RS_RESPONSE_SENT,
130 NXT_UNIT_RS_DONE,
168 NXT_UNIT_RS_RELEASED,
131} nxt_unit_req_state_t;
132
133
134struct nxt_unit_request_info_impl_s {
135 nxt_unit_request_info_t req;
136
169} nxt_unit_req_state_t;
170
171
172struct nxt_unit_request_info_impl_s {
173 nxt_unit_request_info_t req;
174
137 nxt_unit_recv_msg_t recv_msg;
138 nxt_queue_t outgoing_buf; /* of nxt_unit_mmap_buf_t */
139 nxt_queue_t incoming_buf; /* of nxt_unit_mmap_buf_t */
175 uint32_t stream;
140
176
177 nxt_unit_process_t *process;
178
179 nxt_unit_mmap_buf_t *outgoing_buf;
180 nxt_unit_mmap_buf_t *incoming_buf;
181
141 nxt_unit_req_state_t state;
182 nxt_unit_req_state_t state;
183 uint8_t websocket;
142
143 nxt_queue_link_t link;
144
145 char extra_data[];
146};
147
148
184
185 nxt_queue_link_t link;
186
187 char extra_data[];
188};
189
190
191struct nxt_unit_websocket_frame_impl_s {
192 nxt_unit_websocket_frame_t ws;
193
194 nxt_unit_mmap_buf_t *buf;
195
196 nxt_queue_link_t link;
197
198 nxt_unit_ctx_impl_t *ctx_impl;
199
200 void *retain_buf;
201};
202
203
149struct nxt_unit_ctx_impl_s {
150 nxt_unit_ctx_t ctx;
151
152 nxt_unit_port_id_t read_port_id;
153 int read_port_fd;
154
155 nxt_queue_link_t link;
156
204struct nxt_unit_ctx_impl_s {
205 nxt_unit_ctx_t ctx;
206
207 nxt_unit_port_id_t read_port_id;
208 int read_port_fd;
209
210 nxt_queue_link_t link;
211
157 nxt_queue_t free_buf; /* of nxt_unit_mmap_buf_t */
212 nxt_unit_mmap_buf_t *free_buf;
158
159 /* of nxt_unit_request_info_impl_t */
160 nxt_queue_t free_req;
161
213
214 /* of nxt_unit_request_info_impl_t */
215 nxt_queue_t free_req;
216
217 /* of nxt_unit_websocket_frame_impl_t */
218 nxt_queue_t free_ws;
219
162 /* of nxt_unit_request_info_impl_t */
163 nxt_queue_t active_req;
164
220 /* of nxt_unit_request_info_impl_t */
221 nxt_queue_t active_req;
222
223 /* of nxt_unit_request_info_impl_t */
224 nxt_lvlhsh_t requests;
225
165 nxt_unit_mmap_buf_t ctx_buf[2];
166
167 nxt_unit_request_info_impl_t req;
168};
169
170
171struct nxt_unit_impl_s {
172 nxt_unit_t unit;

--- 216 unchanged lines hidden (view full) ---

389nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
390 void *data)
391{
392 ctx_impl->ctx.data = data;
393 ctx_impl->ctx.unit = &lib->unit;
394
395 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
396
226 nxt_unit_mmap_buf_t ctx_buf[2];
227
228 nxt_unit_request_info_impl_t req;
229};
230
231
232struct nxt_unit_impl_s {
233 nxt_unit_t unit;

--- 216 unchanged lines hidden (view full) ---

450nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
451 void *data)
452{
453 ctx_impl->ctx.data = data;
454 ctx_impl->ctx.unit = &lib->unit;
455
456 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
457
397 nxt_queue_init(&ctx_impl->free_buf);
398 nxt_queue_init(&ctx_impl->free_req);
458 nxt_queue_init(&ctx_impl->free_req);
459 nxt_queue_init(&ctx_impl->free_ws);
399 nxt_queue_init(&ctx_impl->active_req);
400
460 nxt_queue_init(&ctx_impl->active_req);
461
401 nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0].link);
402 nxt_queue_insert_tail(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1].link);
462 ctx_impl->free_buf = NULL;
463 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
464 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
465
403 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
404
405 ctx_impl->req.req.ctx = &ctx_impl->ctx;
406 ctx_impl->req.req.unit = &lib->unit;
407
408 ctx_impl->read_port_fd = -1;
466 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
467
468 ctx_impl->req.req.ctx = &ctx_impl->ctx;
469 ctx_impl->req.req.unit = &lib->unit;
470
471 ctx_impl->read_port_fd = -1;
472 ctx_impl->requests.slot = 0;
409}
410
411
473}
474
475
476nxt_inline void
477nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
478 nxt_unit_mmap_buf_t *mmap_buf)
479{
480 mmap_buf->next = *head;
481
482 if (mmap_buf->next != NULL) {
483 mmap_buf->next->prev = &mmap_buf->next;
484 }
485
486 *head = mmap_buf;
487 mmap_buf->prev = head;
488}
489
490
491nxt_inline void
492nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
493 nxt_unit_mmap_buf_t *mmap_buf)
494{
495 while (*prev != NULL) {
496 prev = &(*prev)->next;
497 }
498
499 nxt_unit_mmap_buf_insert(prev, mmap_buf);
500}
501
502
503nxt_inline void
504nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf)
505{
506 nxt_unit_mmap_buf_t **prev;
507
508 prev = mmap_buf->prev;
509
510 if (mmap_buf->next != NULL) {
511 mmap_buf->next->prev = prev;
512 }
513
514 if (prev != NULL) {
515 *prev = mmap_buf->next;
516 }
517}
518
519
412static int
413nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
414 int *log_fd, uint32_t *stream)
415{
416 int rc;
417 int ready_fd, read_fd;
418 char *unit_init, *version_end;
419 long version_length;

--- 84 unchanged lines hidden (view full) ---

504 return NXT_UNIT_OK;
505}
506
507
508int
509nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
510 void *buf, size_t buf_size, void *oob, size_t oob_size)
511{
520static int
521nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
522 int *log_fd, uint32_t *stream)
523{
524 int rc;
525 int ready_fd, read_fd;
526 char *unit_init, *version_end;
527 long version_length;

--- 84 unchanged lines hidden (view full) ---

612 return NXT_UNIT_OK;
613}
614
615
616int
617nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
618 void *buf, size_t buf_size, void *oob, size_t oob_size)
619{
512 int fd, rc, nb;
513 pid_t pid;
514 nxt_queue_t incoming_buf;
515 struct cmsghdr *cm;
516 nxt_port_msg_t *port_msg;
517 nxt_unit_impl_t *lib;
518 nxt_unit_port_t new_port;
519 nxt_queue_link_t *lnk;
520 nxt_unit_request_t *r;
521 nxt_unit_mmap_buf_t *b;
522 nxt_unit_recv_msg_t recv_msg;
523 nxt_unit_callbacks_t *cb;
524 nxt_port_msg_new_port_t *new_port_msg;
525 nxt_unit_request_info_t *req;
526 nxt_unit_request_info_impl_t *req_impl;
620 int rc;
621 pid_t pid;
622 struct cmsghdr *cm;
623 nxt_port_msg_t *port_msg;
624 nxt_unit_impl_t *lib;
625 nxt_unit_recv_msg_t recv_msg;
626 nxt_unit_callbacks_t *cb;
527
528 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
529
530 rc = NXT_UNIT_ERROR;
627
628 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
629
630 rc = NXT_UNIT_ERROR;
531 fd = -1;
631 recv_msg.fd = -1;
532 recv_msg.process = NULL;
533 port_msg = buf;
534 cm = oob;
535
536 if (oob_size >= CMSG_SPACE(sizeof(int))
537 && cm->cmsg_len == CMSG_LEN(sizeof(int))
538 && cm->cmsg_level == SOL_SOCKET
539 && cm->cmsg_type == SCM_RIGHTS)
540 {
632 recv_msg.process = NULL;
633 port_msg = buf;
634 cm = oob;
635
636 if (oob_size >= CMSG_SPACE(sizeof(int))
637 && cm->cmsg_len == CMSG_LEN(sizeof(int))
638 && cm->cmsg_level == SOL_SOCKET
639 && cm->cmsg_type == SCM_RIGHTS)
640 {
541 memcpy(&fd, CMSG_DATA(cm), sizeof(int));
641 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
542 }
543
642 }
643
544 nxt_queue_init(&incoming_buf);
644 recv_msg.incoming_buf = NULL;
545
546 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
547 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
548 goto fail;
549 }
550
645
646 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
647 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
648 goto fail;
649 }
650
551 recv_msg.port_msg = *port_msg;
651 recv_msg.stream = port_msg->stream;
652 recv_msg.pid = port_msg->pid;
653 recv_msg.reply_port = port_msg->reply_port;
654 recv_msg.last = port_msg->last;
655 recv_msg.mmap = port_msg->mmap;
656
552 recv_msg.start = port_msg + 1;
553 recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
554
555 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
556 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
557 port_msg->stream, (int) port_msg->type);
558 goto fail;
559 }

--- 7 unchanged lines hidden (view full) ---

567 /* Fragmentation is unsupported. */
568 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
569 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
570 port_msg->stream, (int) port_msg->type);
571 goto fail;
572 }
573
574 if (port_msg->mmap) {
657 recv_msg.start = port_msg + 1;
658 recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
659
660 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
661 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
662 port_msg->stream, (int) port_msg->type);
663 goto fail;
664 }

--- 7 unchanged lines hidden (view full) ---

672 /* Fragmentation is unsupported. */
673 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
674 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
675 port_msg->stream, (int) port_msg->type);
676 goto fail;
677 }
678
679 if (port_msg->mmap) {
575 if (nxt_unit_mmap_read(ctx, &recv_msg, &incoming_buf) != NXT_UNIT_OK) {
680 if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
576 goto fail;
577 }
578 }
579
580 cb = &lib->callbacks;
581
582 switch (port_msg->type) {
583
584 case _NXT_PORT_MSG_QUIT:
585 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
586
587 cb->quit(ctx);
588 rc = NXT_UNIT_OK;
589 break;
590
591 case _NXT_PORT_MSG_NEW_PORT:
681 goto fail;
682 }
683 }
684
685 cb = &lib->callbacks;
686
687 switch (port_msg->type) {
688
689 case _NXT_PORT_MSG_QUIT:
690 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
691
692 cb->quit(ctx);
693 rc = NXT_UNIT_OK;
694 break;
695
696 case _NXT_PORT_MSG_NEW_PORT:
592 if (nxt_slow_path(recv_msg.size != sizeof(nxt_port_msg_new_port_t))) {
593 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
594 "invalid message size (%d)",
595 port_msg->stream, (int) recv_msg.size);
697 rc = nxt_unit_process_new_port(ctx, &recv_msg);
698 break;
596
699
597 goto fail;
598 }
700 case _NXT_PORT_MSG_CHANGE_FILE:
701 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
702 port_msg->stream, recv_msg.fd);
703 break;
599
704
600 if (nxt_slow_path(fd < 0)) {
601 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
602 port_msg->stream, fd);
705 case _NXT_PORT_MSG_MMAP:
706 if (nxt_slow_path(recv_msg.fd < 0)) {
707 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
708 port_msg->stream, recv_msg.fd);
603
604 goto fail;
605 }
606
709
710 goto fail;
711 }
712
607 new_port_msg = recv_msg.start;
713 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
714 break;
608
715
609 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
610 port_msg->stream, (int) new_port_msg->pid,
611 (int) new_port_msg->id, fd);
716 case _NXT_PORT_MSG_REQ_HEADERS:
717 rc = nxt_unit_process_req_headers(ctx, &recv_msg);
718 break;
612
719
613 nb = 0;
720 case _NXT_PORT_MSG_WEBSOCKET:
721 rc = nxt_unit_process_websocket(ctx, &recv_msg);
722 break;
614
723
615 if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
616 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
617 "failed: %s (%d)", fd, strerror(errno), errno);
724 case _NXT_PORT_MSG_REMOVE_PID:
725 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
726 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
727 "(%d != %d)", port_msg->stream, (int) recv_msg.size,
728 (int) sizeof(pid));
618
619 goto fail;
620 }
621
729
730 goto fail;
731 }
732
622 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
623 new_port_msg->id);
733 memcpy(&pid, recv_msg.start, sizeof(pid));
624
734
625 new_port.in_fd = -1;
626 new_port.out_fd = fd;
627 new_port.data = NULL;
735 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
736 port_msg->stream, (int) pid);
628
737
629 fd = -1;
738 cb->remove_pid(ctx, pid);
630
739
631 rc = cb->add_port(ctx, &new_port);
740 rc = NXT_UNIT_OK;
632 break;
633
741 break;
742
634 case _NXT_PORT_MSG_CHANGE_FILE:
635 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
636 port_msg->stream, fd);
637 break;
743 default:
744 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
745 port_msg->stream, (int) port_msg->type);
638
746
639 case _NXT_PORT_MSG_MMAP:
640 if (nxt_slow_path(fd < 0)) {
641 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
642 port_msg->stream, fd);
747 goto fail;
748 }
643
749
644 goto fail;
645 }
750fail:
646
751
647 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, fd);
648 break;
752 if (recv_msg.fd != -1) {
753 close(recv_msg.fd);
754 }
649
755
650 case _NXT_PORT_MSG_DATA:
651 if (nxt_slow_path(port_msg->mmap == 0)) {
652 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
653 port_msg->stream);
756 while (recv_msg.incoming_buf != NULL) {
757 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
758 }
654
759
655 goto fail;
656 }
760 if (recv_msg.process != NULL) {
761 nxt_unit_process_use(ctx, recv_msg.process, -1);
762 }
657
763
658 if (nxt_slow_path(recv_msg.size < sizeof(nxt_unit_request_t))) {
659 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
660 "%d expected", port_msg->stream, (int) recv_msg.size,
661 (int) sizeof(nxt_unit_request_t));
764 return rc;
765}
662
766
663 goto fail;
664 }
665
767
666 req_impl = nxt_unit_request_info_get(ctx);
667 if (nxt_slow_path(req_impl == NULL)) {
668 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
669 port_msg->stream);
768static int
769nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
770{
771 int nb;
772 nxt_unit_impl_t *lib;
773 nxt_unit_port_t new_port;
774 nxt_port_msg_new_port_t *new_port_msg;
670
775
671 goto fail;
672 }
776 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
777 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
778 "invalid message size (%d)",
779 recv_msg->stream, (int) recv_msg->size);
673
780
674 req = &req_impl->req;
781 return NXT_UNIT_ERROR;
782 }
675
783
676 req->request_port = *port_id;
784 if (nxt_slow_path(recv_msg->fd < 0)) {
785 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
786 recv_msg->stream, recv_msg->fd);
677
787
678 nxt_unit_port_id_init(&req->response_port, port_msg->pid,
679 port_msg->reply_port);
788 return NXT_UNIT_ERROR;
789 }
680
790
681 req->request = recv_msg.start;
791 new_port_msg = recv_msg->start;
682
792
683 lnk = nxt_queue_first(&incoming_buf);
684 b = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
793 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
794 recv_msg->stream, (int) new_port_msg->pid,
795 (int) new_port_msg->id, recv_msg->fd);
685
796
686 req->request_buf = &b->buf;
687 req->response = NULL;
688 req->response_buf = NULL;
797 nb = 0;
689
798
690 r = req->request;
799 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
800 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
801 "failed: %s (%d)", recv_msg->fd, strerror(errno), errno);
691
802
692 req->content_length = r->content_length;
803 return NXT_UNIT_ERROR;
804 }
693
805
694 req->content_buf = req->request_buf;
695 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
806 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
807 new_port_msg->id);
696
808
697 /* Move process to req_impl. */
698 req_impl->recv_msg = recv_msg;
809 new_port.in_fd = -1;
810 new_port.out_fd = recv_msg->fd;
811 new_port.data = NULL;
699
812
700 recv_msg.process = NULL;
813 recv_msg->fd = -1;
701
814
702 nxt_queue_init(&req_impl->outgoing_buf);
703 nxt_queue_init(&req_impl->incoming_buf);
815 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
704
816
705 nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link)
706 {
707 b->req = req;
708 } nxt_queue_loop;
817 return lib->callbacks.add_port(ctx, &new_port);
818}
709
819
710 nxt_queue_add(&req_impl->incoming_buf, &incoming_buf);
711 nxt_queue_init(&incoming_buf);
712
820
713 req->response_max_fields = 0;
714 req_impl->state = NXT_UNIT_RS_START;
821static int
822nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
823{
824 nxt_unit_impl_t *lib;
825 nxt_unit_request_t *r;
826 nxt_unit_mmap_buf_t *b;
827 nxt_unit_request_info_t *req;
828 nxt_unit_request_info_impl_t *req_impl;
715
829
716 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", port_msg->stream,
717 (int) r->method_length, nxt_unit_sptr_get(&r->method),
718 (int) r->target_length, nxt_unit_sptr_get(&r->target),
719 (int) r->content_length);
830 if (nxt_slow_path(recv_msg->mmap == 0)) {
831 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
832 recv_msg->stream);
720
833
721 cb->request_handler(req);
834 return NXT_UNIT_ERROR;
835 }
722
836
723 rc = NXT_UNIT_OK;
724 break;
837 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
838 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
839 "%d expected", recv_msg->stream, (int) recv_msg->size,
840 (int) sizeof(nxt_unit_request_t));
725
841
726 case _NXT_PORT_MSG_REMOVE_PID:
727 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
728 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
729 "(%d != %d)", port_msg->stream, (int) recv_msg.size,
730 (int) sizeof(pid));
842 return NXT_UNIT_ERROR;
843 }
731
844
732 goto fail;
733 }
845 req_impl = nxt_unit_request_info_get(ctx);
846 if (nxt_slow_path(req_impl == NULL)) {
847 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
848 recv_msg->stream);
734
849
735 memcpy(&pid, recv_msg.start, sizeof(pid));
850 return NXT_UNIT_ERROR;
851 }
736
852
737 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
738 port_msg->stream, (int) pid);
853 req = &req_impl->req;
739
854
740 cb->remove_pid(ctx, pid);
855 nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
856 recv_msg->reply_port);
741
857
742 rc = NXT_UNIT_OK;
743 break;
858 req->request = recv_msg->start;
744
859
745 default:
746 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
747 port_msg->stream, (int) port_msg->type);
860 b = recv_msg->incoming_buf;
748
861
749 goto fail;
862 req->request_buf = &b->buf;
863 req->response = NULL;
864 req->response_buf = NULL;
865
866 r = req->request;
867
868 req->content_length = r->content_length;
869
870 req->content_buf = req->request_buf;
871 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
872
873 /* "Move" process reference to req_impl. */
874 req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
875 if (nxt_slow_path(req_impl->process == NULL)) {
876 return NXT_UNIT_ERROR;
750 }
751
877 }
878
752fail:
879 recv_msg->process = NULL;
753
880
754 if (fd != -1) {
755 close(fd);
881 req_impl->stream = recv_msg->stream;
882
883 req_impl->outgoing_buf = NULL;
884
885 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
886 b->req = req;
756 }
757
887 }
888
758 if (port_msg->mmap) {
759 nxt_queue_each(b, &incoming_buf, nxt_unit_mmap_buf_t, link)
760 {
761 nxt_unit_mmap_release(b->hdr, b->buf.start,
762 b->buf.end - b->buf.start);
889 /* "Move" incoming buffer list to req_impl. */
890 req_impl->incoming_buf = recv_msg->incoming_buf;
891 req_impl->incoming_buf->prev = &req_impl->incoming_buf;
892 recv_msg->incoming_buf = NULL;
763
893
764 nxt_unit_mmap_buf_release(b);
765 } nxt_queue_loop;
894 req->response_max_fields = 0;
895 req_impl->state = NXT_UNIT_RS_START;
896 req_impl->websocket = 0;
897
898 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
899 (int) r->method_length, nxt_unit_sptr_get(&r->method),
900 (int) r->target_length, nxt_unit_sptr_get(&r->target),
901 (int) r->content_length);
902
903 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
904
905 lib->callbacks.request_handler(req);
906
907 return NXT_UNIT_OK;
908}
909
910
911static int
912nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
913{
914 size_t hsize;
915 nxt_unit_impl_t *lib;
916 nxt_unit_mmap_buf_t *b;
917 nxt_unit_ctx_impl_t *ctx_impl;
918 nxt_unit_callbacks_t *cb;
919 nxt_unit_request_info_t *req;
920 nxt_unit_request_info_impl_t *req_impl;
921 nxt_unit_websocket_frame_impl_t *ws_impl;
922
923 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
924
925 req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
926 recv_msg->last);
927 if (req_impl == NULL) {
928 return NXT_UNIT_OK;
766 }
767
929 }
930
768 if (recv_msg.process != NULL) {
769 nxt_unit_process_use(ctx, recv_msg.process, -1);
931 req = &req_impl->req;
932
933 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
934 cb = &lib->callbacks;
935
936 if (cb->websocket_handler && recv_msg->size >= 2) {
937 ws_impl = nxt_unit_websocket_frame_get(ctx);
938 if (nxt_slow_path(ws_impl == NULL)) {
939 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
940 req_impl->stream);
941
942 return NXT_UNIT_ERROR;
943 }
944
945 ws_impl->ws.req = req;
946
947 ws_impl->buf = NULL;
948 ws_impl->retain_buf = NULL;
949
950 if (recv_msg->mmap) {
951 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
952 b->req = req;
953 }
954
955 /* "Move" incoming buffer list to ws_impl. */
956 ws_impl->buf = recv_msg->incoming_buf;
957 ws_impl->buf->prev = &ws_impl->buf;
958 recv_msg->incoming_buf = NULL;
959
960 b = ws_impl->buf;
961
962 } else {
963 b = nxt_unit_mmap_buf_get(ctx);
964 if (nxt_slow_path(b == NULL)) {
965 return NXT_UNIT_ERROR;
966 }
967
968 b->hdr = NULL;
969 b->req = req;
970 b->buf.start = recv_msg->start;
971 b->buf.free = b->buf.start;
972 b->buf.end = b->buf.start + recv_msg->size;
973
974 nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
975 }
976
977 ws_impl->ws.header = (void *) b->buf.start;
978 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
979 ws_impl->ws.header);
980
981 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
982
983 if (ws_impl->ws.header->mask) {
984 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
985
986 } else {
987 ws_impl->ws.mask = NULL;
988 }
989
990 b->buf.free += hsize;
991
992 ws_impl->ws.content_buf = &b->buf;
993 ws_impl->ws.content_length = ws_impl->ws.payload_len;
994
995 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
996 "payload_len=%"PRIu64,
997 ws_impl->ws.header->opcode,
998 ws_impl->ws.payload_len);
999
1000 cb->websocket_handler(&ws_impl->ws);
770 }
771
1001 }
1002
772 return rc;
1003 if (recv_msg->last) {
1004 req_impl->websocket = 0;
1005
1006 if (cb->close_handler) {
1007 nxt_unit_req_debug(req, "close_handler");
1008
1009 cb->close_handler(req);
1010
1011 } else {
1012 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1013 }
1014 }
1015
1016 return NXT_UNIT_OK;
773}
774
775
776static nxt_unit_request_info_impl_t *
777nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
778{
779 nxt_unit_impl_t *lib;
780 nxt_queue_link_t *lnk;

--- 29 unchanged lines hidden (view full) ---

810
811 return req_impl;
812}
813
814
815static void
816nxt_unit_request_info_release(nxt_unit_request_info_t *req)
817{
1017}
1018
1019
1020static nxt_unit_request_info_impl_t *
1021nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1022{
1023 nxt_unit_impl_t *lib;
1024 nxt_queue_link_t *lnk;

--- 29 unchanged lines hidden (view full) ---

1054
1055 return req_impl;
1056}
1057
1058
1059static void
1060nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1061{
818 nxt_unit_mmap_buf_t *b;
819 nxt_unit_ctx_impl_t *ctx_impl;
1062 nxt_unit_ctx_impl_t *ctx_impl;
820 nxt_unit_recv_msg_t *recv_msg;
821 nxt_unit_request_info_impl_t *req_impl;
822
823 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
824 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
825
826 req->response = NULL;
827 req->response_buf = NULL;
828
1063 nxt_unit_request_info_impl_t *req_impl;
1064
1065 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1066 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1067
1068 req->response = NULL;
1069 req->response_buf = NULL;
1070
829 recv_msg = &req_impl->recv_msg;
1071 if (req_impl->process != NULL) {
1072 nxt_unit_process_use(req->ctx, req_impl->process, -1);
830
1073
831 if (recv_msg->process != NULL) {
832 nxt_unit_process_use(req->ctx, recv_msg->process, -1);
833
834 recv_msg->process = NULL;
1074 req_impl->process = NULL;
835 }
836
1075 }
1076
837 nxt_queue_each(b, &req_impl->outgoing_buf, nxt_unit_mmap_buf_t, link) {
1077 if (req_impl->websocket) {
1078 nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
838
1079
839 nxt_unit_buf_free(&b->buf);
1080 req_impl->websocket = 0;
1081 }
840
1082
841 } nxt_queue_loop;
1083 while (req_impl->outgoing_buf != NULL) {
1084 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1085 }
842
1086
843 nxt_queue_each(b, &req_impl->incoming_buf, nxt_unit_mmap_buf_t, link) {
1087 while (req_impl->incoming_buf != NULL) {
1088 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1089 }
844
1090
845 nxt_unit_mmap_release(b->hdr, b->buf.start, b->buf.end - b->buf.start);
846 nxt_unit_mmap_buf_release(b);
847
848 } nxt_queue_loop;
849
850 nxt_queue_remove(&req_impl->link);
851
852 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1091 nxt_queue_remove(&req_impl->link);
1092
1093 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1094
1095 req_impl->state = NXT_UNIT_RS_RELEASED;
853}
854
855
856static void
857nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
858{
859 nxt_unit_ctx_impl_t *ctx_impl;
860
861 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
862
863 nxt_queue_remove(&req_impl->link);
864
865 if (req_impl != &ctx_impl->req) {
866 free(req_impl);
867 }
868}
869
870
1096}
1097
1098
1099static void
1100nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1101{
1102 nxt_unit_ctx_impl_t *ctx_impl;
1103
1104 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1105
1106 nxt_queue_remove(&req_impl->link);
1107
1108 if (req_impl != &ctx_impl->req) {
1109 free(req_impl);
1110 }
1111}
1112
1113
1114static nxt_unit_websocket_frame_impl_t *
1115nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1116{
1117 nxt_queue_link_t *lnk;
1118 nxt_unit_ctx_impl_t *ctx_impl;
1119 nxt_unit_websocket_frame_impl_t *ws_impl;
1120
1121 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1122
1123 if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1124 ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
1125 if (nxt_slow_path(ws_impl == NULL)) {
1126 nxt_unit_warn(ctx, "websocket frame allocation failed");
1127
1128 return NULL;
1129 }
1130
1131 } else {
1132 lnk = nxt_queue_first(&ctx_impl->free_ws);
1133 nxt_queue_remove(lnk);
1134
1135 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1136 }
1137
1138 ws_impl->ctx_impl = ctx_impl;
1139
1140 return ws_impl;
1141}
1142
1143
1144static void
1145nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1146{
1147 nxt_unit_websocket_frame_impl_t *ws_impl;
1148
1149 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1150
1151 while (ws_impl->buf != NULL) {
1152 nxt_unit_mmap_buf_free(ws_impl->buf);
1153 }
1154
1155 ws->req = NULL;
1156
1157 if (ws_impl->retain_buf != NULL) {
1158 free(ws_impl->retain_buf);
1159
1160 ws_impl->retain_buf = NULL;
1161 }
1162
1163 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1164}
1165
1166
1167static void
1168nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
1169{
1170 nxt_queue_remove(&ws_impl->link);
1171
1172 free(ws_impl);
1173}
1174
1175
871uint16_t
872nxt_unit_field_hash(const char *name, size_t name_length)
873{
874 u_char ch;
875 uint32_t hash;
876 const char *p, *end;
877
878 hash = 159406; /* Magic value copied from nxt_http_parse.c */

--- 391 unchanged lines hidden (view full) ---

1270 }
1271
1272 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1273 nxt_unit_req_warn(req, "send: response already sent");
1274
1275 return NXT_UNIT_ERROR;
1276 }
1277
1176uint16_t
1177nxt_unit_field_hash(const char *name, size_t name_length)
1178{
1179 u_char ch;
1180 uint32_t hash;
1181 const char *p, *end;
1182
1183 hash = 159406; /* Magic value copied from nxt_http_parse.c */

--- 391 unchanged lines hidden (view full) ---

1575 }
1576
1577 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1578 nxt_unit_req_warn(req, "send: response already sent");
1579
1580 return NXT_UNIT_ERROR;
1581 }
1582
1583 if (req->request->websocket_handshake && req->response->status == 101) {
1584 nxt_unit_response_upgrade(req);
1585 }
1586
1278 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
1279 req->response->fields_count,
1280 (int) (req->response_buf->free
1281 - req->response_buf->start));
1282
1283 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
1284
1587 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
1588 req->response->fields_count,
1589 (int) (req->response_buf->free
1590 - req->response_buf->start));
1591
1592 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
1593
1285 rc = nxt_unit_mmap_buf_send(req->ctx,
1286 req_impl->recv_msg.port_msg.stream,
1287 mmap_buf, 0);
1594 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1288 if (nxt_fast_path(rc == NXT_UNIT_OK)) {
1289 req->response = NULL;
1290 req->response_buf = NULL;
1291 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1292
1293 nxt_unit_mmap_buf_release(mmap_buf);
1294 }
1295

--- 11 unchanged lines hidden (view full) ---

1307 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
1308}
1309
1310
1311nxt_unit_buf_t *
1312nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
1313{
1314 int rc;
1595 if (nxt_fast_path(rc == NXT_UNIT_OK)) {
1596 req->response = NULL;
1597 req->response_buf = NULL;
1598 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1599
1600 nxt_unit_mmap_buf_release(mmap_buf);
1601 }
1602

--- 11 unchanged lines hidden (view full) ---

1614 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
1615}
1616
1617
1618nxt_unit_buf_t *
1619nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
1620{
1621 int rc;
1315 nxt_unit_process_t *process;
1316 nxt_unit_mmap_buf_t *mmap_buf;
1317 nxt_unit_request_info_impl_t *req_impl;
1318
1319 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
1320 nxt_unit_req_warn(req, "response_buf_alloc: "
1321 "requested buffer (%"PRIu32") too big", size);
1322
1323 return NULL;
1324 }
1325
1326 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
1327
1328 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1329
1622 nxt_unit_mmap_buf_t *mmap_buf;
1623 nxt_unit_request_info_impl_t *req_impl;
1624
1625 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
1626 nxt_unit_req_warn(req, "response_buf_alloc: "
1627 "requested buffer (%"PRIu32") too big", size);
1628
1629 return NULL;
1630 }
1631
1632 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
1633
1634 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1635
1330 process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg);
1331 if (nxt_slow_path(process == NULL)) {
1332 return NULL;
1333 }
1334
1335 mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
1336 if (nxt_slow_path(mmap_buf == NULL)) {
1337 return NULL;
1338 }
1339
1340 mmap_buf->req = req;
1341
1636 mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
1637 if (nxt_slow_path(mmap_buf == NULL)) {
1638 return NULL;
1639 }
1640
1641 mmap_buf->req = req;
1642
1342 nxt_queue_insert_tail(&req_impl->outgoing_buf, &mmap_buf->link);
1643 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
1343
1644
1344 rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port,
1345 size, mmap_buf);
1645 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
1646 &req->response_port, size, mmap_buf);
1346 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1347 nxt_unit_mmap_buf_release(mmap_buf);
1348
1349 return NULL;
1350 }
1351
1352 return &mmap_buf->buf;
1353}

--- 7 unchanged lines hidden (view full) ---

1361 if (recv_msg->process != NULL) {
1362 return recv_msg->process;
1363 }
1364
1365 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1366
1367 pthread_mutex_lock(&lib->mutex);
1368
1647 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1648 nxt_unit_mmap_buf_release(mmap_buf);
1649
1650 return NULL;
1651 }
1652
1653 return &mmap_buf->buf;
1654}

--- 7 unchanged lines hidden (view full) ---

1662 if (recv_msg->process != NULL) {
1663 return recv_msg->process;
1664 }
1665
1666 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1667
1668 pthread_mutex_lock(&lib->mutex);
1669
1369 recv_msg->process = nxt_unit_process_find(ctx, recv_msg->port_msg.pid, 0);
1670 recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
1370
1371 pthread_mutex_unlock(&lib->mutex);
1372
1373 if (recv_msg->process == NULL) {
1374 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
1671
1672 pthread_mutex_unlock(&lib->mutex);
1673
1674 if (recv_msg->process == NULL) {
1675 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
1375 recv_msg->port_msg.stream, (int) recv_msg->port_msg.pid);
1676 recv_msg->stream, (int) recv_msg->pid);
1376 }
1377
1378 return recv_msg->process;
1379}
1380
1381
1382static nxt_unit_mmap_buf_t *
1383nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
1384{
1677 }
1678
1679 return recv_msg->process;
1680}
1681
1682
1683static nxt_unit_mmap_buf_t *
1684nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
1685{
1385 nxt_queue_link_t *lnk;
1386 nxt_unit_mmap_buf_t *mmap_buf;
1387 nxt_unit_ctx_impl_t *ctx_impl;
1388
1389 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1390
1686 nxt_unit_mmap_buf_t *mmap_buf;
1687 nxt_unit_ctx_impl_t *ctx_impl;
1688
1689 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1690
1391 if (nxt_queue_is_empty(&ctx_impl->free_buf)) {
1691 if (ctx_impl->free_buf == NULL) {
1392 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
1393 if (nxt_slow_path(mmap_buf == NULL)) {
1394 nxt_unit_warn(ctx, "failed to allocate buf");
1395 }
1396
1397 } else {
1692 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
1693 if (nxt_slow_path(mmap_buf == NULL)) {
1694 nxt_unit_warn(ctx, "failed to allocate buf");
1695 }
1696
1697 } else {
1398 lnk = nxt_queue_first(&ctx_impl->free_buf);
1399 nxt_queue_remove(lnk);
1698 mmap_buf = ctx_impl->free_buf;
1400
1699
1401 mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
1700 nxt_unit_mmap_buf_remove(mmap_buf);
1402 }
1403
1404 mmap_buf->ctx_impl = ctx_impl;
1405
1406 return mmap_buf;
1407}
1408
1409
1410static void
1411nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
1412{
1701 }
1702
1703 mmap_buf->ctx_impl = ctx_impl;
1704
1705 return mmap_buf;
1706}
1707
1708
1709static void
1710nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
1711{
1413 nxt_queue_remove(&mmap_buf->link);
1712 nxt_unit_mmap_buf_remove(mmap_buf);
1414
1713
1415 nxt_queue_insert_tail(&mmap_buf->ctx_impl->free_buf, &mmap_buf->link);
1714 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
1416}
1417
1418
1715}
1716
1717
1718typedef struct {
1719 size_t len;
1720 const char *str;
1721} nxt_unit_str_t;
1722
1723
1724#define nxt_unit_str(str) { nxt_length(str), str }
1725
1726
1419int
1727int
1728nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
1729{
1730 return req->request->websocket_handshake;
1731}
1732
1733
1734int
1735nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
1736{
1737 int rc;
1738 nxt_unit_ctx_impl_t *ctx_impl;
1739 nxt_unit_request_info_impl_t *req_impl;
1740
1741 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1742
1743 if (nxt_slow_path(req_impl->websocket != 0)) {
1744 nxt_unit_req_debug(req, "upgrade: already upgraded");
1745
1746 return NXT_UNIT_OK;
1747 }
1748
1749 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1750 nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
1751
1752 return NXT_UNIT_ERROR;
1753 }
1754
1755 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1756 nxt_unit_req_warn(req, "upgrade: response already sent");
1757
1758 return NXT_UNIT_ERROR;
1759 }
1760
1761 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1762
1763 rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
1764 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1765 nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
1766
1767 return NXT_UNIT_ERROR;
1768 }
1769
1770 req_impl->websocket = 1;
1771
1772 req->response->status = 101;
1773
1774 return NXT_UNIT_OK;
1775}
1776
1777
1778int
1779nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
1780{
1781 nxt_unit_request_info_impl_t *req_impl;
1782
1783 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1784
1785 return req_impl->websocket;
1786}
1787
1788
1789nxt_unit_request_info_t *
1790nxt_unit_get_request_info_from_data(void *data)
1791{
1792 nxt_unit_request_info_impl_t *req_impl;
1793
1794 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
1795
1796 return &req_impl->req;
1797}
1798
1799
1800int
1420nxt_unit_buf_send(nxt_unit_buf_t *buf)
1421{
1422 int rc;
1423 nxt_unit_mmap_buf_t *mmap_buf;
1424 nxt_unit_request_info_t *req;
1425 nxt_unit_request_info_impl_t *req_impl;
1426
1427 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);

--- 12 unchanged lines hidden (view full) ---

1440
1441 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
1442 nxt_unit_req_warn(req, "buf_send: headers not sent yet");
1443
1444 return NXT_UNIT_ERROR;
1445 }
1446
1447 if (nxt_fast_path(buf->free > buf->start)) {
1801nxt_unit_buf_send(nxt_unit_buf_t *buf)
1802{
1803 int rc;
1804 nxt_unit_mmap_buf_t *mmap_buf;
1805 nxt_unit_request_info_t *req;
1806 nxt_unit_request_info_impl_t *req_impl;
1807
1808 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);

--- 12 unchanged lines hidden (view full) ---

1821
1822 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
1823 nxt_unit_req_warn(req, "buf_send: headers not sent yet");
1824
1825 return NXT_UNIT_ERROR;
1826 }
1827
1828 if (nxt_fast_path(buf->free > buf->start)) {
1448 rc = nxt_unit_mmap_buf_send(req->ctx,
1449 req_impl->recv_msg.port_msg.stream,
1450 mmap_buf, 0);
1829 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1451 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1452 return rc;
1453 }
1454 }
1455
1456 nxt_unit_mmap_buf_release(mmap_buf);
1457
1458 return NXT_UNIT_OK;

--- 8 unchanged lines hidden (view full) ---

1467 nxt_unit_request_info_t *req;
1468 nxt_unit_request_info_impl_t *req_impl;
1469
1470 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1471
1472 req = mmap_buf->req;
1473 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1474
1830 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1831 return rc;
1832 }
1833 }
1834
1835 nxt_unit_mmap_buf_release(mmap_buf);
1836
1837 return NXT_UNIT_OK;

--- 8 unchanged lines hidden (view full) ---

1846 nxt_unit_request_info_t *req;
1847 nxt_unit_request_info_impl_t *req_impl;
1848
1849 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1850
1851 req = mmap_buf->req;
1852 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1853
1475 rc = nxt_unit_mmap_buf_send(req->ctx,
1476 req_impl->recv_msg.port_msg.stream,
1477 mmap_buf, 1);
1478
1854 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
1479 if (nxt_slow_path(rc == NXT_UNIT_OK)) {
1480 nxt_unit_mmap_buf_release(mmap_buf);
1481
1482 nxt_unit_request_info_release(req);
1483
1484 } else {
1485 nxt_unit_request_done(req, rc);
1486 }

--- 14 unchanged lines hidden (view full) ---

1501 nxt_chunk_id_t first_free_chunk;
1502 nxt_unit_buf_t *buf;
1503 nxt_unit_impl_t *lib;
1504 nxt_port_mmap_header_t *hdr;
1505
1506 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1507
1508 buf = &mmap_buf->buf;
1855 if (nxt_slow_path(rc == NXT_UNIT_OK)) {
1856 nxt_unit_mmap_buf_release(mmap_buf);
1857
1858 nxt_unit_request_info_release(req);
1859
1860 } else {
1861 nxt_unit_request_done(req, rc);
1862 }

--- 14 unchanged lines hidden (view full) ---

1877 nxt_chunk_id_t first_free_chunk;
1878 nxt_unit_buf_t *buf;
1879 nxt_unit_impl_t *lib;
1880 nxt_port_mmap_header_t *hdr;
1881
1882 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1883
1884 buf = &mmap_buf->buf;
1885 hdr = mmap_buf->hdr;
1509
1510 m.mmap_msg.size = buf->free - buf->start;
1511
1512 m.msg.stream = stream;
1513 m.msg.pid = lib->pid;
1514 m.msg.reply_port = 0;
1515 m.msg.type = _NXT_PORT_MSG_DATA;
1516 m.msg.last = last != 0;
1886
1887 m.mmap_msg.size = buf->free - buf->start;
1888
1889 m.msg.stream = stream;
1890 m.msg.pid = lib->pid;
1891 m.msg.reply_port = 0;
1892 m.msg.type = _NXT_PORT_MSG_DATA;
1893 m.msg.last = last != 0;
1517 m.msg.mmap = m.mmap_msg.size > 0;
1894 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
1518 m.msg.nf = 0;
1519 m.msg.mf = 0;
1520 m.msg.tracking = 0;
1521
1895 m.msg.nf = 0;
1896 m.msg.mf = 0;
1897 m.msg.tracking = 0;
1898
1522 hdr = mmap_buf->hdr;
1899 if (hdr != NULL) {
1900 m.mmap_msg.mmap_id = hdr->id;
1901 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
1902 }
1523
1903
1524 m.mmap_msg.mmap_id = hdr->id;
1525 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
1526
1527 nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
1528 stream,
1529 (int) m.mmap_msg.mmap_id,
1530 (int) m.mmap_msg.chunk_id,
1531 (int) m.mmap_msg.size);
1532
1533 res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
1904 nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
1905 stream,
1906 (int) m.mmap_msg.mmap_id,
1907 (int) m.mmap_msg.chunk_id,
1908 (int) m.mmap_msg.size);
1909
1910 res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
1534 m.mmap_msg.size > 0 ? sizeof(m)
1535 : sizeof(m.msg),
1911 m.msg.mmap ? sizeof(m) : sizeof(m.msg),
1536 NULL, 0);
1537 if (nxt_slow_path(res != sizeof(m))) {
1538 return NXT_UNIT_ERROR;
1539 }
1540
1912 NULL, 0);
1913 if (nxt_slow_path(res != sizeof(m))) {
1914 return NXT_UNIT_ERROR;
1915 }
1916
1541 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) {
1917 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) {
1542 last_used = (u_char *) buf->free - 1;
1543
1544 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
1545 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
1546 end = (u_char *) buf->end;
1547
1548 nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));
1549
1550 buf->end = (char *) first_free;
1551 }
1552
1553 return NXT_UNIT_OK;
1554}
1555
1556
1557void
1558nxt_unit_buf_free(nxt_unit_buf_t *buf)
1559{
1918 last_used = (u_char *) buf->free - 1;
1919
1920 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
1921 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
1922 end = (u_char *) buf->end;
1923
1924 nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));
1925
1926 buf->end = (char *) first_free;
1927 }
1928
1929 return NXT_UNIT_OK;
1930}
1931
1932
1933void
1934nxt_unit_buf_free(nxt_unit_buf_t *buf)
1935{
1560 nxt_unit_mmap_buf_t *mmap_buf;
1936 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
1937}
1561
1938
1562 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1563
1939
1564 nxt_unit_mmap_release(mmap_buf->hdr, buf->start, buf->end - buf->start);
1940static void
1941nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
1942{
1943 if (nxt_fast_path(mmap_buf->hdr != NULL)) {
1944 nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
1945 mmap_buf->buf.end - mmap_buf->buf.start);
1946 }
1565
1566 nxt_unit_mmap_buf_release(mmap_buf);
1567}
1568
1569
1570nxt_unit_buf_t *
1571nxt_unit_buf_next(nxt_unit_buf_t *buf)
1572{
1947
1948 nxt_unit_mmap_buf_release(mmap_buf);
1949}
1950
1951
1952nxt_unit_buf_t *
1953nxt_unit_buf_next(nxt_unit_buf_t *buf)
1954{
1573 nxt_queue_link_t *lnk;
1574 nxt_unit_mmap_buf_t *mmap_buf;
1575 nxt_unit_request_info_impl_t *req_impl;
1955 nxt_unit_mmap_buf_t *mmap_buf;
1576
1577 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1956
1957 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1578 req_impl = nxt_container_of(mmap_buf->req, nxt_unit_request_info_impl_t,
1579 req);
1580
1958
1581 lnk = &mmap_buf->link;
1582
1583 if (lnk == nxt_queue_last(&req_impl->incoming_buf)
1584 || lnk == nxt_queue_last(&req_impl->outgoing_buf))
1585 {
1959 if (mmap_buf->next == NULL) {
1586 return NULL;
1587 }
1588
1960 return NULL;
1961 }
1962
1589 lnk = nxt_queue_next(lnk);
1590 mmap_buf = nxt_container_of(lnk, nxt_unit_mmap_buf_t, link);
1591
1592 return &mmap_buf->buf;
1963 return &mmap_buf->next->buf;
1593}
1594
1595
1596uint32_t
1597nxt_unit_buf_max(void)
1598{
1599 return PORT_MMAP_DATA_SIZE;
1600}

--- 8 unchanged lines hidden (view full) ---

1609
1610int
1611nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
1612 size_t size)
1613{
1614 int rc;
1615 uint32_t part_size;
1616 const char *part_start;
1964}
1965
1966
1967uint32_t
1968nxt_unit_buf_max(void)
1969{
1970 return PORT_MMAP_DATA_SIZE;
1971}

--- 8 unchanged lines hidden (view full) ---

1980
1981int
1982nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
1983 size_t size)
1984{
1985 int rc;
1986 uint32_t part_size;
1987 const char *part_start;
1617 nxt_unit_process_t *process;
1618 nxt_unit_mmap_buf_t mmap_buf;
1619 nxt_unit_request_info_impl_t *req_impl;
1620
1621 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1622
1623 part_start = start;
1624
1625 /* Check if response is not send yet. */

--- 10 unchanged lines hidden (view full) ---

1636 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1637 return rc;
1638 }
1639
1640 size -= part_size;
1641 part_start += part_size;
1642 }
1643
1988 nxt_unit_mmap_buf_t mmap_buf;
1989 nxt_unit_request_info_impl_t *req_impl;
1990
1991 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1992
1993 part_start = start;
1994
1995 /* Check if response is not send yet. */

--- 10 unchanged lines hidden (view full) ---

2006 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2007 return rc;
2008 }
2009
2010 size -= part_size;
2011 part_start += part_size;
2012 }
2013
1644 process = nxt_unit_msg_get_process(req->ctx, &req_impl->recv_msg);
1645 if (nxt_slow_path(process == NULL)) {
1646 return NXT_UNIT_ERROR;
1647 }
1648
1649 while (size > 0) {
1650 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
1651
2014 while (size > 0) {
2015 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2016
1652 rc = nxt_unit_get_outgoing_buf(req->ctx, process, &req->response_port,
1653 part_size, &mmap_buf);
2017 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2018 &req->response_port, part_size,
2019 &mmap_buf);
1654 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1655 return rc;
1656 }
1657
1658 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
1659 part_start, part_size);
1660
2020 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2021 return rc;
2022 }
2023
2024 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2025 part_start, part_size);
2026
1661 rc = nxt_unit_mmap_buf_send(req->ctx,
1662 req_impl->recv_msg.port_msg.stream,
1663 &mmap_buf, 0);
2027 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
1664 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1665 nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
1666 mmap_buf.buf.end - mmap_buf.buf.start);
1667
1668 return rc;
1669 }
1670
1671 size -= part_size;

--- 89 unchanged lines hidden (view full) ---

1761
1762 return NXT_UNIT_OK;
1763}
1764
1765
1766ssize_t
1767nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
1768{
2028 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2029 nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
2030 mmap_buf.buf.end - mmap_buf.buf.start);
2031
2032 return rc;
2033 }
2034
2035 size -= part_size;

--- 89 unchanged lines hidden (view full) ---

2125
2126 return NXT_UNIT_OK;
2127}
2128
2129
2130ssize_t
2131nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2132{
2133 return nxt_unit_buf_read(&req->content_buf, &req->content_length,
2134 dst, size);
2135}
2136
2137
2138static ssize_t
2139nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
2140{
1769 u_char *p;
1770 size_t rest, copy, read;
1771 nxt_unit_buf_t *buf;
1772
1773 p = dst;
1774 rest = size;
1775
2141 u_char *p;
2142 size_t rest, copy, read;
2143 nxt_unit_buf_t *buf;
2144
2145 p = dst;
2146 rest = size;
2147
1776 buf = req->content_buf;
2148 buf = *b;
1777
1778 while (buf != NULL) {
1779 copy = buf->end - buf->free;
1780 copy = nxt_min(rest, copy);
1781
1782 p = nxt_cpymem(p, buf->free, copy);
1783
1784 buf->free += copy;

--- 5 unchanged lines hidden (view full) ---

1790 }
1791
1792 break;
1793 }
1794
1795 buf = nxt_unit_buf_next(buf);
1796 }
1797
2149
2150 while (buf != NULL) {
2151 copy = buf->end - buf->free;
2152 copy = nxt_min(rest, copy);
2153
2154 p = nxt_cpymem(p, buf->free, copy);
2155
2156 buf->free += copy;

--- 5 unchanged lines hidden (view full) ---

2162 }
2163
2164 break;
2165 }
2166
2167 buf = nxt_unit_buf_next(buf);
2168 }
2169
1798 req->content_buf = buf;
2170 *b = buf;
1799
1800 read = size - rest;
1801
2171
2172 read = size - rest;
2173
1802 req->content_length -= read;
2174 *len -= read;
1803
1804 return read;
1805}
1806
1807
1808void
1809nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
1810{

--- 36 unchanged lines hidden (view full) ---

1847
1848 return;
1849 }
1850
1851skip_response_send:
1852
1853 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
1854
2175
2176 return read;
2177}
2178
2179
2180void
2181nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
2182{

--- 36 unchanged lines hidden (view full) ---

2219
2220 return;
2221 }
2222
2223skip_response_send:
2224
2225 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
2226
1855 msg.stream = req_impl->recv_msg.port_msg.stream;
2227 msg.stream = req_impl->stream;
1856 msg.pid = lib->pid;
1857 msg.reply_port = 0;
1858 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
1859 : _NXT_PORT_MSG_RPC_ERROR;
1860 msg.last = 1;
1861 msg.mmap = 0;
1862 msg.nf = 0;
1863 msg.mf = 0;

--- 5 unchanged lines hidden (view full) ---

1869 nxt_unit_req_alert(req, "last message send failed: %s (%d)",
1870 strerror(errno), errno);
1871 }
1872
1873 nxt_unit_request_info_release(req);
1874}
1875
1876
2228 msg.pid = lib->pid;
2229 msg.reply_port = 0;
2230 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
2231 : _NXT_PORT_MSG_RPC_ERROR;
2232 msg.last = 1;
2233 msg.mmap = 0;
2234 msg.nf = 0;
2235 msg.mf = 0;

--- 5 unchanged lines hidden (view full) ---

2241 nxt_unit_req_alert(req, "last message send failed: %s (%d)",
2242 strerror(errno), errno);
2243 }
2244
2245 nxt_unit_request_info_release(req);
2246}
2247
2248
2249int
2250nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
2251 uint8_t last, const void *start, size_t size)
2252{
2253 const struct iovec iov = { (void *) start, size };
2254
2255 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
2256}
2257
2258
2259int
2260nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
2261 uint8_t last, const struct iovec *iov, int iovcnt)
2262{
2263 int i, rc;
2264 size_t l, copy;
2265 uint32_t payload_len, buf_size;
2266 const uint8_t *b;
2267 nxt_unit_buf_t *buf;
2268 nxt_websocket_header_t *wh;
2269
2270 payload_len = 0;
2271
2272 for (i = 0; i < iovcnt; i++) {
2273 payload_len += iov[i].iov_len;
2274 }
2275
2276 buf_size = 10 + payload_len;
2277
2278 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
2279 PORT_MMAP_DATA_SIZE));
2280 if (nxt_slow_path(buf == NULL)) {
2281 nxt_unit_req_error(req, "Failed to allocate buf for content");
2282
2283 return NXT_UNIT_ERROR;
2284 }
2285
2286 buf->start[0] = 0;
2287 buf->start[1] = 0;
2288
2289 wh = (void *) buf->free;
2290
2291 buf->free = nxt_websocket_frame_init(wh, payload_len);
2292 wh->fin = last;
2293 wh->opcode = opcode;
2294
2295 for (i = 0; i < iovcnt; i++) {
2296 b = iov[i].iov_base;
2297 l = iov[i].iov_len;
2298
2299 while (l > 0) {
2300 copy = buf->end - buf->free;
2301 copy = nxt_min(l, copy);
2302
2303 buf->free = nxt_cpymem(buf->free, b, copy);
2304 b += copy;
2305 l -= copy;
2306
2307 if (l > 0) {
2308 buf_size -= buf->end - buf->start;
2309
2310 rc = nxt_unit_buf_send(buf);
2311 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2312 nxt_unit_req_error(req, "Failed to send content");
2313
2314 return NXT_UNIT_ERROR;
2315 }
2316
2317 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
2318 PORT_MMAP_DATA_SIZE));
2319 if (nxt_slow_path(buf == NULL)) {
2320 nxt_unit_req_error(req,
2321 "Failed to allocate buf for content");
2322
2323 return NXT_UNIT_ERROR;
2324 }
2325 }
2326 }
2327 }
2328
2329 if (buf->free > buf->start) {
2330 rc = nxt_unit_buf_send(buf);
2331 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2332 nxt_unit_req_error(req, "Failed to send content");
2333 }
2334 }
2335
2336 return rc;
2337}
2338
2339
2340ssize_t
2341nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
2342 size_t size)
2343{
2344 ssize_t res;
2345 uint8_t *b;
2346 uint64_t i, d;
2347
2348 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
2349 dst, size);
2350
2351 if (ws->mask == NULL) {
2352 return res;
2353 }
2354
2355 b = dst;
2356 d = (ws->payload_len - ws->content_length - res) % 4;
2357
2358 for (i = 0; i < (uint64_t) res; i++) {
2359 b[i] ^= ws->mask[ (i + d) % 4 ];
2360 }
2361
2362 return res;
2363}
2364
2365
2366int
2367nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
2368{
2369 char *b;
2370 size_t size;
2371 nxt_unit_websocket_frame_impl_t *ws_impl;
2372
2373 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
2374
2375 if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) {
2376 return NXT_UNIT_OK;
2377 }
2378
2379 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
2380
2381 b = malloc(size);
2382 if (nxt_slow_path(b == NULL)) {
2383 return NXT_UNIT_ERROR;
2384 }
2385
2386 memcpy(b, ws_impl->buf->buf.start, size);
2387
2388 ws_impl->buf->buf.start = b;
2389 ws_impl->buf->buf.free = b;
2390 ws_impl->buf->buf.end = b + size;
2391
2392 ws_impl->retain_buf = b;
2393
2394 return NXT_UNIT_OK;
2395}
2396
2397
2398void
2399nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
2400{
2401 nxt_unit_websocket_frame_release(ws);
2402}
2403
2404
1877static nxt_port_mmap_header_t *
1878nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
1879 nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
1880{
1881 int res, nchunks, i;
1882 nxt_unit_mmap_t *mm, *mm_end;
1883 nxt_port_mmap_header_t *hdr;
1884

--- 465 unchanged lines hidden (view full) ---

2350 int rc;
2351 nxt_chunk_id_t c;
2352 nxt_unit_process_t *process;
2353 nxt_port_mmap_header_t *hdr;
2354 nxt_port_mmap_tracking_msg_t *tracking_msg;
2355
2356 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
2357 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
2405static nxt_port_mmap_header_t *
2406nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2407 nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
2408{
2409 int res, nchunks, i;
2410 nxt_unit_mmap_t *mm, *mm_end;
2411 nxt_port_mmap_header_t *hdr;
2412

--- 465 unchanged lines hidden (view full) ---

2878 int rc;
2879 nxt_chunk_id_t c;
2880 nxt_unit_process_t *process;
2881 nxt_port_mmap_header_t *hdr;
2882 nxt_port_mmap_tracking_msg_t *tracking_msg;
2883
2884 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
2885 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
2358 recv_msg->port_msg.stream, (int) recv_msg->size);
2886 recv_msg->stream, (int) recv_msg->size);
2359
2360 return 0;
2361 }
2362
2363 tracking_msg = recv_msg->start;
2364
2365 recv_msg->start = tracking_msg + 1;
2366 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);

--- 6 unchanged lines hidden (view full) ---

2373 pthread_mutex_lock(&process->incoming.mutex);
2374
2375 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
2376 if (nxt_slow_path(hdr == NULL)) {
2377 pthread_mutex_unlock(&process->incoming.mutex);
2378
2379 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
2380 "invalid mmap id %d,%"PRIu32,
2887
2888 return 0;
2889 }
2890
2891 tracking_msg = recv_msg->start;
2892
2893 recv_msg->start = tracking_msg + 1;
2894 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);

--- 6 unchanged lines hidden (view full) ---

2901 pthread_mutex_lock(&process->incoming.mutex);
2902
2903 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
2904 if (nxt_slow_path(hdr == NULL)) {
2905 pthread_mutex_unlock(&process->incoming.mutex);
2906
2907 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
2908 "invalid mmap id %d,%"PRIu32,
2381 recv_msg->port_msg.stream,
2382 (int) process->pid, tracking_msg->mmap_id);
2909 recv_msg->stream, (int) process->pid,
2910 tracking_msg->mmap_id);
2383
2384 return 0;
2385 }
2386
2387 c = tracking_msg->tracking_id;
2911
2912 return 0;
2913 }
2914
2915 c = tracking_msg->tracking_id;
2388 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->port_msg.stream, 0);
2916 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
2389
2390 if (rc == 0) {
2391 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
2917
2918 if (rc == 0) {
2919 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
2392 recv_msg->port_msg.stream);
2920 recv_msg->stream);
2393
2394 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
2395 }
2396
2397 pthread_mutex_unlock(&process->incoming.mutex);
2398
2399 return rc;
2400}
2401
2402
2403static int
2921
2922 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
2923 }
2924
2925 pthread_mutex_unlock(&process->incoming.mutex);
2926
2927 return rc;
2928}
2929
2930
2931static int
2404nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg,
2405 nxt_queue_t *incoming_buf)
2932nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
2406{
2407 void *start;
2408 uint32_t size;
2409 nxt_unit_process_t *process;
2933{
2934 void *start;
2935 uint32_t size;
2936 nxt_unit_process_t *process;
2410 nxt_unit_mmap_buf_t *b;
2937 nxt_unit_mmap_buf_t *b, **incoming_tail;
2411 nxt_port_mmap_msg_t *mmap_msg, *end;
2412 nxt_port_mmap_header_t *hdr;
2413
2414 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
2415 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
2938 nxt_port_mmap_msg_t *mmap_msg, *end;
2939 nxt_port_mmap_header_t *hdr;
2940
2941 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
2942 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
2416 recv_msg->port_msg.stream, (int) recv_msg->size);
2943 recv_msg->stream, (int) recv_msg->size);
2417
2418 return NXT_UNIT_ERROR;
2419 }
2420
2421 process = nxt_unit_msg_get_process(ctx, recv_msg);
2422 if (nxt_slow_path(process == NULL)) {
2423 return NXT_UNIT_ERROR;
2424 }
2425
2426 mmap_msg = recv_msg->start;
2427 end = nxt_pointer_to(recv_msg->start, recv_msg->size);
2428
2944
2945 return NXT_UNIT_ERROR;
2946 }
2947
2948 process = nxt_unit_msg_get_process(ctx, recv_msg);
2949 if (nxt_slow_path(process == NULL)) {
2950 return NXT_UNIT_ERROR;
2951 }
2952
2953 mmap_msg = recv_msg->start;
2954 end = nxt_pointer_to(recv_msg->start, recv_msg->size);
2955
2956 incoming_tail = &recv_msg->incoming_buf;
2957
2429 pthread_mutex_lock(&process->incoming.mutex);
2430
2431 for (; mmap_msg < end; mmap_msg++) {
2432 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
2433 if (nxt_slow_path(hdr == NULL)) {
2434 pthread_mutex_unlock(&process->incoming.mutex);
2435
2436 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
2437 "invalid mmap id %d,%"PRIu32,
2958 pthread_mutex_lock(&process->incoming.mutex);
2959
2960 for (; mmap_msg < end; mmap_msg++) {
2961 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
2962 if (nxt_slow_path(hdr == NULL)) {
2963 pthread_mutex_unlock(&process->incoming.mutex);
2964
2965 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
2966 "invalid mmap id %d,%"PRIu32,
2438 recv_msg->port_msg.stream,
2439 (int) process->pid, mmap_msg->mmap_id);
2967 recv_msg->stream, (int) process->pid,
2968 mmap_msg->mmap_id);
2440
2441 return NXT_UNIT_ERROR;
2442 }
2443
2444 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
2445 size = mmap_msg->size;
2446
2447 if (recv_msg->start == mmap_msg) {
2448 recv_msg->start = start;
2449 recv_msg->size = size;
2450 }
2451
2452 b = nxt_unit_mmap_buf_get(ctx);
2453 if (nxt_slow_path(b == NULL)) {
2454 pthread_mutex_unlock(&process->incoming.mutex);
2455
2969
2970 return NXT_UNIT_ERROR;
2971 }
2972
2973 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
2974 size = mmap_msg->size;
2975
2976 if (recv_msg->start == mmap_msg) {
2977 recv_msg->start = start;
2978 recv_msg->size = size;
2979 }
2980
2981 b = nxt_unit_mmap_buf_get(ctx);
2982 if (nxt_slow_path(b == NULL)) {
2983 pthread_mutex_unlock(&process->incoming.mutex);
2984
2456 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
2457 "failed to allocate buf",
2458 recv_msg->port_msg.stream);
2985 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
2986 recv_msg->stream);
2459
2460 nxt_unit_mmap_release(hdr, start, size);
2461
2462 return NXT_UNIT_ERROR;
2463 }
2464
2987
2988 nxt_unit_mmap_release(hdr, start, size);
2989
2990 return NXT_UNIT_ERROR;
2991 }
2992
2465 nxt_queue_insert_tail(incoming_buf, &b->link);
2993 nxt_unit_mmap_buf_insert(incoming_tail, b);
2994 incoming_tail = &b->next;
2466
2467 b->buf.start = start;
2468 b->buf.free = start;
2469 b->buf.end = b->buf.start + size;
2470 b->hdr = hdr;
2471
2472 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
2995
2996 b->buf.start = start;
2997 b->buf.free = start;
2998 b->buf.end = b->buf.start + size;
2999 b->hdr = hdr;
3000
3001 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
2473 recv_msg->port_msg.stream,
3002 recv_msg->stream,
2474 start, (int) size,
2475 (int) hdr->src_pid, (int) hdr->dst_pid,
2476 (int) hdr->id, (int) mmap_msg->chunk_id,
2477 (int) mmap_msg->size);
2478 }
2479
2480 pthread_mutex_unlock(&process->incoming.mutex);
2481

--- 198 unchanged lines hidden (view full) ---

2680 rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
2681 buf, sizeof(buf),
2682 oob, sizeof(oob));
2683 }
2684
2685 if (nxt_fast_path(rsize > 0)) {
2686 rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
2687 oob, sizeof(oob));
3003 start, (int) size,
3004 (int) hdr->src_pid, (int) hdr->dst_pid,
3005 (int) hdr->id, (int) mmap_msg->chunk_id,
3006 (int) mmap_msg->size);
3007 }
3008
3009 pthread_mutex_unlock(&process->incoming.mutex);
3010

--- 198 unchanged lines hidden (view full) ---

3209 rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
3210 buf, sizeof(buf),
3211 oob, sizeof(oob));
3212 }
3213
3214 if (nxt_fast_path(rsize > 0)) {
3215 rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
3216 oob, sizeof(oob));
3217
3218#if (NXT_DEBUG)
3219 memset(buf, 0xAC, rsize);
3220#endif
3221
2688 } else {
2689 rc = NXT_UNIT_ERROR;
2690 }
2691
2692 return rc;
2693}
2694
2695

--- 74 unchanged lines hidden (view full) ---

2770
2771 return &new_ctx->ctx;
2772}
2773
2774
2775void
2776nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
2777{
3222 } else {
3223 rc = NXT_UNIT_ERROR;
3224 }
3225
3226 return rc;
3227}
3228
3229

--- 74 unchanged lines hidden (view full) ---

3304
3305 return &new_ctx->ctx;
3306}
3307
3308
3309void
3310nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
3311{
2778 nxt_unit_impl_t *lib;
2779 nxt_unit_ctx_impl_t *ctx_impl;
2780 nxt_unit_mmap_buf_t *mmap_buf;
2781 nxt_unit_request_info_impl_t *req_impl;
3312 nxt_unit_impl_t *lib;
3313 nxt_unit_ctx_impl_t *ctx_impl;
3314 nxt_unit_mmap_buf_t *mmap_buf;
3315 nxt_unit_request_info_impl_t *req_impl;
3316 nxt_unit_websocket_frame_impl_t *ws_impl;
2782
2783 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
2784 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
2785
2786 nxt_queue_each(req_impl, &ctx_impl->active_req,
2787 nxt_unit_request_info_impl_t, link)
2788 {
2789 nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
2790
2791 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
2792
2793 } nxt_queue_loop;
2794
3317
3318 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3319 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3320
3321 nxt_queue_each(req_impl, &ctx_impl->active_req,
3322 nxt_unit_request_info_impl_t, link)
3323 {
3324 nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
3325
3326 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
3327
3328 } nxt_queue_loop;
3329
2795 nxt_queue_remove(&ctx_impl->ctx_buf[0].link);
2796 nxt_queue_remove(&ctx_impl->ctx_buf[1].link);
3330 nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]);
3331 nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]);
2797
3332
2798 nxt_queue_each(mmap_buf, &ctx_impl->free_buf, nxt_unit_mmap_buf_t, link) {
2799
2800 nxt_queue_remove(&mmap_buf->link);
3333 while (ctx_impl->free_buf != NULL) {
3334 mmap_buf = ctx_impl->free_buf;
3335 nxt_unit_mmap_buf_remove(mmap_buf);
2801 free(mmap_buf);
3336 free(mmap_buf);
3337 }
2802
3338
2803 } nxt_queue_loop;
2804
2805 nxt_queue_each(req_impl, &ctx_impl->free_req,
2806 nxt_unit_request_info_impl_t, link)
2807 {
2808 nxt_unit_request_info_free(req_impl);
2809
2810 } nxt_queue_loop;
2811
3339 nxt_queue_each(req_impl, &ctx_impl->free_req,
3340 nxt_unit_request_info_impl_t, link)
3341 {
3342 nxt_unit_request_info_free(req_impl);
3343
3344 } nxt_queue_loop;
3345
3346 nxt_queue_each(ws_impl, &ctx_impl->free_ws,
3347 nxt_unit_websocket_frame_impl_t, link)
3348 {
3349 nxt_unit_websocket_frame_free(ws_impl);
3350
3351 } nxt_queue_loop;
3352
2812 nxt_queue_remove(&ctx_impl->link);
2813
2814 if (ctx_impl != &lib->main_ctx) {
2815 free(ctx_impl);
2816 }
2817}
2818
2819

--- 629 unchanged lines hidden (view full) ---

3449 return lhq.value;
3450
3451 default:
3452 return NULL;
3453 }
3454}
3455
3456
3353 nxt_queue_remove(&ctx_impl->link);
3354
3355 if (ctx_impl != &lib->main_ctx) {
3356 free(ctx_impl);
3357 }
3358}
3359
3360

--- 629 unchanged lines hidden (view full) ---

3990 return lhq.value;
3991
3992 default:
3993 return NULL;
3994 }
3995}
3996
3997
3998static nxt_int_t
3999nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
4000{
4001 return NXT_OK;
4002}
4003
4004
4005static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
4006 NXT_LVLHSH_DEFAULT,
4007 nxt_unit_request_hash_test,
4008 nxt_lvlhsh_alloc,
4009 nxt_lvlhsh_free,
4010};
4011
4012
4013static int
4014nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
4015 nxt_unit_request_info_impl_t *req_impl)
4016{
4017 uint32_t *stream;
4018 nxt_int_t res;
4019 nxt_lvlhsh_query_t lhq;
4020
4021 stream = &req_impl->stream;
4022
4023 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
4024 lhq.key.length = sizeof(*stream);
4025 lhq.key.start = (u_char *) stream;
4026 lhq.proto = &lvlhsh_requests_proto;
4027 lhq.pool = NULL;
4028 lhq.replace = 0;
4029 lhq.value = req_impl;
4030
4031 res = nxt_lvlhsh_insert(request_hash, &lhq);
4032
4033 switch (res) {
4034
4035 case NXT_OK:
4036 return NXT_UNIT_OK;
4037
4038 default:
4039 return NXT_UNIT_ERROR;
4040 }
4041}
4042
4043
4044static nxt_unit_request_info_impl_t *
4045nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
4046 int remove)
4047{
4048 nxt_int_t res;
4049 nxt_lvlhsh_query_t lhq;
4050
4051 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
4052 lhq.key.length = sizeof(stream);
4053 lhq.key.start = (u_char *) &stream;
4054 lhq.proto = &lvlhsh_requests_proto;
4055 lhq.pool = NULL;
4056
4057 if (remove) {
4058 res = nxt_lvlhsh_delete(request_hash, &lhq);
4059
4060 } else {
4061 res = nxt_lvlhsh_find(request_hash, &lhq);
4062 }
4063
4064 switch (res) {
4065
4066 case NXT_OK:
4067 return lhq.value;
4068
4069 default:
4070 return NULL;
4071 }
4072}
4073
4074
3457void
3458nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
3459{
3460 int log_fd, n;
3461 char msg[NXT_MAX_ERROR_STR], *p, *end;
3462 pid_t pid;
3463 va_list ap;
3464 nxt_unit_impl_t *lib;

--- 56 unchanged lines hidden (view full) ---

3521 p = msg;
3522 end = p + sizeof(msg) - 1;
3523
3524 p = nxt_unit_snprint_prefix(p, end, pid, level);
3525
3526 if (nxt_fast_path(req != NULL)) {
3527 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
3528
4075void
4076nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
4077{
4078 int log_fd, n;
4079 char msg[NXT_MAX_ERROR_STR], *p, *end;
4080 pid_t pid;
4081 va_list ap;
4082 nxt_unit_impl_t *lib;

--- 56 unchanged lines hidden (view full) ---

4139 p = msg;
4140 end = p + sizeof(msg) - 1;
4141
4142 p = nxt_unit_snprint_prefix(p, end, pid, level);
4143
4144 if (nxt_fast_path(req != NULL)) {
4145 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
4146
3529 p += snprintf(p, end - p,
3530 "#%"PRIu32": ", req_impl->recv_msg.port_msg.stream);
4147 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
3531 }
3532
3533 va_start(ap, fmt);
3534 p += vsnprintf(p, end - p, fmt, ap);
3535 va_end(ap);
3536
3537 if (nxt_slow_path(p > end)) {
3538 memcpy(end - 5, "[...]", 5);

--- 77 unchanged lines hidden ---
4148 }
4149
4150 va_start(ap, fmt);
4151 p += vsnprintf(p, end - p, fmt, ap);
4152 va_end(ap);
4153
4154 if (nxt_slow_path(p > end)) {
4155 memcpy(end - 5, "[...]", 5);

--- 77 unchanged lines hidden ---