xref: /unit/src/nodejs/unit-http/http_server.js (revision 1865:3c551b9721df)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6'use strict';
7
8const EventEmitter = require('events');
9const http = require('http');
10const util = require('util');
11const unit_lib = require('./build/Release/unit-http');
12const Socket = require('./socket');
13const WebSocketFrame = require('./websocket_frame');
14const Readable = require('stream').Readable;
15
16
17function ServerResponse(req) {
18    EventEmitter.call(this);
19
20    this.headers = {};
21
22    this.server = req.server;
23    this._request = req;
24    req._response = this;
25    this.socket = req.socket;
26    this.connection = req.connection;
27    this.writable = true;
28}
29util.inherits(ServerResponse, EventEmitter);
30
31ServerResponse.prototype.statusCode = 200;
32ServerResponse.prototype.statusMessage = undefined;
33ServerResponse.prototype.headers_len = 0;
34ServerResponse.prototype.headers_count = 0;
35ServerResponse.prototype.headersSent = false;
36ServerResponse.prototype.finished = false;
37
38ServerResponse.prototype._finish = function _finish() {
39    this.headers = {};
40    this.headers_len = 0;
41    this.headers_count = 0;
42    this.finished = true;
43};
44
45ServerResponse.prototype.assignSocket = function assignSocket(socket) {
46};
47
48ServerResponse.prototype.detachSocket = function detachSocket(socket) {
49};
50
51ServerResponse.prototype.writeContinue = function writeContinue(cb) {
52};
53
54ServerResponse.prototype.writeProcessing = function writeProcessing(cb) {
55};
56
57ServerResponse.prototype.setHeader = function setHeader(name, value) {
58    if (typeof name !== 'string') {
59        throw new TypeError('Name argument must be a string');
60    }
61
62    let value_len = 0
63    let count = 0;
64
65    if (Array.isArray(value)) {
66        count = value.length;
67
68        value.forEach(function(val) {
69            value_len += Buffer.byteLength(val + "", 'latin1');
70        });
71
72    } else {
73        count = 1;
74        value_len = Buffer.byteLength(value + "", 'latin1');
75    }
76
77    let lc_name = name.toLowerCase();
78
79    if (lc_name in this.headers) {
80        this._removeHeader(lc_name);
81    }
82
83    let name_len = Buffer.byteLength(name, 'latin1');
84
85    this.headers[lc_name] = [name, value];
86    this.headers_len += value_len + (name_len * count);
87    this.headers_count += count;
88};
89
90ServerResponse.prototype.getHeader = function getHeader(name) {
91    const entry = this.headers[name.toLowerCase()];
92
93    return entry && entry[1];
94};
95
96ServerResponse.prototype.getHeaderNames = function getHeaderNames() {
97    return Object.keys(this.headers);
98};
99
100ServerResponse.prototype.getHeaders = function getHeaders() {
101    const ret = Object.create(null);
102
103    if (this.headers) {
104        const keys = Object.keys(this.headers);
105
106        for (var i = 0; i < keys.length; i++) {
107            const key = keys[i];
108
109            ret[key] = this.headers[key][1];
110        }
111    }
112
113    return ret;
114};
115
116ServerResponse.prototype.hasHeader = function hasHeader(name) {
117    return name.toLowerCase() in this.headers;
118};
119
120ServerResponse.prototype.removeHeader = function removeHeader(name) {
121    if (typeof name !== 'string') {
122        throw new TypeError('Name argument must be a string');
123    }
124
125    let lc_name = name.toLowerCase();
126
127    if (lc_name in this.headers) {
128        this._removeHeader(lc_name);
129    }
130};
131
132ServerResponse.prototype._removeHeader = function _removeHeader(lc_name) {
133    let entry = this.headers[lc_name];
134    let name_len = Buffer.byteLength(entry[0] + "", 'latin1');
135    let value = entry[1];
136
137    delete this.headers[lc_name];
138
139    if (Array.isArray(value)) {
140        this.headers_count -= value.length;
141        this.headers_len -= value.length * name_len;
142
143        value.forEach(function(val) {
144            this.headers_len -= Buffer.byteLength(val + "", 'latin1');
145        });
146
147        return;
148    }
149
150    this.headers_count--;
151    this.headers_len -= name_len + Buffer.byteLength(value + "", 'latin1');
152};
153
154ServerResponse.prototype.sendDate = function sendDate() {
155    throw new Error("Not supported");
156};
157
158ServerResponse.prototype.setTimeout = function setTimeout(msecs, callback) {
159    this.timeout = msecs;
160
161    if (callback) {
162        this.on('timeout', callback);
163    }
164
165    return this;
166};
167
168ServerResponse.prototype.writeHead = writeHead;
169ServerResponse.prototype.writeHeader = ServerResponse.prototype.writeHead;
170
171function writeHead(statusCode, reason, obj) {
172    var originalStatusCode = statusCode;
173
174    statusCode |= 0;
175
176    if (statusCode < 100 || statusCode > 999) {
177        throw new ERR_HTTP_INVALID_STATUS_CODE(originalStatusCode);
178    }
179
180    if (typeof reason === 'string') {
181        this.statusMessage = reason;
182
183    } else {
184        if (!this.statusMessage) {
185            this.statusMessage = http.STATUS_CODES[statusCode] || 'unknown';
186        }
187
188        obj = reason;
189    }
190
191    this.statusCode = statusCode;
192
193    if (obj) {
194        var k;
195        var keys = Object.keys(obj);
196
197        for (var i = 0; i < keys.length; i++) {
198            k = keys[i];
199
200            if (k) {
201                this.setHeader(k, obj[k]);
202            }
203        }
204    }
205
206    return this;
207};
208
209/*
210 * Some Node.js packages are known to be using this undocumented function,
211 * notably "compression" middleware.
212 */
213ServerResponse.prototype._implicitHeader = function _implicitHeader() {
214    this.writeHead(this.statusCode);
215};
216
217ServerResponse.prototype._send_headers = unit_lib.response_send_headers;
218
219ServerResponse.prototype._sendHeaders = function _sendHeaders() {
220    if (!this.headersSent) {
221        this._send_headers(this.statusCode, this.headers, this.headers_count,
222                           this.headers_len);
223
224        this.headersSent = true;
225    }
226};
227
228ServerResponse.prototype._write = unit_lib.response_write;
229
230ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
231    var contentLength = 0;
232    var res, o;
233
234    this._sendHeaders();
235
236    if (typeof chunk === 'function') {
237        callback = chunk;
238        chunk = null;
239
240    } else if (typeof encoding === 'function') {
241        callback = encoding;
242        encoding = null;
243    }
244
245    if (chunk) {
246        if (typeof chunk !== 'string' && !(chunk instanceof Buffer)) {
247            throw new TypeError('First argument must be a string or Buffer');
248        }
249
250        if (typeof chunk === 'string') {
251            contentLength = Buffer.byteLength(chunk, encoding);
252
253            if (contentLength > unit_lib.buf_min) {
254                chunk = Buffer.from(chunk, encoding);
255
256                contentLength = chunk.length;
257            }
258
259        } else {
260            contentLength = chunk.length;
261        }
262
263        if (this.server._output.length > 0 || !this.socket.writable) {
264            o = new BufferedOutput(this, 0, chunk, encoding, callback);
265            this.server._output.push(o);
266
267            return false;
268        }
269
270        res = this._write(chunk, 0, contentLength);
271        if (res < contentLength) {
272            this.socket.writable = false;
273            this.writable = false;
274
275            o = new BufferedOutput(this, res, chunk, encoding, callback);
276            this.server._output.push(o);
277
278            return false;
279        }
280    }
281
282    if (typeof callback === 'function') {
283        /*
284         * The callback must be called only when response.write() caller
285         * completes.  process.nextTick() postpones the callback execution.
286         *
287         * process.nextTick() is not technically part of the event loop.
288         * Instead, the nextTickQueue will be processed after the current
289         * operation completes, regardless of the current phase of
290         * the event loop.  All callbacks passed to process.nextTick()
291         * will be resolved before the event loop continues.
292         */
293        process.nextTick(callback);
294    }
295
296    return true;
297};
298
299ServerResponse.prototype.write = function write(chunk, encoding, callback) {
300    if (this.finished) {
301        if (typeof encoding === 'function') {
302            callback = encoding;
303            encoding = null;
304        }
305
306        var err = new Error("Write after end");
307        process.nextTick(() => {
308            this.emit('error', err);
309
310            if (typeof callback === 'function') {
311                callback(err);
312            }
313        })
314    }
315
316    return this._writeBody(chunk, encoding, callback);
317};
318
319ServerResponse.prototype._end = unit_lib.response_end;
320
321ServerResponse.prototype.end = function end(chunk, encoding, callback) {
322    if (!this.finished) {
323        if (typeof encoding === 'function') {
324            callback = encoding;
325            encoding = null;
326        }
327
328        this._writeBody(chunk, encoding, () => {
329            this._end();
330
331            if (typeof callback === 'function') {
332                callback();
333            }
334
335            this.emit("finish");
336        });
337
338        this.finished = true;
339    }
340
341    return this;
342};
343
344function ServerRequest(server, socket) {
345    Readable.call(this);
346
347    this.server = server;
348    this.socket = socket;
349    this.connection = socket;
350    this._pushed_eofchunk = false;
351}
352util.inherits(ServerRequest, Readable);
353
354ServerRequest.prototype.setTimeout = function setTimeout(msecs, callback) {
355    this.timeout = msecs;
356
357    if (callback) {
358        this.on('timeout', callback);
359    }
360
361    return this;
362};
363
364ServerRequest.prototype.statusCode = function statusCode() {
365    /* Only valid for response obtained from http.ClientRequest. */
366};
367
368ServerRequest.prototype.statusMessage = function statusMessage() {
369    /* Only valid for response obtained from http.ClientRequest. */
370};
371
372ServerRequest.prototype.trailers = function trailers() {
373    throw new Error("Not supported");
374};
375
376ServerRequest.prototype.METHODS = function METHODS() {
377    return http.METHODS;
378};
379
380ServerRequest.prototype.STATUS_CODES = function STATUS_CODES() {
381    return http.STATUS_CODES;
382};
383
384ServerRequest.prototype._request_read = unit_lib.request_read;
385
386ServerRequest.prototype._read = function _read(n) {
387    const b = this._request_read(n);
388
389    if (b != null) {
390        this.push(b);
391    }
392
393    if (!this._pushed_eofchunk && (b == null || b.length < n)) {
394        this._pushed_eofchunk = true;
395        this.push(null);
396    }
397};
398
399
400function Server(requestListener) {
401    EventEmitter.call(this);
402
403    this.unit = new unit_lib.Unit();
404    this.unit.server = this;
405
406    this.unit.createServer();
407
408    this.Socket = Socket;
409    this.ServerRequest = ServerRequest;
410    this.ServerResponse = ServerResponse;
411    this.WebSocketFrame = WebSocketFrame;
412
413    if (requestListener) {
414        this.on('request', requestListener);
415    }
416
417    this._upgradeListenerCount = 0;
418    this.on('newListener', function(ev) {
419        if (ev === 'upgrade'){
420            this._upgradeListenerCount++;
421        }
422      }).on('removeListener', function(ev) {
423        if (ev === 'upgrade') {
424            this._upgradeListenerCount--;
425        }
426    });
427
428    this._output = [];
429    this._drain_resp = new Set();
430}
431
432util.inherits(Server, EventEmitter);
433
434Server.prototype.setTimeout = function setTimeout(msecs, callback) {
435    this.timeout = msecs;
436
437    if (callback) {
438        this.on('timeout', callback);
439    }
440
441    return this;
442};
443
444Server.prototype.listen = function (...args) {
445    this.unit.listen();
446
447    if (typeof args[args.length - 1] === 'function') {
448        this.once('listening', args[args.length - 1]);
449    }
450
451    /*
452     * Some express.js apps use the returned server object inside the listening
453     * callback, so we timeout the listening event to occur after this function
454     * returns.
455     */
456    setImmediate(function() {
457        this.emit('listening')
458    }.bind(this))
459
460    return this;
461};
462
463Server.prototype.address = function () {
464    return  {
465        family: "IPv4",
466        address: "127.0.0.1",
467        port: 80
468    }
469}
470
471Server.prototype.emit_request = function (req, res) {
472    if (req._websocket_handshake && this._upgradeListenerCount > 0) {
473        this.emit('upgrade', req, req.socket);
474
475    } else {
476        this.emit("request", req, res);
477    }
478};
479
480Server.prototype.emit_close = function () {
481    this.emit('close');
482};
483
484Server.prototype.emit_drain = function () {
485    var res, o, l;
486
487    if (this._output.length <= 0) {
488        return;
489    }
490
491    while (this._output.length > 0) {
492        o = this._output[0];
493
494        if (typeof o.chunk === 'string') {
495            l = Buffer.byteLength(o.chunk, o.encoding);
496
497        } else {
498            l = o.chunk.length;
499        }
500
501        res = o.resp._write(o.chunk, o.offset, l);
502
503        o.offset += res;
504        if (o.offset < l) {
505            return;
506        }
507
508        this._drain_resp.add(o.resp);
509
510        if (typeof o.callback === 'function') {
511            process.nextTick(o.callback);
512        }
513
514        this._output.shift();
515    }
516
517    for (var resp of this._drain_resp) {
518
519        if (resp.socket.writable) {
520            continue;
521        }
522
523        resp.socket.writable = true;
524        resp.writable = true;
525
526        process.nextTick(() => {
527            resp.emit("drain");
528        });
529    }
530
531    this._drain_resp.clear();
532};
533
534function BufferedOutput(resp, offset, chunk, encoding, callback) {
535    this.resp = resp;
536    this.offset = offset;
537    this.chunk = chunk;
538    this.encoding = encoding;
539    this.callback = callback;
540}
541
542function connectionListener(socket) {
543}
544
545module.exports = {
546    Server,
547    ServerResponse,
548    ServerRequest,
549    _connectionListener: connectionListener
550};
551