Purge old deltas (#57)

* Keep track of connected clients in session data

* Add API for confirming and purging deltas

* Send delta confirmation from clients once received

* Update naming

* Fix and extend Data tests

* Update naming
This commit is contained in:
Jonatan Kłosko 2021-02-23 21:20:46 +01:00 committed by GitHub
parent 780ca84500
commit efd58466f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 447 additions and 38 deletions

View file

@ -80,6 +80,10 @@ export default class EditorClient {
sendDelta(delta) {
this.serverAdapter.sendDelta(delta, this.revision + 1);
}
reportCurrentRevision() {
this.serverAdapter.reportRevision(this.revision);
}
}
/**
@ -87,17 +91,37 @@ export default class EditorClient {
* (the client is fully in sync with the server).
*/
class Synchronized {
constructor(client) {
constructor(client, reportRevisionTimeout = 5000) {
this.client = client;
this.reportRevisionTimeoutId = null;
this.reportRevisionTimeout = reportRevisionTimeout;
}
onClientDelta(delta) {
// Cancel the report request if scheduled,
// as the client is about to send the revision
// along with own delta.
if (this.reportRevisionTimeoutId !== null) {
clearTimeout(this.reportRevisionTimeoutId);
this.reportRevisionTimeoutId = null;
}
this.client.sendDelta(delta);
return new AwaitingConfirm(this.client, delta);
return new AwaitingAcknowledgement(this.client, delta);
}
onServerDelta(delta) {
this.client.applyDelta(delta);
// The client received a new delta, so let's schedule
// a request to report the new revision.
if (this.reportRevisionTimeoutId === null) {
this.reportRevisionTimeoutId = setTimeout(() => {
this.client.reportCurrentRevision();
this.reportRevisionTimeoutId = null;
}, this.reportRevisionTimeout);
}
return this;
}
@ -110,7 +134,7 @@ class Synchronized {
* Client is in this state when the client sent one delta and waits
* for an acknowledgement, while there are no other deltas in a buffer.
*/
class AwaitingConfirm {
class AwaitingAcknowledgement {
constructor(client, awaitedDelta) {
this.client = client;
this.awaitedDelta = awaitedDelta;
@ -126,7 +150,7 @@ class AwaitingConfirm {
const deltaPrime = this.awaitedDelta.transform(delta, "right");
this.client.applyDelta(deltaPrime);
const awaitedDeltaPrime = delta.transform(this.awaitedDelta, "left");
return new AwaitingConfirm(this.client, awaitedDeltaPrime);
return new AwaitingAcknowledgement(this.client, awaitedDeltaPrime);
}
onServerAcknowledgement() {
@ -169,6 +193,6 @@ class AwaitingWithBuffer {
onServerAcknowledgement() {
this.client.sendDelta(this.buffer);
return new AwaitingConfirm(this.client, this.buffer);
return new AwaitingAcknowledgement(this.client, this.buffer);
}
}

View file

@ -45,4 +45,18 @@ export default class HookServerAdapter {
revision,
});
}
/**
* Sends an information to the server that the client
* is at the specified revision.
*
* This should be invoked if the client received updates,
* but is not itself sending any delta at the moment.
*/
reportRevision(revision) {
this.hook.pushEvent("report_cell_revision", {
cell_id: this.cellId,
revision,
});
}
}

View file

@ -21,7 +21,6 @@ defmodule LiveBook.Session do
@type state :: %{
session_id: id(),
data: Data.t(),
client_pids: list(pid()),
runtime_monitor_ref: reference()
}
@ -169,8 +168,18 @@ defmodule LiveBook.Session do
Asynchronously sends a cell delta to apply to the server.
"""
@spec apply_cell_delta(id(), pid(), Cell.id(), Delta.t(), Data.cell_revision()) :: :ok
def apply_cell_delta(session_id, from, cell_id, delta, revision) do
GenServer.cast(name(session_id), {:apply_cell_delta, from, cell_id, delta, revision})
def apply_cell_delta(session_id, client_pid, cell_id, delta, revision) do
GenServer.cast(name(session_id), {:apply_cell_delta, client_pid, cell_id, delta, revision})
end
@doc """
Asynchronously informs at what revision the given client is.
This helps to remove old deltas that are no longer necessary.
"""
@spec report_cell_revision(id(), pid(), Cell.id(), Data.cell_revision()) :: :ok
def report_cell_revision(session_id, client_pid, cell_id, revision) do
GenServer.cast(name(session_id), {:report_cell_revision, client_pid, cell_id, revision})
end
@doc """
@ -239,7 +248,6 @@ defmodule LiveBook.Session do
%{
session_id: id,
data: data,
client_pids: [],
runtime_monitor_ref: nil
}}
@ -270,7 +278,10 @@ defmodule LiveBook.Session do
@impl true
def handle_call({:register_client, pid}, _from, state) do
Process.monitor(pid)
{:reply, state.data, %{state | client_pids: [pid | state.client_pids]}}
state = handle_operation(state, {:client_join, pid})
{:reply, state.data, state}
end
def handle_call(:get_data, _from, state) do
@ -331,8 +342,13 @@ defmodule LiveBook.Session do
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:apply_cell_delta, from, cell_id, delta, revision}, state) do
operation = {:apply_cell_delta, from, cell_id, delta, revision}
def handle_cast({:apply_cell_delta, client_pid, cell_id, delta, revision}, state) do
operation = {:apply_cell_delta, client_pid, cell_id, delta, revision}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:report_cell_revision, client_pid, cell_id, revision}, state) do
operation = {:report_cell_revision, client_pid, cell_id, revision}
{:noreply, handle_operation(state, operation)}
end
@ -397,7 +413,14 @@ defmodule LiveBook.Session do
end
def handle_info({:DOWN, _, :process, pid, _}, state) do
{:noreply, %{state | client_pids: List.delete(state.client_pids, pid)}}
state =
if pid in state.data.client_pids do
handle_operation(state, {:client_leave, pid})
else
state
end
{:noreply, state}
end
def handle_info({:evaluation_stdout, cell_id, string}, state) do

View file

@ -22,7 +22,8 @@ defmodule LiveBook.Session.Data do
:cell_infos,
:deleted_sections,
:deleted_cells,
:runtime
:runtime,
:client_pids
]
alias LiveBook.{Notebook, Evaluator, Delta, Runtime, JSInterop}
@ -36,7 +37,8 @@ defmodule LiveBook.Session.Data do
cell_infos: %{Cell.id() => cell_info()},
deleted_sections: list(Section.t()),
deleted_cells: list(Cell.t()),
runtime: Runtime.t() | nil
runtime: Runtime.t() | nil,
client_pids: list(pid())
}
@type section_info :: %{
@ -49,6 +51,7 @@ defmodule LiveBook.Session.Data do
evaluation_status: cell_evaluation_status(),
revision: cell_revision(),
deltas: list(Delta.t()),
revision_by_client_pid: %{pid() => cell_revision()},
evaluated_at: DateTime.t()
}
@ -70,7 +73,10 @@ defmodule LiveBook.Session.Data do
| {:cancel_cell_evaluation, Cell.id()}
| {:set_notebook_name, String.t()}
| {:set_section_name, Section.id(), String.t()}
| {:client_join, pid()}
| {:client_leave, pid()}
| {:apply_cell_delta, pid(), Cell.id(), Delta.t(), cell_revision()}
| {:report_cell_revision, pid(), Cell.id(), cell_revision()}
| {:set_runtime, Runtime.t() | nil}
| {:set_path, String.t() | nil}
| :mark_as_not_dirty
@ -94,7 +100,8 @@ defmodule LiveBook.Session.Data do
cell_infos: initial_cell_infos(notebook),
deleted_sections: [],
deleted_cells: [],
runtime: nil
runtime: nil,
client_pids: []
}
end
@ -108,7 +115,7 @@ defmodule LiveBook.Session.Data do
for section <- notebook.sections,
cell <- section.cells,
into: %{},
do: {cell.id, new_cell_info()}
do: {cell.id, new_cell_info([])}
end
@doc """
@ -281,13 +288,36 @@ defmodule LiveBook.Session.Data do
end
end
def apply_operation(data, {:apply_cell_delta, from, cell_id, delta, revision}) do
with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, cell_id),
cell_info <- data.cell_infos[cell.id],
true <- 0 < revision and revision <= cell_info.revision + 1 do
def apply_operation(data, {:client_join, pid}) do
with false <- pid in data.client_pids do
data
|> with_actions()
|> apply_delta(from, cell, delta, revision)
|> client_join(pid)
|> wrap_ok()
else
_ -> :error
end
end
def apply_operation(data, {:client_leave, pid}) do
with true <- pid in data.client_pids do
data
|> with_actions()
|> client_leave(pid)
|> wrap_ok()
else
_ -> :error
end
end
def apply_operation(data, {:apply_cell_delta, client_pid, cell_id, delta, revision}) do
with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, cell_id),
cell_info <- data.cell_infos[cell.id],
true <- 0 < revision and revision <= cell_info.revision + 1,
true <- client_pid in data.client_pids do
data
|> with_actions()
|> apply_delta(client_pid, cell, delta, revision)
|> set_dirty()
|> wrap_ok()
else
@ -295,6 +325,20 @@ defmodule LiveBook.Session.Data do
end
end
def apply_operation(data, {:report_cell_revision, client_pid, cell_id, revision}) do
with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, cell_id),
cell_info <- data.cell_infos[cell.id],
true <- 0 < revision and revision <= cell_info.revision,
true <- client_pid in data.client_pids do
data
|> with_actions()
|> report_revision(client_pid, cell, revision)
|> wrap_ok()
else
_ -> :error
end
end
def apply_operation(data, {:set_runtime, runtime}) do
data
|> with_actions()
@ -336,7 +380,7 @@ defmodule LiveBook.Session.Data do
data_actions
|> set!(
notebook: Notebook.insert_cell(data.notebook, section_id, index, cell),
cell_infos: Map.put(data.cell_infos, cell.id, new_cell_info())
cell_infos: Map.put(data.cell_infos, cell.id, new_cell_info(data.client_pids))
)
end
@ -494,7 +538,24 @@ defmodule LiveBook.Session.Data do
|> set!(notebook: Notebook.update_section(data.notebook, section.id, &%{&1 | name: name}))
end
defp apply_delta({data, _} = data_actions, from, cell, delta, revision) do
defp client_join({data, _} = data_actions, pid) do
data_actions
|> set!(client_pids: [pid | data.client_pids])
|> update_every_cell_info(fn info ->
put_in(info.revision_by_client_pid[pid], info.revision)
end)
end
defp client_leave({data, _} = data_actions, pid) do
data_actions
|> set!(client_pids: List.delete(data.client_pids, pid))
|> update_every_cell_info(fn info ->
{_, info} = pop_in(info.revision_by_client_pid[pid])
purge_deltas(info)
end)
end
defp apply_delta({data, _} = data_actions, client_pid, cell, delta, revision) do
info = data.cell_infos[cell.id]
deltas_ahead = Enum.take(info.deltas, -(info.revision - revision + 1))
@ -508,11 +569,42 @@ defmodule LiveBook.Session.Data do
data_actions
|> set!(notebook: Notebook.update_cell(data.notebook, cell.id, &%{&1 | source: new_source}))
|> set_cell_info!(cell.id,
deltas: info.deltas ++ [transformed_new_delta],
revision: info.revision + 1
)
|> add_action({:broadcast_delta, from, %{cell | source: new_source}, transformed_new_delta})
|> update_cell_info!(cell.id, fn info ->
info = %{info | deltas: info.deltas ++ [transformed_new_delta], revision: info.revision + 1}
# Before receiving acknowledgement, the client receives all the other deltas,
# so we can assume they are in sync with the server and have the same revision.
info = put_in(info.revision_by_client_pid[client_pid], info.revision)
purge_deltas(info)
end)
|> add_action({:broadcast_delta, client_pid, %{cell | source: new_source}, transformed_new_delta})
end
defp report_revision(data_actions, client_pid, cell, revision) do
data_actions
|> update_cell_info!(cell.id, fn info ->
info = put_in(info.revision_by_client_pid[client_pid], revision)
purge_deltas(info)
end)
end
defp purge_deltas(cell_info) do
# Given client at revision X and upstream revision Y,
# we need Y - X last deltas that the client is not aware of,
# so that later we can use them to transform whatever
# the client sends us as an update.
#
# We find the client that is the most behind and keep
# as many deltas as we need for them.
min_client_revision =
cell_info.revision_by_client_pid
|> Map.values()
|> Enum.min(fn -> cell_info.revision end)
necessary_deltas = cell_info.revision - min_client_revision
deltas = Enum.take(cell_info.deltas, -necessary_deltas)
%{cell_info | deltas: deltas}
end
defp add_action({data, actions}, action) do
@ -526,10 +618,11 @@ defmodule LiveBook.Session.Data do
}
end
defp new_cell_info() do
defp new_cell_info(client_pids) do
%{
revision: 0,
deltas: [],
revision_by_client_pid: Map.new(client_pids, &{&1, 0}),
validity_status: :fresh,
evaluation_status: :ready,
evaluated_at: nil
@ -556,6 +649,15 @@ defmodule LiveBook.Session.Data do
set!(data_actions, cell_infos: cell_infos)
end
defp update_every_cell_info({data, _} = data_actions, fun) do
cell_infos =
Map.new(data.cell_infos, fn {cell_id, info} ->
{cell_id, fun.(info)}
end)
set!(data_actions, cell_infos: cell_infos)
end
defp set_section_info!(data_actions, section_id, changes) do
update_section_info!(data_actions, section_id, fn info ->
Enum.reduce(changes, info, fn {key, value}, info ->

View file

@ -266,6 +266,16 @@ defmodule LiveBookWeb.SessionLive do
{:noreply, socket}
end
def handle_event(
"report_cell_revision",
%{"cell_id" => cell_id, "revision" => revision},
socket
) do
Session.report_cell_revision(socket.assigns.session_id, self(), cell_id, revision)
{:noreply, socket}
end
def handle_event("focus_cell", %{"cell_id" => nil}, socket) do
{:noreply, focus_cell(socket, nil)}
end

View file

@ -48,7 +48,7 @@ defmodule LiveBook.Session.DataTest do
assert :error = Data.apply_operation(data, operation)
end
test "insert_cell adds new cell to notebook and session info" do
test "insert_cell adds new cell to notebook and cell info" do
data =
data_after_operations!([
{:insert_section, 0, "s1"}
@ -66,6 +66,23 @@ defmodule LiveBook.Session.DataTest do
cell_infos: %{"c1" => _}
}, []} = Data.apply_operation(data, operation)
end
test "initializes client-revision map" do
client_pid = self()
data =
data_after_operations!([
{:client_join, client_pid},
{:insert_section, 0, "s1"}
])
operation = {:insert_cell, "s1", 0, :elixir, "c1"}
assert {:ok,
%{
cell_infos: %{"c1" => %{revision_by_client_pid: %{^client_pid => 0}}}
}, []} = Data.apply_operation(data, operation)
end
end
describe "apply_operation/2 given :delete_section" do
@ -559,16 +576,120 @@ defmodule LiveBook.Session.DataTest do
end
end
describe "apply_operation/2 given :client_join" do
test "returns an error if the given process is already a client" do
data =
data_after_operations!([
{:client_join, self()}
])
operation = {:client_join, self()}
assert :error = Data.apply_operation(data, operation)
end
test "adds the given process to the client list" do
client_pid = self()
data = Data.new()
operation = {:client_join, client_pid}
assert {:ok, %{client_pids: [^client_pid]}, []} = Data.apply_operation(data, operation)
end
test "adds new entry to the cell revisions map for the client with the latest revision" do
client1_pid = IEx.Helpers.pid(0, 0, 0)
delta1 = Delta.new() |> Delta.insert("cats")
data =
data_after_operations!([
{:client_join, client1_pid},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"},
{:apply_cell_delta, client1_pid, "c1", delta1, 1}
])
client2_pid = IEx.Helpers.pid(0, 0, 1)
operation = {:client_join, client2_pid}
assert {:ok,
%{
cell_infos: %{"c1" => %{revision_by_client_pid: %{^client2_pid => 1}}}
}, _} = Data.apply_operation(data, operation)
end
end
describe "apply_operation/2 given :client_leave" do
test "returns an error if the given process is not a client" do
data = Data.new()
operation = {:client_leave, self()}
assert :error = Data.apply_operation(data, operation)
end
test "removes the given process from the client list" do
data =
data_after_operations!([
{:client_join, self()}
])
operation = {:client_leave, self()}
assert {:ok, %{client_pids: []}, []} = Data.apply_operation(data, operation)
end
test "removes an entry in the the cell revisions map for the client and purges deltas" do
client1_pid = IEx.Helpers.pid(0, 0, 0)
client2_pid = IEx.Helpers.pid(0, 0, 1)
delta1 = Delta.new() |> Delta.insert("cats")
data =
data_after_operations!([
{:client_join, client1_pid},
{:client_join, client2_pid},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"},
{:apply_cell_delta, client1_pid, "c1", delta1, 1}
])
operation = {:client_leave, client2_pid}
assert {:ok,
%{
cell_infos: %{
"c1" => %{deltas: [], revision_by_client_pid: revision_by_client_pid}
}
}, _} = Data.apply_operation(data, operation)
assert revision_by_client_pid == %{client1_pid => 1}
end
end
describe "apply_operation/2 given :apply_cell_delta" do
test "returns an error given invalid cell id" do
data = Data.new()
data =
data_after_operations!([
{:client_join, self()}
])
operation = {:apply_cell_delta, self(), "nonexistent", Delta.new(), 1}
assert :error = Data.apply_operation(data, operation)
end
test "returns an error given non-joined client pid" do
data =
data_after_operations!([
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"}
])
delta = Delta.new() |> Delta.insert("cats")
operation = {:apply_cell_delta, self(), "c1", delta, 1}
assert :error = Data.apply_operation(data, operation)
end
test "returns an error given invalid revision" do
data =
data_after_operations!([
{:client_join, self()},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"}
])
@ -582,6 +703,7 @@ defmodule LiveBook.Session.DataTest do
test "updates cell source according to the given delta" do
data =
data_after_operations!([
{:client_join, self()},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"}
])
@ -601,17 +723,22 @@ defmodule LiveBook.Session.DataTest do
end
test "transforms the delta if the revision is not the most recent" do
client1_pid = IEx.Helpers.pid(0, 0, 0)
client2_pid = IEx.Helpers.pid(0, 0, 1)
delta1 = Delta.new() |> Delta.insert("cats")
data =
data_after_operations!([
{:client_join, client1_pid},
{:client_join, client2_pid},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"},
{:apply_cell_delta, self(), "c1", delta1, 1}
{:apply_cell_delta, client1_pid, "c1", delta1, 1}
])
delta2 = Delta.new() |> Delta.insert("tea")
operation = {:apply_cell_delta, self(), "c1", delta2, 1}
operation = {:apply_cell_delta, client2_pid, "c1", delta2, 1}
assert {:ok,
%{
@ -625,24 +752,133 @@ defmodule LiveBook.Session.DataTest do
end
test "returns broadcast delta action with the transformed delta" do
client1_pid = IEx.Helpers.pid(0, 0, 0)
client2_pid = IEx.Helpers.pid(0, 0, 1)
delta1 = Delta.new() |> Delta.insert("cats")
data =
data_after_operations!([
{:client_join, client1_pid},
{:client_join, client2_pid},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"},
{:apply_cell_delta, self(), "c1", delta1, 1}
{:apply_cell_delta, client1_pid, "c1", delta1, 1}
])
delta2 = Delta.new() |> Delta.insert("tea")
operation = {:apply_cell_delta, self(), "c1", delta2, 1}
operation = {:apply_cell_delta, client2_pid, "c1", delta2, 1}
from = self()
transformed_delta2 = Delta.new() |> Delta.retain(4) |> Delta.insert("tea")
assert {:ok, _data, [{:broadcast_delta, ^from, _cell, ^transformed_delta2}]} =
assert {:ok, _data, [{:broadcast_delta, ^client2_pid, _cell, ^transformed_delta2}]} =
Data.apply_operation(data, operation)
end
test "given single client, does not keep deltas" do
client_pid = self()
data =
data_after_operations!([
{:client_join, client_pid},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"}
])
delta = Delta.new() |> Delta.insert("cats")
operation = {:apply_cell_delta, client_pid, "c1", delta, 1}
assert {:ok,
%{
cell_infos: %{"c1" => %{deltas: []}}
}, _} = Data.apply_operation(data, operation)
end
test "given multiple client, keeps the delta" do
client1_pid = IEx.Helpers.pid(0, 0, 0)
client2_pid = IEx.Helpers.pid(0, 0, 1)
data =
data_after_operations!([
{:client_join, client1_pid},
{:client_join, client2_pid},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"}
])
delta = Delta.new() |> Delta.insert("cats")
operation = {:apply_cell_delta, client1_pid, "c1", delta, 1}
assert {:ok,
%{
cell_infos: %{"c1" => %{deltas: [^delta]}}
}, _} = Data.apply_operation(data, operation)
end
end
describe "apply_operation/2 given :report_cell_revision" do
test "returns an error given invalid cell id" do
data =
data_after_operations!([
{:client_join, self()}
])
operation = {:report_cell_revision, self(), "nonexistent", 1}
assert :error = Data.apply_operation(data, operation)
end
test "returns an error given non-joined client pid" do
client1_pid = IEx.Helpers.pid(0, 0, 0)
client2_pid = IEx.Helpers.pid(0, 0, 1)
data =
data_after_operations!([
{:client_join, client1_pid},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"},
{:apply_cell_delta, client1_pid, "c1", Delta.new(insert: "cats"), 1}
])
operation = {:report_cell_revision, client2_pid, "c1", 1}
assert :error = Data.apply_operation(data, operation)
end
test "returns an error given invalid revision" do
data =
data_after_operations!([
{:client_join, self()},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"}
])
operation = {:report_cell_revision, self(), "c1", 1}
assert :error = Data.apply_operation(data, operation)
end
test "updates client entry in the revisions map and purges unnecessary deltas" do
client1_pid = IEx.Helpers.pid(0, 0, 0)
client2_pid = IEx.Helpers.pid(0, 0, 1)
delta1 = Delta.new() |> Delta.insert("cats")
data =
data_after_operations!([
{:client_join, client1_pid},
{:client_join, client2_pid},
{:insert_section, 0, "s1"},
{:insert_cell, "s1", 0, :elixir, "c1"},
{:apply_cell_delta, client1_pid, "c1", delta1, 1}
])
operation = {:report_cell_revision, client2_pid, "c1", 1}
assert {:ok,
%{
cell_infos: %{
"c1" => %{deltas: [], revision_by_client_pid: %{^client1_pid => 1, ^client2_pid => 1}}
}
}, _} = Data.apply_operation(data, operation)
end
end
describe "apply_operation/2 given :set_runtime" do