mirror of
https://github.com/nodemailer/wildduck.git
synced 2024-12-26 18:01:01 +08:00
190 lines
5.1 KiB
JavaScript
190 lines
5.1 KiB
JavaScript
|
'use strict';
|
||
|
|
||
|
const { Transform } = require('stream');
|
||
|
|
||
|
const msgpack = require('msgpack5')();
|
||
|
|
||
|
const MAGIC = Buffer.from([0x09, 0x06, 0x82]);
|
||
|
|
||
|
const TYPE_HEADER = 0x01;
|
||
|
const TYPE_CONTENT = 0x02;
|
||
|
|
||
|
const STATE_MAGIC = 0x03;
|
||
|
const STATE_LEN = 0x04;
|
||
|
const STATE_VAL = 0x05;
|
||
|
|
||
|
class ExportStream extends Transform {
|
||
|
constructor(meta) {
|
||
|
super({
|
||
|
readableObjectMode: false,
|
||
|
writableObjectMode: true
|
||
|
});
|
||
|
|
||
|
this.headerSent = false;
|
||
|
this.meta = meta || {};
|
||
|
}
|
||
|
|
||
|
sendHeader() {
|
||
|
if (this.headerSent) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
this.headerSent = true;
|
||
|
|
||
|
// magic
|
||
|
this.push(MAGIC);
|
||
|
|
||
|
this.writeRecord(
|
||
|
TYPE_HEADER,
|
||
|
Object.assign(this.meta, {
|
||
|
created: new Date()
|
||
|
})
|
||
|
);
|
||
|
}
|
||
|
|
||
|
writeRecord(recordType, recordData) {
|
||
|
let recordBuf = Buffer.allocUnsafe(1);
|
||
|
recordBuf.writeUInt8(recordType, 0);
|
||
|
|
||
|
let content = msgpack.encode([recordType, recordData]);
|
||
|
let contentLen = Buffer.allocUnsafe(4);
|
||
|
contentLen.writeUInt32LE(content.length, 0);
|
||
|
|
||
|
this.push(Buffer.concat([contentLen, content]));
|
||
|
}
|
||
|
|
||
|
_transform(data, encoding, done) {
|
||
|
this.sendHeader();
|
||
|
|
||
|
try {
|
||
|
this.writeRecord(TYPE_CONTENT, data);
|
||
|
} catch (err) {
|
||
|
return done(err);
|
||
|
}
|
||
|
|
||
|
done();
|
||
|
}
|
||
|
|
||
|
_flush(done) {
|
||
|
done();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
class ImportStream extends Transform {
|
||
|
constructor() {
|
||
|
super({
|
||
|
readableObjectMode: true,
|
||
|
writableObjectMode: false
|
||
|
});
|
||
|
|
||
|
this.buffer = [];
|
||
|
|
||
|
this.expectedRecordLength = 0;
|
||
|
this.expectedRecordLength = false;
|
||
|
|
||
|
this.curReadState = STATE_MAGIC;
|
||
|
}
|
||
|
|
||
|
readMagick(data) {
|
||
|
if (this.curReadState !== STATE_MAGIC) {
|
||
|
return data;
|
||
|
}
|
||
|
|
||
|
let removeSuffix = 0;
|
||
|
for (let i = 0; i < data.length; i++) {
|
||
|
this.buffer.push(data[i]);
|
||
|
removeSuffix++;
|
||
|
if (this.buffer.length === 3) {
|
||
|
if (Buffer.compare(Buffer.from(this.buffer), MAGIC) === 0) {
|
||
|
// seems like a correct file
|
||
|
this.buffer = []; // reset buffer
|
||
|
this.curReadState = STATE_LEN;
|
||
|
break;
|
||
|
} else {
|
||
|
let error = new Error('Invalid content sequence');
|
||
|
error.code = 'INVALID_SEQUENCE';
|
||
|
throw error;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if (removeSuffix) {
|
||
|
return data.slice(removeSuffix);
|
||
|
}
|
||
|
|
||
|
return data;
|
||
|
}
|
||
|
|
||
|
async readRecords(data) {
|
||
|
let pos = 0;
|
||
|
while (pos < data.length) {
|
||
|
switch (this.curReadState) {
|
||
|
case STATE_LEN: {
|
||
|
let c = data[pos++];
|
||
|
if (this.buffer.length < 4) {
|
||
|
this.buffer.push(c);
|
||
|
}
|
||
|
if (this.buffer.length === 4) {
|
||
|
this.expectedRecordLength = Buffer.from(this.buffer).readUInt32LE(0);
|
||
|
this.buffer = false;
|
||
|
this.curReadState = STATE_VAL;
|
||
|
}
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
case STATE_VAL: {
|
||
|
let buffered = this.buffer ? this.buffer.length : 0;
|
||
|
if (pos + (this.expectedRecordLength - buffered) <= data.length) {
|
||
|
// entire chunk available
|
||
|
let slice = data.subarray(pos, pos + (this.expectedRecordLength - buffered));
|
||
|
pos += slice.length;
|
||
|
|
||
|
let value = this.buffer ? Buffer.concat([this.buffer, slice]) : slice;
|
||
|
|
||
|
let [recordType, recordData] = msgpack.decode(value);
|
||
|
switch (recordType) {
|
||
|
case TYPE_HEADER:
|
||
|
this.emit('header', recordData);
|
||
|
break;
|
||
|
|
||
|
case TYPE_CONTENT:
|
||
|
if (recordData) {
|
||
|
this.push(recordData);
|
||
|
}
|
||
|
break;
|
||
|
}
|
||
|
|
||
|
this.buffer = [];
|
||
|
this.curReadState = STATE_LEN;
|
||
|
} else if (pos < data.length) {
|
||
|
let slice = data.subarray(pos);
|
||
|
pos += slice.length;
|
||
|
this.buffer = this.buffer ? Buffer.concat([this.buffer, slice]) : slice;
|
||
|
}
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
_transform(data, encoding, done) {
|
||
|
try {
|
||
|
data = this.readMagick(data);
|
||
|
} catch (err) {
|
||
|
if (err) {
|
||
|
return done(err);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
this.readRecords(data)
|
||
|
.then(() => done())
|
||
|
.catch(err => done(err));
|
||
|
}
|
||
|
|
||
|
_flush(done) {
|
||
|
done();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
module.exports = { ExportStream, ImportStream };
|