nxt_router.c (229:dbceed548653) nxt_router.c (240:36bafba970b5)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 178 unchanged lines hidden (view full) ---

187
188 return NXT_OK;
189}
190
191
192static nxt_start_worker_t *
193nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
194{
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 178 unchanged lines hidden (view full) ---

187
188 return NXT_OK;
189}
190
191
192static nxt_start_worker_t *
193nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
194{
195 nxt_port_t *master_port;
195 nxt_port_t *main_port;
196 nxt_runtime_t *rt;
197 nxt_start_worker_t *sw;
198
199 sw = nxt_zalloc(sizeof(nxt_start_worker_t));
200
201 if (nxt_slow_path(sw == NULL)) {
202 return NULL;
203 }
204
205 sw->app = app;
206 sw->ra = ra;
207
208 nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw,
209 ra->req_id, &app->name, app);
210
211 rt = task->thread->runtime;
196 nxt_runtime_t *rt;
197 nxt_start_worker_t *sw;
198
199 sw = nxt_zalloc(sizeof(nxt_start_worker_t));
200
201 if (nxt_slow_path(sw == NULL)) {
202 return NULL;
203 }
204
205 sw->app = app;
206 sw->ra = ra;
207
208 nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw,
209 ra->req_id, &app->name, app);
210
211 rt = task->thread->runtime;
212 master_port = rt->port_by_type[NXT_PROCESS_MASTER];
212 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
213
214 sw->work.handler = nxt_router_send_sw_request;
213
214 sw->work.handler = nxt_router_send_sw_request;
215 sw->work.task = &master_port->engine->task;
215 sw->work.task = &main_port->engine->task;
216 sw->work.obj = sw;
217 sw->work.data = task->thread->engine;
218 sw->work.next = NULL;
219
216 sw->work.obj = sw;
217 sw->work.data = task->thread->engine;
218 sw->work.next = NULL;
219
220 if (task->thread->engine != master_port->engine) {
221 nxt_debug(task, "sw %p post send to master engine %p", sw,
222 master_port->engine);
220 if (task->thread->engine != main_port->engine) {
221 nxt_debug(task, "sw %p post send to main engine %p", sw,
222 main_port->engine);
223
223
224 nxt_event_engine_post(master_port->engine, &sw->work);
224 nxt_event_engine_post(main_port->engine, &sw->work);
225
226 } else {
227 nxt_router_send_sw_request(task, sw, sw->work.data);
228 }
229
230 return sw;
231}
232

--- 775 unchanged lines hidden (view full) ---

1008 if (b == NULL) {
1009 goto fail;
1010 }
1011
1012 b->mem.free = nxt_cpymem(b->mem.free, skcf->sockaddr,
1013 skcf->sockaddr->sockaddr_size);
1014
1015 rt = task->thread->runtime;
225
226 } else {
227 nxt_router_send_sw_request(task, sw, sw->work.data);
228 }
229
230 return sw;
231}
232

--- 775 unchanged lines hidden (view full) ---

1008 if (b == NULL) {
1009 goto fail;
1010 }
1011
1012 b->mem.free = nxt_cpymem(b->mem.free, skcf->sockaddr,
1013 skcf->sockaddr->sockaddr_size);
1014
1015 rt = task->thread->runtime;
1016 main_port = rt->port_by_type[NXT_PROCESS_MASTER];
1016 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1017 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1018
1019 stream = nxt_port_rpc_register_handler(task, router_port,
1020 nxt_router_listen_socket_ready,
1021 nxt_router_listen_socket_error,
1022 main_port->pid, rpc);
1023 if (stream == 0) {
1024 goto fail;

--- 1103 unchanged lines hidden (view full) ---

2128
2129static void
2130nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
2131{
2132 size_t size;
2133 uint32_t stream;
2134 nxt_buf_t *b;
2135 nxt_app_t *app;
1017 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1018
1019 stream = nxt_port_rpc_register_handler(task, router_port,
1020 nxt_router_listen_socket_ready,
1021 nxt_router_listen_socket_error,
1022 main_port->pid, rpc);
1023 if (stream == 0) {
1024 goto fail;

--- 1103 unchanged lines hidden (view full) ---

2128
2129static void
2130nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
2131{
2132 size_t size;
2133 uint32_t stream;
2134 nxt_buf_t *b;
2135 nxt_app_t *app;
2136 nxt_port_t *master_port, *router_port;
2136 nxt_port_t *main_port, *router_port;
2137 nxt_runtime_t *rt;
2138 nxt_start_worker_t *sw;
2139
2140 sw = obj;
2141 app = sw->app;
2142
2143 nxt_queue_insert_tail(&app->requests, &sw->ra->link);
2144

--- 8 unchanged lines hidden (view full) ---

2153 return;
2154 }
2155
2156 app->pending_workers++;
2157
2158 nxt_debug(task, "sw %p send", sw);
2159
2160 rt = task->thread->runtime;
2137 nxt_runtime_t *rt;
2138 nxt_start_worker_t *sw;
2139
2140 sw = obj;
2141 app = sw->app;
2142
2143 nxt_queue_insert_tail(&app->requests, &sw->ra->link);
2144

--- 8 unchanged lines hidden (view full) ---

2153 return;
2154 }
2155
2156 app->pending_workers++;
2157
2158 nxt_debug(task, "sw %p send", sw);
2159
2160 rt = task->thread->runtime;
2161 master_port = rt->port_by_type[NXT_PROCESS_MASTER];
2161 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2162 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2163
2164 size = app->name.length + 1 + app->conf.length;
2165
2162 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2163
2164 size = app->name.length + 1 + app->conf.length;
2165
2166 b = nxt_buf_mem_alloc(master_port->mem_pool, size, 0);
2166 b = nxt_buf_mem_alloc(main_port->mem_pool, size, 0);
2167
2168 nxt_buf_cpystr(b, &app->name);
2169 *b->mem.free++ = '\0';
2170 nxt_buf_cpystr(b, &app->conf);
2171
2172 stream = nxt_port_rpc_register_handler(task, router_port,
2173 nxt_router_sw_ready,
2174 nxt_router_sw_error,
2167
2168 nxt_buf_cpystr(b, &app->name);
2169 *b->mem.free++ = '\0';
2170 nxt_buf_cpystr(b, &app->conf);
2171
2172 stream = nxt_port_rpc_register_handler(task, router_port,
2173 nxt_router_sw_ready,
2174 nxt_router_sw_error,
2175 master_port->pid, sw);
2175 main_port->pid, sw);
2176
2176
2177 nxt_port_socket_write(task, master_port, NXT_PORT_MSG_START_WORKER, -1,
2177 nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
2178 stream, router_port->id, b);
2179}
2180
2181
2182static nxt_bool_t
2183nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
2184{
2185 nxt_queue_link_t *lnk;

--- 957 unchanged lines hidden ---
2178 stream, router_port->id, b);
2179}
2180
2181
2182static nxt_bool_t
2183nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
2184{
2185 nxt_queue_link_t *lnk;

--- 957 unchanged lines hidden ---