fix(database): add DatabaseStore.atomically to handle read/write issues

Summary:
Fixes T3566
Fixes T3567

Test Plan: new tests

Reviewers: dillon, bengotow

Reviewed By: bengotow

Maniphest Tasks: T3566, T3567

Differential Revision: https://phab.nylas.com/D2025
This commit is contained in:
Evan Morikawa 2015-09-15 20:27:52 -04:00
parent fc770a85cd
commit 61eec6b21a
10 changed files with 127 additions and 61 deletions

View file

@ -77,7 +77,7 @@ class DeveloperBar extends React.Component
else if @state.section == 'long-polling'
itemDivs = @state.longPollHistory.filter(matchingFilter).map (item) ->
<DeveloperBarLongPollItem item={item} key={item.cursor}/>
<DeveloperBarLongPollItem item={item} key={"#{item.cursor}-#{item.timestamp}"}/>
expandedDiv = <div className="expanded-section long-polling">{itemDivs}</div>
else if @state.section == 'queue'

View file

@ -6,6 +6,9 @@ Thread = require '../src/flux/models/thread'
DatabaseStore = require '../src/flux/stores/database-store'
describe "NylasAPI", ->
beforeEach ->
spyOn(DatabaseStore, "atomically").andCallFake (fn) -> fn()
describe "handleModel404", ->
it "should unpersist the model from the cache that was requested", ->
model = new Thread(id: 'threadidhere')

View file

@ -70,7 +70,7 @@ describe "DatabaseStore", ->
console.log err
it "should call through to _writeModels", ->
spyOn(DatabaseStore, '_writeModels')
spyOn(DatabaseStore, '_writeModels').andReturn Promise.resolve()
DatabaseStore.persistModel(testModelInstance)
expect(DatabaseStore._writeModels.callCount).toBe(1)
@ -87,7 +87,7 @@ describe "DatabaseStore", ->
type:'persist'
it "should call through to _writeModels after checking them", ->
spyOn(DatabaseStore, '_writeModels')
spyOn(DatabaseStore, '_writeModels').andReturn Promise.resolve()
DatabaseStore.persistModels([testModelInstanceA, testModelInstanceB])
expect(DatabaseStore._writeModels.callCount).toBe(1)
@ -98,9 +98,9 @@ describe "DatabaseStore", ->
describe "unpersistModel", ->
it "should delete the model by Id", -> waitsForPromise =>
DatabaseStore.unpersistModel(testModelInstance).then =>
expect(@performed.length).toBe(3)
expect(@performed[1].query).toBe("DELETE FROM `TestModel` WHERE `id` = ?")
expect(@performed[1].values[0]).toBe('1234')
expect(@performed.length).toBe(1)
expect(@performed[0].query).toBe("DELETE FROM `TestModel` WHERE `id` = ?")
expect(@performed[0].values[0]).toBe('1234')
it "should cause the DatabaseStore to trigger() with a change that contains the model", ->
waitsForPromise ->
@ -129,18 +129,18 @@ describe "DatabaseStore", ->
TestModel.configureWithCollectionAttribute()
waitsForPromise =>
DatabaseStore.unpersistModel(testModelInstance).then =>
expect(@performed.length).toBe(4)
expect(@performed[2].query).toBe("DELETE FROM `TestModel-Label` WHERE `id` = ?")
expect(@performed[2].values[0]).toBe('1234')
expect(@performed.length).toBe(2)
expect(@performed[1].query).toBe("DELETE FROM `TestModel-Label` WHERE `id` = ?")
expect(@performed[1].values[0]).toBe('1234')
describe "when the model has joined data attributes", ->
it "should delete the element in the joined data table", ->
TestModel.configureWithJoinedDataAttribute()
waitsForPromise =>
DatabaseStore.unpersistModel(testModelInstance).then =>
expect(@performed.length).toBe(4)
expect(@performed[2].query).toBe("DELETE FROM `TestModelBody` WHERE `id` = ?")
expect(@performed[2].values[0]).toBe('1234')
expect(@performed.length).toBe(2)
expect(@performed[1].query).toBe("DELETE FROM `TestModelBody` WHERE `id` = ?")
expect(@performed[1].values[0]).toBe('1234')
describe "_writeModels", ->
it "should compose a REPLACE INTO query to save the model", ->
@ -263,4 +263,55 @@ describe "DatabaseStore", ->
@m = new TestModel(id: 'local-6806434c-b0cd', body: 'hello world')
expect( => DatabaseStore._writeModels([@m])).not.toThrow()
describe "atomically", ->
beforeEach ->
DatabaseStore._atomicPromise = null
it "sets up an exclusive transaction", ->
waitsForPromise =>
DatabaseStore.atomically( =>
DatabaseStore._query("TEST")
).then =>
expect(@performed.length).toBe 3
expect(@performed[0].query).toBe "BEGIN EXCLUSIVE TRANSACTION"
expect(@performed[1].query).toBe "TEST"
expect(@performed[2].query).toBe "COMMIT"
it "resolves, but doesn't fire a commit on failure", ->
waitsForPromise =>
DatabaseStore.atomically( =>
throw new Error("BOOO")
).catch =>
expect(@performed.length).toBe 1
expect(@performed[0].query).toBe "BEGIN EXCLUSIVE TRANSACTION"
it "can be called multiple times and get queued", ->
waitsForPromise =>
Promise.all([
DatabaseStore.atomically( -> )
DatabaseStore.atomically( -> )
DatabaseStore.atomically( -> )
]).then =>
expect(@performed.length).toBe 6
expect(@performed[0].query).toBe "BEGIN EXCLUSIVE TRANSACTION"
expect(@performed[1].query).toBe "COMMIT"
expect(@performed[2].query).toBe "BEGIN EXCLUSIVE TRANSACTION"
expect(@performed[3].query).toBe "COMMIT"
expect(@performed[4].query).toBe "BEGIN EXCLUSIVE TRANSACTION"
expect(@performed[5].query).toBe "COMMIT"
it "can be called multiple times and get queued", ->
waitsForPromise =>
DatabaseStore.atomically( -> )
.then -> DatabaseStore.atomically( -> )
.then -> DatabaseStore.atomically( -> )
.then =>
expect(@performed.length).toBe 6
expect(@performed[0].query).toBe "BEGIN EXCLUSIVE TRANSACTION"
expect(@performed[1].query).toBe "COMMIT"
expect(@performed[2].query).toBe "BEGIN EXCLUSIVE TRANSACTION"
expect(@performed[3].query).toBe "COMMIT"
expect(@performed[4].query).toBe "BEGIN EXCLUSIVE TRANSACTION"
expect(@performed[5].query).toBe "COMMIT"
describe "DatabaseStore::_triggerSoon", ->

View file

@ -43,6 +43,10 @@ describe "SyncbackDraftTask", ->
spyOn(DatabaseStore, "persistModel").andCallFake ->
Promise.resolve()
spyOn(DatabaseStore, "_atomically").andCallFake (fn) ->
fn()
return Promise.resolve()
describe "performRemote", ->
beforeEach ->
spyOn(NylasAPI, 'makeRequest').andCallFake (opts) ->

View file

@ -267,31 +267,36 @@ class NylasAPI
# Step 3: Retrieve any existing models from the database for the given IDs.
ids = _.pluck(unlockedJSONs, 'id')
DatabaseStore = require './stores/database-store'
DatabaseStore.findAll(klass).where(klass.attributes.id.in(ids)).then (models) ->
existingModels = {}
existingModels[model.id] = model for model in models
DatabaseStore.atomically =>
DatabaseStore.findAll(klass).where(klass.attributes.id.in(ids)).then (models) ->
existingModels = {}
existingModels[model.id] = model for model in models
responseModels = []
changedModels = []
responseModels = []
changedModels = []
# Step 4: Merge the response data into the existing data for each model,
# skipping changes when we already have the given version
unlockedJSONs.forEach (json) =>
model = existingModels[json.id]
# Step 4: Merge the response data into the existing data for each model,
# skipping changes when we already have the given version
unlockedJSONs.forEach (json) =>
model = existingModels[json.id]
isSameOrNewerVersion = model and model.version? and json.version? and model.version >= json.version
isAlreadySent = model and model.draft is false and json.draft is true
isSameOrNewerVersion = model and model.version? and json.version? and model.version >= json.version
isAlreadySent = model and model.draft is false and json.draft is true
unless isSameOrNewerVersion or isAlreadySent
model ?= new klass()
model.fromJSON(json)
changedModels.push(model)
responseModels.push(model)
if isSameOrNewerVersion
json._delta?.ignoredBecause = "JSON v#{json.version} <= model v#{model.version}"
else if isAlreadySent
json._delta?.ignoredBecause = "Model #{model.id} is already sent!"
else
model ?= new klass()
model.fromJSON(json)
changedModels.push(model)
responseModels.push(model)
# Step 5: Save models that have changed, and then return all of the models
# that were in the response body.
DatabaseStore.persistModels(changedModels).then ->
return Promise.resolve(responseModels)
# Step 5: Save models that have changed, and then return all of the models
# that were in the response body.
DatabaseStore.persistModels(changedModels).then ->
return Promise.resolve(responseModels)
_apiObjectToClassMap:
"file": require('./models/file')

View file

@ -236,7 +236,7 @@ class DatabaseStore extends NylasStore
if DEBUG_MISSING_ACCOUNT_ID and query.indexOf("`account_id`") is -1
@_prettyConsoleLog("QUERY does not specify accountId: #{query}")
if DEBUG_QUERY_PLANS
@_db.all "EXPLAIN QUERY PLAN #{query}", values, (err, results) =>
@_db.all "EXPLAIN QUERY PLAN #{query}", values, (err, results=[]) =>
str = results.map((row) -> row.detail).join('\n') + " for " + query
@_prettyConsoleLog(str) if str.indexOf("SCAN") isnt -1
@ -249,7 +249,7 @@ class DatabaseStore extends NylasStore
# We don't exit serial execution mode until the last pending transaction has
# finished executing.
if query is BEGIN_TRANSACTION
if query.indexOf "BEGIN" is 0
@_db.serialize() if @_inflightTransactions is 0
@_inflightTransactions += 1
@ -407,11 +407,7 @@ class DatabaseStore extends NylasStore
# - rejects if any databse query fails or one of the triggering
# callbacks failed
persistModel: (model) =>
Promise.all([
@_query(BEGIN_TRANSACTION)
@_writeModels([model])
@_query(COMMIT)
]).then =>
@_writeModels([model]).then =>
@_triggerSoon({objectClass: model.constructor.name, objects: [model], type: 'persist'})
# Public: Asynchronously writes `models` to the cache and triggers a single change
@ -435,11 +431,7 @@ class DatabaseStore extends NylasStore
throw new Error("DatabaseStore::persistModels - You must pass an array of models with different ids. ID #{model.id} is in the set multiple times.")
ids[model.id] = true
Promise.all([
@_query(BEGIN_TRANSACTION)
@_writeModels(models)
@_query(COMMIT)
]).then =>
@_writeModels(models).then =>
@_triggerSoon({objectClass: models[0].constructor.name, objects: models, type: 'persist'})
# Public: Asynchronously removes `model` from the cache and triggers a change event.
@ -452,19 +444,13 @@ class DatabaseStore extends NylasStore
# - rejects if any databse query fails or one of the triggering
# callbacks failed
unpersistModel: (model) =>
Promise.all([
@_query(BEGIN_TRANSACTION)
@_deleteModel(model)
@_query(COMMIT)
]).then =>
@_deleteModel(model).then =>
@_triggerSoon({objectClass: model.constructor.name, objects: [model], type: 'unpersist'})
persistJSONObject: (key, json) ->
jsonString = serializeRegisteredObjects(json)
@_query(BEGIN_TRANSACTION)
@_query("REPLACE INTO `JSONObject` (`key`,`data`) VALUES (?,?)", [key, jsonString])
@_query(COMMIT)
@trigger({objectClass: 'JSONObject', objects: [{key: key, json: json}], type: 'persist'})
@_query("REPLACE INTO `JSONObject` (`key`,`data`) VALUES (?,?)", [key, jsonString]).then =>
@trigger({objectClass: 'JSONObject', objects: [{key: key, json: json}], type: 'persist'})
findJSONObject: (key) ->
@_query("SELECT `data` FROM `JSONObject` WHERE key = ? LIMIT 1", [key]).then (results) =>
@ -472,6 +458,18 @@ class DatabaseStore extends NylasStore
data = deserializeRegisteredObjects(results[0].data)
Promise.resolve(data)
atomically: (fn) =>
@_atomicPromise ?= Promise.resolve()
@_atomicPromise = @_atomicPromise.finally =>
@_atomically(fn)
return @_atomicPromise
_atomically: (fn) ->
@_query("BEGIN EXCLUSIVE TRANSACTION")
.then => fn()
.then => @_query("COMMIT")
.then => Promise.resolve()
########################################################################
########################### PRIVATE METHODS ############################
########################################################################

View file

@ -158,9 +158,11 @@ class DraftStoreProxy
if @_destroyed or not @_draft
return Promise.resolve(true)
updated = @changes.applyToModel(@_draft)
return DatabaseStore.persistModel(updated).then =>
Actions.queueTask(new SyncbackDraftTask(@draftClientId))
DatabaseStore.atomically =>
DatabaseStore.findBy(Message, clientId: @_draft.clientId).then (draft) =>
updatedDraft = @changes.applyToModel(draft)
return DatabaseStore.persistModel(updatedDraft).then =>
Actions.queueTask(new SyncbackDraftTask(@draftClientId))

View file

@ -60,7 +60,7 @@ class DestroyDraftTask extends Task
inboxMsg = err.body?.message ? ""
# Draft has already been deleted, this is not really an error
if err.statusCode is 404
if err.statusCode in [404, 409]
return Promise.resolve(Task.Status.Finished)
# Draft has been sent, and can't be deleted. Not much we can do but finish

View file

@ -103,6 +103,8 @@ class FileUploadTask extends Task
DraftStore = require '../stores/draft-store'
# We have a `DatabaseStore.atomically` block surrounding the object
# right before we persist changes
DraftStore.sessionForClientId(@messageClientId).then (session) =>
files = _.clone(session.draft().files) ? []
files.push(file)

View file

@ -70,10 +70,11 @@ class SyncbackDraftTask extends Task
# below. We currently have no way of locking between processes. Maybe a
# log-style data structure would be better suited for drafts.
#
@getLatestLocalDraft().then (draft) ->
draft.version = json.version
draft.serverId = json.id
DatabaseStore.persistModel(draft)
DatabaseStore.atomically =>
@getLatestLocalDraft().then (draft) ->
draft.version = json.version
draft.serverId = json.id
DatabaseStore.persistModel(draft)
.then =>
return Promise.resolve(Task.Status.Finished)