1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <nxt_unit.h> 7 #include <nxt_unit_request.h> 8 #include <nxt_clang.h> 9 #include <pthread.h> 10 #include <string.h> 11 #include <stdlib.h> 12 13 14 #define CONTENT_TYPE "Content-Type" 15 #define TEXT_PLAIN "text/plain" 16 #define HELLO_WORLD "Hello world!\n" 17 18 #define NEW_LINE "\n" 19 20 #define REQUEST_DATA "Request data:\n" 21 #define METHOD " Method: " 22 #define PROTOCOL " Protocol: " 23 #define REMOTE_ADDR " Remote addr: " 24 #define LOCAL_ADDR " Local addr: " 25 #define TARGET " Target: " 26 #define PATH " Path: " 27 #define QUERY " Query: " 28 #define FIELDS " Fields:\n" 29 #define FIELD_PAD " " 30 #define FIELD_SEP ": " 31 #define BODY " Body:\n" 32 33 34 static int ready_handler(nxt_unit_ctx_t *ctx); 35 static void *worker(void *main_ctx); 36 static void greeting_app_request_handler(nxt_unit_request_info_t *req); 37 static inline char *copy(char *p, const void *src, uint32_t len); 38 39 40 static int thread_count; 41 static pthread_t *threads; 42 43 44 int 45 main(int argc, char **argv) 46 { 47 int i, err; 48 nxt_unit_ctx_t *ctx; 49 nxt_unit_init_t init; 50 51 if (argc == 3 && strcmp(argv[1], "-t") == 0) { 52 thread_count = atoi(argv[2]); 53 } 54 55 memset(&init, 0, sizeof(nxt_unit_init_t)); 56 57 init.callbacks.request_handler = greeting_app_request_handler; 58 init.callbacks.ready_handler = ready_handler; 59 60 ctx = nxt_unit_init(&init); 61 if (ctx == NULL) { 62 return 1; 63 } 64 65 err = nxt_unit_run(ctx); 66 67 nxt_unit_debug(ctx, "main worker finished with %d code", err); 68 69 if (thread_count > 1) { 70 for (i = 0; i < thread_count - 1; i++) { 71 err = pthread_join(threads[i], NULL); 72 73 if (nxt_fast_path(err == 0)) { 74 nxt_unit_debug(ctx, "join thread #%d", i); 75 76 } else { 77 nxt_unit_alert(ctx, "pthread_join(#%d) failed: %s (%d)", 78 i, strerror(err), err); 79 } 80 } 81 82 nxt_unit_free(ctx, threads); 83 } 84 85 nxt_unit_done(ctx); 86 87 nxt_unit_debug(NULL, "main worker done"); 88 89 return 0; 90 } 91 92 93 static int 94 ready_handler(nxt_unit_ctx_t *ctx) 95 { 96 int i, err; 97 98 nxt_unit_debug(ctx, "ready"); 99 100 if (!nxt_unit_is_main_ctx(ctx) || thread_count <= 1) { 101 return NXT_UNIT_OK; 102 } 103 104 threads = nxt_unit_malloc(ctx, sizeof(pthread_t) * (thread_count - 1)); 105 if (threads == NULL) { 106 return NXT_UNIT_ERROR; 107 } 108 109 for (i = 0; i < thread_count - 1; i++) { 110 err = pthread_create(&threads[i], NULL, worker, ctx); 111 if (err != 0) { 112 return NXT_UNIT_ERROR; 113 } 114 } 115 116 return NXT_UNIT_OK; 117 } 118 119 120 static void * 121 worker(void *main_ctx) 122 { 123 int rc; 124 nxt_unit_ctx_t *ctx; 125 126 ctx = nxt_unit_ctx_alloc(main_ctx, NULL); 127 if (ctx == NULL) { 128 return NULL; 129 } 130 131 nxt_unit_debug(ctx, "start worker"); 132 133 rc = nxt_unit_run(ctx); 134 135 nxt_unit_debug(ctx, "worker finished with %d code", rc); 136 137 nxt_unit_done(ctx); 138 139 return (void *) (intptr_t) rc; 140 } 141 142 143 static void 144 greeting_app_request_handler(nxt_unit_request_info_t *req) 145 { 146 int rc; 147 char *p; 148 ssize_t res; 149 uint32_t i; 150 nxt_unit_buf_t *buf; 151 nxt_unit_field_t *f; 152 nxt_unit_request_t *r; 153 154 rc = nxt_unit_response_init(req, 200 /* Status code. */, 155 1 /* Number of response headers. */, 156 nxt_length(CONTENT_TYPE) 157 + nxt_length(TEXT_PLAIN) 158 + nxt_length(HELLO_WORLD)); 159 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 160 goto fail; 161 } 162 163 rc = nxt_unit_response_add_field(req, 164 CONTENT_TYPE, nxt_length(CONTENT_TYPE), 165 TEXT_PLAIN, nxt_length(TEXT_PLAIN)); 166 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 167 goto fail; 168 } 169 170 rc = nxt_unit_response_add_content(req, HELLO_WORLD, 171 nxt_length(HELLO_WORLD)); 172 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 173 goto fail; 174 } 175 176 rc = nxt_unit_response_send(req); 177 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 178 goto fail; 179 } 180 181 r = req->request; 182 183 buf = nxt_unit_response_buf_alloc(req, (req->request_buf->end 184 - req->request_buf->start) 185 + nxt_length(REQUEST_DATA) 186 + nxt_length(METHOD) 187 + nxt_length(NEW_LINE) 188 + nxt_length(PROTOCOL) 189 + nxt_length(NEW_LINE) 190 + nxt_length(REMOTE_ADDR) 191 + nxt_length(NEW_LINE) 192 + nxt_length(LOCAL_ADDR) 193 + nxt_length(NEW_LINE) 194 + nxt_length(TARGET) 195 + nxt_length(NEW_LINE) 196 + nxt_length(PATH) 197 + nxt_length(NEW_LINE) 198 + nxt_length(QUERY) 199 + nxt_length(NEW_LINE) 200 + nxt_length(FIELDS) 201 + r->fields_count * ( 202 nxt_length(FIELD_PAD) 203 + nxt_length(FIELD_SEP)) 204 + nxt_length(BODY)); 205 if (nxt_slow_path(buf == NULL)) { 206 rc = NXT_UNIT_ERROR; 207 208 goto fail; 209 } 210 211 p = buf->free; 212 213 p = copy(p, REQUEST_DATA, nxt_length(REQUEST_DATA)); 214 215 p = copy(p, METHOD, nxt_length(METHOD)); 216 p = copy(p, nxt_unit_sptr_get(&r->method), r->method_length); 217 *p++ = '\n'; 218 219 p = copy(p, PROTOCOL, nxt_length(PROTOCOL)); 220 p = copy(p, nxt_unit_sptr_get(&r->version), r->version_length); 221 *p++ = '\n'; 222 223 p = copy(p, REMOTE_ADDR, nxt_length(REMOTE_ADDR)); 224 p = copy(p, nxt_unit_sptr_get(&r->remote), r->remote_length); 225 *p++ = '\n'; 226 227 p = copy(p, LOCAL_ADDR, nxt_length(LOCAL_ADDR)); 228 p = copy(p, nxt_unit_sptr_get(&r->local), r->local_length); 229 *p++ = '\n'; 230 231 p = copy(p, TARGET, nxt_length(TARGET)); 232 p = copy(p, nxt_unit_sptr_get(&r->target), r->target_length); 233 *p++ = '\n'; 234 235 p = copy(p, PATH, nxt_length(PATH)); 236 p = copy(p, nxt_unit_sptr_get(&r->path), r->path_length); 237 *p++ = '\n'; 238 239 if (r->query.offset) { 240 p = copy(p, QUERY, nxt_length(QUERY)); 241 p = copy(p, nxt_unit_sptr_get(&r->query), r->query_length); 242 *p++ = '\n'; 243 } 244 245 p = copy(p, FIELDS, nxt_length(FIELDS)); 246 247 for (i = 0; i < r->fields_count; i++) { 248 f = r->fields + i; 249 250 p = copy(p, FIELD_PAD, nxt_length(FIELD_PAD)); 251 p = copy(p, nxt_unit_sptr_get(&f->name), f->name_length); 252 p = copy(p, FIELD_SEP, nxt_length(FIELD_SEP)); 253 p = copy(p, nxt_unit_sptr_get(&f->value), f->value_length); 254 *p++ = '\n'; 255 } 256 257 if (r->content_length > 0) { 258 p = copy(p, BODY, nxt_length(BODY)); 259 260 res = nxt_unit_request_read(req, buf->free, buf->end - buf->free); 261 buf->free += res; 262 263 } 264 265 buf->free = p; 266 267 rc = nxt_unit_buf_send(buf); 268 269 fail: 270 271 nxt_unit_request_done(req, rc); 272 } 273 274 275 static inline char * 276 copy(char *p, const void *src, uint32_t len) 277 { 278 memcpy(p, src, len); 279 280 return p + len; 281 } 282