From efd58466f29370c8a6f3853f0a3a84c1aa468161 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Tue, 23 Feb 2021 21:20:46 +0100 Subject: [PATCH] Purge old deltas (#57) * Keep track of connected clients in session data * Add API for confirming and purging deltas * Send delta confirmation from clients once received * Update naming * Fix and extend Data tests * Update naming --- assets/js/cell/live_editor/editor_client.js | 34 ++- .../cell/live_editor/hook_server_adapter.js | 14 + lib/live_book/session.ex | 39 ++- lib/live_book/session/data.ex | 136 ++++++++-- lib/live_book_web/live/session_live.ex | 10 + test/live_book/session/data_test.exs | 252 +++++++++++++++++- 6 files changed, 447 insertions(+), 38 deletions(-) diff --git a/assets/js/cell/live_editor/editor_client.js b/assets/js/cell/live_editor/editor_client.js index c0f67986e..b55260c4f 100644 --- a/assets/js/cell/live_editor/editor_client.js +++ b/assets/js/cell/live_editor/editor_client.js @@ -80,6 +80,10 @@ export default class EditorClient { sendDelta(delta) { this.serverAdapter.sendDelta(delta, this.revision + 1); } + + reportCurrentRevision() { + this.serverAdapter.reportRevision(this.revision); + } } /** @@ -87,17 +91,37 @@ export default class EditorClient { * (the client is fully in sync with the server). */ class Synchronized { - constructor(client) { + constructor(client, reportRevisionTimeout = 5000) { this.client = client; + this.reportRevisionTimeoutId = null; + this.reportRevisionTimeout = reportRevisionTimeout; } onClientDelta(delta) { + // Cancel the report request if scheduled, + // as the client is about to send the revision + // along with own delta. + if (this.reportRevisionTimeoutId !== null) { + clearTimeout(this.reportRevisionTimeoutId); + this.reportRevisionTimeoutId = null; + } + this.client.sendDelta(delta); - return new AwaitingConfirm(this.client, delta); + return new AwaitingAcknowledgement(this.client, delta); } onServerDelta(delta) { this.client.applyDelta(delta); + + // The client received a new delta, so let's schedule + // a request to report the new revision. + if (this.reportRevisionTimeoutId === null) { + this.reportRevisionTimeoutId = setTimeout(() => { + this.client.reportCurrentRevision(); + this.reportRevisionTimeoutId = null; + }, this.reportRevisionTimeout); + } + return this; } @@ -110,7 +134,7 @@ class Synchronized { * Client is in this state when the client sent one delta and waits * for an acknowledgement, while there are no other deltas in a buffer. */ -class AwaitingConfirm { +class AwaitingAcknowledgement { constructor(client, awaitedDelta) { this.client = client; this.awaitedDelta = awaitedDelta; @@ -126,7 +150,7 @@ class AwaitingConfirm { const deltaPrime = this.awaitedDelta.transform(delta, "right"); this.client.applyDelta(deltaPrime); const awaitedDeltaPrime = delta.transform(this.awaitedDelta, "left"); - return new AwaitingConfirm(this.client, awaitedDeltaPrime); + return new AwaitingAcknowledgement(this.client, awaitedDeltaPrime); } onServerAcknowledgement() { @@ -169,6 +193,6 @@ class AwaitingWithBuffer { onServerAcknowledgement() { this.client.sendDelta(this.buffer); - return new AwaitingConfirm(this.client, this.buffer); + return new AwaitingAcknowledgement(this.client, this.buffer); } } diff --git a/assets/js/cell/live_editor/hook_server_adapter.js b/assets/js/cell/live_editor/hook_server_adapter.js index ef8269e67..a6ab200e0 100644 --- a/assets/js/cell/live_editor/hook_server_adapter.js +++ b/assets/js/cell/live_editor/hook_server_adapter.js @@ -45,4 +45,18 @@ export default class HookServerAdapter { revision, }); } + + /** + * Sends an information to the server that the client + * is at the specified revision. + * + * This should be invoked if the client received updates, + * but is not itself sending any delta at the moment. + */ + reportRevision(revision) { + this.hook.pushEvent("report_cell_revision", { + cell_id: this.cellId, + revision, + }); + } } diff --git a/lib/live_book/session.ex b/lib/live_book/session.ex index b414a508c..c36998c4c 100644 --- a/lib/live_book/session.ex +++ b/lib/live_book/session.ex @@ -21,7 +21,6 @@ defmodule LiveBook.Session do @type state :: %{ session_id: id(), data: Data.t(), - client_pids: list(pid()), runtime_monitor_ref: reference() } @@ -169,8 +168,18 @@ defmodule LiveBook.Session do Asynchronously sends a cell delta to apply to the server. """ @spec apply_cell_delta(id(), pid(), Cell.id(), Delta.t(), Data.cell_revision()) :: :ok - def apply_cell_delta(session_id, from, cell_id, delta, revision) do - GenServer.cast(name(session_id), {:apply_cell_delta, from, cell_id, delta, revision}) + def apply_cell_delta(session_id, client_pid, cell_id, delta, revision) do + GenServer.cast(name(session_id), {:apply_cell_delta, client_pid, cell_id, delta, revision}) + end + + @doc """ + Asynchronously informs at what revision the given client is. + + This helps to remove old deltas that are no longer necessary. + """ + @spec report_cell_revision(id(), pid(), Cell.id(), Data.cell_revision()) :: :ok + def report_cell_revision(session_id, client_pid, cell_id, revision) do + GenServer.cast(name(session_id), {:report_cell_revision, client_pid, cell_id, revision}) end @doc """ @@ -239,7 +248,6 @@ defmodule LiveBook.Session do %{ session_id: id, data: data, - client_pids: [], runtime_monitor_ref: nil }} @@ -270,7 +278,10 @@ defmodule LiveBook.Session do @impl true def handle_call({:register_client, pid}, _from, state) do Process.monitor(pid) - {:reply, state.data, %{state | client_pids: [pid | state.client_pids]}} + + state = handle_operation(state, {:client_join, pid}) + + {:reply, state.data, state} end def handle_call(:get_data, _from, state) do @@ -331,8 +342,13 @@ defmodule LiveBook.Session do {:noreply, handle_operation(state, operation)} end - def handle_cast({:apply_cell_delta, from, cell_id, delta, revision}, state) do - operation = {:apply_cell_delta, from, cell_id, delta, revision} + def handle_cast({:apply_cell_delta, client_pid, cell_id, delta, revision}, state) do + operation = {:apply_cell_delta, client_pid, cell_id, delta, revision} + {:noreply, handle_operation(state, operation)} + end + + def handle_cast({:report_cell_revision, client_pid, cell_id, revision}, state) do + operation = {:report_cell_revision, client_pid, cell_id, revision} {:noreply, handle_operation(state, operation)} end @@ -397,7 +413,14 @@ defmodule LiveBook.Session do end def handle_info({:DOWN, _, :process, pid, _}, state) do - {:noreply, %{state | client_pids: List.delete(state.client_pids, pid)}} + state = + if pid in state.data.client_pids do + handle_operation(state, {:client_leave, pid}) + else + state + end + + {:noreply, state} end def handle_info({:evaluation_stdout, cell_id, string}, state) do diff --git a/lib/live_book/session/data.ex b/lib/live_book/session/data.ex index f4e08cab9..718561261 100644 --- a/lib/live_book/session/data.ex +++ b/lib/live_book/session/data.ex @@ -22,7 +22,8 @@ defmodule LiveBook.Session.Data do :cell_infos, :deleted_sections, :deleted_cells, - :runtime + :runtime, + :client_pids ] alias LiveBook.{Notebook, Evaluator, Delta, Runtime, JSInterop} @@ -36,7 +37,8 @@ defmodule LiveBook.Session.Data do cell_infos: %{Cell.id() => cell_info()}, deleted_sections: list(Section.t()), deleted_cells: list(Cell.t()), - runtime: Runtime.t() | nil + runtime: Runtime.t() | nil, + client_pids: list(pid()) } @type section_info :: %{ @@ -49,6 +51,7 @@ defmodule LiveBook.Session.Data do evaluation_status: cell_evaluation_status(), revision: cell_revision(), deltas: list(Delta.t()), + revision_by_client_pid: %{pid() => cell_revision()}, evaluated_at: DateTime.t() } @@ -70,7 +73,10 @@ defmodule LiveBook.Session.Data do | {:cancel_cell_evaluation, Cell.id()} | {:set_notebook_name, String.t()} | {:set_section_name, Section.id(), String.t()} + | {:client_join, pid()} + | {:client_leave, pid()} | {:apply_cell_delta, pid(), Cell.id(), Delta.t(), cell_revision()} + | {:report_cell_revision, pid(), Cell.id(), cell_revision()} | {:set_runtime, Runtime.t() | nil} | {:set_path, String.t() | nil} | :mark_as_not_dirty @@ -94,7 +100,8 @@ defmodule LiveBook.Session.Data do cell_infos: initial_cell_infos(notebook), deleted_sections: [], deleted_cells: [], - runtime: nil + runtime: nil, + client_pids: [] } end @@ -108,7 +115,7 @@ defmodule LiveBook.Session.Data do for section <- notebook.sections, cell <- section.cells, into: %{}, - do: {cell.id, new_cell_info()} + do: {cell.id, new_cell_info([])} end @doc """ @@ -281,13 +288,36 @@ defmodule LiveBook.Session.Data do end end - def apply_operation(data, {:apply_cell_delta, from, cell_id, delta, revision}) do - with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, cell_id), - cell_info <- data.cell_infos[cell.id], - true <- 0 < revision and revision <= cell_info.revision + 1 do + def apply_operation(data, {:client_join, pid}) do + with false <- pid in data.client_pids do data |> with_actions() - |> apply_delta(from, cell, delta, revision) + |> client_join(pid) + |> wrap_ok() + else + _ -> :error + end + end + + def apply_operation(data, {:client_leave, pid}) do + with true <- pid in data.client_pids do + data + |> with_actions() + |> client_leave(pid) + |> wrap_ok() + else + _ -> :error + end + end + + def apply_operation(data, {:apply_cell_delta, client_pid, cell_id, delta, revision}) do + with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, cell_id), + cell_info <- data.cell_infos[cell.id], + true <- 0 < revision and revision <= cell_info.revision + 1, + true <- client_pid in data.client_pids do + data + |> with_actions() + |> apply_delta(client_pid, cell, delta, revision) |> set_dirty() |> wrap_ok() else @@ -295,6 +325,20 @@ defmodule LiveBook.Session.Data do end end + def apply_operation(data, {:report_cell_revision, client_pid, cell_id, revision}) do + with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, cell_id), + cell_info <- data.cell_infos[cell.id], + true <- 0 < revision and revision <= cell_info.revision, + true <- client_pid in data.client_pids do + data + |> with_actions() + |> report_revision(client_pid, cell, revision) + |> wrap_ok() + else + _ -> :error + end + end + def apply_operation(data, {:set_runtime, runtime}) do data |> with_actions() @@ -336,7 +380,7 @@ defmodule LiveBook.Session.Data do data_actions |> set!( notebook: Notebook.insert_cell(data.notebook, section_id, index, cell), - cell_infos: Map.put(data.cell_infos, cell.id, new_cell_info()) + cell_infos: Map.put(data.cell_infos, cell.id, new_cell_info(data.client_pids)) ) end @@ -494,7 +538,24 @@ defmodule LiveBook.Session.Data do |> set!(notebook: Notebook.update_section(data.notebook, section.id, &%{&1 | name: name})) end - defp apply_delta({data, _} = data_actions, from, cell, delta, revision) do + defp client_join({data, _} = data_actions, pid) do + data_actions + |> set!(client_pids: [pid | data.client_pids]) + |> update_every_cell_info(fn info -> + put_in(info.revision_by_client_pid[pid], info.revision) + end) + end + + defp client_leave({data, _} = data_actions, pid) do + data_actions + |> set!(client_pids: List.delete(data.client_pids, pid)) + |> update_every_cell_info(fn info -> + {_, info} = pop_in(info.revision_by_client_pid[pid]) + purge_deltas(info) + end) + end + + defp apply_delta({data, _} = data_actions, client_pid, cell, delta, revision) do info = data.cell_infos[cell.id] deltas_ahead = Enum.take(info.deltas, -(info.revision - revision + 1)) @@ -508,11 +569,42 @@ defmodule LiveBook.Session.Data do data_actions |> set!(notebook: Notebook.update_cell(data.notebook, cell.id, &%{&1 | source: new_source})) - |> set_cell_info!(cell.id, - deltas: info.deltas ++ [transformed_new_delta], - revision: info.revision + 1 - ) - |> add_action({:broadcast_delta, from, %{cell | source: new_source}, transformed_new_delta}) + |> update_cell_info!(cell.id, fn info -> + info = %{info | deltas: info.deltas ++ [transformed_new_delta], revision: info.revision + 1} + # Before receiving acknowledgement, the client receives all the other deltas, + # so we can assume they are in sync with the server and have the same revision. + info = put_in(info.revision_by_client_pid[client_pid], info.revision) + purge_deltas(info) + end) + |> add_action({:broadcast_delta, client_pid, %{cell | source: new_source}, transformed_new_delta}) + end + + defp report_revision(data_actions, client_pid, cell, revision) do + data_actions + |> update_cell_info!(cell.id, fn info -> + info = put_in(info.revision_by_client_pid[client_pid], revision) + purge_deltas(info) + end) + end + + defp purge_deltas(cell_info) do + # Given client at revision X and upstream revision Y, + # we need Y - X last deltas that the client is not aware of, + # so that later we can use them to transform whatever + # the client sends us as an update. + # + # We find the client that is the most behind and keep + # as many deltas as we need for them. + + min_client_revision = + cell_info.revision_by_client_pid + |> Map.values() + |> Enum.min(fn -> cell_info.revision end) + + necessary_deltas = cell_info.revision - min_client_revision + deltas = Enum.take(cell_info.deltas, -necessary_deltas) + + %{cell_info | deltas: deltas} end defp add_action({data, actions}, action) do @@ -526,10 +618,11 @@ defmodule LiveBook.Session.Data do } end - defp new_cell_info() do + defp new_cell_info(client_pids) do %{ revision: 0, deltas: [], + revision_by_client_pid: Map.new(client_pids, &{&1, 0}), validity_status: :fresh, evaluation_status: :ready, evaluated_at: nil @@ -556,6 +649,15 @@ defmodule LiveBook.Session.Data do set!(data_actions, cell_infos: cell_infos) end + defp update_every_cell_info({data, _} = data_actions, fun) do + cell_infos = + Map.new(data.cell_infos, fn {cell_id, info} -> + {cell_id, fun.(info)} + end) + + set!(data_actions, cell_infos: cell_infos) + end + defp set_section_info!(data_actions, section_id, changes) do update_section_info!(data_actions, section_id, fn info -> Enum.reduce(changes, info, fn {key, value}, info -> diff --git a/lib/live_book_web/live/session_live.ex b/lib/live_book_web/live/session_live.ex index 892632e70..9203ac1c9 100644 --- a/lib/live_book_web/live/session_live.ex +++ b/lib/live_book_web/live/session_live.ex @@ -266,6 +266,16 @@ defmodule LiveBookWeb.SessionLive do {:noreply, socket} end + def handle_event( + "report_cell_revision", + %{"cell_id" => cell_id, "revision" => revision}, + socket + ) do + Session.report_cell_revision(socket.assigns.session_id, self(), cell_id, revision) + + {:noreply, socket} + end + def handle_event("focus_cell", %{"cell_id" => nil}, socket) do {:noreply, focus_cell(socket, nil)} end diff --git a/test/live_book/session/data_test.exs b/test/live_book/session/data_test.exs index 46a4634d4..1dba76f95 100644 --- a/test/live_book/session/data_test.exs +++ b/test/live_book/session/data_test.exs @@ -48,7 +48,7 @@ defmodule LiveBook.Session.DataTest do assert :error = Data.apply_operation(data, operation) end - test "insert_cell adds new cell to notebook and session info" do + test "insert_cell adds new cell to notebook and cell info" do data = data_after_operations!([ {:insert_section, 0, "s1"} @@ -66,6 +66,23 @@ defmodule LiveBook.Session.DataTest do cell_infos: %{"c1" => _} }, []} = Data.apply_operation(data, operation) end + + test "initializes client-revision map" do + client_pid = self() + + data = + data_after_operations!([ + {:client_join, client_pid}, + {:insert_section, 0, "s1"} + ]) + + operation = {:insert_cell, "s1", 0, :elixir, "c1"} + + assert {:ok, + %{ + cell_infos: %{"c1" => %{revision_by_client_pid: %{^client_pid => 0}}} + }, []} = Data.apply_operation(data, operation) + end end describe "apply_operation/2 given :delete_section" do @@ -559,16 +576,120 @@ defmodule LiveBook.Session.DataTest do end end + describe "apply_operation/2 given :client_join" do + test "returns an error if the given process is already a client" do + data = + data_after_operations!([ + {:client_join, self()} + ]) + + operation = {:client_join, self()} + assert :error = Data.apply_operation(data, operation) + end + + test "adds the given process to the client list" do + client_pid = self() + data = Data.new() + + operation = {:client_join, client_pid} + assert {:ok, %{client_pids: [^client_pid]}, []} = Data.apply_operation(data, operation) + end + + test "adds new entry to the cell revisions map for the client with the latest revision" do + client1_pid = IEx.Helpers.pid(0, 0, 0) + delta1 = Delta.new() |> Delta.insert("cats") + + data = + data_after_operations!([ + {:client_join, client1_pid}, + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:apply_cell_delta, client1_pid, "c1", delta1, 1} + ]) + + client2_pid = IEx.Helpers.pid(0, 0, 1) + operation = {:client_join, client2_pid} + + assert {:ok, + %{ + cell_infos: %{"c1" => %{revision_by_client_pid: %{^client2_pid => 1}}} + }, _} = Data.apply_operation(data, operation) + end + end + + describe "apply_operation/2 given :client_leave" do + test "returns an error if the given process is not a client" do + data = Data.new() + + operation = {:client_leave, self()} + assert :error = Data.apply_operation(data, operation) + end + + test "removes the given process from the client list" do + data = + data_after_operations!([ + {:client_join, self()} + ]) + + operation = {:client_leave, self()} + assert {:ok, %{client_pids: []}, []} = Data.apply_operation(data, operation) + end + + test "removes an entry in the the cell revisions map for the client and purges deltas" do + client1_pid = IEx.Helpers.pid(0, 0, 0) + client2_pid = IEx.Helpers.pid(0, 0, 1) + + delta1 = Delta.new() |> Delta.insert("cats") + + data = + data_after_operations!([ + {:client_join, client1_pid}, + {:client_join, client2_pid}, + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:apply_cell_delta, client1_pid, "c1", delta1, 1} + ]) + + operation = {:client_leave, client2_pid} + + assert {:ok, + %{ + cell_infos: %{ + "c1" => %{deltas: [], revision_by_client_pid: revision_by_client_pid} + } + }, _} = Data.apply_operation(data, operation) + + assert revision_by_client_pid == %{client1_pid => 1} + end + end + describe "apply_operation/2 given :apply_cell_delta" do test "returns an error given invalid cell id" do - data = Data.new() + data = + data_after_operations!([ + {:client_join, self()} + ]) + operation = {:apply_cell_delta, self(), "nonexistent", Delta.new(), 1} assert :error = Data.apply_operation(data, operation) end + test "returns an error given non-joined client pid" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"} + ]) + + delta = Delta.new() |> Delta.insert("cats") + operation = {:apply_cell_delta, self(), "c1", delta, 1} + assert :error = Data.apply_operation(data, operation) + end + test "returns an error given invalid revision" do data = data_after_operations!([ + {:client_join, self()}, {:insert_section, 0, "s1"}, {:insert_cell, "s1", 0, :elixir, "c1"} ]) @@ -582,6 +703,7 @@ defmodule LiveBook.Session.DataTest do test "updates cell source according to the given delta" do data = data_after_operations!([ + {:client_join, self()}, {:insert_section, 0, "s1"}, {:insert_cell, "s1", 0, :elixir, "c1"} ]) @@ -601,17 +723,22 @@ defmodule LiveBook.Session.DataTest do end test "transforms the delta if the revision is not the most recent" do + client1_pid = IEx.Helpers.pid(0, 0, 0) + client2_pid = IEx.Helpers.pid(0, 0, 1) + delta1 = Delta.new() |> Delta.insert("cats") data = data_after_operations!([ + {:client_join, client1_pid}, + {:client_join, client2_pid}, {:insert_section, 0, "s1"}, {:insert_cell, "s1", 0, :elixir, "c1"}, - {:apply_cell_delta, self(), "c1", delta1, 1} + {:apply_cell_delta, client1_pid, "c1", delta1, 1} ]) delta2 = Delta.new() |> Delta.insert("tea") - operation = {:apply_cell_delta, self(), "c1", delta2, 1} + operation = {:apply_cell_delta, client2_pid, "c1", delta2, 1} assert {:ok, %{ @@ -625,24 +752,133 @@ defmodule LiveBook.Session.DataTest do end test "returns broadcast delta action with the transformed delta" do + client1_pid = IEx.Helpers.pid(0, 0, 0) + client2_pid = IEx.Helpers.pid(0, 0, 1) + delta1 = Delta.new() |> Delta.insert("cats") data = data_after_operations!([ + {:client_join, client1_pid}, + {:client_join, client2_pid}, {:insert_section, 0, "s1"}, {:insert_cell, "s1", 0, :elixir, "c1"}, - {:apply_cell_delta, self(), "c1", delta1, 1} + {:apply_cell_delta, client1_pid, "c1", delta1, 1} ]) delta2 = Delta.new() |> Delta.insert("tea") - operation = {:apply_cell_delta, self(), "c1", delta2, 1} + operation = {:apply_cell_delta, client2_pid, "c1", delta2, 1} - from = self() transformed_delta2 = Delta.new() |> Delta.retain(4) |> Delta.insert("tea") - assert {:ok, _data, [{:broadcast_delta, ^from, _cell, ^transformed_delta2}]} = + assert {:ok, _data, [{:broadcast_delta, ^client2_pid, _cell, ^transformed_delta2}]} = Data.apply_operation(data, operation) end + + test "given single client, does not keep deltas" do + client_pid = self() + + data = + data_after_operations!([ + {:client_join, client_pid}, + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"} + ]) + + delta = Delta.new() |> Delta.insert("cats") + operation = {:apply_cell_delta, client_pid, "c1", delta, 1} + + assert {:ok, + %{ + cell_infos: %{"c1" => %{deltas: []}} + }, _} = Data.apply_operation(data, operation) + end + + test "given multiple client, keeps the delta" do + client1_pid = IEx.Helpers.pid(0, 0, 0) + client2_pid = IEx.Helpers.pid(0, 0, 1) + + data = + data_after_operations!([ + {:client_join, client1_pid}, + {:client_join, client2_pid}, + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"} + ]) + + delta = Delta.new() |> Delta.insert("cats") + operation = {:apply_cell_delta, client1_pid, "c1", delta, 1} + + assert {:ok, + %{ + cell_infos: %{"c1" => %{deltas: [^delta]}} + }, _} = Data.apply_operation(data, operation) + end + end + + describe "apply_operation/2 given :report_cell_revision" do + test "returns an error given invalid cell id" do + data = + data_after_operations!([ + {:client_join, self()} + ]) + + operation = {:report_cell_revision, self(), "nonexistent", 1} + assert :error = Data.apply_operation(data, operation) + end + + test "returns an error given non-joined client pid" do + client1_pid = IEx.Helpers.pid(0, 0, 0) + client2_pid = IEx.Helpers.pid(0, 0, 1) + + data = + data_after_operations!([ + {:client_join, client1_pid}, + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:apply_cell_delta, client1_pid, "c1", Delta.new(insert: "cats"), 1} + ]) + + operation = {:report_cell_revision, client2_pid, "c1", 1} + assert :error = Data.apply_operation(data, operation) + end + + test "returns an error given invalid revision" do + data = + data_after_operations!([ + {:client_join, self()}, + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"} + ]) + + operation = {:report_cell_revision, self(), "c1", 1} + assert :error = Data.apply_operation(data, operation) + end + + test "updates client entry in the revisions map and purges unnecessary deltas" do + client1_pid = IEx.Helpers.pid(0, 0, 0) + client2_pid = IEx.Helpers.pid(0, 0, 1) + + delta1 = Delta.new() |> Delta.insert("cats") + + data = + data_after_operations!([ + {:client_join, client1_pid}, + {:client_join, client2_pid}, + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:apply_cell_delta, client1_pid, "c1", delta1, 1} + ]) + + operation = {:report_cell_revision, client2_pid, "c1", 1} + + assert {:ok, + %{ + cell_infos: %{ + "c1" => %{deltas: [], revision_by_client_pid: %{^client1_pid => 1, ^client2_pid => 1}} + } + }, _} = Data.apply_operation(data, operation) + end end describe "apply_operation/2 given :set_runtime" do