Deleted Added
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <stdlib.h>
7
8#include "nxt_main.h"
9#include "nxt_port_memory_int.h"
10
11#include "nxt_unit.h"
12#include "nxt_unit_request.h"
13#include "nxt_unit_response.h"
14#include "nxt_unit_websocket.h"
15
16#include "nxt_websocket.h"
17
18#if (NXT_HAVE_MEMFD_CREATE)
19#include <linux/memfd.h>
20#endif
21
22typedef struct nxt_unit_impl_s nxt_unit_impl_t;
23typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
24typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
25typedef struct nxt_unit_process_s nxt_unit_process_t;
26typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t;
27typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t;
28typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t;
29typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t;
30typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t;
31typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t;
32
33static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init);
34static void nxt_unit_ctx_init(nxt_unit_impl_t *lib,
35 nxt_unit_ctx_impl_t *ctx_impl, void *data);
36nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
37 nxt_unit_mmap_buf_t *mmap_buf);
38nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
39 nxt_unit_mmap_buf_t *mmap_buf);
40nxt_inline void nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf);
41static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
42 nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream);
43static int nxt_unit_ready(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
44 uint32_t stream);
45static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
46 nxt_unit_recv_msg_t *recv_msg);
47static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
48 nxt_unit_recv_msg_t *recv_msg);
49static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
50 nxt_unit_recv_msg_t *recv_msg);
51static nxt_unit_request_info_impl_t *nxt_unit_request_info_get(
52 nxt_unit_ctx_t *ctx);
53static void nxt_unit_request_info_release(nxt_unit_request_info_t *req);
54static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req);
55static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get(
56 nxt_unit_ctx_t *ctx);
57static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws);
58static void nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws);
59static nxt_unit_process_t *nxt_unit_msg_get_process(nxt_unit_ctx_t *ctx,
60 nxt_unit_recv_msg_t *recv_msg);
61static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx);
62static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf);
63static int nxt_unit_mmap_buf_send(nxt_unit_ctx_t *ctx, uint32_t stream,
64 nxt_unit_mmap_buf_t *mmap_buf, int last);
65static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf);
66static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst,
67 size_t size);
68static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx,
69 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id,
70 nxt_chunk_id_t *c, int n);
71static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i);
72static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx,
73 nxt_unit_process_t *process, nxt_unit_port_id_t *port_id, int n);
74static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
75 int fd);

--- 6 unchanged lines hidden (view full) ---

82static void nxt_unit_process_use(nxt_unit_ctx_t *ctx,
83 nxt_unit_process_t *process, int i);
84static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps);
85static nxt_port_mmap_header_t *nxt_unit_get_incoming_mmap(nxt_unit_ctx_t *ctx,
86 nxt_unit_process_t *process, uint32_t id);
87static int nxt_unit_tracking_read(nxt_unit_ctx_t *ctx,
88 nxt_unit_recv_msg_t *recv_msg);
89static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx,
90 nxt_unit_recv_msg_t *recv_msg);
91static int nxt_unit_mmap_release(nxt_port_mmap_header_t *hdr, void *start,
92 uint32_t size);
93
94static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
95 pid_t pid);
96static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
97 pid_t pid, int remove);
98static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);

--- 16 unchanged lines hidden (view full) ---

115 nxt_unit_port_id_t *port_id, void *buf, size_t buf_size,
116 void *oob, size_t oob_size);
117
118static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
119 nxt_unit_port_t *port);
120static nxt_unit_port_impl_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash,
121 nxt_unit_port_id_t *port_id, int remove);
122
123static int nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
124 nxt_unit_request_info_impl_t *req_impl);
125static nxt_unit_request_info_impl_t *nxt_unit_request_hash_find(
126 nxt_lvlhsh_t *request_hash, uint32_t stream, int remove);
127
128static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level);
129
130
131struct nxt_unit_mmap_buf_s {
132 nxt_unit_buf_t buf;
133
134 nxt_unit_mmap_buf_t *next;
135 nxt_unit_mmap_buf_t **prev;
136
137 nxt_port_mmap_header_t *hdr;
138// nxt_queue_link_t link;
139 nxt_unit_port_id_t port_id;
140 nxt_unit_request_info_t *req;
141 nxt_unit_ctx_impl_t *ctx_impl;
142};
143
144
145struct nxt_unit_recv_msg_s {
146 uint32_t stream;
147 nxt_pid_t pid;
148 nxt_port_id_t reply_port;
149
150 uint8_t last; /* 1 bit */
151 uint8_t mmap; /* 1 bit */
152
153 void *start;
154 uint32_t size;
155
156 int fd;
157 nxt_unit_process_t *process;
158
159 nxt_unit_mmap_buf_t *incoming_buf;
160};
161
162
163typedef enum {
164 NXT_UNIT_RS_START = 0,
165 NXT_UNIT_RS_RESPONSE_INIT,
166 NXT_UNIT_RS_RESPONSE_HAS_CONTENT,
167 NXT_UNIT_RS_RESPONSE_SENT,
168 NXT_UNIT_RS_RELEASED,
169} nxt_unit_req_state_t;
170
171
172struct nxt_unit_request_info_impl_s {
173 nxt_unit_request_info_t req;
174
175 uint32_t stream;
176
177 nxt_unit_process_t *process;
178
179 nxt_unit_mmap_buf_t *outgoing_buf;
180 nxt_unit_mmap_buf_t *incoming_buf;
181
182 nxt_unit_req_state_t state;
183 uint8_t websocket;
184
185 nxt_queue_link_t link;
186
187 char extra_data[];
188};
189
190
191struct nxt_unit_websocket_frame_impl_s {
192 nxt_unit_websocket_frame_t ws;
193
194 nxt_unit_mmap_buf_t *buf;
195
196 nxt_queue_link_t link;
197
198 nxt_unit_ctx_impl_t *ctx_impl;
199
200 void *retain_buf;
201};
202
203
204struct nxt_unit_ctx_impl_s {
205 nxt_unit_ctx_t ctx;
206
207 nxt_unit_port_id_t read_port_id;
208 int read_port_fd;
209
210 nxt_queue_link_t link;
211
212 nxt_unit_mmap_buf_t *free_buf;
213
214 /* of nxt_unit_request_info_impl_t */
215 nxt_queue_t free_req;
216
217 /* of nxt_unit_websocket_frame_impl_t */
218 nxt_queue_t free_ws;
219
220 /* of nxt_unit_request_info_impl_t */
221 nxt_queue_t active_req;
222
223 /* of nxt_unit_request_info_impl_t */
224 nxt_lvlhsh_t requests;
225
226 nxt_unit_mmap_buf_t ctx_buf[2];
227
228 nxt_unit_request_info_impl_t req;
229};
230
231
232struct nxt_unit_impl_s {
233 nxt_unit_t unit;

--- 216 unchanged lines hidden (view full) ---

450nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
451 void *data)
452{
453 ctx_impl->ctx.data = data;
454 ctx_impl->ctx.unit = &lib->unit;
455
456 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
457
458 nxt_queue_init(&ctx_impl->free_req);
459 nxt_queue_init(&ctx_impl->free_ws);
460 nxt_queue_init(&ctx_impl->active_req);
461
462 ctx_impl->free_buf = NULL;
463 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
464 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]);
465
466 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link);
467
468 ctx_impl->req.req.ctx = &ctx_impl->ctx;
469 ctx_impl->req.req.unit = &lib->unit;
470
471 ctx_impl->read_port_fd = -1;
472 ctx_impl->requests.slot = 0;
473}
474
475
476nxt_inline void
477nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head,
478 nxt_unit_mmap_buf_t *mmap_buf)
479{
480 mmap_buf->next = *head;
481
482 if (mmap_buf->next != NULL) {
483 mmap_buf->next->prev = &mmap_buf->next;
484 }
485
486 *head = mmap_buf;
487 mmap_buf->prev = head;
488}
489
490
491nxt_inline void
492nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
493 nxt_unit_mmap_buf_t *mmap_buf)
494{
495 while (*prev != NULL) {
496 prev = &(*prev)->next;
497 }
498
499 nxt_unit_mmap_buf_insert(prev, mmap_buf);
500}
501
502
503nxt_inline void
504nxt_unit_mmap_buf_remove(nxt_unit_mmap_buf_t *mmap_buf)
505{
506 nxt_unit_mmap_buf_t **prev;
507
508 prev = mmap_buf->prev;
509
510 if (mmap_buf->next != NULL) {
511 mmap_buf->next->prev = prev;
512 }
513
514 if (prev != NULL) {
515 *prev = mmap_buf->next;
516 }
517}
518
519
520static int
521nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *read_port,
522 int *log_fd, uint32_t *stream)
523{
524 int rc;
525 int ready_fd, read_fd;
526 char *unit_init, *version_end;
527 long version_length;

--- 84 unchanged lines hidden (view full) ---

612 return NXT_UNIT_OK;
613}
614
615
616int
617nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id,
618 void *buf, size_t buf_size, void *oob, size_t oob_size)
619{
620 int rc;
621 pid_t pid;
622 struct cmsghdr *cm;
623 nxt_port_msg_t *port_msg;
624 nxt_unit_impl_t *lib;
625 nxt_unit_recv_msg_t recv_msg;
626 nxt_unit_callbacks_t *cb;
627
628 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
629
630 rc = NXT_UNIT_ERROR;
631 recv_msg.fd = -1;
632 recv_msg.process = NULL;
633 port_msg = buf;
634 cm = oob;
635
636 if (oob_size >= CMSG_SPACE(sizeof(int))
637 && cm->cmsg_len == CMSG_LEN(sizeof(int))
638 && cm->cmsg_level == SOL_SOCKET
639 && cm->cmsg_type == SCM_RIGHTS)
640 {
641 memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int));
642 }
643
644 recv_msg.incoming_buf = NULL;
645
646 if (nxt_slow_path(buf_size < sizeof(nxt_port_msg_t))) {
647 nxt_unit_warn(ctx, "message too small (%d bytes)", (int) buf_size);
648 goto fail;
649 }
650
651 recv_msg.stream = port_msg->stream;
652 recv_msg.pid = port_msg->pid;
653 recv_msg.reply_port = port_msg->reply_port;
654 recv_msg.last = port_msg->last;
655 recv_msg.mmap = port_msg->mmap;
656
657 recv_msg.start = port_msg + 1;
658 recv_msg.size = buf_size - sizeof(nxt_port_msg_t);
659
660 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) {
661 nxt_unit_warn(ctx, "#%"PRIu32": unknown message type (%d)",
662 port_msg->stream, (int) port_msg->type);
663 goto fail;
664 }

--- 7 unchanged lines hidden (view full) ---

672 /* Fragmentation is unsupported. */
673 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) {
674 nxt_unit_warn(ctx, "#%"PRIu32": fragmented message type (%d)",
675 port_msg->stream, (int) port_msg->type);
676 goto fail;
677 }
678
679 if (port_msg->mmap) {
680 if (nxt_unit_mmap_read(ctx, &recv_msg) != NXT_UNIT_OK) {
681 goto fail;
682 }
683 }
684
685 cb = &lib->callbacks;
686
687 switch (port_msg->type) {
688
689 case _NXT_PORT_MSG_QUIT:
690 nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
691
692 cb->quit(ctx);
693 rc = NXT_UNIT_OK;
694 break;
695
696 case _NXT_PORT_MSG_NEW_PORT:
697 rc = nxt_unit_process_new_port(ctx, &recv_msg);
698 break;
699
700 case _NXT_PORT_MSG_CHANGE_FILE:
701 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
702 port_msg->stream, recv_msg.fd);
703 break;
704
705 case _NXT_PORT_MSG_MMAP:
706 if (nxt_slow_path(recv_msg.fd < 0)) {
707 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
708 port_msg->stream, recv_msg.fd);
709
710 goto fail;
711 }
712
713 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd);
714 break;
715
716 case _NXT_PORT_MSG_REQ_HEADERS:
717 rc = nxt_unit_process_req_headers(ctx, &recv_msg);
718 break;
719
720 case _NXT_PORT_MSG_WEBSOCKET:
721 rc = nxt_unit_process_websocket(ctx, &recv_msg);
722 break;
723
724 case _NXT_PORT_MSG_REMOVE_PID:
725 if (nxt_slow_path(recv_msg.size != sizeof(pid))) {
726 nxt_unit_warn(ctx, "#%"PRIu32": remove_pid: invalid message size "
727 "(%d != %d)", port_msg->stream, (int) recv_msg.size,
728 (int) sizeof(pid));
729
730 goto fail;
731 }
732
733 memcpy(&pid, recv_msg.start, sizeof(pid));
734
735 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d",
736 port_msg->stream, (int) pid);
737
738 cb->remove_pid(ctx, pid);
739
740 rc = NXT_UNIT_OK;
741 break;
742
743 default:
744 nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
745 port_msg->stream, (int) port_msg->type);
746
747 goto fail;
748 }
749
750fail:
751
752 if (recv_msg.fd != -1) {
753 close(recv_msg.fd);
754 }
755
756 while (recv_msg.incoming_buf != NULL) {
757 nxt_unit_mmap_buf_free(recv_msg.incoming_buf);
758 }
759
760 if (recv_msg.process != NULL) {
761 nxt_unit_process_use(ctx, recv_msg.process, -1);
762 }
763
764 return rc;
765}
766
767
768static int
769nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
770{
771 int nb;
772 nxt_unit_impl_t *lib;
773 nxt_unit_port_t new_port;
774 nxt_port_msg_new_port_t *new_port_msg;
775
776 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) {
777 nxt_unit_warn(ctx, "#%"PRIu32": new_port: "
778 "invalid message size (%d)",
779 recv_msg->stream, (int) recv_msg->size);
780
781 return NXT_UNIT_ERROR;
782 }
783
784 if (nxt_slow_path(recv_msg->fd < 0)) {
785 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
786 recv_msg->stream, recv_msg->fd);
787
788 return NXT_UNIT_ERROR;
789 }
790
791 new_port_msg = recv_msg->start;
792
793 nxt_unit_debug(ctx, "#%"PRIu32": new_port: %d,%d fd %d",
794 recv_msg->stream, (int) new_port_msg->pid,
795 (int) new_port_msg->id, recv_msg->fd);
796
797 nb = 0;
798
799 if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
800 nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
801 "failed: %s (%d)", recv_msg->fd, strerror(errno), errno);
802
803 return NXT_UNIT_ERROR;
804 }
805
806 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
807 new_port_msg->id);
808
809 new_port.in_fd = -1;
810 new_port.out_fd = recv_msg->fd;
811 new_port.data = NULL;
812
813 recv_msg->fd = -1;
814
815 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
816
817 return lib->callbacks.add_port(ctx, &new_port);
818}
819
820
821static int
822nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
823{
824 nxt_unit_impl_t *lib;
825 nxt_unit_request_t *r;
826 nxt_unit_mmap_buf_t *b;
827 nxt_unit_request_info_t *req;
828 nxt_unit_request_info_impl_t *req_impl;
829
830 if (nxt_slow_path(recv_msg->mmap == 0)) {
831 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory",
832 recv_msg->stream);
833
834 return NXT_UNIT_ERROR;
835 }
836
837 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) {
838 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least "
839 "%d expected", recv_msg->stream, (int) recv_msg->size,
840 (int) sizeof(nxt_unit_request_t));
841
842 return NXT_UNIT_ERROR;
843 }
844
845 req_impl = nxt_unit_request_info_get(ctx);
846 if (nxt_slow_path(req_impl == NULL)) {
847 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed",
848 recv_msg->stream);
849
850 return NXT_UNIT_ERROR;
851 }
852
853 req = &req_impl->req;
854
855 nxt_unit_port_id_init(&req->response_port, recv_msg->pid,
856 recv_msg->reply_port);
857
858 req->request = recv_msg->start;
859
860 b = recv_msg->incoming_buf;
861
862 req->request_buf = &b->buf;
863 req->response = NULL;
864 req->response_buf = NULL;
865
866 r = req->request;
867
868 req->content_length = r->content_length;
869
870 req->content_buf = req->request_buf;
871 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content);
872
873 /* "Move" process reference to req_impl. */
874 req_impl->process = nxt_unit_msg_get_process(ctx, recv_msg);
875 if (nxt_slow_path(req_impl->process == NULL)) {
876 return NXT_UNIT_ERROR;
877 }
878
879 recv_msg->process = NULL;
880
881 req_impl->stream = recv_msg->stream;
882
883 req_impl->outgoing_buf = NULL;
884
885 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
886 b->req = req;
887 }
888
889 /* "Move" incoming buffer list to req_impl. */
890 req_impl->incoming_buf = recv_msg->incoming_buf;
891 req_impl->incoming_buf->prev = &req_impl->incoming_buf;
892 recv_msg->incoming_buf = NULL;
893
894 req->response_max_fields = 0;
895 req_impl->state = NXT_UNIT_RS_START;
896 req_impl->websocket = 0;
897
898 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream,
899 (int) r->method_length, nxt_unit_sptr_get(&r->method),
900 (int) r->target_length, nxt_unit_sptr_get(&r->target),
901 (int) r->content_length);
902
903 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
904
905 lib->callbacks.request_handler(req);
906
907 return NXT_UNIT_OK;
908}
909
910
911static int
912nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
913{
914 size_t hsize;
915 nxt_unit_impl_t *lib;
916 nxt_unit_mmap_buf_t *b;
917 nxt_unit_ctx_impl_t *ctx_impl;
918 nxt_unit_callbacks_t *cb;
919 nxt_unit_request_info_t *req;
920 nxt_unit_request_info_impl_t *req_impl;
921 nxt_unit_websocket_frame_impl_t *ws_impl;
922
923 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
924
925 req_impl = nxt_unit_request_hash_find(&ctx_impl->requests, recv_msg->stream,
926 recv_msg->last);
927 if (req_impl == NULL) {
928 return NXT_UNIT_OK;
929 }
930
931 req = &req_impl->req;
932
933 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
934 cb = &lib->callbacks;
935
936 if (cb->websocket_handler && recv_msg->size >= 2) {
937 ws_impl = nxt_unit_websocket_frame_get(ctx);
938 if (nxt_slow_path(ws_impl == NULL)) {
939 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed",
940 req_impl->stream);
941
942 return NXT_UNIT_ERROR;
943 }
944
945 ws_impl->ws.req = req;
946
947 ws_impl->buf = NULL;
948 ws_impl->retain_buf = NULL;
949
950 if (recv_msg->mmap) {
951 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) {
952 b->req = req;
953 }
954
955 /* "Move" incoming buffer list to ws_impl. */
956 ws_impl->buf = recv_msg->incoming_buf;
957 ws_impl->buf->prev = &ws_impl->buf;
958 recv_msg->incoming_buf = NULL;
959
960 b = ws_impl->buf;
961
962 } else {
963 b = nxt_unit_mmap_buf_get(ctx);
964 if (nxt_slow_path(b == NULL)) {
965 return NXT_UNIT_ERROR;
966 }
967
968 b->hdr = NULL;
969 b->req = req;
970 b->buf.start = recv_msg->start;
971 b->buf.free = b->buf.start;
972 b->buf.end = b->buf.start + recv_msg->size;
973
974 nxt_unit_mmap_buf_insert(&ws_impl->buf, b);
975 }
976
977 ws_impl->ws.header = (void *) b->buf.start;
978 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len(
979 ws_impl->ws.header);
980
981 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header);
982
983 if (ws_impl->ws.header->mask) {
984 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4;
985
986 } else {
987 ws_impl->ws.mask = NULL;
988 }
989
990 b->buf.free += hsize;
991
992 ws_impl->ws.content_buf = &b->buf;
993 ws_impl->ws.content_length = ws_impl->ws.payload_len;
994
995 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, "
996 "payload_len=%"PRIu64,
997 ws_impl->ws.header->opcode,
998 ws_impl->ws.payload_len);
999
1000 cb->websocket_handler(&ws_impl->ws);
1001 }
1002
1003 if (recv_msg->last) {
1004 req_impl->websocket = 0;
1005
1006 if (cb->close_handler) {
1007 nxt_unit_req_debug(req, "close_handler");
1008
1009 cb->close_handler(req);
1010
1011 } else {
1012 nxt_unit_request_done(req, NXT_UNIT_ERROR);
1013 }
1014 }
1015
1016 return NXT_UNIT_OK;
1017}
1018
1019
1020static nxt_unit_request_info_impl_t *
1021nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
1022{
1023 nxt_unit_impl_t *lib;
1024 nxt_queue_link_t *lnk;

--- 29 unchanged lines hidden (view full) ---

1054
1055 return req_impl;
1056}
1057
1058
1059static void
1060nxt_unit_request_info_release(nxt_unit_request_info_t *req)
1061{
1062 nxt_unit_ctx_impl_t *ctx_impl;
1063 nxt_unit_request_info_impl_t *req_impl;
1064
1065 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1066 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1067
1068 req->response = NULL;
1069 req->response_buf = NULL;
1070
1071 if (req_impl->process != NULL) {
1072 nxt_unit_process_use(req->ctx, req_impl->process, -1);
1073
1074 req_impl->process = NULL;
1075 }
1076
1077 if (req_impl->websocket) {
1078 nxt_unit_request_hash_find(&ctx_impl->requests, req_impl->stream, 1);
1079
1080 req_impl->websocket = 0;
1081 }
1082
1083 while (req_impl->outgoing_buf != NULL) {
1084 nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
1085 }
1086
1087 while (req_impl->incoming_buf != NULL) {
1088 nxt_unit_mmap_buf_free(req_impl->incoming_buf);
1089 }
1090
1091 nxt_queue_remove(&req_impl->link);
1092
1093 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
1094
1095 req_impl->state = NXT_UNIT_RS_RELEASED;
1096}
1097
1098
1099static void
1100nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl)
1101{
1102 nxt_unit_ctx_impl_t *ctx_impl;
1103
1104 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx);
1105
1106 nxt_queue_remove(&req_impl->link);
1107
1108 if (req_impl != &ctx_impl->req) {
1109 free(req_impl);
1110 }
1111}
1112
1113
1114static nxt_unit_websocket_frame_impl_t *
1115nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx)
1116{
1117 nxt_queue_link_t *lnk;
1118 nxt_unit_ctx_impl_t *ctx_impl;
1119 nxt_unit_websocket_frame_impl_t *ws_impl;
1120
1121 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1122
1123 if (nxt_queue_is_empty(&ctx_impl->free_ws)) {
1124 ws_impl = malloc(sizeof(nxt_unit_websocket_frame_impl_t));
1125 if (nxt_slow_path(ws_impl == NULL)) {
1126 nxt_unit_warn(ctx, "websocket frame allocation failed");
1127
1128 return NULL;
1129 }
1130
1131 } else {
1132 lnk = nxt_queue_first(&ctx_impl->free_ws);
1133 nxt_queue_remove(lnk);
1134
1135 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link);
1136 }
1137
1138 ws_impl->ctx_impl = ctx_impl;
1139
1140 return ws_impl;
1141}
1142
1143
1144static void
1145nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws)
1146{
1147 nxt_unit_websocket_frame_impl_t *ws_impl;
1148
1149 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
1150
1151 while (ws_impl->buf != NULL) {
1152 nxt_unit_mmap_buf_free(ws_impl->buf);
1153 }
1154
1155 ws->req = NULL;
1156
1157 if (ws_impl->retain_buf != NULL) {
1158 free(ws_impl->retain_buf);
1159
1160 ws_impl->retain_buf = NULL;
1161 }
1162
1163 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link);
1164}
1165
1166
1167static void
1168nxt_unit_websocket_frame_free(nxt_unit_websocket_frame_impl_t *ws_impl)
1169{
1170 nxt_queue_remove(&ws_impl->link);
1171
1172 free(ws_impl);
1173}
1174
1175
1176uint16_t
1177nxt_unit_field_hash(const char *name, size_t name_length)
1178{
1179 u_char ch;
1180 uint32_t hash;
1181 const char *p, *end;
1182
1183 hash = 159406; /* Magic value copied from nxt_http_parse.c */

--- 391 unchanged lines hidden (view full) ---

1575 }
1576
1577 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1578 nxt_unit_req_warn(req, "send: response already sent");
1579
1580 return NXT_UNIT_ERROR;
1581 }
1582
1583 if (req->request->websocket_handshake && req->response->status == 101) {
1584 nxt_unit_response_upgrade(req);
1585 }
1586
1587 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes",
1588 req->response->fields_count,
1589 (int) (req->response_buf->free
1590 - req->response_buf->start));
1591
1592 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf);
1593
1594 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1595 if (nxt_fast_path(rc == NXT_UNIT_OK)) {
1596 req->response = NULL;
1597 req->response_buf = NULL;
1598 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT;
1599
1600 nxt_unit_mmap_buf_release(mmap_buf);
1601 }
1602

--- 11 unchanged lines hidden (view full) ---

1614 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT;
1615}
1616
1617
1618nxt_unit_buf_t *
1619nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size)
1620{
1621 int rc;
1622 nxt_unit_mmap_buf_t *mmap_buf;
1623 nxt_unit_request_info_impl_t *req_impl;
1624
1625 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
1626 nxt_unit_req_warn(req, "response_buf_alloc: "
1627 "requested buffer (%"PRIu32") too big", size);
1628
1629 return NULL;
1630 }
1631
1632 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size);
1633
1634 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1635
1636 mmap_buf = nxt_unit_mmap_buf_get(req->ctx);
1637 if (nxt_slow_path(mmap_buf == NULL)) {
1638 return NULL;
1639 }
1640
1641 mmap_buf->req = req;
1642
1643 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf);
1644
1645 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
1646 &req->response_port, size, mmap_buf);
1647 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1648 nxt_unit_mmap_buf_release(mmap_buf);
1649
1650 return NULL;
1651 }
1652
1653 return &mmap_buf->buf;
1654}

--- 7 unchanged lines hidden (view full) ---

1662 if (recv_msg->process != NULL) {
1663 return recv_msg->process;
1664 }
1665
1666 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1667
1668 pthread_mutex_lock(&lib->mutex);
1669
1670 recv_msg->process = nxt_unit_process_find(ctx, recv_msg->pid, 0);
1671
1672 pthread_mutex_unlock(&lib->mutex);
1673
1674 if (recv_msg->process == NULL) {
1675 nxt_unit_warn(ctx, "#%"PRIu32": process %d not found",
1676 recv_msg->stream, (int) recv_msg->pid);
1677 }
1678
1679 return recv_msg->process;
1680}
1681
1682
1683static nxt_unit_mmap_buf_t *
1684nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx)
1685{
1686 nxt_unit_mmap_buf_t *mmap_buf;
1687 nxt_unit_ctx_impl_t *ctx_impl;
1688
1689 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
1690
1691 if (ctx_impl->free_buf == NULL) {
1692 mmap_buf = malloc(sizeof(nxt_unit_mmap_buf_t));
1693 if (nxt_slow_path(mmap_buf == NULL)) {
1694 nxt_unit_warn(ctx, "failed to allocate buf");
1695 }
1696
1697 } else {
1698 mmap_buf = ctx_impl->free_buf;
1699
1700 nxt_unit_mmap_buf_remove(mmap_buf);
1701 }
1702
1703 mmap_buf->ctx_impl = ctx_impl;
1704
1705 return mmap_buf;
1706}
1707
1708
1709static void
1710nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf)
1711{
1712 nxt_unit_mmap_buf_remove(mmap_buf);
1713
1714 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf);
1715}
1716
1717
1718typedef struct {
1719 size_t len;
1720 const char *str;
1721} nxt_unit_str_t;
1722
1723
1724#define nxt_unit_str(str) { nxt_length(str), str }
1725
1726
1727int
1728nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req)
1729{
1730 return req->request->websocket_handshake;
1731}
1732
1733
1734int
1735nxt_unit_response_upgrade(nxt_unit_request_info_t *req)
1736{
1737 int rc;
1738 nxt_unit_ctx_impl_t *ctx_impl;
1739 nxt_unit_request_info_impl_t *req_impl;
1740
1741 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1742
1743 if (nxt_slow_path(req_impl->websocket != 0)) {
1744 nxt_unit_req_debug(req, "upgrade: already upgraded");
1745
1746 return NXT_UNIT_OK;
1747 }
1748
1749 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) {
1750 nxt_unit_req_warn(req, "upgrade: response is not initialized yet");
1751
1752 return NXT_UNIT_ERROR;
1753 }
1754
1755 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) {
1756 nxt_unit_req_warn(req, "upgrade: response already sent");
1757
1758 return NXT_UNIT_ERROR;
1759 }
1760
1761 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
1762
1763 rc = nxt_unit_request_hash_add(&ctx_impl->requests, req_impl);
1764 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1765 nxt_unit_req_warn(req, "upgrade: failed to add request to hash");
1766
1767 return NXT_UNIT_ERROR;
1768 }
1769
1770 req_impl->websocket = 1;
1771
1772 req->response->status = 101;
1773
1774 return NXT_UNIT_OK;
1775}
1776
1777
1778int
1779nxt_unit_response_is_websocket(nxt_unit_request_info_t *req)
1780{
1781 nxt_unit_request_info_impl_t *req_impl;
1782
1783 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1784
1785 return req_impl->websocket;
1786}
1787
1788
1789nxt_unit_request_info_t *
1790nxt_unit_get_request_info_from_data(void *data)
1791{
1792 nxt_unit_request_info_impl_t *req_impl;
1793
1794 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data);
1795
1796 return &req_impl->req;
1797}
1798
1799
1800int
1801nxt_unit_buf_send(nxt_unit_buf_t *buf)
1802{
1803 int rc;
1804 nxt_unit_mmap_buf_t *mmap_buf;
1805 nxt_unit_request_info_t *req;
1806 nxt_unit_request_info_impl_t *req_impl;
1807
1808 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);

--- 12 unchanged lines hidden (view full) ---

1821
1822 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) {
1823 nxt_unit_req_warn(req, "buf_send: headers not sent yet");
1824
1825 return NXT_UNIT_ERROR;
1826 }
1827
1828 if (nxt_fast_path(buf->free > buf->start)) {
1829 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 0);
1830 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
1831 return rc;
1832 }
1833 }
1834
1835 nxt_unit_mmap_buf_release(mmap_buf);
1836
1837 return NXT_UNIT_OK;

--- 8 unchanged lines hidden (view full) ---

1846 nxt_unit_request_info_t *req;
1847 nxt_unit_request_info_impl_t *req_impl;
1848
1849 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1850
1851 req = mmap_buf->req;
1852 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1853
1854 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, mmap_buf, 1);
1855 if (nxt_slow_path(rc == NXT_UNIT_OK)) {
1856 nxt_unit_mmap_buf_release(mmap_buf);
1857
1858 nxt_unit_request_info_release(req);
1859
1860 } else {
1861 nxt_unit_request_done(req, rc);
1862 }

--- 14 unchanged lines hidden (view full) ---

1877 nxt_chunk_id_t first_free_chunk;
1878 nxt_unit_buf_t *buf;
1879 nxt_unit_impl_t *lib;
1880 nxt_port_mmap_header_t *hdr;
1881
1882 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
1883
1884 buf = &mmap_buf->buf;
1885 hdr = mmap_buf->hdr;
1886
1887 m.mmap_msg.size = buf->free - buf->start;
1888
1889 m.msg.stream = stream;
1890 m.msg.pid = lib->pid;
1891 m.msg.reply_port = 0;
1892 m.msg.type = _NXT_PORT_MSG_DATA;
1893 m.msg.last = last != 0;
1894 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0;
1895 m.msg.nf = 0;
1896 m.msg.mf = 0;
1897 m.msg.tracking = 0;
1898
1899 if (hdr != NULL) {
1900 m.mmap_msg.mmap_id = hdr->id;
1901 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, (u_char *) buf->start);
1902 }
1903
1904 nxt_unit_debug(ctx, "#%"PRIu32": send mmap: (%d,%d,%d)",
1905 stream,
1906 (int) m.mmap_msg.mmap_id,
1907 (int) m.mmap_msg.chunk_id,
1908 (int) m.mmap_msg.size);
1909
1910 res = lib->callbacks.port_send(ctx, &mmap_buf->port_id, &m,
1911 m.msg.mmap ? sizeof(m) : sizeof(m.msg),
1912 NULL, 0);
1913 if (nxt_slow_path(res != sizeof(m))) {
1914 return NXT_UNIT_ERROR;
1915 }
1916
1917 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE && hdr != NULL) {
1918 last_used = (u_char *) buf->free - 1;
1919
1920 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1;
1921 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk);
1922 end = (u_char *) buf->end;
1923
1924 nxt_unit_mmap_release(hdr, first_free, (uint32_t) (end - first_free));
1925
1926 buf->end = (char *) first_free;
1927 }
1928
1929 return NXT_UNIT_OK;
1930}
1931
1932
1933void
1934nxt_unit_buf_free(nxt_unit_buf_t *buf)
1935{
1936 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf));
1937}
1938
1939
1940static void
1941nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf)
1942{
1943 if (nxt_fast_path(mmap_buf->hdr != NULL)) {
1944 nxt_unit_mmap_release(mmap_buf->hdr, mmap_buf->buf.start,
1945 mmap_buf->buf.end - mmap_buf->buf.start);
1946 }
1947
1948 nxt_unit_mmap_buf_release(mmap_buf);
1949}
1950
1951
1952nxt_unit_buf_t *
1953nxt_unit_buf_next(nxt_unit_buf_t *buf)
1954{
1955 nxt_unit_mmap_buf_t *mmap_buf;
1956
1957 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf);
1958
1959 if (mmap_buf->next == NULL) {
1960 return NULL;
1961 }
1962
1963 return &mmap_buf->next->buf;
1964}
1965
1966
1967uint32_t
1968nxt_unit_buf_max(void)
1969{
1970 return PORT_MMAP_DATA_SIZE;
1971}

--- 8 unchanged lines hidden (view full) ---

1980
1981int
1982nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
1983 size_t size)
1984{
1985 int rc;
1986 uint32_t part_size;
1987 const char *part_start;
1988 nxt_unit_mmap_buf_t mmap_buf;
1989 nxt_unit_request_info_impl_t *req_impl;
1990
1991 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
1992
1993 part_start = start;
1994
1995 /* Check if response is not send yet. */

--- 10 unchanged lines hidden (view full) ---

2006 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2007 return rc;
2008 }
2009
2010 size -= part_size;
2011 part_start += part_size;
2012 }
2013
2014 while (size > 0) {
2015 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
2016
2017 rc = nxt_unit_get_outgoing_buf(req->ctx, req_impl->process,
2018 &req->response_port, part_size,
2019 &mmap_buf);
2020 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2021 return rc;
2022 }
2023
2024 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free,
2025 part_start, part_size);
2026
2027 rc = nxt_unit_mmap_buf_send(req->ctx, req_impl->stream, &mmap_buf, 0);
2028 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2029 nxt_unit_mmap_release(mmap_buf.hdr, mmap_buf.buf.start,
2030 mmap_buf.buf.end - mmap_buf.buf.start);
2031
2032 return rc;
2033 }
2034
2035 size -= part_size;

--- 89 unchanged lines hidden (view full) ---

2125
2126 return NXT_UNIT_OK;
2127}
2128
2129
2130ssize_t
2131nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
2132{
2133 return nxt_unit_buf_read(&req->content_buf, &req->content_length,
2134 dst, size);
2135}
2136
2137
2138static ssize_t
2139nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, size_t size)
2140{
2141 u_char *p;
2142 size_t rest, copy, read;
2143 nxt_unit_buf_t *buf;
2144
2145 p = dst;
2146 rest = size;
2147
2148 buf = *b;
2149
2150 while (buf != NULL) {
2151 copy = buf->end - buf->free;
2152 copy = nxt_min(rest, copy);
2153
2154 p = nxt_cpymem(p, buf->free, copy);
2155
2156 buf->free += copy;

--- 5 unchanged lines hidden (view full) ---

2162 }
2163
2164 break;
2165 }
2166
2167 buf = nxt_unit_buf_next(buf);
2168 }
2169
2170 *b = buf;
2171
2172 read = size - rest;
2173
2174 *len -= read;
2175
2176 return read;
2177}
2178
2179
2180void
2181nxt_unit_request_done(nxt_unit_request_info_t *req, int rc)
2182{

--- 36 unchanged lines hidden (view full) ---

2219
2220 return;
2221 }
2222
2223skip_response_send:
2224
2225 lib = nxt_container_of(req->unit, nxt_unit_impl_t, unit);
2226
2227 msg.stream = req_impl->stream;
2228 msg.pid = lib->pid;
2229 msg.reply_port = 0;
2230 msg.type = (rc == NXT_UNIT_OK) ? _NXT_PORT_MSG_DATA
2231 : _NXT_PORT_MSG_RPC_ERROR;
2232 msg.last = 1;
2233 msg.mmap = 0;
2234 msg.nf = 0;
2235 msg.mf = 0;

--- 5 unchanged lines hidden (view full) ---

2241 nxt_unit_req_alert(req, "last message send failed: %s (%d)",
2242 strerror(errno), errno);
2243 }
2244
2245 nxt_unit_request_info_release(req);
2246}
2247
2248
2249int
2250nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
2251 uint8_t last, const void *start, size_t size)
2252{
2253 const struct iovec iov = { (void *) start, size };
2254
2255 return nxt_unit_websocket_sendv(req, opcode, last, &iov, 1);
2256}
2257
2258
2259int
2260nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
2261 uint8_t last, const struct iovec *iov, int iovcnt)
2262{
2263 int i, rc;
2264 size_t l, copy;
2265 uint32_t payload_len, buf_size;
2266 const uint8_t *b;
2267 nxt_unit_buf_t *buf;
2268 nxt_websocket_header_t *wh;
2269
2270 payload_len = 0;
2271
2272 for (i = 0; i < iovcnt; i++) {
2273 payload_len += iov[i].iov_len;
2274 }
2275
2276 buf_size = 10 + payload_len;
2277
2278 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
2279 PORT_MMAP_DATA_SIZE));
2280 if (nxt_slow_path(buf == NULL)) {
2281 nxt_unit_req_error(req, "Failed to allocate buf for content");
2282
2283 return NXT_UNIT_ERROR;
2284 }
2285
2286 buf->start[0] = 0;
2287 buf->start[1] = 0;
2288
2289 wh = (void *) buf->free;
2290
2291 buf->free = nxt_websocket_frame_init(wh, payload_len);
2292 wh->fin = last;
2293 wh->opcode = opcode;
2294
2295 for (i = 0; i < iovcnt; i++) {
2296 b = iov[i].iov_base;
2297 l = iov[i].iov_len;
2298
2299 while (l > 0) {
2300 copy = buf->end - buf->free;
2301 copy = nxt_min(l, copy);
2302
2303 buf->free = nxt_cpymem(buf->free, b, copy);
2304 b += copy;
2305 l -= copy;
2306
2307 if (l > 0) {
2308 buf_size -= buf->end - buf->start;
2309
2310 rc = nxt_unit_buf_send(buf);
2311 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2312 nxt_unit_req_error(req, "Failed to send content");
2313
2314 return NXT_UNIT_ERROR;
2315 }
2316
2317 buf = nxt_unit_response_buf_alloc(req, nxt_min(buf_size,
2318 PORT_MMAP_DATA_SIZE));
2319 if (nxt_slow_path(buf == NULL)) {
2320 nxt_unit_req_error(req,
2321 "Failed to allocate buf for content");
2322
2323 return NXT_UNIT_ERROR;
2324 }
2325 }
2326 }
2327 }
2328
2329 if (buf->free > buf->start) {
2330 rc = nxt_unit_buf_send(buf);
2331 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
2332 nxt_unit_req_error(req, "Failed to send content");
2333 }
2334 }
2335
2336 return rc;
2337}
2338
2339
2340ssize_t
2341nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
2342 size_t size)
2343{
2344 ssize_t res;
2345 uint8_t *b;
2346 uint64_t i, d;
2347
2348 res = nxt_unit_buf_read(&ws->content_buf, &ws->content_length,
2349 dst, size);
2350
2351 if (ws->mask == NULL) {
2352 return res;
2353 }
2354
2355 b = dst;
2356 d = (ws->payload_len - ws->content_length - res) % 4;
2357
2358 for (i = 0; i < (uint64_t) res; i++) {
2359 b[i] ^= ws->mask[ (i + d) % 4 ];
2360 }
2361
2362 return res;
2363}
2364
2365
2366int
2367nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws)
2368{
2369 char *b;
2370 size_t size;
2371 nxt_unit_websocket_frame_impl_t *ws_impl;
2372
2373 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws);
2374
2375 if (ws_impl->retain_buf != NULL || ws_impl->buf->hdr != NULL) {
2376 return NXT_UNIT_OK;
2377 }
2378
2379 size = ws_impl->buf->buf.end - ws_impl->buf->buf.start;
2380
2381 b = malloc(size);
2382 if (nxt_slow_path(b == NULL)) {
2383 return NXT_UNIT_ERROR;
2384 }
2385
2386 memcpy(b, ws_impl->buf->buf.start, size);
2387
2388 ws_impl->buf->buf.start = b;
2389 ws_impl->buf->buf.free = b;
2390 ws_impl->buf->buf.end = b + size;
2391
2392 ws_impl->retain_buf = b;
2393
2394 return NXT_UNIT_OK;
2395}
2396
2397
2398void
2399nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws)
2400{
2401 nxt_unit_websocket_frame_release(ws);
2402}
2403
2404
2405static nxt_port_mmap_header_t *
2406nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, nxt_unit_process_t *process,
2407 nxt_unit_port_id_t *port_id, nxt_chunk_id_t *c, int n)
2408{
2409 int res, nchunks, i;
2410 nxt_unit_mmap_t *mm, *mm_end;
2411 nxt_port_mmap_header_t *hdr;
2412

--- 465 unchanged lines hidden (view full) ---

2878 int rc;
2879 nxt_chunk_id_t c;
2880 nxt_unit_process_t *process;
2881 nxt_port_mmap_header_t *hdr;
2882 nxt_port_mmap_tracking_msg_t *tracking_msg;
2883
2884 if (recv_msg->size < (int) sizeof(nxt_port_mmap_tracking_msg_t)) {
2885 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: too small message (%d)",
2886 recv_msg->stream, (int) recv_msg->size);
2887
2888 return 0;
2889 }
2890
2891 tracking_msg = recv_msg->start;
2892
2893 recv_msg->start = tracking_msg + 1;
2894 recv_msg->size -= sizeof(nxt_port_mmap_tracking_msg_t);

--- 6 unchanged lines hidden (view full) ---

2901 pthread_mutex_lock(&process->incoming.mutex);
2902
2903 hdr = nxt_unit_get_incoming_mmap(ctx, process, tracking_msg->mmap_id);
2904 if (nxt_slow_path(hdr == NULL)) {
2905 pthread_mutex_unlock(&process->incoming.mutex);
2906
2907 nxt_unit_warn(ctx, "#%"PRIu32": tracking_read: "
2908 "invalid mmap id %d,%"PRIu32,
2909 recv_msg->stream, (int) process->pid,
2910 tracking_msg->mmap_id);
2911
2912 return 0;
2913 }
2914
2915 c = tracking_msg->tracking_id;
2916 rc = nxt_atomic_cmp_set(hdr->tracking + c, recv_msg->stream, 0);
2917
2918 if (rc == 0) {
2919 nxt_unit_debug(ctx, "#%"PRIu32": tracking cancelled",
2920 recv_msg->stream);
2921
2922 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c);
2923 }
2924
2925 pthread_mutex_unlock(&process->incoming.mutex);
2926
2927 return rc;
2928}
2929
2930
2931static int
2932nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
2933{
2934 void *start;
2935 uint32_t size;
2936 nxt_unit_process_t *process;
2937 nxt_unit_mmap_buf_t *b, **incoming_tail;
2938 nxt_port_mmap_msg_t *mmap_msg, *end;
2939 nxt_port_mmap_header_t *hdr;
2940
2941 if (nxt_slow_path(recv_msg->size < sizeof(nxt_port_mmap_msg_t))) {
2942 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: too small message (%d)",
2943 recv_msg->stream, (int) recv_msg->size);
2944
2945 return NXT_UNIT_ERROR;
2946 }
2947
2948 process = nxt_unit_msg_get_process(ctx, recv_msg);
2949 if (nxt_slow_path(process == NULL)) {
2950 return NXT_UNIT_ERROR;
2951 }
2952
2953 mmap_msg = recv_msg->start;
2954 end = nxt_pointer_to(recv_msg->start, recv_msg->size);
2955
2956 incoming_tail = &recv_msg->incoming_buf;
2957
2958 pthread_mutex_lock(&process->incoming.mutex);
2959
2960 for (; mmap_msg < end; mmap_msg++) {
2961 hdr = nxt_unit_get_incoming_mmap(ctx, process, mmap_msg->mmap_id);
2962 if (nxt_slow_path(hdr == NULL)) {
2963 pthread_mutex_unlock(&process->incoming.mutex);
2964
2965 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: "
2966 "invalid mmap id %d,%"PRIu32,
2967 recv_msg->stream, (int) process->pid,
2968 mmap_msg->mmap_id);
2969
2970 return NXT_UNIT_ERROR;
2971 }
2972
2973 start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
2974 size = mmap_msg->size;
2975
2976 if (recv_msg->start == mmap_msg) {
2977 recv_msg->start = start;
2978 recv_msg->size = size;
2979 }
2980
2981 b = nxt_unit_mmap_buf_get(ctx);
2982 if (nxt_slow_path(b == NULL)) {
2983 pthread_mutex_unlock(&process->incoming.mutex);
2984
2985 nxt_unit_warn(ctx, "#%"PRIu32": mmap_read: failed to allocate buf",
2986 recv_msg->stream);
2987
2988 nxt_unit_mmap_release(hdr, start, size);
2989
2990 return NXT_UNIT_ERROR;
2991 }
2992
2993 nxt_unit_mmap_buf_insert(incoming_tail, b);
2994 incoming_tail = &b->next;
2995
2996 b->buf.start = start;
2997 b->buf.free = start;
2998 b->buf.end = b->buf.start + size;
2999 b->hdr = hdr;
3000
3001 nxt_unit_debug(ctx, "#%"PRIu32": mmap_read: [%p,%d] %d->%d,(%d,%d,%d)",
3002 recv_msg->stream,
3003 start, (int) size,
3004 (int) hdr->src_pid, (int) hdr->dst_pid,
3005 (int) hdr->id, (int) mmap_msg->chunk_id,
3006 (int) mmap_msg->size);
3007 }
3008
3009 pthread_mutex_unlock(&process->incoming.mutex);
3010

--- 198 unchanged lines hidden (view full) ---

3209 rsize = lib->callbacks.port_recv(ctx, &ctx_impl->read_port_id,
3210 buf, sizeof(buf),
3211 oob, sizeof(oob));
3212 }
3213
3214 if (nxt_fast_path(rsize > 0)) {
3215 rc = nxt_unit_process_msg(ctx, &ctx_impl->read_port_id, buf, rsize,
3216 oob, sizeof(oob));
3217
3218#if (NXT_DEBUG)
3219 memset(buf, 0xAC, rsize);
3220#endif
3221
3222 } else {
3223 rc = NXT_UNIT_ERROR;
3224 }
3225
3226 return rc;
3227}
3228
3229

--- 74 unchanged lines hidden (view full) ---

3304
3305 return &new_ctx->ctx;
3306}
3307
3308
3309void
3310nxt_unit_ctx_free(nxt_unit_ctx_t *ctx)
3311{
3312 nxt_unit_impl_t *lib;
3313 nxt_unit_ctx_impl_t *ctx_impl;
3314 nxt_unit_mmap_buf_t *mmap_buf;
3315 nxt_unit_request_info_impl_t *req_impl;
3316 nxt_unit_websocket_frame_impl_t *ws_impl;
3317
3318 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
3319 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
3320
3321 nxt_queue_each(req_impl, &ctx_impl->active_req,
3322 nxt_unit_request_info_impl_t, link)
3323 {
3324 nxt_unit_req_warn(&req_impl->req, "active request on ctx free");
3325
3326 nxt_unit_request_done(&req_impl->req, NXT_UNIT_ERROR);
3327
3328 } nxt_queue_loop;
3329
3330 nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[0]);
3331 nxt_unit_mmap_buf_remove(&ctx_impl->ctx_buf[1]);
3332
3333 while (ctx_impl->free_buf != NULL) {
3334 mmap_buf = ctx_impl->free_buf;
3335 nxt_unit_mmap_buf_remove(mmap_buf);
3336 free(mmap_buf);
3337 }
3338
3339 nxt_queue_each(req_impl, &ctx_impl->free_req,
3340 nxt_unit_request_info_impl_t, link)
3341 {
3342 nxt_unit_request_info_free(req_impl);
3343
3344 } nxt_queue_loop;
3345
3346 nxt_queue_each(ws_impl, &ctx_impl->free_ws,
3347 nxt_unit_websocket_frame_impl_t, link)
3348 {
3349 nxt_unit_websocket_frame_free(ws_impl);
3350
3351 } nxt_queue_loop;
3352
3353 nxt_queue_remove(&ctx_impl->link);
3354
3355 if (ctx_impl != &lib->main_ctx) {
3356 free(ctx_impl);
3357 }
3358}
3359
3360

--- 629 unchanged lines hidden (view full) ---

3990 return lhq.value;
3991
3992 default:
3993 return NULL;
3994 }
3995}
3996
3997
3998static nxt_int_t
3999nxt_unit_request_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
4000{
4001 return NXT_OK;
4002}
4003
4004
4005static const nxt_lvlhsh_proto_t lvlhsh_requests_proto nxt_aligned(64) = {
4006 NXT_LVLHSH_DEFAULT,
4007 nxt_unit_request_hash_test,
4008 nxt_lvlhsh_alloc,
4009 nxt_lvlhsh_free,
4010};
4011
4012
4013static int
4014nxt_unit_request_hash_add(nxt_lvlhsh_t *request_hash,
4015 nxt_unit_request_info_impl_t *req_impl)
4016{
4017 uint32_t *stream;
4018 nxt_int_t res;
4019 nxt_lvlhsh_query_t lhq;
4020
4021 stream = &req_impl->stream;
4022
4023 lhq.key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
4024 lhq.key.length = sizeof(*stream);
4025 lhq.key.start = (u_char *) stream;
4026 lhq.proto = &lvlhsh_requests_proto;
4027 lhq.pool = NULL;
4028 lhq.replace = 0;
4029 lhq.value = req_impl;
4030
4031 res = nxt_lvlhsh_insert(request_hash, &lhq);
4032
4033 switch (res) {
4034
4035 case NXT_OK:
4036 return NXT_UNIT_OK;
4037
4038 default:
4039 return NXT_UNIT_ERROR;
4040 }
4041}
4042
4043
4044static nxt_unit_request_info_impl_t *
4045nxt_unit_request_hash_find(nxt_lvlhsh_t *request_hash, uint32_t stream,
4046 int remove)
4047{
4048 nxt_int_t res;
4049 nxt_lvlhsh_query_t lhq;
4050
4051 lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(stream));
4052 lhq.key.length = sizeof(stream);
4053 lhq.key.start = (u_char *) &stream;
4054 lhq.proto = &lvlhsh_requests_proto;
4055 lhq.pool = NULL;
4056
4057 if (remove) {
4058 res = nxt_lvlhsh_delete(request_hash, &lhq);
4059
4060 } else {
4061 res = nxt_lvlhsh_find(request_hash, &lhq);
4062 }
4063
4064 switch (res) {
4065
4066 case NXT_OK:
4067 return lhq.value;
4068
4069 default:
4070 return NULL;
4071 }
4072}
4073
4074
4075void
4076nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char *fmt, ...)
4077{
4078 int log_fd, n;
4079 char msg[NXT_MAX_ERROR_STR], *p, *end;
4080 pid_t pid;
4081 va_list ap;
4082 nxt_unit_impl_t *lib;

--- 56 unchanged lines hidden (view full) ---

4139 p = msg;
4140 end = p + sizeof(msg) - 1;
4141
4142 p = nxt_unit_snprint_prefix(p, end, pid, level);
4143
4144 if (nxt_fast_path(req != NULL)) {
4145 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
4146
4147 p += snprintf(p, end - p, "#%"PRIu32": ", req_impl->stream);
4148 }
4149
4150 va_start(ap, fmt);
4151 p += vsnprintf(p, end - p, fmt, ap);
4152 va_end(ap);
4153
4154 if (nxt_slow_path(p > end)) {
4155 memcpy(end - 5, "[...]", 5);

--- 77 unchanged lines hidden ---