xref: /unit/src/nxt_http_websocket.c (revision 1269:41331471eee7)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <nxt_main.h>
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_router_request.h>
10 #include <nxt_port_memory_int.h>
11 #include <nxt_websocket.h>
12 #include <nxt_websocket_header.h>
13 
14 
15 static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
16 static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
17     void *data);
18 
19 
20 const nxt_http_request_state_t  nxt_http_websocket
21     nxt_aligned(64) =
22 {
23     .ready_handler = nxt_http_websocket_client,
24     .error_handler = nxt_http_websocket_error_handler,
25 };
26 
27 
28 static void
29 nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
30 {
31     size_t                  frame_size, used_size, copy_size, buf_free_size;
32     size_t                  chunk_copy_size;
33     nxt_buf_t               *out, *buf, **out_tail, *b, *next;
34     nxt_int_t               res;
35     nxt_http_request_t      *r;
36     nxt_request_app_link_t  *req_app_link;
37     nxt_request_rpc_data_t  *req_rpc_data;
38     nxt_websocket_header_t  *wsh;
39 
40     r = obj;
41 
42     if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL
43          || (req_app_link = req_rpc_data->req_app_link) == NULL))
44     {
45         nxt_debug(task, "websocket client frame for destroyed request");
46 
47         return;
48     }
49 
50     nxt_debug(task, "http websocket client frame");
51 
52     wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
53 
54     frame_size = nxt_websocket_frame_header_size(wsh)
55                   + nxt_websocket_frame_payload_len(wsh);
56 
57     buf = NULL;
58     buf_free_size = 0;
59     out = NULL;
60     out_tail = &out;
61 
62     b = r->ws_frame;
63 
64     while (b != NULL && frame_size > 0) {
65         used_size = nxt_buf_mem_used_size(&b->mem);
66         copy_size = nxt_min(used_size, frame_size);
67 
68         while (copy_size > 0) {
69             if (buf == NULL || buf_free_size == 0) {
70                 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
71 
72                 buf = nxt_port_mmap_get_buf(task, req_app_link->app_port,
73                                             buf_free_size);
74 
75                 *out_tail = buf;
76                 out_tail = &buf->next;
77             }
78 
79             chunk_copy_size = nxt_min(buf_free_size, copy_size);
80 
81             buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
82                                        chunk_copy_size);
83 
84             copy_size -= chunk_copy_size;
85             b->mem.pos += chunk_copy_size;
86             buf_free_size -= chunk_copy_size;
87         }
88 
89         frame_size -= copy_size;
90         next = b->next;
91         b->next = NULL;
92 
93         if (nxt_buf_mem_used_size(&b->mem) == 0) {
94             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
95                                b->completion_handler, task, b, b->parent);
96 
97             r->ws_frame = next;
98         }
99 
100         b = next;
101     }
102 
103     res = nxt_port_socket_twrite(task, req_app_link->app_port,
104                                  NXT_PORT_MSG_WEBSOCKET, -1,
105                                  req_app_link->stream,
106                                  req_app_link->reply_port->id, out, NULL);
107     if (nxt_slow_path(res != NXT_OK)) {
108         // TODO: handle
109     }
110 
111     b = r->ws_frame;
112 
113     if (b != NULL) {
114         used_size = nxt_buf_mem_used_size(&b->mem);
115 
116         if (used_size > 0) {
117             nxt_memmove(b->mem.start, b->mem.pos, used_size);
118 
119             b->mem.pos = b->mem.start;
120             b->mem.free = b->mem.start + used_size;
121         }
122     }
123 
124     nxt_http_request_ws_frame_start(task, r, r->ws_frame);
125 }
126 
127 
128 static void
129 nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
130 {
131     nxt_http_request_t      *r;
132     nxt_request_app_link_t  *req_app_link;
133     nxt_request_rpc_data_t  *req_rpc_data;
134 
135     nxt_debug(task, "http websocket error handler");
136 
137     r = obj;
138 
139     if ((req_rpc_data = r->req_rpc_data) == NULL) {
140         nxt_debug(task, "  req_rpc_data is NULL");
141         goto close_handler;
142     }
143 
144     if ((req_app_link = req_rpc_data->req_app_link) == NULL) {
145         nxt_debug(task, "  req_app_link is NULL");
146         goto close_handler;
147     }
148 
149     if (req_app_link->app_port == NULL) {
150         nxt_debug(task, "  app_port is NULL");
151         goto close_handler;
152     }
153 
154     (void) nxt_port_socket_twrite(task, req_app_link->app_port,
155                                   NXT_PORT_MSG_WEBSOCKET_LAST,
156                                   -1, req_app_link->stream,
157                                   req_app_link->reply_port->id, NULL, NULL);
158 
159 close_handler:
160 
161     nxt_http_request_close_handler(task, obj, data);
162 }
163