started with export/import feature

This commit is contained in:
Andris Reinman 2022-09-20 15:35:10 +03:00
parent ef6fb48b3f
commit c1acc970d1
No known key found for this signature in database
GPG key ID: DC6C83F4D584D364
2 changed files with 244 additions and 0 deletions

View file

@ -16,6 +16,14 @@
"read:any": ["*"]
},
"export": {
"create:any": ["*"]
},
"import": {
"create:any": ["*"]
},
"userlisting": {
"read:any": ["*"]
},
@ -135,6 +143,14 @@
"read:any": ["*"]
},
"export": {
"create:any": ["*"]
},
"import": {
"read:any": ["*"]
},
"userlisting": {
"read:any": ["*"]
},

View file

@ -15,6 +15,94 @@ const pwnedpasswords = require('pwnedpasswords');
const { nextPageCursorSchema, previousPageCursorSchema, pageNrSchema, sessSchema, sessIPSchema, booleanSchema, metaDataSchema } = require('../schemas');
const TaskHandler = require('../task-handler');
const { publish, FORWARD_ADDED } = require('../events');
const { Transform } = require('stream');
const { createBrotliCompress } = require('zlib');
class ExportStream extends Transform {
constructor(meta) {
super({ readableObjectMode: false, writableObjectMode: true });
this.headerSent = false;
this.meta = meta || {};
this.TYPE_HEADER = 0x01;
this.TYPE_CONTENT = 0x02;
}
sendHeader() {
if (this.headerSent) {
return;
}
this.headerSent = true;
// magic
this.push(Buffer.from([0x09, 0x06, 0x82]));
this.writeRecord(this.TYPE_HEADER, {
entry: Buffer.from(
JSON.stringify(
Object.assign(this.meta, {
created: new Date().toISOString()
})
)
)
});
}
writeRecord(recordType, recordData) {
let recordBuf = Buffer.allocUnsafe(1);
recordBuf.writeUInt8(recordType, 0);
let content;
switch (recordType) {
case this.TYPE_HEADER:
{
content = recordData.entry;
}
break;
case this.TYPE_CONTENT:
{
let collBuf = Buffer.from(recordData.collection);
if (collBuf.length > 255) {
// ignore
return;
}
let collLen = Buffer.allocUnsafe(1);
collLen.writeUInt8(collBuf.length, 0);
let dataLen = Buffer.allocUnsafe(4);
dataLen.writeUInt32LE(recordData.entry.length, 0);
content = Buffer.concat([collLen, collBuf, dataLen, recordData.entry]);
}
break;
default:
throw new Error('Invalid input');
}
let contentLen = Buffer.allocUnsafe(4);
contentLen.writeUInt32LE(content.length, 0);
this.push(Buffer.concat([contentLen, content]));
}
_transform(data, encoding, done) {
this.sendHeader();
try {
this.writeRecord(this.TYPE_CONTENT, data);
} catch (err) {
return done(err);
}
done();
}
_flush(done) {
done();
}
}
module.exports = (db, server, userHandler, settingsHandler) => {
const taskHandler = new TaskHandler({ database: db.database });
@ -1408,6 +1496,146 @@ module.exports = (db, server, userHandler, settingsHandler) => {
})
);
server.post(
'/export/users',
tools.asyncifyJson(async (req, res, next) => {
res.charSet('utf-8');
const schema = Joi.object().keys({
users: Joi.array().single().items(Joi.string().hex().lowercase().length(24).required()),
tags: Joi.array().single().items(Joi.string().trim().empty('').max(1024)),
sess: sessSchema,
ip: sessIPSchema
});
const result = schema.validate(req.params, {
abortEarly: false,
convert: true
});
if (result.error) {
res.status(400);
res.json({
error: result.error.message,
code: 'InputValidationError',
details: tools.validationErrors(result)
});
return next();
}
// permissions check
req.validate(roles.can(req.role).createAny('export'));
let exporter = new ExportStream({
type: 'wildduck_data_export',
users: result.value.user,
tags: result.value.tags
});
const runUserExport = async (user, exporter) => {
log.info('Export', `Processing user ${user}`);
// Step 1. User record
let rawUserData = await db.users.collection('users').findOne(
{
_id: user
},
{
projection: {
// skip the following fields
storageUsed: false
},
raw: true
}
);
exporter.write({ collection: 'users', entry: rawUserData });
const processCollection = async (client, collection, user) => {
let cursor = await db[client].collection(collection).find(
{
user
},
{
raw: true
}
);
let entry;
let rowcount = 0;
while ((entry = await cursor.next())) {
exporter.write({ client, collection, entry });
rowcount++;
}
await cursor.close();
log.info('Export', `Exported ${rowcount} rows from ${client}.${collection} for user ${user}`);
};
await processCollection('users', 'addresses', user);
await processCollection('users', 'asps', user);
await processCollection('database', 'addressregister', user);
await processCollection('database', 'autoreplies', user);
await processCollection('database', 'filter', user);
await processCollection('database', 'mailboxes', user);
};
const runExport = async (query = {}, exporter) => {
let filter = {};
if (query.users) {
filter._id = { $in: query.users.map(user => new ObjectId(user)) };
}
let tagSeen = new Set();
let requiredTags = (query.tags || [])
.map(tag => tag.toLowerCase().trim())
.filter(tag => {
if (tag && !tagSeen.has(tag)) {
tagSeen.add(tag);
return true;
}
return false;
});
if (requiredTags.length) {
filter.tagsview = { $all: requiredTags };
}
let userIds = await db.users
.collection('users')
.find(filter, { projection: { _id: true } })
.toArray();
for (let { _id: user } of userIds) {
await runUserExport(user, exporter);
}
exporter.end();
};
res.writeHead(200, {
'Content-Type': 'application/octet-stream'
});
const packer = createBrotliCompress();
exporter.pipe(packer).pipe(res);
try {
await new Promise((resolve, reject) => {
exporter.on('error', err => {
reject(err);
});
runExport(result.value, exporter).then(resolve).catch(reject);
});
log.info('API', `Export completed`);
} catch (err) {
log.error('API', `Export failed: ${err.stack}`);
res.write(`\nExport failed\n${err.message}\n${err.code || 'Error'}\n`);
res.end();
}
})
);
server.post(
'/users/:user/password/reset',
tools.asyncifyJson(async (req, res, next) => {