nxt_http_websocket.c (1546:06017e6e3a5f) nxt_http_websocket.c (1547:cbcd76704c90)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <nxt_main.h>
7#include <nxt_router.h>
8#include <nxt_http.h>

--- 19 unchanged lines hidden (view full) ---

28static void
29nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
30{
31 size_t frame_size, used_size, copy_size, buf_free_size;
32 size_t chunk_copy_size;
33 nxt_buf_t *out, *buf, **out_tail, *b, *next;
34 nxt_int_t res;
35 nxt_http_request_t *r;
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include <nxt_main.h>
7#include <nxt_router.h>
8#include <nxt_http.h>

--- 19 unchanged lines hidden (view full) ---

28static void
29nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
30{
31 size_t frame_size, used_size, copy_size, buf_free_size;
32 size_t chunk_copy_size;
33 nxt_buf_t *out, *buf, **out_tail, *b, *next;
34 nxt_int_t res;
35 nxt_http_request_t *r;
36 nxt_request_app_link_t *req_app_link;
37 nxt_request_rpc_data_t *req_rpc_data;
38 nxt_websocket_header_t *wsh;
39
40 r = obj;
36 nxt_request_rpc_data_t *req_rpc_data;
37 nxt_websocket_header_t *wsh;
38
39 r = obj;
40 req_rpc_data = r->req_rpc_data;
41
41
42 if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL
43 || (req_app_link = req_rpc_data->req_app_link) == NULL))
44 {
42 if (nxt_slow_path(req_rpc_data == NULL)) {
45 nxt_debug(task, "websocket client frame for destroyed request");
46
47 return;
48 }
49
50 nxt_debug(task, "http websocket client frame");
51
52 wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;

--- 11 unchanged lines hidden (view full) ---

64 while (b != NULL && frame_size > 0) {
65 used_size = nxt_buf_mem_used_size(&b->mem);
66 copy_size = nxt_min(used_size, frame_size);
67
68 while (copy_size > 0) {
69 if (buf == NULL || buf_free_size == 0) {
70 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
71
43 nxt_debug(task, "websocket client frame for destroyed request");
44
45 return;
46 }
47
48 nxt_debug(task, "http websocket client frame");
49
50 wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;

--- 11 unchanged lines hidden (view full) ---

62 while (b != NULL && frame_size > 0) {
63 used_size = nxt_buf_mem_used_size(&b->mem);
64 copy_size = nxt_min(used_size, frame_size);
65
66 while (copy_size > 0) {
67 if (buf == NULL || buf_free_size == 0) {
68 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
69
72 buf = nxt_port_mmap_get_buf(task,
73 &req_app_link->app_port->process->outgoing,
70 buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing,
74 buf_free_size);
75
76 *out_tail = buf;
77 out_tail = &buf->next;
78 }
79
80 chunk_copy_size = nxt_min(buf_free_size, copy_size);
81

--- 14 unchanged lines hidden (view full) ---

96 b->completion_handler, task, b, b->parent);
97
98 r->ws_frame = next;
99 }
100
101 b = next;
102 }
103
71 buf_free_size);
72
73 *out_tail = buf;
74 out_tail = &buf->next;
75 }
76
77 chunk_copy_size = nxt_min(buf_free_size, copy_size);
78

--- 14 unchanged lines hidden (view full) ---

93 b->completion_handler, task, b, b->parent);
94
95 r->ws_frame = next;
96 }
97
98 b = next;
99 }
100
104 res = nxt_port_socket_twrite(task, req_app_link->app_port,
101 res = nxt_port_socket_twrite(task, req_rpc_data->app_port,
105 NXT_PORT_MSG_WEBSOCKET, -1,
102 NXT_PORT_MSG_WEBSOCKET, -1,
106 req_app_link->stream,
107 req_app_link->reply_port->id, out, NULL);
103 req_rpc_data->stream,
104 task->thread->engine->port->id, out, NULL);
108 if (nxt_slow_path(res != NXT_OK)) {
109 // TODO: handle
110 }
111
112 b = r->ws_frame;
113
114 if (b != NULL) {
115 used_size = nxt_buf_mem_used_size(&b->mem);

--- 9 unchanged lines hidden (view full) ---

125 nxt_http_request_ws_frame_start(task, r, r->ws_frame);
126}
127
128
129static void
130nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
131{
132 nxt_http_request_t *r;
105 if (nxt_slow_path(res != NXT_OK)) {
106 // TODO: handle
107 }
108
109 b = r->ws_frame;
110
111 if (b != NULL) {
112 used_size = nxt_buf_mem_used_size(&b->mem);

--- 9 unchanged lines hidden (view full) ---

122 nxt_http_request_ws_frame_start(task, r, r->ws_frame);
123}
124
125
126static void
127nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
128{
129 nxt_http_request_t *r;
133 nxt_request_app_link_t *req_app_link;
134 nxt_request_rpc_data_t *req_rpc_data;
135
136 nxt_debug(task, "http websocket error handler");
137
138 r = obj;
130 nxt_request_rpc_data_t *req_rpc_data;
131
132 nxt_debug(task, "http websocket error handler");
133
134 r = obj;
135 req_rpc_data = r->req_rpc_data;
139
136
140 if ((req_rpc_data = r->req_rpc_data) == NULL) {
137 if (req_rpc_data == NULL) {
141 nxt_debug(task, " req_rpc_data is NULL");
142 goto close_handler;
143 }
144
138 nxt_debug(task, " req_rpc_data is NULL");
139 goto close_handler;
140 }
141
145 if ((req_app_link = req_rpc_data->req_app_link) == NULL) {
146 nxt_debug(task, " req_app_link is NULL");
147 goto close_handler;
148 }
149
150 if (req_app_link->app_port == NULL) {
142 if (req_rpc_data->app_port == NULL) {
151 nxt_debug(task, " app_port is NULL");
152 goto close_handler;
153 }
154
143 nxt_debug(task, " app_port is NULL");
144 goto close_handler;
145 }
146
155 (void) nxt_port_socket_twrite(task, req_app_link->app_port,
147 (void) nxt_port_socket_twrite(task, req_rpc_data->app_port,
156 NXT_PORT_MSG_WEBSOCKET_LAST,
148 NXT_PORT_MSG_WEBSOCKET_LAST,
157 -1, req_app_link->stream,
158 req_app_link->reply_port->id, NULL, NULL);
149 -1, req_rpc_data->stream,
150 task->thread->engine->port->id, NULL, NULL);
159
160close_handler:
161
162 nxt_http_request_close_handler(task, obj, data);
163}
151
152close_handler:
153
154 nxt_http_request_close_handler(task, obj, data);
155}