xref: /unit/src/nxt_http_websocket.c (revision 1131:ec7d924d8dfb)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <nxt_main.h>
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_router_request.h>
10 #include <nxt_port_memory_int.h>
11 #include <nxt_websocket.h>
12 #include <nxt_websocket_header.h>
13 
14 
15 static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
16 static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
17     void *data);
18 
19 
20 const nxt_http_request_state_t  nxt_http_websocket
21     nxt_aligned(64) =
22 {
23     .ready_handler = nxt_http_websocket_client,
24     .error_handler = nxt_http_websocket_error_handler,
25 };
26 
27 
28 static void
29 nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
30 {
31     size_t                  frame_size, used_size, copy_size, buf_free_size;
32     size_t                  chunk_copy_size;
33     nxt_buf_t               *out, *buf, **out_tail, *b, *next;
34     nxt_int_t               res;
35     nxt_http_request_t      *r;
36     nxt_request_app_link_t  *req_app_link;
37     nxt_request_rpc_data_t  *req_rpc_data;
38     nxt_websocket_header_t  *wsh;
39 
40     r = obj;
41 
42     if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL
43          || (req_app_link = req_rpc_data->req_app_link) == NULL))
44     {
45         nxt_debug(task, "websocket client frame for destroyed request");
46 
47         return;
48     }
49 
50     nxt_debug(task, "http websocket client frame");
51 
52     wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
53 
54     frame_size = nxt_websocket_frame_header_size(wsh)
55                   + nxt_websocket_frame_payload_len(wsh);
56 
57     buf = NULL;
58     buf_free_size = 0;
59     out = NULL;
60     out_tail = &out;
61 
62     b = r->ws_frame;
63 
64     while (b != NULL && frame_size > 0) {
65         used_size = nxt_buf_mem_used_size(&b->mem);
66         copy_size = nxt_min(used_size, frame_size);
67 
68         while (copy_size > 0) {
69             if (buf == NULL || buf_free_size == 0) {
70                 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
71 
72                 buf = nxt_port_mmap_get_buf(task, req_app_link->app_port,
73                                             buf_free_size);
74 
75                 *out_tail = buf;
76                 out_tail = &buf->next;
77             }
78 
79             chunk_copy_size = nxt_min(buf_free_size, copy_size);
80 
81             buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
82                                        chunk_copy_size);
83 
84             copy_size -= chunk_copy_size;
85             b->mem.pos += chunk_copy_size;
86             buf_free_size -= chunk_copy_size;
87         }
88 
89         frame_size -= copy_size;
90         next = b->next;
91 
92         if (nxt_buf_mem_used_size(&b->mem) == 0) {
93             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
94                                b->completion_handler, task, b, b->parent);
95 
96             r->ws_frame = next;
97         }
98 
99         b = next;
100     }
101 
102     res = nxt_port_socket_twrite(task, req_app_link->app_port,
103                                  NXT_PORT_MSG_WEBSOCKET, -1,
104                                  req_app_link->stream,
105                                  req_app_link->reply_port->id, out, NULL);
106     if (nxt_slow_path(res != NXT_OK)) {
107         // TODO: handle
108     }
109 
110     b = r->ws_frame;
111 
112     if (b != NULL) {
113         used_size = nxt_buf_mem_used_size(&b->mem);
114 
115         if (used_size > 0) {
116             nxt_memmove(b->mem.start, b->mem.pos, used_size);
117 
118             b->mem.pos = b->mem.start;
119             b->mem.free = b->mem.start + used_size;
120         }
121     }
122 
123     nxt_http_request_ws_frame_start(task, r, r->ws_frame);
124 }
125 
126 
127 static void
128 nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
129 {
130     nxt_http_request_t      *r;
131     nxt_request_app_link_t  *req_app_link;
132     nxt_request_rpc_data_t  *req_rpc_data;
133 
134     nxt_debug(task, "http websocket error handler");
135 
136     r = obj;
137 
138     if ((req_rpc_data = r->req_rpc_data) == NULL) {
139         nxt_debug(task, "  req_rpc_data is NULL");
140         goto close_handler;
141     }
142 
143     if ((req_app_link = req_rpc_data->req_app_link) == NULL) {
144         nxt_debug(task, "  req_app_link is NULL");
145         goto close_handler;
146     }
147 
148     if (req_app_link->app_port == NULL) {
149         nxt_debug(task, "  app_port is NULL");
150         goto close_handler;
151     }
152 
153     (void) nxt_port_socket_twrite(task, req_app_link->app_port,
154                                   NXT_PORT_MSG_WEBSOCKET_LAST,
155                                   -1, req_app_link->stream,
156                                   req_app_link->reply_port->id, NULL, NULL);
157 
158 close_handler:
159 
160     nxt_http_request_close_handler(task, obj, data);
161 }
162