use cursor for deleting messages

This commit is contained in:
Andris Reinman 2021-01-15 10:40:03 +02:00
parent c29e5c867f
commit 71363e369f
4 changed files with 52 additions and 52 deletions

View file

@ -9,7 +9,6 @@ const consts = require('../consts');
const { nextPageCursorSchema, previousPageCursorSchema, pageNrSchema, sessSchema, sessIPSchema } = require('../schemas');
module.exports = (db, server, storageHandler) => {
server.post(
'/users/:user/storage',
tools.asyncifyJson(async (req, res, next) => {

View file

@ -27,7 +27,7 @@ class StorageHandler {
contentType = 'application/octet-stream';
} else if (!contentType) {
contentType = libmime.detectMimeType(filename) || 'application/octet-stream';
} else {
} else if (!filename) {
filename = filebase + '.' + libmime.detectExtension(contentType);
}

View file

@ -9,64 +9,64 @@ const { publish, USER_DELETE_COMPLETED } = require('../events');
const BATCH_SIZE = 500;
const deleteMessages = async taskData => {
let rdate = new Date(Date.now() + consts.DELETED_USER_MESSAGE_RETENTION).getTime();
let lastId;
let cursor = await db.database.collection('messages').find(
{
user: taskData.user
},
{
projection: {
_id: true
}
}
);
let rdate = new Date(Date.now() + consts.DELETED_USER_MESSAGE_RETENTION).getTime();
let messageData;
let updateEntries = [];
let markedAsDeleted = 0;
let executeBatchUpdate = async () => {
let bulkResult = await db.database.collection('messages').bulkWrite(updateEntries, {
ordered: false,
w: 1
});
log.verbose('Tasks', 'task=user-delete id=%s user=%s message=%s', taskData._id, taskData.user, `Marked ${updateEntries.length} messages for deletion`);
updateEntries = [];
markedAsDeleted += (bulkResult && bulkResult.modifiedCount) || 0;
};
try {
let done = false;
while (!done) {
let query = {
user: taskData.user,
userDeleted: { $ne: true }
};
if (lastId) {
query._id = { $gt: lastId };
}
let messages = await db.database
.collection('messages')
.find(query, {
sort: { _id: 1 },
projection: {
_id: true
while ((messageData = await cursor.next())) {
updateEntries.push({
updateOne: {
filter: {
_id: messageData._id
},
limit: BATCH_SIZE
})
.toArray();
if (!messages.length) {
// all done
done = true;
break;
}
messages = messages.map(messageData => messageData._id);
lastId = messages[messages.length - 1];
let updateEntries = [];
messages.forEach(message => {
updateEntries.push({
updateOne: {
filter: {
_id: message
},
update: {
$set: {
exp: true,
rdate,
userDeleted: true
}
update: {
$set: {
exp: true,
rdate,
userDeleted: true
}
}
});
});
let bulkResult = await db.database.collection('messages').bulkWrite(updateEntries, {
ordered: false,
w: 1
}
});
markedAsDeleted += (bulkResult && bulkResult.modifiedCount) || 0;
if (updateEntries.length >= BATCH_SIZE) {
try {
await executeBatchUpdate();
} catch (err) {
await cursor.close();
throw err;
}
}
}
await cursor.close();
if (updateEntries.length) {
await executeBatchUpdate();
}
} catch (err) {
err.markedAsDeleted = markedAsDeleted;
@ -74,6 +74,7 @@ const deleteMessages = async taskData => {
} finally {
log.verbose('Tasks', 'task=user-delete id=%s user=%s message=%s', taskData._id, taskData.user, `Marked ${markedAsDeleted} messages for deletion`);
}
return markedAsDeleted;
};

View file

@ -43,7 +43,7 @@
"bcryptjs": "2.4.3",
"bull": "3.20.0",
"gelf": "2.0.1",
"generate-password": "1.5.1",
"generate-password": "1.6.0",
"he": "1.2.0",
"html-to-text": "6.0.0",
"humanname": "0.2.2",