nxt_port_socket.c (1002:ba4c745528cf) nxt_port_socket.c (1004:306ceaf8927d)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 181 unchanged lines hidden (view full) ---

190 msg->work.obj = msg;
191 msg->work.data = task->thread->engine;
192
193 return msg;
194}
195
196
197static nxt_port_send_msg_t *
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 181 unchanged lines hidden (view full) ---

190 msg->work.obj = msg;
191 msg->work.data = task->thread->engine;
192
193 return msg;
194}
195
196
197static nxt_port_send_msg_t *
198nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
198nxt_port_msg_insert_head(nxt_task_t *task, nxt_port_t *port,
199 nxt_port_send_msg_t *msg)
199{
200 if (msg->work.data == NULL) {
201 msg = nxt_port_msg_create(task, msg);
202 }
203
204 if (msg != NULL) {
200{
201 if (msg->work.data == NULL) {
202 msg = nxt_port_msg_create(task, msg);
203 }
204
205 if (msg != NULL) {
206 nxt_queue_insert_head(&port->messages, &msg->link);
207 }
208
209 return msg;
210}
211
212
213static nxt_port_send_msg_t *
214nxt_port_msg_insert_tail(nxt_task_t *task, nxt_port_t *port,
215 nxt_port_send_msg_t *msg)
216{
217 if (msg->work.data == NULL) {
218 msg = nxt_port_msg_create(task, msg);
219 }
220
221 if (msg != NULL) {
205 nxt_queue_insert_tail(&port->messages, &msg->link);
206 }
207
208 return msg;
209}
210
211
212static nxt_port_send_msg_t *

--- 42 unchanged lines hidden (view full) ---

255
256 msg.work.data = NULL;
257
258 if (port->socket.write_ready) {
259 nxt_port_write_handler(task, &port->socket, &msg);
260 } else {
261 nxt_thread_mutex_lock(&port->write_mutex);
262
222 nxt_queue_insert_tail(&port->messages, &msg->link);
223 }
224
225 return msg;
226}
227
228
229static nxt_port_send_msg_t *

--- 42 unchanged lines hidden (view full) ---

272
273 msg.work.data = NULL;
274
275 if (port->socket.write_ready) {
276 nxt_port_write_handler(task, &port->socket, &msg);
277 } else {
278 nxt_thread_mutex_lock(&port->write_mutex);
279
263 res = nxt_port_msg_push(task, port, &msg);
280 res = nxt_port_msg_insert_tail(task, port, &msg);
264
265 nxt_thread_mutex_unlock(&port->write_mutex);
266
267 if (res == NULL) {
268 return NXT_ERROR;
269 }
270
271 nxt_port_use(task, port, 1);

--- 132 unchanged lines hidden (view full) ---

404 msg->share = 0;
405
406 if (msg->link.next != NULL) {
407 nxt_queue_remove(&msg->link);
408 use_delta--;
409 }
410 data = NULL;
411
281
282 nxt_thread_mutex_unlock(&port->write_mutex);
283
284 if (res == NULL) {
285 return NXT_ERROR;
286 }
287
288 nxt_port_use(task, port, 1);

--- 132 unchanged lines hidden (view full) ---

421 msg->share = 0;
422
423 if (msg->link.next != NULL) {
424 nxt_queue_remove(&msg->link);
425 use_delta--;
426 }
427 data = NULL;
428
412 if (nxt_port_msg_push(task, port, msg) != NULL) {
429 if (nxt_port_msg_insert_tail(task, port, msg) != NULL) {
413 use_delta++;
414 }
415 }
416
417 } else {
418 if (msg->link.next != NULL) {
419 nxt_queue_remove(&msg->link);
420 use_delta--;
421 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
422 msg->work.data);
423 }
424 data = NULL;
425 }
426
430 use_delta++;
431 }
432 }
433
434 } else {
435 if (msg->link.next != NULL) {
436 nxt_queue_remove(&msg->link);
437 use_delta--;
438 nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
439 msg->work.data);
440 }
441 data = NULL;
442 }
443
427 } else if (nxt_slow_path(n == NXT_ERROR)) {
444 } else {
428 if (msg->link.next == NULL) {
445 if (msg->link.next == NULL) {
429 if (nxt_port_msg_push(task, port, msg) != NULL) {
446 if (nxt_port_msg_insert_head(task, port, msg) != NULL) {
430 use_delta++;
431 }
432 }
447 use_delta++;
448 }
449 }
433 goto fail;
450
451 if (nxt_slow_path(n == NXT_ERROR)) {
452 goto fail;
453 }
434 }
435
454 }
455
436 /* n == NXT_AGAIN */
437
438 } while (port->socket.write_ready);
439
440 if (nxt_fd_event_is_disabled(port->socket.write)) {
441 enable_write = 1;
442 }
443
444 goto unlock_mutex;
445

--- 515 unchanged lines hidden ---
456 } while (port->socket.write_ready);
457
458 if (nxt_fd_event_is_disabled(port->socket.write)) {
459 enable_write = 1;
460 }
461
462 goto unlock_mutex;
463

--- 515 unchanged lines hidden ---