/* eslint-disable complexity,max-statements */
|
/**
|
* file.js: Transport for outputting to a local log file.
|
*
|
* (C) 2010 Charlie Robbins
|
* MIT LICENCE
|
*/
|
|
'use strict';
|
|
const fs = require('fs');
|
const path = require('path');
|
const asyncSeries = require('async/series');
|
const zlib = require('zlib');
|
const { MESSAGE } = require('triple-beam');
|
const { Stream, PassThrough } = require('readable-stream');
|
const TransportStream = require('winston-transport');
|
const debug = require('@dabh/diagnostics')('winston:file');
|
const os = require('os');
|
const tailFile = require('../tail-file');
|
|
/**
|
* Transport for outputting to a local log file.
|
* @type {File}
|
* @extends {TransportStream}
|
*/
|
module.exports = class File extends TransportStream {
|
/**
|
* Constructor function for the File transport object responsible for
|
* persisting log messages and metadata to one or more files.
|
* @param {Object} options - Options for this instance.
|
*/
|
constructor(options = {}) {
|
super(options);
|
|
// Expose the name of this Transport on the prototype.
|
this.name = options.name || 'file';
|
|
// Helper function which throws an `Error` in the event that any of the
|
// rest of the arguments is present in `options`.
|
function throwIf(target, ...args) {
|
args.slice(1).forEach(name => {
|
if (options[name]) {
|
throw new Error(`Cannot set ${name} and ${target} together`);
|
}
|
});
|
}
|
|
// Setup the base stream that always gets piped to to handle buffering.
|
this._stream = new PassThrough();
|
this._stream.setMaxListeners(30);
|
|
// Bind this context for listener methods.
|
this._onError = this._onError.bind(this);
|
|
if (options.filename || options.dirname) {
|
throwIf('filename or dirname', 'stream');
|
this._basename = this.filename = options.filename
|
? path.basename(options.filename)
|
: 'winston.log';
|
|
this.dirname = options.dirname || path.dirname(options.filename);
|
this.options = options.options || { flags: 'a' };
|
} else if (options.stream) {
|
// eslint-disable-next-line no-console
|
console.warn('options.stream will be removed in winston@4. Use winston.transports.Stream');
|
throwIf('stream', 'filename', 'maxsize');
|
this._dest = this._stream.pipe(this._setupStream(options.stream));
|
this.dirname = path.dirname(this._dest.path);
|
// We need to listen for drain events when write() returns false. This
|
// can make node mad at times.
|
} else {
|
throw new Error('Cannot log to file without filename or stream.');
|
}
|
|
this.maxsize = options.maxsize || null;
|
this.rotationFormat = options.rotationFormat || false;
|
this.zippedArchive = options.zippedArchive || false;
|
this.maxFiles = options.maxFiles || null;
|
this.eol = (typeof options.eol === 'string') ? options.eol : os.EOL;
|
this.tailable = options.tailable || false;
|
this.lazy = options.lazy || false;
|
|
// Internal state variables representing the number of files this instance
|
// has created and the current size (in bytes) of the current logfile.
|
this._size = 0;
|
this._pendingSize = 0;
|
this._created = 0;
|
this._drain = false;
|
this._opening = false;
|
this._ending = false;
|
this._fileExist = false;
|
|
if (this.dirname) this._createLogDirIfNotExist(this.dirname);
|
if (!this.lazy) this.open();
|
}
|
|
finishIfEnding() {
|
if (this._ending) {
|
if (this._opening) {
|
this.once('open', () => {
|
this._stream.once('finish', () => this.emit('finish'));
|
setImmediate(() => this._stream.end());
|
});
|
} else {
|
this._stream.once('finish', () => this.emit('finish'));
|
setImmediate(() => this._stream.end());
|
}
|
}
|
}
|
|
/**
|
* Core logging method exposed to Winston. Metadata is optional.
|
* @param {Object} info - TODO: add param description.
|
* @param {Function} callback - TODO: add param description.
|
* @returns {undefined}
|
*/
|
log(info, callback = () => { }) {
|
// Remark: (jcrugzz) What is necessary about this callback(null, true) now
|
// when thinking about 3.x? Should silent be handled in the base
|
// TransportStream _write method?
|
if (this.silent) {
|
callback();
|
return true;
|
}
|
|
|
// Output stream buffer is full and has asked us to wait for the drain event
|
if (this._drain) {
|
this._stream.once('drain', () => {
|
this._drain = false;
|
this.log(info, callback);
|
});
|
return;
|
}
|
if (this._rotate) {
|
this._stream.once('rotate', () => {
|
this._rotate = false;
|
this.log(info, callback);
|
});
|
return;
|
}
|
if (this.lazy) {
|
if (!this._fileExist) {
|
if (!this._opening) {
|
this.open();
|
}
|
this.once('open', () => {
|
this._fileExist = true;
|
this.log(info, callback);
|
return;
|
});
|
return;
|
}
|
if (this._needsNewFile(this._pendingSize)) {
|
this._dest.once('close', () => {
|
if (!this._opening) {
|
this.open();
|
}
|
this.once('open', () => {
|
this.log(info, callback);
|
return;
|
});
|
return;
|
});
|
return;
|
}
|
}
|
|
// Grab the raw string and append the expected EOL.
|
const output = `${info[MESSAGE]}${this.eol}`;
|
const bytes = Buffer.byteLength(output);
|
|
// After we have written to the PassThrough check to see if we need
|
// to rotate to the next file.
|
//
|
// Remark: This gets called too early and does not depict when data
|
// has been actually flushed to disk.
|
function logged() {
|
this._size += bytes;
|
this._pendingSize -= bytes;
|
|
debug('logged %s %s', this._size, output);
|
this.emit('logged', info);
|
|
// Do not attempt to rotate files while rotating
|
if (this._rotate) {
|
return;
|
}
|
|
// Do not attempt to rotate files while opening
|
if (this._opening) {
|
return;
|
}
|
|
// Check to see if we need to end the stream and create a new one.
|
if (!this._needsNewFile()) {
|
return;
|
}
|
if (this.lazy) {
|
this._endStream(() => {this.emit('fileclosed');});
|
return;
|
}
|
|
// End the current stream, ensure it flushes and create a new one.
|
// This could potentially be optimized to not run a stat call but its
|
// the safest way since we are supporting `maxFiles`.
|
this._rotate = true;
|
this._endStream(() => this._rotateFile());
|
}
|
|
// Keep track of the pending bytes being written while files are opening
|
// in order to properly rotate the PassThrough this._stream when the file
|
// eventually does open.
|
this._pendingSize += bytes;
|
if (this._opening
|
&& !this.rotatedWhileOpening
|
&& this._needsNewFile(this._size + this._pendingSize)) {
|
this.rotatedWhileOpening = true;
|
}
|
|
const written = this._stream.write(output, logged.bind(this));
|
if (!written) {
|
this._drain = true;
|
this._stream.once('drain', () => {
|
this._drain = false;
|
callback();
|
});
|
} else {
|
callback(); // eslint-disable-line callback-return
|
}
|
|
debug('written', written, this._drain);
|
|
this.finishIfEnding();
|
|
return written;
|
}
|
|
/**
|
* Query the transport. Options object is optional.
|
* @param {Object} options - Loggly-like query options for this instance.
|
* @param {function} callback - Continuation to respond to when complete.
|
* TODO: Refactor me.
|
*/
|
query(options, callback) {
|
if (typeof options === 'function') {
|
callback = options;
|
options = {};
|
}
|
|
options = normalizeQuery(options);
|
const file = path.join(this.dirname, this.filename);
|
let buff = '';
|
let results = [];
|
let row = 0;
|
|
const stream = fs.createReadStream(file, {
|
encoding: 'utf8'
|
});
|
|
stream.on('error', err => {
|
if (stream.readable) {
|
stream.destroy();
|
}
|
if (!callback) {
|
return;
|
}
|
|
return err.code !== 'ENOENT' ? callback(err) : callback(null, results);
|
});
|
|
stream.on('data', data => {
|
data = (buff + data).split(/\n+/);
|
const l = data.length - 1;
|
let i = 0;
|
|
for (; i < l; i++) {
|
if (!options.start || row >= options.start) {
|
add(data[i]);
|
}
|
row++;
|
}
|
|
buff = data[l];
|
});
|
|
stream.on('close', () => {
|
if (buff) {
|
add(buff, true);
|
}
|
if (options.order === 'desc') {
|
results = results.reverse();
|
}
|
|
// eslint-disable-next-line callback-return
|
if (callback) callback(null, results);
|
});
|
|
function add(buff, attempt) {
|
try {
|
const log = JSON.parse(buff);
|
if (check(log)) {
|
push(log);
|
}
|
} catch (e) {
|
if (!attempt) {
|
stream.emit('error', e);
|
}
|
}
|
}
|
|
function push(log) {
|
if (
|
options.rows &&
|
results.length >= options.rows &&
|
options.order !== 'desc'
|
) {
|
if (stream.readable) {
|
stream.destroy();
|
}
|
return;
|
}
|
|
if (options.fields) {
|
log = options.fields.reduce((obj, key) => {
|
obj[key] = log[key];
|
return obj;
|
}, {});
|
}
|
|
if (options.order === 'desc') {
|
if (results.length >= options.rows) {
|
results.shift();
|
}
|
}
|
results.push(log);
|
}
|
|
function check(log) {
|
if (!log) {
|
return;
|
}
|
|
if (typeof log !== 'object') {
|
return;
|
}
|
|
const time = new Date(log.timestamp);
|
if (
|
(options.from && time < options.from) ||
|
(options.until && time > options.until) ||
|
(options.level && options.level !== log.level)
|
) {
|
return;
|
}
|
|
return true;
|
}
|
|
function normalizeQuery(options) {
|
options = options || {};
|
|
// limit
|
options.rows = options.rows || options.limit || 10;
|
|
// starting row offset
|
options.start = options.start || 0;
|
|
// now
|
options.until = options.until || new Date();
|
if (typeof options.until !== 'object') {
|
options.until = new Date(options.until);
|
}
|
|
// now - 24
|
options.from = options.from || (options.until - (24 * 60 * 60 * 1000));
|
if (typeof options.from !== 'object') {
|
options.from = new Date(options.from);
|
}
|
|
// 'asc' or 'desc'
|
options.order = options.order || 'desc';
|
|
return options;
|
}
|
}
|
|
/**
|
* Returns a log stream for this transport. Options object is optional.
|
* @param {Object} options - Stream options for this instance.
|
* @returns {Stream} - TODO: add return description.
|
* TODO: Refactor me.
|
*/
|
stream(options = {}) {
|
const file = path.join(this.dirname, this.filename);
|
const stream = new Stream();
|
const tail = {
|
file,
|
start: options.start
|
};
|
|
stream.destroy = tailFile(tail, (err, line) => {
|
if (err) {
|
return stream.emit('error', err);
|
}
|
|
try {
|
stream.emit('data', line);
|
line = JSON.parse(line);
|
stream.emit('log', line);
|
} catch (e) {
|
stream.emit('error', e);
|
}
|
});
|
|
return stream;
|
}
|
|
/**
|
* Checks to see the filesize of.
|
* @returns {undefined}
|
*/
|
open() {
|
// If we do not have a filename then we were passed a stream and
|
// don't need to keep track of size.
|
if (!this.filename) return;
|
if (this._opening) return;
|
|
this._opening = true;
|
|
// Stat the target file to get the size and create the stream.
|
this.stat((err, size) => {
|
if (err) {
|
return this.emit('error', err);
|
}
|
debug('stat done: %s { size: %s }', this.filename, size);
|
this._size = size;
|
this._dest = this._createStream(this._stream);
|
this._opening = false;
|
this.once('open', () => {
|
if (!this._stream.emit('rotate')) {
|
this._rotate = false;
|
}
|
});
|
});
|
}
|
|
/**
|
* Stat the file and assess information in order to create the proper stream.
|
* @param {function} callback - TODO: add param description.
|
* @returns {undefined}
|
*/
|
stat(callback) {
|
const target = this._getFile();
|
const fullpath = path.join(this.dirname, target);
|
|
fs.stat(fullpath, (err, stat) => {
|
if (err && err.code === 'ENOENT') {
|
debug('ENOENT ok', fullpath);
|
// Update internally tracked filename with the new target name.
|
this.filename = target;
|
return callback(null, 0);
|
}
|
|
if (err) {
|
debug(`err ${err.code} ${fullpath}`);
|
return callback(err);
|
}
|
|
if (!stat || this._needsNewFile(stat.size)) {
|
// If `stats.size` is greater than the `maxsize` for this
|
// instance then try again.
|
return this._incFile(() => this.stat(callback));
|
}
|
|
// Once we have figured out what the filename is, set it
|
// and return the size.
|
this.filename = target;
|
callback(null, stat.size);
|
});
|
}
|
|
/**
|
* Closes the stream associated with this instance.
|
* @param {function} cb - TODO: add param description.
|
* @returns {undefined}
|
*/
|
close(cb) {
|
if (!this._stream) {
|
return;
|
}
|
|
this._stream.end(() => {
|
if (cb) {
|
cb(); // eslint-disable-line callback-return
|
}
|
this.emit('flush');
|
this.emit('closed');
|
});
|
}
|
|
/**
|
* TODO: add method description.
|
* @param {number} size - TODO: add param description.
|
* @returns {undefined}
|
*/
|
_needsNewFile(size) {
|
size = size || this._size;
|
return this.maxsize && size >= this.maxsize;
|
}
|
|
/**
|
* TODO: add method description.
|
* @param {Error} err - TODO: add param description.
|
* @returns {undefined}
|
*/
|
_onError(err) {
|
this.emit('error', err);
|
}
|
|
/**
|
* TODO: add method description.
|
* @param {Stream} stream - TODO: add param description.
|
* @returns {mixed} - TODO: add return description.
|
*/
|
_setupStream(stream) {
|
stream.on('error', this._onError);
|
|
return stream;
|
}
|
|
/**
|
* TODO: add method description.
|
* @param {Stream} stream - TODO: add param description.
|
* @returns {mixed} - TODO: add return description.
|
*/
|
_cleanupStream(stream) {
|
stream.removeListener('error', this._onError);
|
stream.destroy();
|
return stream;
|
}
|
|
/**
|
* TODO: add method description.
|
*/
|
_rotateFile() {
|
this._incFile(() => this.open());
|
}
|
|
/**
|
* Unpipe from the stream that has been marked as full and end it so it
|
* flushes to disk.
|
*
|
* @param {function} callback - Callback for when the current file has closed.
|
* @private
|
*/
|
_endStream(callback = () => { }) {
|
if (this._dest) {
|
this._stream.unpipe(this._dest);
|
this._dest.end(() => {
|
this._cleanupStream(this._dest);
|
callback();
|
});
|
} else {
|
callback(); // eslint-disable-line callback-return
|
}
|
}
|
|
/**
|
* Returns the WritableStream for the active file on this instance. If we
|
* should gzip the file then a zlib stream is returned.
|
*
|
* @param {ReadableStream} source –PassThrough to pipe to the file when open.
|
* @returns {WritableStream} Stream that writes to disk for the active file.
|
*/
|
_createStream(source) {
|
const fullpath = path.join(this.dirname, this.filename);
|
|
debug('create stream start', fullpath, this.options);
|
const dest = fs.createWriteStream(fullpath, this.options)
|
// TODO: What should we do with errors here?
|
.on('error', err => debug(err))
|
.on('close', () => debug('close', dest.path, dest.bytesWritten))
|
.on('open', () => {
|
debug('file open ok', fullpath);
|
this.emit('open', fullpath);
|
source.pipe(dest);
|
|
// If rotation occured during the open operation then we immediately
|
// start writing to a new PassThrough, begin opening the next file
|
// and cleanup the previous source and dest once the source has drained.
|
if (this.rotatedWhileOpening) {
|
this._stream = new PassThrough();
|
this._stream.setMaxListeners(30);
|
this._rotateFile();
|
this.rotatedWhileOpening = false;
|
this._cleanupStream(dest);
|
source.end();
|
}
|
});
|
|
debug('create stream ok', fullpath);
|
return dest;
|
}
|
|
/**
|
* TODO: add method description.
|
* @param {function} callback - TODO: add param description.
|
* @returns {undefined}
|
*/
|
_incFile(callback) {
|
debug('_incFile', this.filename);
|
const ext = path.extname(this._basename);
|
const basename = path.basename(this._basename, ext);
|
const tasks = [];
|
|
if (this.zippedArchive) {
|
tasks.push(
|
function (cb) {
|
const num = this._created > 0 && !this.tailable ? this._created : '';
|
this._compressFile(
|
path.join(this.dirname, `${basename}${num}${ext}`),
|
path.join(this.dirname, `${basename}${num}${ext}.gz`),
|
cb
|
);
|
}.bind(this)
|
);
|
}
|
|
tasks.push(
|
function (cb) {
|
if (!this.tailable) {
|
this._created += 1;
|
this._checkMaxFilesIncrementing(ext, basename, cb);
|
} else {
|
this._checkMaxFilesTailable(ext, basename, cb);
|
}
|
}.bind(this)
|
);
|
|
asyncSeries(tasks, callback);
|
}
|
|
/**
|
* Gets the next filename to use for this instance in the case that log
|
* filesizes are being capped.
|
* @returns {string} - TODO: add return description.
|
* @private
|
*/
|
_getFile() {
|
const ext = path.extname(this._basename);
|
const basename = path.basename(this._basename, ext);
|
const isRotation = this.rotationFormat
|
? this.rotationFormat()
|
: this._created;
|
|
// Caveat emptor (indexzero): rotationFormat() was broken by design When
|
// combined with max files because the set of files to unlink is never
|
// stored.
|
return !this.tailable && this._created
|
? `${basename}${isRotation}${ext}`
|
: `${basename}${ext}`;
|
}
|
|
/**
|
* Increment the number of files created or checked by this instance.
|
* @param {mixed} ext - TODO: add param description.
|
* @param {mixed} basename - TODO: add param description.
|
* @param {mixed} callback - TODO: add param description.
|
* @returns {undefined}
|
* @private
|
*/
|
_checkMaxFilesIncrementing(ext, basename, callback) {
|
// Check for maxFiles option and delete file.
|
if (!this.maxFiles || this._created < this.maxFiles) {
|
return setImmediate(callback);
|
}
|
|
const oldest = this._created - this.maxFiles;
|
const isOldest = oldest !== 0 ? oldest : '';
|
const isZipped = this.zippedArchive ? '.gz' : '';
|
const filePath = `${basename}${isOldest}${ext}${isZipped}`;
|
const target = path.join(this.dirname, filePath);
|
|
fs.unlink(target, callback);
|
}
|
|
/**
|
* Roll files forward based on integer, up to maxFiles. e.g. if base if
|
* file.log and it becomes oversized, roll to file1.log, and allow file.log
|
* to be re-used. If file is oversized again, roll file1.log to file2.log,
|
* roll file.log to file1.log, and so on.
|
* @param {mixed} ext - TODO: add param description.
|
* @param {mixed} basename - TODO: add param description.
|
* @param {mixed} callback - TODO: add param description.
|
* @returns {undefined}
|
* @private
|
*/
|
_checkMaxFilesTailable(ext, basename, callback) {
|
const tasks = [];
|
if (!this.maxFiles) {
|
return;
|
}
|
|
// const isZipped = this.zippedArchive ? '.gz' : '';
|
const isZipped = this.zippedArchive ? '.gz' : '';
|
for (let x = this.maxFiles - 1; x > 1; x--) {
|
tasks.push(function (i, cb) {
|
let fileName = `${basename}${(i - 1)}${ext}${isZipped}`;
|
const tmppath = path.join(this.dirname, fileName);
|
|
fs.exists(tmppath, exists => {
|
if (!exists) {
|
return cb(null);
|
}
|
|
fileName = `${basename}${i}${ext}${isZipped}`;
|
fs.rename(tmppath, path.join(this.dirname, fileName), cb);
|
});
|
}.bind(this, x));
|
}
|
|
asyncSeries(tasks, () => {
|
fs.rename(
|
path.join(this.dirname, `${basename}${ext}${isZipped}`),
|
path.join(this.dirname, `${basename}1${ext}${isZipped}`),
|
callback
|
);
|
});
|
}
|
|
/**
|
* Compresses src to dest with gzip and unlinks src
|
* @param {string} src - path to source file.
|
* @param {string} dest - path to zipped destination file.
|
* @param {Function} callback - callback called after file has been compressed.
|
* @returns {undefined}
|
* @private
|
*/
|
_compressFile(src, dest, callback) {
|
fs.access(src, fs.F_OK, (err) => {
|
if (err) {
|
return callback();
|
}
|
var gzip = zlib.createGzip();
|
var inp = fs.createReadStream(src);
|
var out = fs.createWriteStream(dest);
|
out.on('finish', () => {
|
fs.unlink(src, callback);
|
});
|
inp.pipe(gzip).pipe(out);
|
});
|
}
|
|
_createLogDirIfNotExist(dirPath) {
|
/* eslint-disable no-sync */
|
if (!fs.existsSync(dirPath)) {
|
fs.mkdirSync(dirPath, { recursive: true });
|
}
|
/* eslint-enable no-sync */
|
}
|
};
|