xref: /unit/src/nxt_http_websocket.c (revision 1131)
1*1131Smax.romanov@nginx.com 
2*1131Smax.romanov@nginx.com /*
3*1131Smax.romanov@nginx.com  * Copyright (C) NGINX, Inc.
4*1131Smax.romanov@nginx.com  */
5*1131Smax.romanov@nginx.com 
6*1131Smax.romanov@nginx.com #include <nxt_main.h>
7*1131Smax.romanov@nginx.com #include <nxt_router.h>
8*1131Smax.romanov@nginx.com #include <nxt_http.h>
9*1131Smax.romanov@nginx.com #include <nxt_router_request.h>
10*1131Smax.romanov@nginx.com #include <nxt_port_memory_int.h>
11*1131Smax.romanov@nginx.com #include <nxt_websocket.h>
12*1131Smax.romanov@nginx.com #include <nxt_websocket_header.h>
13*1131Smax.romanov@nginx.com 
14*1131Smax.romanov@nginx.com 
15*1131Smax.romanov@nginx.com static void nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data);
16*1131Smax.romanov@nginx.com static void nxt_http_websocket_error_handler(nxt_task_t *task, void *obj,
17*1131Smax.romanov@nginx.com     void *data);
18*1131Smax.romanov@nginx.com 
19*1131Smax.romanov@nginx.com 
20*1131Smax.romanov@nginx.com const nxt_http_request_state_t  nxt_http_websocket
21*1131Smax.romanov@nginx.com     nxt_aligned(64) =
22*1131Smax.romanov@nginx.com {
23*1131Smax.romanov@nginx.com     .ready_handler = nxt_http_websocket_client,
24*1131Smax.romanov@nginx.com     .error_handler = nxt_http_websocket_error_handler,
25*1131Smax.romanov@nginx.com };
26*1131Smax.romanov@nginx.com 
27*1131Smax.romanov@nginx.com 
28*1131Smax.romanov@nginx.com static void
29*1131Smax.romanov@nginx.com nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
30*1131Smax.romanov@nginx.com {
31*1131Smax.romanov@nginx.com     size_t                  frame_size, used_size, copy_size, buf_free_size;
32*1131Smax.romanov@nginx.com     size_t                  chunk_copy_size;
33*1131Smax.romanov@nginx.com     nxt_buf_t               *out, *buf, **out_tail, *b, *next;
34*1131Smax.romanov@nginx.com     nxt_int_t               res;
35*1131Smax.romanov@nginx.com     nxt_http_request_t      *r;
36*1131Smax.romanov@nginx.com     nxt_request_app_link_t  *req_app_link;
37*1131Smax.romanov@nginx.com     nxt_request_rpc_data_t  *req_rpc_data;
38*1131Smax.romanov@nginx.com     nxt_websocket_header_t  *wsh;
39*1131Smax.romanov@nginx.com 
40*1131Smax.romanov@nginx.com     r = obj;
41*1131Smax.romanov@nginx.com 
42*1131Smax.romanov@nginx.com     if (nxt_slow_path((req_rpc_data = r->req_rpc_data) == NULL
43*1131Smax.romanov@nginx.com          || (req_app_link = req_rpc_data->req_app_link) == NULL))
44*1131Smax.romanov@nginx.com     {
45*1131Smax.romanov@nginx.com         nxt_debug(task, "websocket client frame for destroyed request");
46*1131Smax.romanov@nginx.com 
47*1131Smax.romanov@nginx.com         return;
48*1131Smax.romanov@nginx.com     }
49*1131Smax.romanov@nginx.com 
50*1131Smax.romanov@nginx.com     nxt_debug(task, "http websocket client frame");
51*1131Smax.romanov@nginx.com 
52*1131Smax.romanov@nginx.com     wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
53*1131Smax.romanov@nginx.com 
54*1131Smax.romanov@nginx.com     frame_size = nxt_websocket_frame_header_size(wsh)
55*1131Smax.romanov@nginx.com                   + nxt_websocket_frame_payload_len(wsh);
56*1131Smax.romanov@nginx.com 
57*1131Smax.romanov@nginx.com     buf = NULL;
58*1131Smax.romanov@nginx.com     buf_free_size = 0;
59*1131Smax.romanov@nginx.com     out = NULL;
60*1131Smax.romanov@nginx.com     out_tail = &out;
61*1131Smax.romanov@nginx.com 
62*1131Smax.romanov@nginx.com     b = r->ws_frame;
63*1131Smax.romanov@nginx.com 
64*1131Smax.romanov@nginx.com     while (b != NULL && frame_size > 0) {
65*1131Smax.romanov@nginx.com         used_size = nxt_buf_mem_used_size(&b->mem);
66*1131Smax.romanov@nginx.com         copy_size = nxt_min(used_size, frame_size);
67*1131Smax.romanov@nginx.com 
68*1131Smax.romanov@nginx.com         while (copy_size > 0) {
69*1131Smax.romanov@nginx.com             if (buf == NULL || buf_free_size == 0) {
70*1131Smax.romanov@nginx.com                 buf_free_size = nxt_min(frame_size, PORT_MMAP_DATA_SIZE);
71*1131Smax.romanov@nginx.com 
72*1131Smax.romanov@nginx.com                 buf = nxt_port_mmap_get_buf(task, req_app_link->app_port,
73*1131Smax.romanov@nginx.com                                             buf_free_size);
74*1131Smax.romanov@nginx.com 
75*1131Smax.romanov@nginx.com                 *out_tail = buf;
76*1131Smax.romanov@nginx.com                 out_tail = &buf->next;
77*1131Smax.romanov@nginx.com             }
78*1131Smax.romanov@nginx.com 
79*1131Smax.romanov@nginx.com             chunk_copy_size = nxt_min(buf_free_size, copy_size);
80*1131Smax.romanov@nginx.com 
81*1131Smax.romanov@nginx.com             buf->mem.free = nxt_cpymem(buf->mem.free, b->mem.pos,
82*1131Smax.romanov@nginx.com                                        chunk_copy_size);
83*1131Smax.romanov@nginx.com 
84*1131Smax.romanov@nginx.com             copy_size -= chunk_copy_size;
85*1131Smax.romanov@nginx.com             b->mem.pos += chunk_copy_size;
86*1131Smax.romanov@nginx.com             buf_free_size -= chunk_copy_size;
87*1131Smax.romanov@nginx.com         }
88*1131Smax.romanov@nginx.com 
89*1131Smax.romanov@nginx.com         frame_size -= copy_size;
90*1131Smax.romanov@nginx.com         next = b->next;
91*1131Smax.romanov@nginx.com 
92*1131Smax.romanov@nginx.com         if (nxt_buf_mem_used_size(&b->mem) == 0) {
93*1131Smax.romanov@nginx.com             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
94*1131Smax.romanov@nginx.com                                b->completion_handler, task, b, b->parent);
95*1131Smax.romanov@nginx.com 
96*1131Smax.romanov@nginx.com             r->ws_frame = next;
97*1131Smax.romanov@nginx.com         }
98*1131Smax.romanov@nginx.com 
99*1131Smax.romanov@nginx.com         b = next;
100*1131Smax.romanov@nginx.com     }
101*1131Smax.romanov@nginx.com 
102*1131Smax.romanov@nginx.com     res = nxt_port_socket_twrite(task, req_app_link->app_port,
103*1131Smax.romanov@nginx.com                                  NXT_PORT_MSG_WEBSOCKET, -1,
104*1131Smax.romanov@nginx.com                                  req_app_link->stream,
105*1131Smax.romanov@nginx.com                                  req_app_link->reply_port->id, out, NULL);
106*1131Smax.romanov@nginx.com     if (nxt_slow_path(res != NXT_OK)) {
107*1131Smax.romanov@nginx.com         // TODO: handle
108*1131Smax.romanov@nginx.com     }
109*1131Smax.romanov@nginx.com 
110*1131Smax.romanov@nginx.com     b = r->ws_frame;
111*1131Smax.romanov@nginx.com 
112*1131Smax.romanov@nginx.com     if (b != NULL) {
113*1131Smax.romanov@nginx.com         used_size = nxt_buf_mem_used_size(&b->mem);
114*1131Smax.romanov@nginx.com 
115*1131Smax.romanov@nginx.com         if (used_size > 0) {
116*1131Smax.romanov@nginx.com             nxt_memmove(b->mem.start, b->mem.pos, used_size);
117*1131Smax.romanov@nginx.com 
118*1131Smax.romanov@nginx.com             b->mem.pos = b->mem.start;
119*1131Smax.romanov@nginx.com             b->mem.free = b->mem.start + used_size;
120*1131Smax.romanov@nginx.com         }
121*1131Smax.romanov@nginx.com     }
122*1131Smax.romanov@nginx.com 
123*1131Smax.romanov@nginx.com     nxt_http_request_ws_frame_start(task, r, r->ws_frame);
124*1131Smax.romanov@nginx.com }
125*1131Smax.romanov@nginx.com 
126*1131Smax.romanov@nginx.com 
127*1131Smax.romanov@nginx.com static void
128*1131Smax.romanov@nginx.com nxt_http_websocket_error_handler(nxt_task_t *task, void *obj, void *data)
129*1131Smax.romanov@nginx.com {
130*1131Smax.romanov@nginx.com     nxt_http_request_t      *r;
131*1131Smax.romanov@nginx.com     nxt_request_app_link_t  *req_app_link;
132*1131Smax.romanov@nginx.com     nxt_request_rpc_data_t  *req_rpc_data;
133*1131Smax.romanov@nginx.com 
134*1131Smax.romanov@nginx.com     nxt_debug(task, "http websocket error handler");
135*1131Smax.romanov@nginx.com 
136*1131Smax.romanov@nginx.com     r = obj;
137*1131Smax.romanov@nginx.com 
138*1131Smax.romanov@nginx.com     if ((req_rpc_data = r->req_rpc_data) == NULL) {
139*1131Smax.romanov@nginx.com         nxt_debug(task, "  req_rpc_data is NULL");
140*1131Smax.romanov@nginx.com         goto close_handler;
141*1131Smax.romanov@nginx.com     }
142*1131Smax.romanov@nginx.com 
143*1131Smax.romanov@nginx.com     if ((req_app_link = req_rpc_data->req_app_link) == NULL) {
144*1131Smax.romanov@nginx.com         nxt_debug(task, "  req_app_link is NULL");
145*1131Smax.romanov@nginx.com         goto close_handler;
146*1131Smax.romanov@nginx.com     }
147*1131Smax.romanov@nginx.com 
148*1131Smax.romanov@nginx.com     if (req_app_link->app_port == NULL) {
149*1131Smax.romanov@nginx.com         nxt_debug(task, "  app_port is NULL");
150*1131Smax.romanov@nginx.com         goto close_handler;
151*1131Smax.romanov@nginx.com     }
152*1131Smax.romanov@nginx.com 
153*1131Smax.romanov@nginx.com     (void) nxt_port_socket_twrite(task, req_app_link->app_port,
154*1131Smax.romanov@nginx.com                                   NXT_PORT_MSG_WEBSOCKET_LAST,
155*1131Smax.romanov@nginx.com                                   -1, req_app_link->stream,
156*1131Smax.romanov@nginx.com                                   req_app_link->reply_port->id, NULL, NULL);
157*1131Smax.romanov@nginx.com 
158*1131Smax.romanov@nginx.com close_handler:
159*1131Smax.romanov@nginx.com 
160*1131Smax.romanov@nginx.com     nxt_http_request_close_handler(task, obj, data);
161*1131Smax.romanov@nginx.com }
162