xref: /unit/src/nxt_http_websocket.c (revision 1555:1d84b9e4b459)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <nxt_main.h>
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_router_request.h>
10 #include <nxt_port_memory_int.h>
11 #include <nxt_websocket.h>
12 #include <nxt_websocket_header.h>
13 
14 
15 static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
16 static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
17     void *data);
18 
19 
20 const nxt_http_request_state_t  nxt_http_websocket
21     nxt_aligned(64) =
22 {
23     .ready_handler = nxt_http_websocket_client,
24     .error_handler = nxt_http_websocket_error_handler,
25 };
26 
27 
28 static void
nxt_http_websocket_client(nxt_task_t * task,void * obj,void * data)29 nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
30 {
31     size_t                  frame_size, used_size, copy_size, buf_free_size;
32     size_t                  chunk_copy_size;
33     nxt_buf_t               *out, *buf, **out_tail, *b, *next;
34     nxt_int_t               res;
35     nxt_http_request_t      *r;
36     nxt_request_rpc_data_t  *req_rpc_data;
37     nxt_websocket_header_t  *wsh;
38 
39     r = obj;
40     req_rpc_data = r->req_rpc_data;
41 
42     if (nxt_slow_path(req_rpc_data == NULL)) {
43         nxt_debug(task, "websocket client frame for destroyed request");
44 
45         return;
46     }
47 
48     nxt_debug(task, "http websocket client frame");
49 
50     wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
51 
52     frame_size = nxt_websocket_frame_header_size(wsh)
53                   + nxt_websocket_frame_payload_len(wsh);
54 
55     buf = NULL;
56     buf_free_size = 0;
57     out = NULL;
58     out_tail = &out;
59 
60     b = r->ws_frame;
61 
62     while (b != NULL && frame_size > 0) {
63         used_size = nxt_buf_mem_used_size(&b->mem);
64         copy_size = nxt_min(used_size, frame_size);
65 
66         while (copy_size > 0) {
67             if (buf == NULL || buf_free_size == 0) {
68                 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
69 
70                 buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing,
71                                             buf_free_size);
72 
73                 *out_tail = buf;
74                 out_tail = &buf->next;
75             }
76 
77             chunk_copy_size = nxt_min(buf_free_size, copy_size);
78 
79             buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
80                                        chunk_copy_size);
81 
82             copy_size -= chunk_copy_size;
83             b->mem.pos += chunk_copy_size;
84             buf_free_size -= chunk_copy_size;
85         }
86 
87         frame_size -= copy_size;
88         next = b->next;
89         b->next = NULL;
90 
91         if (nxt_buf_mem_used_size(&b->mem) == 0) {
92             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
93                                b->completion_handler, task, b, b->parent);
94 
95             r->ws_frame = next;
96         }
97 
98         b = next;
99     }
100 
101     res = nxt_port_socket_write(task, req_rpc_data->app_port,
102                                 NXT_PORT_MSG_WEBSOCKET, -1,
103                                 req_rpc_data->stream,
104                                 task->thread->engine->port->id, out);
105     if (nxt_slow_path(res != NXT_OK)) {
106         // TODO: handle
107     }
108 
109     b = r->ws_frame;
110 
111     if (b != NULL) {
112         used_size = nxt_buf_mem_used_size(&b->mem);
113 
114         if (used_size > 0) {
115             nxt_memmove(b->mem.start, b->mem.pos, used_size);
116 
117             b->mem.pos = b->mem.start;
118             b->mem.free = b->mem.start + used_size;
119         }
120     }
121 
122     nxt_http_request_ws_frame_start(task, r, r->ws_frame);
123 }
124 
125 
126 static void
nxt_http_websocket_error_handler(nxt_task_t * task,void * obj,void * data)127 nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
128 {
129     nxt_http_request_t      *r;
130     nxt_request_rpc_data_t  *req_rpc_data;
131 
132     nxt_debug(task, "http websocket error handler");
133 
134     r = obj;
135     req_rpc_data = r->req_rpc_data;
136 
137     if (req_rpc_data == NULL) {
138         nxt_debug(task, "  req_rpc_data is NULL");
139         goto close_handler;
140     }
141 
142     if (req_rpc_data->app_port == NULL) {
143         nxt_debug(task, "  app_port is NULL");
144         goto close_handler;
145     }
146 
147     (void) nxt_port_socket_write(task, req_rpc_data->app_port,
148                                  NXT_PORT_MSG_WEBSOCKET_LAST,
149                                  -1, req_rpc_data->stream,
150                                  task->thread->engine->port->id, NULL);
151 
152 close_handler:
153 
154     nxt_http_request_close_handler(task, obj, data);
155 }
156