xref: /unit/src/nxt_http_websocket.c (revision 1546:06017e6e3a5f)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <nxt_main.h>
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_router_request.h>
10 #include <nxt_port_memory_int.h>
11 #include <nxt_websocket.h>
12 #include <nxt_websocket_header.h>
13 
14 
15 static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
16 static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
17     void *data);
18 
19 
20 const nxt_http_request_state_t  nxt_http_websocket
21     nxt_aligned(64) =
22 {
23     .ready_handler = nxt_http_websocket_client,
24     .error_handler = nxt_http_websocket_error_handler,
25 };
26 
27 
28 static void
29 nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
30 {
31     size_t                  frame_size, used_size, copy_size, buf_free_size;
32     size_t                  chunk_copy_size;
33     nxt_buf_t               *out, *buf, **out_tail, *b, *next;
34     nxt_int_t               res;
35     nxt_http_request_t      *r;
36     nxt_request_app_link_t  *req_app_link;
37     nxt_request_rpc_data_t  *req_rpc_data;
38     nxt_websocket_header_t  *wsh;
39 
40     r = obj;
41 
42     if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL
43          || (req_app_link = req_rpc_data->req_app_link) == NULL))
44     {
45         nxt_debug(task, "websocket client frame for destroyed request");
46 
47         return;
48     }
49 
50     nxt_debug(task, "http websocket client frame");
51 
52     wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
53 
54     frame_size = nxt_websocket_frame_header_size(wsh)
55                   + nxt_websocket_frame_payload_len(wsh);
56 
57     buf = NULL;
58     buf_free_size = 0;
59     out = NULL;
60     out_tail = &out;
61 
62     b = r->ws_frame;
63 
64     while (b != NULL && frame_size > 0) {
65         used_size = nxt_buf_mem_used_size(&b->mem);
66         copy_size = nxt_min(used_size, frame_size);
67 
68         while (copy_size > 0) {
69             if (buf == NULL || buf_free_size == 0) {
70                 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
71 
72                 buf = nxt_port_mmap_get_buf(task,
73                                      &req_app_link->app_port->process->outgoing,
74                                             buf_free_size);
75 
76                 *out_tail = buf;
77                 out_tail = &buf->next;
78             }
79 
80             chunk_copy_size = nxt_min(buf_free_size, copy_size);
81 
82             buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
83                                        chunk_copy_size);
84 
85             copy_size -= chunk_copy_size;
86             b->mem.pos += chunk_copy_size;
87             buf_free_size -= chunk_copy_size;
88         }
89 
90         frame_size -= copy_size;
91         next = b->next;
92         b->next = NULL;
93 
94         if (nxt_buf_mem_used_size(&b->mem) == 0) {
95             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
96                                b->completion_handler, task, b, b->parent);
97 
98             r->ws_frame = next;
99         }
100 
101         b = next;
102     }
103 
104     res = nxt_port_socket_twrite(task, req_app_link->app_port,
105                                  NXT_PORT_MSG_WEBSOCKET, -1,
106                                  req_app_link->stream,
107                                  req_app_link->reply_port->id, out, NULL);
108     if (nxt_slow_path(res != NXT_OK)) {
109         // TODO: handle
110     }
111 
112     b = r->ws_frame;
113 
114     if (b != NULL) {
115         used_size = nxt_buf_mem_used_size(&b->mem);
116 
117         if (used_size > 0) {
118             nxt_memmove(b->mem.start, b->mem.pos, used_size);
119 
120             b->mem.pos = b->mem.start;
121             b->mem.free = b->mem.start + used_size;
122         }
123     }
124 
125     nxt_http_request_ws_frame_start(task, r, r->ws_frame);
126 }
127 
128 
129 static void
130 nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
131 {
132     nxt_http_request_t      *r;
133     nxt_request_app_link_t  *req_app_link;
134     nxt_request_rpc_data_t  *req_rpc_data;
135 
136     nxt_debug(task, "http websocket error handler");
137 
138     r = obj;
139 
140     if ((req_rpc_data = r->req_rpc_data) == NULL) {
141         nxt_debug(task, "  req_rpc_data is NULL");
142         goto close_handler;
143     }
144 
145     if ((req_app_link = req_rpc_data->req_app_link) == NULL) {
146         nxt_debug(task, "  req_app_link is NULL");
147         goto close_handler;
148     }
149 
150     if (req_app_link->app_port == NULL) {
151         nxt_debug(task, "  app_port is NULL");
152         goto close_handler;
153     }
154 
155     (void) nxt_port_socket_twrite(task, req_app_link->app_port,
156                                   NXT_PORT_MSG_WEBSOCKET_LAST,
157                                   -1, req_app_link->stream,
158                                   req_app_link->reply_port->id, NULL, NULL);
159 
160 close_handler:
161 
162     nxt_http_request_close_handler(task, obj, data);
163 }
164