Deleted
Added
nxt_http_websocket.c (1546:06017e6e3a5f) | nxt_http_websocket.c (1547:cbcd76704c90) |
---|---|
1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include <nxt_main.h> 7#include <nxt_router.h> 8#include <nxt_http.h> --- 19 unchanged lines hidden (view full) --- 28static void 29nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) 30{ 31 size_t frame_size, used_size, copy_size, buf_free_size; 32 size_t chunk_copy_size; 33 nxt_buf_t *out, *buf, **out_tail, *b, *next; 34 nxt_int_t res; 35 nxt_http_request_t *r; | 1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include <nxt_main.h> 7#include <nxt_router.h> 8#include <nxt_http.h> --- 19 unchanged lines hidden (view full) --- 28static void 29nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data) 30{ 31 size_t frame_size, used_size, copy_size, buf_free_size; 32 size_t chunk_copy_size; 33 nxt_buf_t *out, *buf, **out_tail, *b, *next; 34 nxt_int_t res; 35 nxt_http_request_t *r; |
36 nxt_request_app_link_t *req_app_link; | |
37 nxt_request_rpc_data_t *req_rpc_data; 38 nxt_websocket_header_t *wsh; 39 40 r = obj; | 36 nxt_request_rpc_data_t *req_rpc_data; 37 nxt_websocket_header_t *wsh; 38 39 r = obj; |
40 req_rpc_data = r->req_rpc_data; |
|
41 | 41 |
42 if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL 43 || (req_app_link = req_rpc_data->req_app_link) == NULL)) 44 { | 42 if (nxt_slow_path(req_rpc_data == NULL)) { |
45 nxt_debug(task, "websocket client frame for destroyed request"); 46 47 return; 48 } 49 50 nxt_debug(task, "http websocket client frame"); 51 52 wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; --- 11 unchanged lines hidden (view full) --- 64 while (b != NULL && frame_size > 0) { 65 used_size = nxt_buf_mem_used_size(&b->mem); 66 copy_size = nxt_min(used_size, frame_size); 67 68 while (copy_size > 0) { 69 if (buf == NULL || buf_free_size == 0) { 70 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); 71 | 43 nxt_debug(task, "websocket client frame for destroyed request"); 44 45 return; 46 } 47 48 nxt_debug(task, "http websocket client frame"); 49 50 wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; --- 11 unchanged lines hidden (view full) --- 62 while (b != NULL && frame_size > 0) { 63 used_size = nxt_buf_mem_used_size(&b->mem); 64 copy_size = nxt_min(used_size, frame_size); 65 66 while (copy_size > 0) { 67 if (buf == NULL || buf_free_size == 0) { 68 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE); 69 |
72 buf = nxt_port_mmap_get_buf(task, 73 &req_app_link->app_port->process->outgoing, | 70 buf = nxt_port_mmap_get_buf(task, &req_rpc_data->app->outgoing, |
74 buf_free_size); 75 76 *out_tail = buf; 77 out_tail = &buf->next; 78 } 79 80 chunk_copy_size = nxt_min(buf_free_size, copy_size); 81 --- 14 unchanged lines hidden (view full) --- 96 b->completion_handler, task, b, b->parent); 97 98 r->ws_frame = next; 99 } 100 101 b = next; 102 } 103 | 71 buf_free_size); 72 73 *out_tail = buf; 74 out_tail = &buf->next; 75 } 76 77 chunk_copy_size = nxt_min(buf_free_size, copy_size); 78 --- 14 unchanged lines hidden (view full) --- 93 b->completion_handler, task, b, b->parent); 94 95 r->ws_frame = next; 96 } 97 98 b = next; 99 } 100 |
104 res = nxt_port_socket_twrite(task, req_app_link->app_port, | 101 res = nxt_port_socket_twrite(task, req_rpc_data->app_port, |
105 NXT_PORT_MSG_WEBSOCKET, -1, | 102 NXT_PORT_MSG_WEBSOCKET, -1, |
106 req_app_link->stream, 107 req_app_link->reply_port->id, out, NULL); | 103 req_rpc_data->stream, 104 task->thread->engine->port->id, out, NULL); |
108 if (nxt_slow_path(res != NXT_OK)) { 109 // TODO: handle 110 } 111 112 b = r->ws_frame; 113 114 if (b != NULL) { 115 used_size = nxt_buf_mem_used_size(&b->mem); --- 9 unchanged lines hidden (view full) --- 125 nxt_http_request_ws_frame_start(task, r, r->ws_frame); 126} 127 128 129static void 130nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) 131{ 132 nxt_http_request_t *r; | 105 if (nxt_slow_path(res != NXT_OK)) { 106 // TODO: handle 107 } 108 109 b = r->ws_frame; 110 111 if (b != NULL) { 112 used_size = nxt_buf_mem_used_size(&b->mem); --- 9 unchanged lines hidden (view full) --- 122 nxt_http_request_ws_frame_start(task, r, r->ws_frame); 123} 124 125 126static void 127nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data) 128{ 129 nxt_http_request_t *r; |
133 nxt_request_app_link_t *req_app_link; | |
134 nxt_request_rpc_data_t *req_rpc_data; 135 136 nxt_debug(task, "http websocket error handler"); 137 138 r = obj; | 130 nxt_request_rpc_data_t *req_rpc_data; 131 132 nxt_debug(task, "http websocket error handler"); 133 134 r = obj; |
135 req_rpc_data = r->req_rpc_data; |
|
139 | 136 |
140 if ((req_rpc_data = r->req_rpc_data) == NULL) { | 137 if (req_rpc_data == NULL) { |
141 nxt_debug(task, " req_rpc_data is NULL"); 142 goto close_handler; 143 } 144 | 138 nxt_debug(task, " req_rpc_data is NULL"); 139 goto close_handler; 140 } 141 |
145 if ((req_app_link = req_rpc_data->req_app_link) == NULL) { 146 nxt_debug(task, " req_app_link is NULL"); 147 goto close_handler; 148 } 149 150 if (req_app_link->app_port == NULL) { | 142 if (req_rpc_data->app_port == NULL) { |
151 nxt_debug(task, " app_port is NULL"); 152 goto close_handler; 153 } 154 | 143 nxt_debug(task, " app_port is NULL"); 144 goto close_handler; 145 } 146 |
155 (void) nxt_port_socket_twrite(task, req_app_link->app_port, | 147 (void) nxt_port_socket_twrite(task, req_rpc_data->app_port, |
156 NXT_PORT_MSG_WEBSOCKET_LAST, | 148 NXT_PORT_MSG_WEBSOCKET_LAST, |
157 -1, req_app_link->stream, 158 req_app_link->reply_port->id, NULL, NULL); | 149 -1, req_rpc_data->stream, 150 task->thread->engine->port->id, NULL, NULL); |
159 160close_handler: 161 162 nxt_http_request_close_handler(task, obj, data); 163} | 151 152close_handler: 153 154 nxt_http_request_close_handler(task, obj, data); 155} |