1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include <nxt_unit.h>
7 #include <nxt_unit_request.h>
8 #include <nxt_clang.h>
9 #include <pthread.h>
10 #include <string.h>
11 #include <stdlib.h>
12
13
14 #define CONTENT_TYPE "Content-Type"
15 #define TEXT_PLAIN "text/plain"
16 #define HELLO_WORLD "Hello world!\n"
17
18 #define NEW_LINE "\n"
19
20 #define REQUEST_DATA "Request data:\n"
21 #define METHOD " Method: "
22 #define PROTOCOL " Protocol: "
23 #define REMOTE_ADDR " Remote addr: "
24 #define LOCAL_ADDR " Local addr: "
25 #define TARGET " Target: "
26 #define PATH " Path: "
27 #define QUERY " Query: "
28 #define FIELDS " Fields:\n"
29 #define FIELD_PAD " "
30 #define FIELD_SEP ": "
31 #define BODY " Body:\n"
32
33
34 static int ready_handler(nxt_unit_ctx_t *ctx);
35 static void *worker(void *main_ctx);
36 static void greeting_app_request_handler(nxt_unit_request_info_t *req);
37 static inline char *copy(char *p, const void *src, uint32_t len);
38
39
40 static int thread_count;
41 static pthread_t *threads;
42
43
44 int
main(int argc,char ** argv)45 main(int argc, char **argv)
46 {
47 int i, err;
48 nxt_unit_ctx_t *ctx;
49 nxt_unit_init_t init;
50
51 if (argc == 3 && strcmp(argv[1], "-t") == 0) {
52 thread_count = atoi(argv[2]);
53 }
54
55 memset(&init, 0, sizeof(nxt_unit_init_t));
56
57 init.callbacks.request_handler = greeting_app_request_handler;
58 init.callbacks.ready_handler = ready_handler;
59
60 ctx = nxt_unit_init(&init);
61 if (ctx == NULL) {
62 return 1;
63 }
64
65 err = nxt_unit_run(ctx);
66
67 nxt_unit_debug(ctx, "main worker finished with %d code", err);
68
69 if (thread_count > 1) {
70 for (i = 0; i < thread_count - 1; i++) {
71 err = pthread_join(threads[i], NULL);
72
73 if (nxt_fast_path(err == 0)) {
74 nxt_unit_debug(ctx, "join thread #%d", i);
75
76 } else {
77 nxt_unit_alert(ctx, "pthread_join(#%d) failed: %s (%d)",
78 i, strerror(err), err);
79 }
80 }
81
82 nxt_unit_free(ctx, threads);
83 }
84
85 nxt_unit_done(ctx);
86
87 nxt_unit_debug(NULL, "main worker done");
88
89 return 0;
90 }
91
92
93 static int
ready_handler(nxt_unit_ctx_t * ctx)94 ready_handler(nxt_unit_ctx_t *ctx)
95 {
96 int i, err;
97
98 nxt_unit_debug(ctx, "ready");
99
100 if (thread_count <= 1) {
101 return NXT_UNIT_OK;
102 }
103
104 threads = nxt_unit_malloc(ctx, sizeof(pthread_t) * (thread_count - 1));
105 if (threads == NULL) {
106 return NXT_UNIT_ERROR;
107 }
108
109 for (i = 0; i < thread_count - 1; i++) {
110 err = pthread_create(&threads[i], NULL, worker, ctx);
111 if (err != 0) {
112 return NXT_UNIT_ERROR;
113 }
114 }
115
116 return NXT_UNIT_OK;
117 }
118
119
120 static void *
worker(void * main_ctx)121 worker(void *main_ctx)
122 {
123 int rc;
124 nxt_unit_ctx_t *ctx;
125
126 ctx = nxt_unit_ctx_alloc(main_ctx, NULL);
127 if (ctx == NULL) {
128 return NULL;
129 }
130
131 nxt_unit_debug(ctx, "start worker");
132
133 rc = nxt_unit_run(ctx);
134
135 nxt_unit_debug(ctx, "worker finished with %d code", rc);
136
137 nxt_unit_done(ctx);
138
139 return (void *) (intptr_t) rc;
140 }
141
142
143 static void
greeting_app_request_handler(nxt_unit_request_info_t * req)144 greeting_app_request_handler(nxt_unit_request_info_t *req)
145 {
146 int rc;
147 char *p;
148 ssize_t res;
149 uint32_t i;
150 nxt_unit_buf_t *buf;
151 nxt_unit_field_t *f;
152 nxt_unit_request_t *r;
153
154 rc = nxt_unit_response_init(req, 200 /* Status code. */,
155 1 /* Number of response headers. */,
156 nxt_length(CONTENT_TYPE)
157 + nxt_length(TEXT_PLAIN)
158 + nxt_length(HELLO_WORLD));
159 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
160 goto fail;
161 }
162
163 rc = nxt_unit_response_add_field(req,
164 CONTENT_TYPE, nxt_length(CONTENT_TYPE),
165 TEXT_PLAIN, nxt_length(TEXT_PLAIN));
166 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
167 goto fail;
168 }
169
170 rc = nxt_unit_response_add_content(req, HELLO_WORLD,
171 nxt_length(HELLO_WORLD));
172 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
173 goto fail;
174 }
175
176 rc = nxt_unit_response_send(req);
177 if (nxt_slow_path(rc != NXT_UNIT_OK)) {
178 goto fail;
179 }
180
181 r = req->request;
182
183 buf = nxt_unit_response_buf_alloc(req, (req->request_buf->end
184 - req->request_buf->start)
185 + nxt_length(REQUEST_DATA)
186 + nxt_length(METHOD)
187 + nxt_length(NEW_LINE)
188 + nxt_length(PROTOCOL)
189 + nxt_length(NEW_LINE)
190 + nxt_length(REMOTE_ADDR)
191 + nxt_length(NEW_LINE)
192 + nxt_length(LOCAL_ADDR)
193 + nxt_length(NEW_LINE)
194 + nxt_length(TARGET)
195 + nxt_length(NEW_LINE)
196 + nxt_length(PATH)
197 + nxt_length(NEW_LINE)
198 + nxt_length(QUERY)
199 + nxt_length(NEW_LINE)
200 + nxt_length(FIELDS)
201 + r->fields_count * (
202 nxt_length(FIELD_PAD)
203 + nxt_length(FIELD_SEP))
204 + nxt_length(BODY));
205 if (nxt_slow_path(buf == NULL)) {
206 rc = NXT_UNIT_ERROR;
207
208 goto fail;
209 }
210
211 p = buf->free;
212
213 p = copy(p, REQUEST_DATA, nxt_length(REQUEST_DATA));
214
215 p = copy(p, METHOD, nxt_length(METHOD));
216 p = copy(p, nxt_unit_sptr_get(&r->method), r->method_length);
217 *p++ = '\n';
218
219 p = copy(p, PROTOCOL, nxt_length(PROTOCOL));
220 p = copy(p, nxt_unit_sptr_get(&r->version), r->version_length);
221 *p++ = '\n';
222
223 p = copy(p, REMOTE_ADDR, nxt_length(REMOTE_ADDR));
224 p = copy(p, nxt_unit_sptr_get(&r->remote), r->remote_length);
225 *p++ = '\n';
226
227 p = copy(p, LOCAL_ADDR, nxt_length(LOCAL_ADDR));
228 p = copy(p, nxt_unit_sptr_get(&r->local_addr), r->local_addr_length);
229 *p++ = '\n';
230
231 p = copy(p, TARGET, nxt_length(TARGET));
232 p = copy(p, nxt_unit_sptr_get(&r->target), r->target_length);
233 *p++ = '\n';
234
235 p = copy(p, PATH, nxt_length(PATH));
236 p = copy(p, nxt_unit_sptr_get(&r->path), r->path_length);
237 *p++ = '\n';
238
239 if (r->query.offset) {
240 p = copy(p, QUERY, nxt_length(QUERY));
241 p = copy(p, nxt_unit_sptr_get(&r->query), r->query_length);
242 *p++ = '\n';
243 }
244
245 p = copy(p, FIELDS, nxt_length(FIELDS));
246
247 for (i = 0; i < r->fields_count; i++) {
248 f = r->fields + i;
249
250 p = copy(p, FIELD_PAD, nxt_length(FIELD_PAD));
251 p = copy(p, nxt_unit_sptr_get(&f->name), f->name_length);
252 p = copy(p, FIELD_SEP, nxt_length(FIELD_SEP));
253 p = copy(p, nxt_unit_sptr_get(&f->value), f->value_length);
254 *p++ = '\n';
255 }
256
257 if (r->content_length > 0) {
258 p = copy(p, BODY, nxt_length(BODY));
259
260 res = nxt_unit_request_read(req, p, buf->end - p);
261 p += res;
262
263 }
264
265 buf->free = p;
266
267 rc = nxt_unit_buf_send(buf);
268
269 fail:
270
271 nxt_unit_request_done(req, rc);
272 }
273
274
275 static inline char *
copy(char * p,const void * src,uint32_t len)276 copy(char *p, const void *src, uint32_t len)
277 {
278 memcpy(p, src, len);
279
280 return p + len;
281 }
282