1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include <nxt_main.h>
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_router_request.h>
10 #include <nxt_port_memory_int.h>
11 #include <nxt_websocket.h>
12 #include <nxt_websocket_header.h>
13
14
15 static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
16 static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
17 void *data);
18
19
20 const nxt_http_request_state_t nxt_http_websocket
21 nxt_aligned(64) =
22 {
23 .ready_handler = nxt_http_websocket_client,
24 .error_handler = nxt_http_websocket_error_handler,
25 };
26
27
28 static void
nxt_http_websocket_client(nxt_task_t * task,void * obj,void * data)29 nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
30 {
31 size_t frame_size, used_size, copy_size, buf_free_size;
32 size_t chunk_copy_size;
33 nxt_buf_t *out, *buf, **out_tail, *b, *next;
34 nxt_int_t res;
35 nxt_http_request_t *r;
36 nxt_request_rpc_data_t *req_rpc_data;
37 nxt_websocket_header_t *wsh;
38
39 r = obj;
40 req_rpc_data = r->req_rpc_data;
41
42 if (nxt_slow_path(req_rpc_data == NULL)) {
43 nxt_debug(task, "websocket client frame for destroyed request");
44
45 return;
46 }
47
48 nxt_debug(task, "http websocket client frame");
49
50 wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
51
52 frame_size = nxt_websocket_frame_header_size(wsh)
53 + nxt_websocket_frame_payload_len(wsh);
54
55 buf = NULL;
56 buf_free_size = 0;
57 out = NULL;
58 out_tail = &out;
59
60 b = r->ws_frame;
61
62 while (b != NULL && frame_size > 0) {
63 used_size = nxt_buf_mem_used_size(&b->mem);
64 copy_size = nxt_min(used_size, frame_size);
65
66 while (copy_size > 0) {
67 if (buf == NULL || buf_free_size == 0) {
68 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
69
70 buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing,
71 buf_free_size);
72
73 *out_tail = buf;
74 out_tail = &buf->next;
75 }
76
77 chunk_copy_size = nxt_min(buf_free_size, copy_size);
78
79 buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
80 chunk_copy_size);
81
82 copy_size -= chunk_copy_size;
83 b->mem.pos += chunk_copy_size;
84 buf_free_size -= chunk_copy_size;
85 }
86
87 frame_size -= copy_size;
88 next = b->next;
89 b->next = NULL;
90
91 if (nxt_buf_mem_used_size(&b->mem) == 0) {
92 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
93 b->completion_handler, task, b, b->parent);
94
95 r->ws_frame = next;
96 }
97
98 b = next;
99 }
100
101 res = nxt_port_socket_write(task, req_rpc_data->app_port,
102 NXT_PORT_MSG_WEBSOCKET, -1,
103 req_rpc_data->stream,
104 task->thread->engine->port->id, out);
105 if (nxt_slow_path(res != NXT_OK)) {
106 // TODO: handle
107 }
108
109 b = r->ws_frame;
110
111 if (b != NULL) {
112 used_size = nxt_buf_mem_used_size(&b->mem);
113
114 if (used_size > 0) {
115 nxt_memmove(b->mem.start, b->mem.pos, used_size);
116
117 b->mem.pos = b->mem.start;
118 b->mem.free = b->mem.start + used_size;
119 }
120 }
121
122 nxt_http_request_ws_frame_start(task, r, r->ws_frame);
123 }
124
125
126 static void
nxt_http_websocket_error_handler(nxt_task_t * task,void * obj,void * data)127 nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
128 {
129 nxt_http_request_t *r;
130 nxt_request_rpc_data_t *req_rpc_data;
131
132 nxt_debug(task, "http websocket error handler");
133
134 r = obj;
135 req_rpc_data = r->req_rpc_data;
136
137 if (req_rpc_data == NULL) {
138 nxt_debug(task, " req_rpc_data is NULL");
139 goto close_handler;
140 }
141
142 if (req_rpc_data->app_port == NULL) {
143 nxt_debug(task, " app_port is NULL");
144 goto close_handler;
145 }
146
147 (void) nxt_port_socket_write(task, req_rpc_data->app_port,
148 NXT_PORT_MSG_WEBSOCKET_LAST,
149 -1, req_rpc_data->stream,
150 task->thread->engine->port->id, NULL);
151
152 close_handler:
153
154 nxt_http_request_close_handler(task, obj, data);
155 }
156