Use binary ids for session clients (#1333)

This commit is contained in:
Jonatan Kłosko 2022-08-05 14:43:41 +02:00 committed by GitHub
parent c68df08849
commit 218bce5a63
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 1414 additions and 1323 deletions

View file

@ -32,7 +32,7 @@ class LiveEditor {
this._onChange = [];
this._onBlur = [];
this._onCursorSelectionChange = [];
this._remoteUserByClientPid = {};
this._remoteUserByClientId = {};
const serverAdapter = new HookServerAdapter(hook, cellId, tag);
this.editorClient = new EditorClient(serverAdapter, revision);
@ -170,10 +170,10 @@ class LiveEditor {
updateUserSelection(client, selection) {
this._ensureMounted();
if (this._remoteUserByClientPid[client.pid]) {
this._remoteUserByClientPid[client.pid].update(selection);
if (this._remoteUserByClientId[client.id]) {
this._remoteUserByClientId[client.id].update(selection);
} else {
this._remoteUserByClientPid[client.pid] = new RemoteUser(
this._remoteUserByClientId[client.id] = new RemoteUser(
this.editor,
selection,
client.hex_color,
@ -188,9 +188,9 @@ class LiveEditor {
removeUserSelection(client) {
this._ensureMounted();
if (this._remoteUserByClientPid[client.pid]) {
this._remoteUserByClientPid[client.pid].dispose();
delete this._remoteUserByClientPid[client.pid];
if (this._remoteUserByClientId[client.id]) {
this._remoteUserByClientId[client.id].dispose();
delete this._remoteUserByClientId[client.id];
}
}

View file

@ -70,7 +70,7 @@ const JSView = {
this.initTimeout = setTimeout(() => this.handleInitTimeout(), 2_000);
this.channel = getChannel(this.props.sessionId);
this.channel = getChannel(this.props.sessionId, this.props.clientId);
this.removeIframe = this.createIframe();
@ -169,6 +169,7 @@ const JSView = {
jsPath: getAttributeOrThrow(this.el, "data-js-path"),
sessionToken: getAttributeOrThrow(this.el, "data-session-token"),
sessionId: getAttributeOrThrow(this.el, "data-session-id"),
clientId: getAttributeOrThrow(this.el, "data-client-id"),
iframePort: getAttributeOrThrow(
this.el,
"data-iframe-local-port",

View file

@ -11,10 +11,13 @@ let channel = null;
/**
* Returns channel used for all JS views in the current session.
*/
export function getChannel(sessionId) {
export function getChannel(sessionId, clientId) {
if (!channel) {
socket.connect();
channel = socket.channel("js_view", { session_id: sessionId });
channel = socket.channel("js_view", {
session_id: sessionId,
client_id: clientId,
});
channel.join();
}

View file

@ -61,7 +61,7 @@ import { isDirectlyEditable, isEvaluable } from "../lib/notebook";
* Initially we load basic information about connected clients using
* the `"session_init"` event and then update this information whenever
* clients join/leave/update. This way location reports include only
* client pid, as we already have the necessary hex_color/name locally.
* client id, as we already have the necessary hex_color/name locally.
*/
const Session = {
mounted() {
@ -72,8 +72,8 @@ const Session = {
this.codeZen = false;
this.keyBuffer = new KeyBuffer();
this.clientsMap = {};
this.lastLocationReportByClientPid = {};
this.followedClientPid = null;
this.lastLocationReportByClientId = {};
this.followedClientId = null;
setFavicon(this.faviconForEvaluationStatus(this.props.globalStatus));
@ -150,7 +150,7 @@ const Session = {
this.handleEvent("session_init", ({ clients }) => {
clients.forEach((client) => {
this.clientsMap[client.pid] = client;
this.clientsMap[client.id] = client;
});
});
@ -193,8 +193,8 @@ const Session = {
this.handleClientJoined(client);
});
this.handleEvent("client_left", ({ client_pid }) => {
this.handleClientLeft(client_pid);
this.handleEvent("client_left", ({ client_id }) => {
this.handleClientLeft(client_id);
});
this.handleEvent("clients_updated", ({ clients }) => {
@ -203,13 +203,13 @@ const Session = {
this.handleEvent(
"location_report",
({ client_pid, focusable_id, selection }) => {
({ client_id, focusable_id, selection }) => {
const report = {
focusableId: focusable_id,
selection: this.decodeSelection(selection),
};
this.handleLocationReport(client_pid, report);
this.handleLocationReport(client_id, report);
}
);
@ -557,27 +557,27 @@ const Session = {
const clientListItem = event.target.closest(`[data-el-clients-list-item]`);
if (clientListItem) {
const clientPid = clientListItem.getAttribute("data-client-pid");
const clientId = clientListItem.getAttribute("data-client-id");
const clientLink = event.target.closest(`[data-el-client-link]`);
if (clientLink) {
this.handleClientLinkClick(clientPid);
this.handleClientLinkClick(clientId);
}
const clientFollowToggle = event.target.closest(
`[data-el-client-follow-toggle]`
);
if (clientFollowToggle) {
this.handleClientFollowToggleClick(clientPid, clientListItem);
this.handleClientFollowToggleClick(clientId, clientListItem);
}
}
},
handleClientLinkClick(clientPid) {
this.mirrorClientFocus(clientPid);
handleClientLinkClick(clientId) {
this.mirrorClientFocus(clientId);
},
handleClientFollowToggleClick(clientPid, clientListItem) {
handleClientFollowToggleClick(clientId, clientListItem) {
const followedClientListItem = this.el.querySelector(
`[data-el-clients-list-item][data-js-followed]`
);
@ -586,17 +586,17 @@ const Session = {
followedClientListItem.removeAttribute("data-js-followed");
}
if (clientPid === this.followedClientPid) {
this.followedClientPid = null;
if (clientId === this.followedClientId) {
this.followedClientId = null;
} else {
clientListItem.setAttribute("data-js-followed", "");
this.followedClientPid = clientPid;
this.mirrorClientFocus(clientPid);
this.followedClientId = clientId;
this.mirrorClientFocus(clientId);
}
},
mirrorClientFocus(clientPid) {
const locationReport = this.lastLocationReportByClientPid[clientPid];
mirrorClientFocus(clientId) {
const locationReport = this.lastLocationReportByClientId[clientId];
if (locationReport && locationReport.focusableId) {
this.setFocusedEl(locationReport.focusableId);
@ -998,42 +998,42 @@ const Session = {
},
handleClientJoined(client) {
this.clientsMap[client.pid] = client;
this.clientsMap[client.id] = client;
},
handleClientLeft(clientPid) {
const client = this.clientsMap[clientPid];
handleClientLeft(clientId) {
const client = this.clientsMap[clientId];
if (client) {
delete this.clientsMap[clientPid];
delete this.clientsMap[clientId];
this.broadcastLocationReport(client, {
focusableId: null,
selection: null,
});
if (client.pid === this.followedClientPid) {
this.followedClientPid = null;
if (client.id === this.followedClientId) {
this.followedClientId = null;
}
}
},
handleClientsUpdated(updatedClients) {
updatedClients.forEach((client) => {
this.clientsMap[client.pid] = client;
this.clientsMap[client.id] = client;
});
},
handleLocationReport(clientPid, report) {
const client = this.clientsMap[clientPid];
handleLocationReport(clientId, report) {
const client = this.clientsMap[clientId];
this.lastLocationReportByClientPid[clientPid] = report;
this.lastLocationReportByClientId[clientId] = report;
if (client) {
this.broadcastLocationReport(client, report);
if (
client.pid === this.followedClientPid &&
client.id === this.followedClientId &&
report.focusableId !== this.focusedId
) {
this.setFocusedEl(report.focusableId);
@ -1224,7 +1224,7 @@ const Session = {
*
* @typedef Client
* @type {Object}
* @property {String} pid
* @property {String} id
* @property {String} hex_color
* @property {String} name
*/

View file

@ -57,6 +57,8 @@ defmodule Livebook.Session do
@timeout :infinity
@main_container_ref :main_flow
@client_id "__server__"
@anonymous_client_id "__anonymous__"
@type t :: %__MODULE__{
id: id(),
@ -134,10 +136,13 @@ defmodule Livebook.Session do
The client process is automatically unregistered when it terminates.
Returns the current session data, which the client can than
keep in sync with the server by subscribing to the `sessions:id` topic
and receiving operations to apply.
keep in sync with the server by subscribing to the `sessions:id`
topic and receiving operations to apply.
Also returns a unique client identifier representing the registered
client.
"""
@spec register_client(pid(), pid(), User.t()) :: Data.t()
@spec register_client(pid(), pid(), User.t()) :: {Data.t(), Data.client_id()}
def register_client(pid, client_pid, user) do
GenServer.call(pid, {:register_client, client_pid, user}, @timeout)
end
@ -566,6 +571,7 @@ defmodule Livebook.Session do
state = %{
session_id: id,
data: data,
client_pids_with_id: %{},
created_at: DateTime.utc_now(),
runtime_monitor_ref: nil,
autosave_timer_ref: nil,
@ -632,11 +638,18 @@ defmodule Livebook.Session do
end
def handle_call({:register_client, client_pid, user}, _from, state) do
Process.monitor(client_pid)
{state, client_id} =
if client_id = state.client_pids_with_id[client_pid] do
{state, client_id}
else
Process.monitor(client_pid)
client_id = Utils.random_id()
state = handle_operation(state, {:client_join, client_id, user})
state = put_in(state.client_pids_with_id[client_pid], client_id)
{state, client_id}
end
state = handle_operation(state, {:client_join, client_pid, user})
{:reply, state.data, state}
{:reply, {state.data, client_id}, state}
end
def handle_call(:get_data, _from, state) do
@ -678,12 +691,14 @@ defmodule Livebook.Session do
end
def handle_call({:disconnect_runtime, client_pid}, _from, state) do
client_id = client_id(state, client_pid)
state =
if Runtime.connected?(state.data.runtime) do
{:ok, runtime} = Runtime.disconnect(state.data.runtime)
%{state | runtime_monitor_ref: nil}
|> handle_operation({:set_runtime, client_pid, runtime})
|> handle_operation({:set_runtime, client_id, runtime})
else
state
end
@ -693,66 +708,79 @@ defmodule Livebook.Session do
@impl true
def handle_cast({:set_notebook_attributes, client_pid, attrs}, state) do
operation = {:set_notebook_attributes, client_pid, attrs}
client_id = client_id(state, client_pid)
operation = {:set_notebook_attributes, client_id, attrs}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:insert_section, client_pid, index}, state) do
client_id = client_id(state, client_pid)
# Include new id in the operation, so it's reproducible
operation = {:insert_section, client_pid, index, Utils.random_id()}
operation = {:insert_section, client_id, index, Utils.random_id()}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:insert_section_into, client_pid, section_id, index}, state) do
client_id = client_id(state, client_pid)
# Include new id in the operation, so it's reproducible
operation = {:insert_section_into, client_pid, section_id, index, Utils.random_id()}
operation = {:insert_section_into, client_id, section_id, index, Utils.random_id()}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:set_section_parent, client_pid, section_id, parent_id}, state) do
client_id = client_id(state, client_pid)
# Include new id in the operation, so it's reproducible
operation = {:set_section_parent, client_pid, section_id, parent_id}
operation = {:set_section_parent, client_id, section_id, parent_id}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:unset_section_parent, client_pid, section_id}, state) do
client_id = client_id(state, client_pid)
# Include new id in the operation, so it's reproducible
operation = {:unset_section_parent, client_pid, section_id}
operation = {:unset_section_parent, client_id, section_id}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:insert_cell, client_pid, section_id, index, type, attrs}, state) do
client_id = client_id(state, client_pid)
# Include new id in the operation, so it's reproducible
operation = {:insert_cell, client_pid, section_id, index, type, Utils.random_id(), attrs}
operation = {:insert_cell, client_id, section_id, index, type, Utils.random_id(), attrs}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:delete_section, client_pid, section_id, delete_cells}, state) do
operation = {:delete_section, client_pid, section_id, delete_cells}
client_id = client_id(state, client_pid)
operation = {:delete_section, client_id, section_id, delete_cells}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:delete_cell, client_pid, cell_id}, state) do
operation = {:delete_cell, client_pid, cell_id}
client_id = client_id(state, client_pid)
operation = {:delete_cell, client_id, cell_id}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:restore_cell, client_pid, cell_id}, state) do
operation = {:restore_cell, client_pid, cell_id}
client_id = client_id(state, client_pid)
operation = {:restore_cell, client_id, cell_id}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:move_cell, client_pid, cell_id, offset}, state) do
operation = {:move_cell, client_pid, cell_id, offset}
client_id = client_id(state, client_pid)
operation = {:move_cell, client_id, cell_id, offset}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:move_section, client_pid, section_id, offset}, state) do
operation = {:move_section, client_pid, section_id, offset}
client_id = client_id(state, client_pid)
operation = {:move_section, client_id, section_id, offset}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:convert_smart_cell, client_pid, cell_id}, state) do
client_id = client_id(state, client_pid)
state =
with {:ok, %Cell.Smart{} = cell, section} <-
Notebook.fetch_cell_and_section(state.data.notebook, cell_id) do
@ -761,9 +789,9 @@ defmodule Livebook.Session do
attrs = Map.take(cell, [:source, :outputs])
state
|> handle_operation({:delete_cell, client_pid, cell.id})
|> handle_operation({:delete_cell, client_id, cell.id})
|> handle_operation(
{:insert_cell, client_pid, section.id, index, :code, Utils.random_id(), attrs}
{:insert_cell, client_id, section.id, index, :code, Utils.random_id(), attrs}
)
else
_ -> state
@ -777,15 +805,18 @@ defmodule Livebook.Session do
end
def handle_cast({:queue_cell_evaluation, client_pid, cell_id}, state) do
operation = {:queue_cells_evaluation, client_pid, [cell_id]}
client_id = client_id(state, client_pid)
operation = {:queue_cells_evaluation, client_id, [cell_id]}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:queue_section_evaluation, client_pid, section_id}, state) do
client_id = client_id(state, client_pid)
case Notebook.fetch_section(state.data.notebook, section_id) do
{:ok, section} ->
cell_ids = for cell <- section.cells, Cell.evaluable?(cell), do: cell.id
operation = {:queue_cells_evaluation, client_pid, cell_ids}
operation = {:queue_cells_evaluation, client_id, cell_ids}
{:noreply, handle_operation(state, operation)}
:error ->
@ -794,69 +825,85 @@ defmodule Livebook.Session do
end
def handle_cast({:queue_bound_cells_evaluation, client_pid, input_id}, state) do
client_id = client_id(state, client_pid)
cell_ids =
for {bound_cell, _} <- Data.bound_cells_with_section(state.data, input_id),
do: bound_cell.id
operation = {:queue_cells_evaluation, client_pid, cell_ids}
operation = {:queue_cells_evaluation, client_id, cell_ids}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:queue_full_evaluation, client_pid, forced_cell_ids}, state) do
client_id = client_id(state, client_pid)
cell_ids = Data.cell_ids_for_full_evaluation(state.data, forced_cell_ids)
operation = {:queue_cells_evaluation, client_pid, cell_ids}
operation = {:queue_cells_evaluation, client_id, cell_ids}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:queue_cells_reevaluation, client_pid}, state) do
client_id = client_id(state, client_pid)
cell_ids = Data.cell_ids_for_reevaluation(state.data)
operation = {:queue_cells_evaluation, client_pid, cell_ids}
operation = {:queue_cells_evaluation, client_id, cell_ids}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:cancel_cell_evaluation, client_pid, cell_id}, state) do
operation = {:cancel_cell_evaluation, client_pid, cell_id}
client_id = client_id(state, client_pid)
operation = {:cancel_cell_evaluation, client_id, cell_id}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:erase_outputs, client_pid}, state) do
operation = {:erase_outputs, client_pid}
client_id = client_id(state, client_pid)
operation = {:erase_outputs, client_id}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:set_notebook_name, client_pid, name}, state) do
operation = {:set_notebook_name, client_pid, name}
client_id = client_id(state, client_pid)
operation = {:set_notebook_name, client_id, name}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:set_section_name, client_pid, section_id, name}, state) do
operation = {:set_section_name, client_pid, section_id, name}
client_id = client_id(state, client_pid)
operation = {:set_section_name, client_id, section_id, name}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:apply_cell_delta, client_pid, cell_id, tag, delta, revision}, state) do
operation = {:apply_cell_delta, client_pid, cell_id, tag, delta, revision}
client_id = client_id(state, client_pid)
operation = {:apply_cell_delta, client_id, cell_id, tag, delta, revision}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:report_cell_revision, client_pid, cell_id, tag, revision}, state) do
operation = {:report_cell_revision, client_pid, cell_id, tag, revision}
client_id = client_id(state, client_pid)
operation = {:report_cell_revision, client_id, cell_id, tag, revision}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:set_cell_attributes, client_pid, cell_id, attrs}, state) do
operation = {:set_cell_attributes, client_pid, cell_id, attrs}
client_id = client_id(state, client_pid)
operation = {:set_cell_attributes, client_id, cell_id, attrs}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:set_input_value, client_pid, input_id, value}, state) do
operation = {:set_input_value, client_pid, input_id, value}
client_id = client_id(state, client_pid)
operation = {:set_input_value, client_id, input_id, value}
{:noreply, handle_operation(state, operation)}
end
def handle_cast({:set_runtime, client_pid, runtime}, state) do
client_id = client_id(state, client_pid)
if Runtime.connected?(state.data.runtime) do
{:ok, _} = Runtime.disconnect(state.data.runtime)
end
@ -868,10 +915,12 @@ defmodule Livebook.Session do
state
end
{:noreply, handle_operation(state, {:set_runtime, client_pid, runtime})}
{:noreply, handle_operation(state, {:set_runtime, client_id, runtime})}
end
def handle_cast({:set_file, client_pid, file}, state) do
client_id = client_id(state, client_pid)
if file do
FileGuard.lock(file, self())
else
@ -883,7 +932,7 @@ defmodule Livebook.Session do
FileGuard.unlock(state.data.file)
end
{:noreply, handle_operation(state, {:set_file, client_pid, file})}
{:noreply, handle_operation(state, {:set_file, client_id, file})}
{:error, :already_in_use} ->
broadcast_error(state.session_id, "failed to set new file because it is already in use")
@ -901,13 +950,15 @@ defmodule Livebook.Session do
{:noreply,
%{state | runtime_monitor_ref: nil}
|> handle_operation({:set_runtime, self(), Livebook.Runtime.duplicate(state.data.runtime)})}
|> handle_operation(
{:set_runtime, @client_id, Livebook.Runtime.duplicate(state.data.runtime)}
)}
end
def handle_info({:DOWN, _, :process, pid, _}, state) do
state =
if Map.has_key?(state.data.clients_map, pid) do
handle_operation(state, {:client_leave, pid})
if client_id = state.client_pids_with_id[pid] do
handle_operation(state, {:client_leave, client_id})
else
state
end
@ -916,13 +967,13 @@ defmodule Livebook.Session do
end
def handle_info({:runtime_evaluation_output, cell_id, output}, state) do
operation = {:add_cell_evaluation_output, self(), cell_id, output}
operation = {:add_cell_evaluation_output, @client_id, cell_id, output}
{:noreply, handle_operation(state, operation)}
end
def handle_info({:runtime_evaluation_response, cell_id, response, metadata}, state) do
{memory_usage, metadata} = Map.pop(metadata, :memory_usage)
operation = {:add_cell_evaluation_response, self(), cell_id, response, metadata}
operation = {:add_cell_evaluation_response, @client_id, cell_id, response, metadata}
{:noreply,
state
@ -935,7 +986,7 @@ defmodule Livebook.Session do
{reply, state} =
with {:ok, cell, _section} <- Notebook.fetch_cell_and_section(state.data.notebook, cell_id),
{:ok, value} <- Map.fetch(state.data.input_values, input_id) do
state = handle_operation(state, {:bind_input, self(), cell.id, input_id})
state = handle_operation(state, {:bind_input, @client_id, cell.id, input_id})
{{:ok, value}, state}
else
_ -> {:error, state}
@ -951,8 +1002,8 @@ defmodule Livebook.Session do
operation =
case container_ref do
@main_container_ref -> {:reflect_main_evaluation_failure, self()}
section_id -> {:reflect_evaluation_failure, self(), section_id}
@main_container_ref -> {:reflect_main_evaluation_failure, @client_id}
section_id -> {:reflect_evaluation_failure, @client_id, section_id}
end
{:noreply, handle_operation(state, operation)}
@ -977,7 +1028,7 @@ defmodule Livebook.Session do
end
def handle_info({:runtime_smart_cell_definitions, definitions}, state) do
operation = {:set_smart_cell_definitions, self(), definitions}
operation = {:set_smart_cell_definitions, @client_id, definitions}
{:noreply, handle_operation(state, operation)}
end
@ -985,7 +1036,7 @@ defmodule Livebook.Session do
case Notebook.fetch_cell_and_section(state.data.notebook, id) do
{:ok, cell, _section} ->
delta = Livebook.JSInterop.diff(cell.source, info.source)
operation = {:smart_cell_started, self(), id, delta, info.js_view, info.editor}
operation = {:smart_cell_started, @client_id, id, delta, info.js_view, info.editor}
{:noreply, handle_operation(state, operation)}
:error ->
@ -997,7 +1048,7 @@ defmodule Livebook.Session do
case Notebook.fetch_cell_and_section(state.data.notebook, id) do
{:ok, cell, _section} ->
delta = Livebook.JSInterop.diff(cell.source, source)
operation = {:update_smart_cell, self(), id, attrs, delta, info.reevaluate}
operation = {:update_smart_cell, @client_id, id, attrs, delta, info.reevaluate}
{:noreply, handle_operation(state, operation)}
:error ->
@ -1027,6 +1078,10 @@ defmodule Livebook.Session do
# ---
defp client_id(state, client_pid) do
state.client_pids_with_id[client_pid] || @anonymous_client_id
end
defp self_from_state(state) do
%__MODULE__{
id: state.session_id,
@ -1177,7 +1232,7 @@ defmodule Livebook.Session do
handle_operation(
state,
{:apply_cell_delta, self(), cell.id, :primary, delta, revision}
{:apply_cell_delta, @client_id, cell.id, :primary, delta, revision}
)
{:error, message} ->
@ -1215,11 +1270,11 @@ defmodule Livebook.Session do
end
end
defp after_operation(state, _prev_state, {:set_notebook_name, _pid, _name}) do
defp after_operation(state, _prev_state, {:set_notebook_name, _client_id, _name}) do
notify_update(state)
end
defp after_operation(state, _prev_state, {:set_runtime, _pid, runtime}) do
defp after_operation(state, _prev_state, {:set_runtime, _client_id, runtime}) do
if Runtime.connected?(runtime) do
state
else
@ -1229,7 +1284,7 @@ defmodule Livebook.Session do
end
end
defp after_operation(state, prev_state, {:set_file, _pid, _file}) do
defp after_operation(state, prev_state, {:set_file, _client_id, _file}) do
prev_images_dir = images_dir_from_state(prev_state)
if prev_state.data.file do
@ -1251,14 +1306,14 @@ defmodule Livebook.Session do
defp after_operation(
state,
_prev_state,
{:set_notebook_attributes, _client_pid, %{autosave_interval_s: _}}
{:set_notebook_attributes, _client_id, %{autosave_interval_s: _}}
) do
state
|> unschedule_autosave()
|> schedule_autosave()
end
defp after_operation(state, prev_state, {:client_join, _client_pid, user}) do
defp after_operation(state, prev_state, {:client_join, _client_id, user}) do
unless Map.has_key?(prev_state.data.users_map, user.id) do
Livebook.Users.subscribe(user.id)
end
@ -1266,8 +1321,8 @@ defmodule Livebook.Session do
state
end
defp after_operation(state, prev_state, {:client_leave, client_pid}) do
user_id = prev_state.data.clients_map[client_pid]
defp after_operation(state, prev_state, {:client_leave, client_id}) do
user_id = prev_state.data.clients_map[client_id]
unless Map.has_key?(state.data.users_map, user_id) do
Livebook.Users.unsubscribe(user_id)
@ -1276,7 +1331,7 @@ defmodule Livebook.Session do
state
end
defp after_operation(state, _prev_state, {:delete_cell, _client_pid, cell_id}) do
defp after_operation(state, _prev_state, {:delete_cell, _client_id, cell_id}) do
entry = Enum.find(state.data.bin_entries, fn entry -> entry.cell.id == cell_id end)
# The session LV drops cell's source, so we send them
# the complete bin entry to override
@ -1285,7 +1340,7 @@ defmodule Livebook.Session do
state
end
defp after_operation(state, prev_state, {:delete_section, _client_pid, section_id, true}) do
defp after_operation(state, prev_state, {:delete_section, _client_id, section_id, true}) do
{:ok, section} = Notebook.fetch_section(prev_state.data.notebook, section_id)
cell_ids = Enum.map(section.cells, & &1.id)
entries = Enum.filter(state.data.bin_entries, fn entry -> entry.cell.id in cell_ids end)
@ -1297,7 +1352,7 @@ defmodule Livebook.Session do
defp after_operation(
state,
_prev_state,
{:apply_cell_delta, _client_pid, cell_id, tag, _delta, _revision}
{:apply_cell_delta, _client_id, cell_id, tag, _delta, _revision}
) do
with :secondary <- tag,
{:ok, %Cell.Smart{} = cell, _section} <-
@ -1318,11 +1373,11 @@ defmodule Livebook.Session do
case Runtime.connect(state.data.runtime) do
{:ok, runtime} ->
state = own_runtime(runtime, state)
handle_operation(state, {:set_runtime, self(), runtime})
handle_operation(state, {:set_runtime, @client_id, runtime})
{:error, error} ->
broadcast_error(state.session_id, "failed to connect runtime - #{error}")
handle_operation(state, {:set_runtime, self(), state.data.runtime})
handle_operation(state, {:set_runtime, @client_id, state.data.runtime})
end
end
@ -1417,7 +1472,7 @@ defmodule Livebook.Session do
Runtime.evaluate_code(state.data.runtime, cell.source, locator, base_locator, opts)
evaluation_digest = :erlang.md5(cell.source)
handle_operation(state, {:evaluation_started, self(), cell.id, evaluation_digest})
handle_operation(state, {:evaluation_started, @client_id, cell.id, evaluation_digest})
end
defp broadcast_operation(session_id, operation) do
@ -1540,7 +1595,7 @@ defmodule Livebook.Session do
case result do
:ok ->
handle_operation(state, {:mark_as_not_dirty, self()})
handle_operation(state, {:mark_as_not_dirty, @client_id})
{:error, message} ->
broadcast_error(state.session_id, "failed to save notebook - #{message}")

View file

@ -49,7 +49,7 @@ defmodule Livebook.Session.Data do
bin_entries: list(cell_bin_entry()),
runtime: Runtime.t(),
smart_cell_definitions: list(Runtime.smart_cell_definition()),
clients_map: %{pid() => User.id()},
clients_map: %{client_id() => User.id()},
users_map: %{User.id() => User.t()}
}
@ -80,7 +80,7 @@ defmodule Livebook.Session.Data do
@type cell_source_info :: %{
revision: cell_revision(),
deltas: list(Delta.t()),
revision_by_client_pid: %{pid() => cell_revision()}
revision_by_client_id: %{client_id() => cell_revision()}
}
@type cell_eval_info :: %{
@ -114,7 +114,9 @@ defmodule Livebook.Session.Data do
@type input_id :: String.t()
@type client :: {User.id(), pid()}
@type client :: {User.id(), client_id()}
@type client_id :: Livebook.Utils.id()
@type index :: non_neg_integer()
@ -138,52 +140,53 @@ defmodule Livebook.Session.Data do
@type input_reading :: {input_id(), input_value :: term()}
# Note that all operations carry the pid of whichever process
# Note that all operations carry the id of whichever client
# originated the operation. Some operations like :apply_cell_delta
# and :report_cell_revision require the pid to be a registered
# and :report_cell_revision require the id to be a registered
# client, as in these cases it's necessary for the operation to
# be properly applied. For other operations the pid can represent
# be properly applied. For other operations the id can represent
# an arbitrary process and is passed for informative purposes only.
@type operation ::
{:set_notebook_attributes, pid(), map()}
| {:insert_section, pid(), index(), Section.id()}
| {:insert_section_into, pid(), Section.id(), index(), Section.id()}
| {:set_section_parent, pid(), Section.id(), parent_id :: Section.id()}
| {:unset_section_parent, pid(), Section.id()}
| {:insert_cell, pid(), Section.id(), index(), Cell.type(), Cell.id(), map()}
| {:delete_section, pid(), Section.id(), delete_cells :: boolean()}
| {:delete_cell, pid(), Cell.id()}
| {:restore_cell, pid(), Cell.id()}
| {:move_cell, pid(), Cell.id(), offset :: integer()}
| {:move_section, pid(), Section.id(), offset :: integer()}
| {:queue_cells_evaluation, pid(), list(Cell.id())}
| {:evaluation_started, pid(), Cell.id(), binary()}
| {:add_cell_evaluation_output, pid(), Cell.id(), term()}
| {:add_cell_evaluation_response, pid(), Cell.id(), term(), metadata :: map()}
| {:bind_input, pid(), code_cell_id :: Cell.id(), input_id()}
| {:reflect_main_evaluation_failure, pid()}
| {:reflect_evaluation_failure, pid(), Section.id()}
| {:cancel_cell_evaluation, pid(), Cell.id()}
| {:smart_cell_started, pid(), Cell.id(), Delta.t(), Runtime.js_view(),
{:set_notebook_attributes, client_id(), map()}
| {:insert_section, client_id(), index(), Section.id()}
| {:insert_section_into, client_id(), Section.id(), index(), Section.id()}
| {:set_section_parent, client_id(), Section.id(), parent_id :: Section.id()}
| {:unset_section_parent, client_id(), Section.id()}
| {:insert_cell, client_id(), Section.id(), index(), Cell.type(), Cell.id(), map()}
| {:delete_section, client_id(), Section.id(), delete_cells :: boolean()}
| {:delete_cell, client_id(), Cell.id()}
| {:restore_cell, client_id(), Cell.id()}
| {:move_cell, client_id(), Cell.id(), offset :: integer()}
| {:move_section, client_id(), Section.id(), offset :: integer()}
| {:queue_cells_evaluation, client_id(), list(Cell.id())}
| {:evaluation_started, client_id(), Cell.id(), binary()}
| {:add_cell_evaluation_output, client_id(), Cell.id(), term()}
| {:add_cell_evaluation_response, client_id(), Cell.id(), term(), metadata :: map()}
| {:bind_input, client_id(), code_cell_id :: Cell.id(), input_id()}
| {:reflect_main_evaluation_failure, client_id()}
| {:reflect_evaluation_failure, client_id(), Section.id()}
| {:cancel_cell_evaluation, client_id(), Cell.id()}
| {:smart_cell_started, client_id(), Cell.id(), Delta.t(), Runtime.js_view(),
Cell.Smart.editor() | nil}
| {:update_smart_cell, pid(), Cell.id(), Cell.Smart.attrs(), Delta.t(),
| {:update_smart_cell, client_id(), Cell.id(), Cell.Smart.attrs(), Delta.t(),
reevaluate :: boolean()}
| {:erase_outputs, pid()}
| {:set_notebook_name, pid(), String.t()}
| {:set_section_name, pid(), Section.id(), String.t()}
| {:client_join, pid(), User.t()}
| {:client_leave, pid()}
| {:update_user, pid(), User.t()}
| {:apply_cell_delta, pid(), Cell.id(), cell_source_tag(), Delta.t(), cell_revision()}
| {:report_cell_revision, pid(), Cell.id(), cell_source_tag(), cell_revision()}
| {:set_cell_attributes, pid(), Cell.id(), map()}
| {:set_input_value, pid(), input_id(), value :: term()}
| {:set_runtime, pid(), Runtime.t()}
| {:set_smart_cell_definitions, pid(), list(Runtime.smart_cell_definition())}
| {:set_file, pid(), FileSystem.File.t() | nil}
| {:set_autosave_interval, pid(), non_neg_integer() | nil}
| {:mark_as_not_dirty, pid()}
| {:erase_outputs, client_id()}
| {:set_notebook_name, client_id(), String.t()}
| {:set_section_name, client_id(), Section.id(), String.t()}
| {:client_join, client_id(), User.t()}
| {:client_leave, client_id()}
| {:update_user, client_id(), User.t()}
| {:apply_cell_delta, client_id(), Cell.id(), cell_source_tag(), Delta.t(),
cell_revision()}
| {:report_cell_revision, client_id(), Cell.id(), cell_source_tag(), cell_revision()}
| {:set_cell_attributes, client_id(), Cell.id(), map()}
| {:set_input_value, client_id(), input_id(), value :: term()}
| {:set_runtime, client_id(), Runtime.t()}
| {:set_smart_cell_definitions, client_id(), list(Runtime.smart_cell_definition())}
| {:set_file, client_id(), FileSystem.File.t() | nil}
| {:set_autosave_interval, client_id(), non_neg_integer() | nil}
| {:mark_as_not_dirty, client_id()}
@type action ::
:connect_runtime
@ -192,7 +195,7 @@ defmodule Livebook.Session.Data do
| {:forget_evaluation, Cell.t(), Section.t()}
| {:start_smart_cell, Cell.t(), Section.t()}
| {:set_smart_cell_base, Cell.t(), Section.t(), parent :: {Cell.t(), Section.t()} | nil}
| {:broadcast_delta, pid(), Cell.t(), cell_source_tag(), Delta.t()}
| {:broadcast_delta, client_id(), Cell.t(), cell_source_tag(), Delta.t()}
@doc """
Returns a fresh notebook session state.
@ -273,7 +276,7 @@ defmodule Livebook.Session.Data do
@spec apply_operation(t(), operation()) :: {:ok, t(), list(action())} | :error
def apply_operation(data, operation)
def apply_operation(data, {:set_notebook_attributes, _client_pid, attrs}) do
def apply_operation(data, {:set_notebook_attributes, _client_id, attrs}) do
with true <- valid_attrs_for?(data.notebook, attrs) do
data
|> with_actions()
@ -285,7 +288,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:insert_section, _client_pid, index, id}) do
def apply_operation(data, {:insert_section, _client_id, index, id}) do
section = %{Section.new() | id: id}
data
@ -295,7 +298,7 @@ defmodule Livebook.Session.Data do
|> wrap_ok()
end
def apply_operation(data, {:insert_section_into, _client_pid, section_id, index, id}) do
def apply_operation(data, {:insert_section_into, _client_id, section_id, index, id}) do
with {:ok, _section} <- Notebook.fetch_section(data.notebook, section_id) do
section = %{Section.new() | id: id}
@ -307,7 +310,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:set_section_parent, _client_pid, section_id, parent_id}) do
def apply_operation(data, {:set_section_parent, _client_id, section_id, parent_id}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, section_id),
{:ok, parent_section} <- Notebook.fetch_section(data.notebook, parent_id),
true <- section.parent_id != parent_id,
@ -325,7 +328,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:unset_section_parent, _client_pid, section_id}) do
def apply_operation(data, {:unset_section_parent, _client_id, section_id}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, section_id),
true <- section.parent_id != nil do
data
@ -341,7 +344,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:insert_cell, _client_pid, section_id, index, type, id, attrs}) do
def apply_operation(data, {:insert_cell, _client_id, section_id, index, type, id, attrs}) do
with {:ok, _section} <- Notebook.fetch_section(data.notebook, section_id) do
cell = %{Cell.new(type) | id: id} |> Map.merge(attrs)
@ -355,7 +358,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:delete_section, _client_pid, id, delete_cells}) do
def apply_operation(data, {:delete_section, _client_id, id, delete_cells}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, id),
true <- section != hd(data.notebook.sections) or delete_cells,
[] <- Notebook.child_sections(data.notebook, section.id) do
@ -371,7 +374,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:delete_cell, _client_pid, id}) do
def apply_operation(data, {:delete_cell, _client_id, id}) do
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id),
false <- Cell.setup?(cell) do
data
@ -386,7 +389,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:restore_cell, _client_pid, id}) do
def apply_operation(data, {:restore_cell, _client_id, id}) do
with {:ok, cell_bin_entry} <- fetch_cell_bin_entry(data, id),
true <- data.notebook.sections != [] do
data
@ -401,7 +404,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:move_cell, _client_pid, id, offset}) do
def apply_operation(data, {:move_cell, _client_id, id, offset}) do
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id),
false <- Cell.setup?(cell),
true <- offset != 0,
@ -418,7 +421,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:move_section, _client_pid, id, offset}) do
def apply_operation(data, {:move_section, _client_id, id, offset}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, id),
true <- offset != 0,
true <- Notebook.can_move_section_by?(data.notebook, section, offset) do
@ -434,7 +437,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:queue_cells_evaluation, _client_pid, cell_ids}) do
def apply_operation(data, {:queue_cells_evaluation, _client_id, cell_ids}) do
cells_with_section =
data.notebook
|> Notebook.evaluable_cells_with_section()
@ -459,7 +462,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:evaluation_started, _client_pid, id, evaluation_digest}) do
def apply_operation(data, {:evaluation_started, _client_id, id, evaluation_digest}) do
with {:ok, cell, _section} <- Notebook.fetch_cell_and_section(data.notebook, id),
Cell.evaluable?(cell),
:evaluating <- data.cell_infos[cell.id].eval.status do
@ -472,7 +475,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:add_cell_evaluation_output, _client_pid, id, output}) do
def apply_operation(data, {:add_cell_evaluation_output, _client_id, id, output}) do
with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, id) do
data
|> with_actions()
@ -485,7 +488,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:add_cell_evaluation_response, _client_pid, id, output, metadata}) do
def apply_operation(data, {:add_cell_evaluation_response, _client_id, id, output, metadata}) do
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id),
:evaluating <- data.cell_infos[cell.id].eval.status do
data
@ -504,7 +507,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:bind_input, _client_pid, cell_id, input_id}) do
def apply_operation(data, {:bind_input, _client_id, cell_id, input_id}) do
with {:ok, cell, _section} <- Notebook.fetch_cell_and_section(data.notebook, cell_id),
Cell.evaluable?(cell),
true <- Map.has_key?(data.input_values, input_id),
@ -518,7 +521,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:reflect_main_evaluation_failure, _client_pid}) do
def apply_operation(data, {:reflect_main_evaluation_failure, _client_id}) do
data
|> with_actions()
|> clear_main_evaluation()
@ -526,7 +529,7 @@ defmodule Livebook.Session.Data do
|> wrap_ok()
end
def apply_operation(data, {:reflect_evaluation_failure, _client_pid, section_id}) do
def apply_operation(data, {:reflect_evaluation_failure, _client_id, section_id}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, section_id) do
data
|> with_actions()
@ -536,7 +539,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:cancel_cell_evaluation, _client_pid, id}) do
def apply_operation(data, {:cancel_cell_evaluation, _client_id, id}) do
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id),
true <- data.cell_infos[cell.id].eval.status in [:evaluating, :queued] do
data
@ -549,13 +552,13 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:smart_cell_started, client_pid, id, delta, js_view, editor}) do
def apply_operation(data, {:smart_cell_started, client_id, id, delta, js_view, editor}) do
with {:ok, %Cell.Smart{} = cell, _section} <-
Notebook.fetch_cell_and_section(data.notebook, id),
:starting <- data.cell_infos[cell.id].status do
data
|> with_actions()
|> smart_cell_started(cell, client_pid, delta, js_view, editor)
|> smart_cell_started(cell, client_id, delta, js_view, editor)
|> set_dirty()
|> wrap_ok()
else
@ -563,12 +566,12 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:update_smart_cell, client_pid, id, attrs, delta, reevaluate}) do
def apply_operation(data, {:update_smart_cell, client_id, id, attrs, delta, reevaluate}) do
with {:ok, %Cell.Smart{} = cell, section} <-
Notebook.fetch_cell_and_section(data.notebook, id) do
data
|> with_actions()
|> update_smart_cell(cell, client_pid, attrs, delta)
|> update_smart_cell(cell, client_id, attrs, delta)
|> maybe_queue_updated_smart_cell(cell, section, reevaluate)
|> set_dirty()
|> wrap_ok()
@ -577,7 +580,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:erase_outputs, _client_pid}) do
def apply_operation(data, {:erase_outputs, _client_id}) do
data
|> with_actions()
|> erase_outputs()
@ -586,7 +589,7 @@ defmodule Livebook.Session.Data do
|> wrap_ok()
end
def apply_operation(data, {:set_notebook_name, _client_pid, name}) do
def apply_operation(data, {:set_notebook_name, _client_id, name}) do
data
|> with_actions()
|> set_notebook_name(name)
@ -594,7 +597,7 @@ defmodule Livebook.Session.Data do
|> wrap_ok()
end
def apply_operation(data, {:set_section_name, _client_pid, section_id, name}) do
def apply_operation(data, {:set_section_name, _client_id, section_id, name}) do
with {:ok, section} <- Notebook.fetch_section(data.notebook, section_id) do
data
|> with_actions()
@ -604,29 +607,29 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:client_join, client_pid, user}) do
with false <- Map.has_key?(data.clients_map, client_pid) do
def apply_operation(data, {:client_join, client_id, user}) do
with false <- Map.has_key?(data.clients_map, client_id) do
data
|> with_actions()
|> client_join(client_pid, user)
|> client_join(client_id, user)
|> wrap_ok()
else
_ -> :error
end
end
def apply_operation(data, {:client_leave, client_pid}) do
with true <- Map.has_key?(data.clients_map, client_pid) do
def apply_operation(data, {:client_leave, client_id}) do
with true <- Map.has_key?(data.clients_map, client_id) do
data
|> with_actions()
|> client_leave(client_pid)
|> client_leave(client_id)
|> wrap_ok()
else
_ -> :error
end
end
def apply_operation(data, {:update_user, _client_pid, user}) do
def apply_operation(data, {:update_user, _client_id, user}) do
with true <- Map.has_key?(data.users_map, user.id) do
data
|> with_actions()
@ -637,7 +640,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:apply_cell_delta, client_pid, cell_id, tag, delta, revision}) do
def apply_operation(data, {:apply_cell_delta, client_id, cell_id, tag, delta, revision}) do
with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, cell_id),
source_info <- data.cell_infos[cell_id].sources[tag],
true <- 0 < revision and revision <= source_info.revision + 1,
@ -646,11 +649,11 @@ defmodule Livebook.Session.Data do
# in which case no transformation is necessary. The latter is
# useful when we want to apply changes programatically
true <-
Map.has_key?(data.clients_map, client_pid) or
Map.has_key?(data.clients_map, client_id) or
revision == source_info.revision + 1 do
data
|> with_actions()
|> apply_delta(client_pid, cell, tag, delta, revision)
|> apply_delta(client_id, cell, tag, delta, revision)
|> set_dirty()
|> wrap_ok()
else
@ -658,21 +661,21 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:report_cell_revision, client_pid, cell_id, tag, revision}) do
def apply_operation(data, {:report_cell_revision, client_id, cell_id, tag, revision}) do
with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, cell_id),
source_info <- data.cell_infos[cell_id].sources[tag],
true <- 0 < revision and revision <= source_info.revision,
true <- Map.has_key?(data.clients_map, client_pid) do
true <- Map.has_key?(data.clients_map, client_id) do
data
|> with_actions()
|> report_revision(client_pid, cell, tag, revision)
|> report_revision(client_id, cell, tag, revision)
|> wrap_ok()
else
_ -> :error
end
end
def apply_operation(data, {:set_cell_attributes, _client_pid, cell_id, attrs}) do
def apply_operation(data, {:set_cell_attributes, _client_id, cell_id, attrs}) do
with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, cell_id),
true <- valid_attrs_for?(cell, attrs) do
data
@ -686,7 +689,7 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:set_input_value, _client_pid, input_id, value}) do
def apply_operation(data, {:set_input_value, _client_id, input_id, value}) do
with true <- Map.has_key?(data.input_values, input_id) do
data
|> with_actions()
@ -698,21 +701,21 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:set_runtime, _client_pid, runtime}) do
def apply_operation(data, {:set_runtime, _client_id, runtime}) do
data
|> with_actions()
|> set_runtime(data, runtime)
|> wrap_ok()
end
def apply_operation(data, {:set_smart_cell_definitions, _client_pid, definitions}) do
def apply_operation(data, {:set_smart_cell_definitions, _client_id, definitions}) do
data
|> with_actions()
|> set_smart_cell_definitions(definitions)
|> wrap_ok()
end
def apply_operation(data, {:set_file, _client_pid, file}) do
def apply_operation(data, {:set_file, _client_id, file}) do
data
|> with_actions()
|> set!(file: file)
@ -720,7 +723,7 @@ defmodule Livebook.Session.Data do
|> wrap_ok()
end
def apply_operation(data, {:mark_as_not_dirty, _client_pid}) do
def apply_operation(data, {:mark_as_not_dirty, _client_id}) do
data
|> with_actions()
|> set_dirty(false)
@ -1247,7 +1250,7 @@ defmodule Livebook.Session.Data do
end
end
defp smart_cell_started({data, _} = data_actions, cell, client_pid, delta, js_view, editor) do
defp smart_cell_started({data, _} = data_actions, cell, client_id, delta, js_view, editor) do
updated_cell = %{cell | js_view: js_view, editor: editor} |> apply_delta_to_cell(delta)
data_actions
@ -1257,10 +1260,10 @@ defmodule Livebook.Session.Data do
info = %{info | status: :started}
put_in(info.sources.secondary, new_source_info(data.clients_map))
end)
|> add_action({:broadcast_delta, client_pid, updated_cell, :primary, delta})
|> add_action({:broadcast_delta, client_id, updated_cell, :primary, delta})
end
defp update_smart_cell({data, _} = data_actions, cell, client_pid, attrs, delta) do
defp update_smart_cell({data, _} = data_actions, cell, client_id, attrs, delta) do
new_attrs =
case cell.attrs do
:__pruned__ -> :__pruned__
@ -1271,7 +1274,7 @@ defmodule Livebook.Session.Data do
data_actions
|> set!(notebook: Notebook.update_cell(data.notebook, cell.id, fn _ -> updated_cell end))
|> add_action({:broadcast_delta, client_pid, updated_cell, :primary, delta})
|> add_action({:broadcast_delta, client_id, updated_cell, :primary, delta})
end
defp maybe_queue_updated_smart_cell({data, _} = data_actions, cell, section, reevaluate) do
@ -1319,10 +1322,10 @@ defmodule Livebook.Session.Data do
|> set!(notebook: Notebook.update_section(data.notebook, section.id, &%{&1 | name: name}))
end
defp client_join({data, _} = data_actions, client_pid, user) do
defp client_join({data, _} = data_actions, client_id, user) do
data_actions
|> set!(
clients_map: Map.put(data.clients_map, client_pid, user.id),
clients_map: Map.put(data.clients_map, client_id, user.id),
users_map: Map.put(data.users_map, user.id, user)
)
|> update_every_cell_info(fn
@ -1330,7 +1333,7 @@ defmodule Livebook.Session.Data do
update_in(
info.sources,
&Map.new(&1, fn {key, source_info} ->
{key, put_in(source_info.revision_by_client_pid[client_pid], source_info.revision)}
{key, put_in(source_info.revision_by_client_id[client_id], source_info.revision)}
end)
)
@ -1339,8 +1342,8 @@ defmodule Livebook.Session.Data do
end)
end
defp client_leave({data, _} = data_actions, client_pid) do
{user_id, clients_map} = Map.pop(data.clients_map, client_pid)
defp client_leave({data, _} = data_actions, client_id) do
{user_id, clients_map} = Map.pop(data.clients_map, client_id)
users_map =
if user_id in Map.values(clients_map) do
@ -1356,7 +1359,7 @@ defmodule Livebook.Session.Data do
update_in(
info.sources,
&Map.new(&1, fn {key, source_info} ->
{_, source_info} = pop_in(source_info.revision_by_client_pid[client_pid])
{_, source_info} = pop_in(source_info.revision_by_client_id[client_id])
{key, purge_deltas(source_info)}
end)
)
@ -1370,7 +1373,7 @@ defmodule Livebook.Session.Data do
set!(data_actions, users_map: Map.put(data.users_map, user.id, user))
end
defp apply_delta({data, _} = data_actions, client_pid, cell, tag, delta, revision) do
defp apply_delta({data, _} = data_actions, client_id, cell, tag, delta, revision) do
source_info = data.cell_infos[cell.id].sources[tag]
deltas_ahead = Enum.take(source_info.deltas, -(source_info.revision - revision + 1))
@ -1386,11 +1389,11 @@ defmodule Livebook.Session.Data do
|> Map.update!(:revision, &(&1 + 1))
source_info =
if Map.has_key?(source_info.revision_by_client_pid, client_pid) do
if Map.has_key?(source_info.revision_by_client_id, client_id) do
# Before receiving acknowledgement, the client receives all
# the other deltas, so we can assume they are in sync with
# the server and have the same revision.
put_in(source_info.revision_by_client_pid[client_pid], source_info.revision)
put_in(source_info.revision_by_client_id[client_id], source_info.revision)
|> purge_deltas()
else
source_info
@ -1405,7 +1408,7 @@ defmodule Livebook.Session.Data do
data_actions
|> set!(notebook: Notebook.update_cell(data.notebook, cell.id, fn _ -> updated_cell end))
|> update_cell_info!(cell.id, &put_in(&1.sources[tag], source_info))
|> add_action({:broadcast_delta, client_pid, updated_cell, tag, transformed_new_delta})
|> add_action({:broadcast_delta, client_id, updated_cell, tag, transformed_new_delta})
end
defp source_access(%Cell.Smart{}, :secondary), do: [Access.key(:editor), :source]
@ -1418,11 +1421,11 @@ defmodule Livebook.Session.Data do
update_in(cell.source, &JSInterop.apply_delta_to_string(delta, &1))
end
defp report_revision(data_actions, client_pid, cell, tag, revision) do
defp report_revision(data_actions, client_id, cell, tag, revision) do
data_actions
|> update_cell_info!(cell.id, fn info ->
update_in(info.sources[tag], fn source_info ->
put_in(source_info.revision_by_client_pid[client_pid], revision)
put_in(source_info.revision_by_client_id[client_id], revision)
|> purge_deltas()
end)
end)
@ -1512,7 +1515,7 @@ defmodule Livebook.Session.Data do
# as many deltas as we need for them.
min_client_revision =
source_info.revision_by_client_pid
source_info.revision_by_client_id
|> Map.values()
|> Enum.min(fn -> source_info.revision end)
@ -1648,12 +1651,12 @@ defmodule Livebook.Session.Data do
end
defp new_source_info(clients_map) do
client_pids = Map.keys(clients_map)
client_ids = Map.keys(clients_map)
%{
revision: 0,
deltas: [],
revision_by_client_pid: Map.new(client_pids, &{&1, 0})
revision_by_client_id: Map.new(client_ids, &{&1, 0})
}
end

View file

@ -2,8 +2,8 @@ defmodule LivebookWeb.JSViewChannel do
use Phoenix.Channel
@impl true
def join("js_view", %{"session_id" => session_id}, socket) do
{:ok, assign(socket, session_id: session_id, ref_with_info: %{})}
def join("js_view", %{"session_id" => session_id, "client_id" => client_id}, socket) do
{:ok, assign(socket, session_id: session_id, client_id: client_id, ref_with_info: %{})}
end
@impl true
@ -11,7 +11,7 @@ defmodule LivebookWeb.JSViewChannel do
{:ok, data} = Phoenix.Token.verify(LivebookWeb.Endpoint, "js view", session_token)
%{pid: pid} = data
send(pid, {:connect, self(), %{origin: self(), ref: ref}})
send(pid, {:connect, self(), %{origin: socket.assigns.client_id, ref: ref}})
socket =
update_in(socket.assigns.ref_with_info[ref], fn
@ -35,7 +35,7 @@ defmodule LivebookWeb.JSViewChannel do
def handle_in("event", raw, socket) do
{[event, ref], payload} = transport_decode!(raw)
pid = socket.assigns.ref_with_info[ref].pid
send(pid, {:event, event, payload, %{origin: self(), ref: ref}})
send(pid, {:event, event, payload, %{origin: socket.assigns.client_id, ref: ref}})
{:noreply, socket}
end

View file

@ -23,6 +23,7 @@ defmodule LivebookWeb.JSViewComponent do
data-js-path={@js_view.assets.js_path}
data-session-token={session_token(@js_view.pid)}
data-session-id={@session_id}
data-client-id={@client_id}
data-iframe-local-port={LivebookWeb.IframeEndpoint.port()}
data-iframe-url={Livebook.Config.iframe_url()}
data-timeout-message={@timeout_message}

View file

@ -23,7 +23,8 @@ defmodule LivebookWeb.Output do
id: "output-#{idx}",
socket: @socket,
session_id: @session_id,
input_values: @input_values
input_values: @input_values,
client_id: @client_id
}) %>
</div>
<% end %>
@ -69,25 +70,28 @@ defmodule LivebookWeb.Output do
"""
end
defp render_output({:js, js_info}, %{id: id, session_id: session_id}) do
defp render_output({:js, js_info}, %{id: id, session_id: session_id, client_id: client_id}) do
live_component(LivebookWeb.JSViewComponent,
id: id,
js_view: js_info.js_view,
session_id: session_id,
client_id: client_id,
timeout_message: "Output data no longer available, please reevaluate this cell"
)
end
defp render_output({:frame, outputs, _info}, %{
id: id,
session_id: session_id,
input_values: input_values,
session_id: session_id
client_id: client_id
}) do
live_component(Output.FrameComponent,
id: id,
outputs: outputs,
session_id: session_id,
input_values: input_values
input_values: input_values,
client_id: client_id
)
end
@ -211,12 +215,26 @@ defmodule LivebookWeb.Output do
"""
end
defp render_output({:input, attrs}, %{id: id, input_values: input_values}) do
live_component(Output.InputComponent, id: id, attrs: attrs, input_values: input_values)
defp render_output({:input, attrs}, %{id: id, input_values: input_values, client_id: client_id}) do
live_component(Output.InputComponent,
id: id,
attrs: attrs,
input_values: input_values,
client_id: client_id
)
end
defp render_output({:control, attrs}, %{id: id, input_values: input_values}) do
live_component(Output.ControlComponent, id: id, attrs: attrs, input_values: input_values)
defp render_output({:control, attrs}, %{
id: id,
input_values: input_values,
client_id: client_id
}) do
live_component(Output.ControlComponent,
id: id,
attrs: attrs,
input_values: input_values,
client_id: client_id
)
end
defp render_output({:error, formatted}, %{}) do

View file

@ -55,6 +55,7 @@ defmodule LivebookWeb.Output.ControlComponent do
id={@id}
attrs={@attrs}
input_values={@input_values}
client_id={@client_id}
/>
</div>
"""
@ -111,7 +112,7 @@ defmodule LivebookWeb.Output.ControlComponent do
defp report_event(socket, attrs) do
topic = socket.assigns.attrs.ref
event = Map.merge(%{origin: self()}, attrs)
event = Map.merge(%{origin: socket.assigns.client_id}, attrs)
send(socket.assigns.attrs.destination, {:event, topic, event})
end
end

View file

@ -74,7 +74,7 @@ defmodule LivebookWeb.Output.ControlFormComponent do
defp report_event(socket, attrs) do
topic = socket.assigns.attrs.ref
event = Map.merge(%{origin: self()}, attrs)
event = Map.merge(%{origin: socket.assigns.client_id}, attrs)
send(socket.assigns.attrs.destination, {:event, topic, event})
end

View file

@ -85,6 +85,7 @@ defmodule LivebookWeb.Output.FrameComponent do
socket={@socket}
session_id={@session_id}
input_values={@input_values}
client_id={@client_id}
/>
</div>
<% end %>

View file

@ -245,7 +245,7 @@ defmodule LivebookWeb.Output.InputComponent do
defp report_event(socket, value) do
topic = socket.assigns.attrs.ref
event = %{value: value, origin: self(), type: :change}
event = %{value: value, origin: socket.assigns.client_id, type: :change}
send(socket.assigns.attrs.destination, {:event, topic, event})
end
end

View file

@ -15,22 +15,25 @@ defmodule LivebookWeb.SessionLive do
# we talk to the session process exclusively for getting all the information
case Sessions.fetch_session(session_id) do
{:ok, %{pid: session_pid}} ->
data =
{data, client_id} =
if connected?(socket) do
data = Session.register_client(session_pid, self(), socket.assigns.current_user)
{data, client_id} =
Session.register_client(session_pid, self(), socket.assigns.current_user)
Session.subscribe(session_id)
data
{data, client_id}
else
Session.get_data(session_pid)
data = Session.get_data(session_pid)
{data, nil}
end
socket =
if connected?(socket) do
payload = %{
clients:
Enum.map(data.clients_map, fn {client_pid, user_id} ->
client_info(client_pid, data.users_map[user_id])
Enum.map(data.clients_map, fn {client_id, user_id} ->
client_info(client_id, data.users_map[user_id])
end)
}
@ -48,8 +51,8 @@ defmodule LivebookWeb.SessionLive do
|> assign(
self_path: Routes.session_path(socket, :page, session.id),
session: session,
client_id: client_id,
platform: platform,
self: self(),
data_view: data_to_view(data),
autofocus_cell_id: autofocus_cell_id(data.notebook),
page_title: get_page_title(data.notebook.name)
@ -161,7 +164,7 @@ defmodule LivebookWeb.SessionLive do
<.sections_list data_view={@data_view} />
</div>
<div data-el-clients-list>
<.clients_list data_view={@data_view} self={@self} />
<.clients_list data_view={@data_view} client_id={@client_id} />
</div>
<div data-el-runtime-info>
<.runtime_info data_view={@data_view} session={@session} socket={@socket} />
@ -243,6 +246,7 @@ defmodule LivebookWeb.SessionLive do
module={LivebookWeb.SessionLive.CellComponent}
id={@data_view.setup_cell_view.id}
session_id={@session.id}
client_id={@client_id}
runtime={@data_view.runtime}
installing?={@data_view.installing?}
cell_view={@data_view.setup_cell_view}
@ -262,6 +266,7 @@ defmodule LivebookWeb.SessionLive do
id={section_view.id}
index={index}
session_id={@session.id}
client_id={@client_id}
runtime={@data_view.runtime}
smart_cell_definitions={@data_view.smart_cell_definitions}
installing?={@data_view.installing?}
@ -474,22 +479,22 @@ defmodule LivebookWeb.SessionLive do
</span>
</div>
<div class="flex flex-col mt-4 space-y-4">
<%= for {client_pid, user} <- @data_view.clients do %>
<%= for {client_id, user} <- @data_view.clients do %>
<div
class="flex items-center justify-between space-x-2"
id={"clients-list-item-#{inspect(client_pid)}"}
id={"clients-list-item-#{client_id}"}
data-el-clients-list-item
data-client-pid={inspect(client_pid)}
data-client-id={client_id}
>
<button
class="flex items-center space-x-2 text-gray-500 hover:text-gray-900 disabled:pointer-events-none"
disabled={client_pid == @self}
disabled={client_id == @client_id}
data-el-client-link
>
<.user_avatar user={user} class="shrink-0 h-7 w-7" text_class="text-xs" />
<span><%= user.name || "Anonymous" %></span>
</button>
<%= if client_pid != @self do %>
<%= if client_id != @client_id do %>
<span
class="tooltip left"
data-tooltip="Follow this user"
@ -1040,7 +1045,7 @@ defmodule LivebookWeb.SessionLive do
Livebook.PubSub,
self(),
"sessions:#{socket.assigns.session.id}",
{:location_report, self(), report}
{:location_report, socket.assigns.client_id, report}
)
{:noreply, socket}
@ -1099,8 +1104,8 @@ defmodule LivebookWeb.SessionLive do
{:noreply, push_event(socket, "intellisense_response", payload)}
end
def handle_info({:location_report, client_pid, report}, socket) do
report = Map.put(report, :client_pid, inspect(client_pid))
def handle_info({:location_report, client_id, report}, socket) do
report = Map.put(report, :client_id, client_id)
{:noreply, push_event(socket, "location_report", report)}
end
@ -1108,7 +1113,7 @@ defmodule LivebookWeb.SessionLive do
if local do
socket =
Enum.reduce(values, socket, fn {input_id, value}, socket ->
operation = {:set_input_value, self(), input_id, value}
operation = {:set_input_value, socket.assigns.client_id, input_id, value}
handle_operation(socket, operation)
end)
@ -1273,29 +1278,29 @@ defmodule LivebookWeb.SessionLive do
end
end
defp after_operation(socket, _prev_socket, {:client_join, client_pid, user}) do
push_event(socket, "client_joined", %{client: client_info(client_pid, user)})
defp after_operation(socket, _prev_socket, {:client_join, client_id, user}) do
push_event(socket, "client_joined", %{client: client_info(client_id, user)})
end
defp after_operation(socket, _prev_socket, {:client_leave, client_pid}) do
push_event(socket, "client_left", %{client_pid: inspect(client_pid)})
defp after_operation(socket, _prev_socket, {:client_leave, client_id}) do
push_event(socket, "client_left", %{client_id: client_id})
end
defp after_operation(socket, _prev_socket, {:update_user, _client_pid, user}) do
defp after_operation(socket, _prev_socket, {:update_user, _client_id, user}) do
updated_clients =
socket.private.data.clients_map
|> Enum.filter(fn {_client_pid, user_id} -> user_id == user.id end)
|> Enum.map(fn {client_pid, _user_id} -> client_info(client_pid, user) end)
|> Enum.filter(fn {_client_id, user_id} -> user_id == user.id end)
|> Enum.map(fn {client_id, _user_id} -> client_info(client_id, user) end)
push_event(socket, "clients_updated", %{clients: updated_clients})
end
defp after_operation(socket, _prev_socket, {:set_notebook_name, _client_pid, name}) do
defp after_operation(socket, _prev_socket, {:set_notebook_name, _client_id, name}) do
assign(socket, page_title: get_page_title(name))
end
defp after_operation(socket, _prev_socket, {:insert_section, client_pid, _index, section_id}) do
if client_pid == self() do
defp after_operation(socket, _prev_socket, {:insert_section, client_id, _index, section_id}) do
if client_id == socket.assigns.client_id do
push_event(socket, "section_inserted", %{section_id: section_id})
else
socket
@ -1305,9 +1310,9 @@ defmodule LivebookWeb.SessionLive do
defp after_operation(
socket,
_prev_socket,
{:insert_section_into, client_pid, _section_id, _index, section_id}
{:insert_section_into, client_id, _section_id, _index, section_id}
) do
if client_pid == self() do
if client_id == socket.assigns.client_id do
push_event(socket, "section_inserted", %{section_id: section_id})
else
socket
@ -1317,22 +1322,22 @@ defmodule LivebookWeb.SessionLive do
defp after_operation(
socket,
_prev_socket,
{:delete_section, _client_pid, section_id, _delete_cells}
{:delete_section, _client_id, section_id, _delete_cells}
) do
push_event(socket, "section_deleted", %{section_id: section_id})
end
defp after_operation(socket, _prev_socket, {:insert_cell, client_pid, _, _, _, cell_id, _attrs}) do
defp after_operation(socket, _prev_socket, {:insert_cell, client_id, _, _, _, cell_id, _attrs}) do
socket = prune_cell_sources(socket)
if client_pid == self() do
if client_id == socket.assigns.client_id do
push_event(socket, "cell_inserted", %{cell_id: cell_id})
else
socket
end
end
defp after_operation(socket, prev_socket, {:delete_cell, _client_pid, cell_id}) do
defp after_operation(socket, prev_socket, {:delete_cell, _client_id, cell_id}) do
# Find a sibling cell that the client would focus if the deleted cell has focus.
sibling_cell_id =
case Notebook.fetch_cell_sibling(prev_socket.private.data.notebook, cell_id, 1) do
@ -1349,26 +1354,26 @@ defmodule LivebookWeb.SessionLive do
push_event(socket, "cell_deleted", %{cell_id: cell_id, sibling_cell_id: sibling_cell_id})
end
defp after_operation(socket, _prev_socket, {:restore_cell, client_pid, cell_id}) do
defp after_operation(socket, _prev_socket, {:restore_cell, client_id, cell_id}) do
socket = prune_cell_sources(socket)
if client_pid == self() do
if client_id == socket.assigns.client_id do
push_event(socket, "cell_restored", %{cell_id: cell_id})
else
socket
end
end
defp after_operation(socket, _prev_socket, {:move_cell, client_pid, cell_id, _offset}) do
if client_pid == self() do
defp after_operation(socket, _prev_socket, {:move_cell, client_id, cell_id, _offset}) do
if client_id == socket.assigns.client_id do
push_event(socket, "cell_moved", %{cell_id: cell_id})
else
socket
end
end
defp after_operation(socket, _prev_socket, {:move_section, client_pid, section_id, _offset}) do
if client_pid == self() do
defp after_operation(socket, _prev_socket, {:move_section, client_id, section_id, _offset}) do
if client_id == socket.assigns.client_id do
push_event(socket, "section_moved", %{section_id: section_id})
else
socket
@ -1378,7 +1383,7 @@ defmodule LivebookWeb.SessionLive do
defp after_operation(
socket,
_prev_socket,
{:add_cell_evaluation_output, _client_pid, _cell_id, _output}
{:add_cell_evaluation_output, _client_id, _cell_id, _output}
) do
prune_outputs(socket)
end
@ -1386,7 +1391,7 @@ defmodule LivebookWeb.SessionLive do
defp after_operation(
socket,
_prev_socket,
{:add_cell_evaluation_response, _client_pid, cell_id, _output, metadata}
{:add_cell_evaluation_response, _client_id, cell_id, _output, metadata}
) do
socket
|> prune_outputs()
@ -1396,7 +1401,7 @@ defmodule LivebookWeb.SessionLive do
defp after_operation(
socket,
_prev_socket,
{:smart_cell_started, _client_pid, _cell_id, _delta, _js_view, _editor}
{:smart_cell_started, _client_id, _cell_id, _delta, _js_view, _editor}
) do
prune_cell_sources(socket)
end
@ -1407,8 +1412,8 @@ defmodule LivebookWeb.SessionLive do
Enum.reduce(actions, socket, &handle_action(&2, &1))
end
defp handle_action(socket, {:broadcast_delta, client_pid, cell, tag, delta}) do
if client_pid == self() do
defp handle_action(socket, {:broadcast_delta, client_id, cell, tag, delta}) do
if client_id == socket.assigns.client_id do
push_event(socket, "cell_acknowledgement:#{cell.id}:#{tag}", %{})
else
push_event(socket, "cell_delta:#{cell.id}:#{tag}", %{delta: Delta.to_compressed(delta)})
@ -1417,8 +1422,8 @@ defmodule LivebookWeb.SessionLive do
defp handle_action(socket, _action), do: socket
defp client_info(pid, user) do
%{pid: inspect(pid), hex_color: user.hex_color, name: user.name || "Anonymous"}
defp client_info(id, user) do
%{id: id, hex_color: user.hex_color, name: user.name || "Anonymous"}
end
defp normalize_name(name) do
@ -1562,8 +1567,8 @@ defmodule LivebookWeb.SessionLive do
end,
clients:
data.clients_map
|> Enum.map(fn {client_pid, user_id} -> {client_pid, data.users_map[user_id]} end)
|> Enum.sort_by(fn {_client_pid, user} -> user.name end),
|> Enum.map(fn {client_id, user_id} -> {client_id, data.users_map[user_id]} end)
|> Enum.sort_by(fn {_client_id, user} -> user.name end),
installing?: data.cell_infos[Cell.setup_cell_id()].eval.status == :evaluating,
setup_cell_view: %{cell_to_view(hd(data.notebook.setup_section.cells), data) | type: :setup},
section_views: section_views(data.notebook.sections, data),
@ -1717,19 +1722,19 @@ defmodule LivebookWeb.SessionLive do
# most common ones we only update the relevant parts.
defp update_data_view(data_view, prev_data, data, operation) do
case operation do
{:report_cell_revision, _pid, _cell_id, _tag, _revision} ->
{:report_cell_revision, _client_id, _cell_id, _tag, _revision} ->
data_view
{:apply_cell_delta, _pid, _cell_id, _tag, _delta, _revision} ->
{:apply_cell_delta, _client_id, _cell_id, _tag, _delta, _revision} ->
update_dirty_status(data_view, data)
{:update_smart_cell, _pid, _cell_id, _cell_state, _delta, _reevaluate} ->
{:update_smart_cell, _client_id, _cell_id, _cell_state, _delta, _reevaluate} ->
update_dirty_status(data_view, data)
# For outputs that update existing outputs we send the update directly
# to the corresponding component, so the DOM patch is isolated and fast.
# This is important for intensive output updates
{:add_cell_evaluation_output, _client_pid, _cell_id,
{:add_cell_evaluation_output, _client_id, _cell_id,
{:frame, _outputs, %{type: type, ref: ref}}}
when type != :default ->
for {idx, {:frame, frame_outputs, _}} <- Notebook.find_frame_outputs(data.notebook, ref) do
@ -1742,7 +1747,7 @@ defmodule LivebookWeb.SessionLive do
data_view
{:add_cell_evaluation_output, _client_pid, cell_id, {:stdout, text}} ->
{:add_cell_evaluation_output, _client_id, cell_id, {:stdout, text}} ->
# Lookup in previous data to see if the output is already there
case Notebook.fetch_cell_and_section(prev_data.notebook, cell_id) do
{:ok, %{outputs: [{idx, {:stdout, _}} | _]}, _section} ->

View file

@ -95,7 +95,12 @@ defmodule LivebookWeb.SessionLive.CellComponent do
<.cell_status id={@cell_view.id} cell_view={@cell_view} />
</div>
</div>
<.evaluation_outputs cell_view={@cell_view} socket={@socket} session_id={@session_id} />
<.evaluation_outputs
cell_view={@cell_view}
socket={@socket}
session_id={@session_id}
client_id={@client_id}
/>
</.cell_body>
"""
end
@ -138,7 +143,12 @@ defmodule LivebookWeb.SessionLive.CellComponent do
<.cell_status id={"#{@cell_view.id}-2"} cell_view={@cell_view} />
</div>
</div>
<.evaluation_outputs cell_view={@cell_view} socket={@socket} session_id={@session_id} />
<.evaluation_outputs
cell_view={@cell_view}
socket={@socket}
session_id={@session_id}
client_id={@client_id}
/>
</div>
</.cell_body>
"""
@ -226,7 +236,12 @@ defmodule LivebookWeb.SessionLive.CellComponent do
</div>
</div>
</div>
<.evaluation_outputs cell_view={@cell_view} socket={@socket} session_id={@session_id} />
<.evaluation_outputs
cell_view={@cell_view}
socket={@socket}
session_id={@session_id}
client_id={@client_id}
/>
</.cell_body>
"""
end
@ -546,6 +561,7 @@ defmodule LivebookWeb.SessionLive.CellComponent do
dom_id_map={%{}}
socket={@socket}
session_id={@session_id}
client_id={@client_id}
input_values={@cell_view.eval.input_values}
/>
</div>

View file

@ -139,6 +139,7 @@ defmodule LivebookWeb.SessionLive.SectionComponent do
module={LivebookWeb.SessionLive.CellComponent}
id={cell_view.id}
session_id={@session_id}
client_id={@client_id}
runtime={@runtime}
installing?={@installing?}
cell_view={cell_view}

File diff suppressed because it is too large Load diff

View file

@ -42,71 +42,66 @@ defmodule Livebook.SessionTest do
describe "set_notebook_attributes/2" do
test "sends an attributes update to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
attrs = %{set_notebook_attributes: true}
Session.set_notebook_attributes(session.pid, attrs)
assert_receive {:operation, {:set_notebook_attributes, ^pid, ^attrs}}
assert_receive {:operation, {:set_notebook_attributes, _client_id, ^attrs}}
end
end
describe "insert_section/2" do
test "sends an insert operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
Session.insert_section(session.pid, 0)
assert_receive {:operation, {:insert_section, ^pid, 0, _id}}
assert_receive {:operation, {:insert_section, _client_id, 0, _id}}
end
end
describe "insert_cell/4" do
test "sends an insert operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
Session.insert_section(session.pid, 0)
assert_receive {:operation, {:insert_section, ^pid, 0, section_id}}
assert_receive {:operation, {:insert_section, _client_id, 0, section_id}}
Session.insert_cell(session.pid, section_id, 0, :code)
assert_receive {:operation, {:insert_cell, ^pid, ^section_id, 0, :code, _id, _attrs}}
assert_receive {:operation, {:insert_cell, _client_id, ^section_id, 0, :code, _id, _attrs}}
end
end
describe "delete_section/3" do
test "sends a delete operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
{section_id, _cell_id} = insert_section_and_cell(session.pid)
Session.delete_section(session.pid, section_id, false)
assert_receive {:operation, {:delete_section, ^pid, ^section_id, false}}
assert_receive {:operation, {:delete_section, _client_id, ^section_id, false}}
end
end
describe "delete_cell/2" do
test "sends a delete operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
{_section_id, cell_id} = insert_section_and_cell(session.pid)
Session.delete_cell(session.pid, cell_id)
assert_receive {:operation, {:delete_cell, ^pid, ^cell_id}}
assert_receive {:operation, {:delete_cell, _client_id, ^cell_id}}
end
end
describe "restore_cell/2" do
test "sends a restore operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
{_section_id, cell_id} = insert_section_and_cell(session.pid)
Session.delete_cell(session.pid, cell_id)
Session.restore_cell(session.pid, cell_id)
assert_receive {:operation, {:restore_cell, ^pid, ^cell_id}}
assert_receive {:operation, {:restore_cell, _client_id, ^cell_id}}
end
end
@ -119,17 +114,16 @@ defmodule Livebook.SessionTest do
session = start_session(notebook: notebook)
Session.subscribe(session.id)
pid = self()
Session.convert_smart_cell(session.pid, smart_cell.id)
cell_id = smart_cell.id
section_id = section.id
assert_receive {:operation, {:delete_cell, ^pid, ^cell_id}}
assert_receive {:operation, {:delete_cell, _client_id, ^cell_id}}
assert_receive {:operation,
{:insert_cell, ^pid, ^section_id, 0, :code, _id,
{:insert_cell, _client_id, ^section_id, 0, :code, _id,
%{source: "content", outputs: []}}}
end
end
@ -144,8 +138,7 @@ defmodule Livebook.SessionTest do
Session.add_dependencies(session.pid, [{:jason, "~> 1.3.0"}])
session_pid = session.pid
assert_receive {:operation, {:apply_cell_delta, ^session_pid, "setup", :primary, _delta, 1}}
assert_receive {:operation, {:apply_cell_delta, "__server__", "setup", :primary, _delta, 1}}
assert %{
notebook: %{
@ -183,13 +176,12 @@ defmodule Livebook.SessionTest do
test "triggers evaluation and sends update operation once it finishes",
%{session: session} do
Session.subscribe(session.id)
pid = self()
{_section_id, cell_id} = insert_section_and_cell(session.pid)
Session.queue_cell_evaluation(session.pid, cell_id)
assert_receive {:operation, {:queue_cells_evaluation, ^pid, [^cell_id]}}
assert_receive {:operation, {:queue_cells_evaluation, _client_id, [^cell_id]}}
assert_receive {:operation,
{:add_cell_evaluation_response, _, ^cell_id, _,
@ -200,43 +192,39 @@ defmodule Livebook.SessionTest do
describe "cancel_cell_evaluation/2" do
test "sends a cancel evaluation operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
{_section_id, cell_id} = insert_section_and_cell(session.pid)
Session.queue_cell_evaluation(session.pid, cell_id)
Session.cancel_cell_evaluation(session.pid, cell_id)
assert_receive {:operation, {:cancel_cell_evaluation, ^pid, ^cell_id}}
assert_receive {:operation, {:cancel_cell_evaluation, _client_id, ^cell_id}}
end
end
describe "set_notebook_name/2" do
test "sends a notebook name update operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
Session.set_notebook_name(session.pid, "Cat's guide to life")
assert_receive {:operation, {:set_notebook_name, ^pid, "Cat's guide to life"}}
assert_receive {:operation, {:set_notebook_name, _client_id, "Cat's guide to life"}}
end
end
describe "set_section_name/3" do
test "sends a section name update operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
{section_id, _cell_id} = insert_section_and_cell(session.pid)
Session.set_section_name(session.pid, section_id, "Chapter 1")
assert_receive {:operation, {:set_section_name, ^pid, ^section_id, "Chapter 1"}}
assert_receive {:operation, {:set_section_name, _client_id, ^section_id, "Chapter 1"}}
end
end
describe "apply_cell_delta/4" do
test "sends a cell delta operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
{_section_id, cell_id} = insert_section_and_cell(session.pid)
@ -246,62 +234,60 @@ defmodule Livebook.SessionTest do
Session.apply_cell_delta(session.pid, cell_id, :primary, delta, revision)
assert_receive {:operation,
{:apply_cell_delta, ^pid, ^cell_id, :primary, ^delta, ^revision}}
{:apply_cell_delta, _client_id, ^cell_id, :primary, ^delta, ^revision}}
end
end
describe "report_cell_revision/3" do
test "sends a revision report operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
{_section_id, cell_id} = insert_section_and_cell(session.pid)
revision = 1
Session.report_cell_revision(session.pid, cell_id, :primary, revision)
assert_receive {:operation, {:report_cell_revision, ^pid, ^cell_id, :primary, ^revision}}
assert_receive {:operation,
{:report_cell_revision, _client_id, ^cell_id, :primary, ^revision}}
end
end
describe "set_cell_attributes/3" do
test "sends an attributes update operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
{_section_id, cell_id} = insert_section_and_cell(session.pid)
attrs = %{disable_formatting: true}
Session.set_cell_attributes(session.pid, cell_id, attrs)
assert_receive {:operation, {:set_cell_attributes, ^pid, ^cell_id, ^attrs}}
assert_receive {:operation, {:set_cell_attributes, _client_id, ^cell_id, ^attrs}}
end
end
describe "connect_runtime/2" do
test "sends a runtime update operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
runtime = connected_noop_runtime()
Session.set_runtime(session.pid, runtime)
assert_receive {:operation, {:set_runtime, ^pid, ^runtime}}
assert_receive {:operation, {:set_runtime, _client_id, ^runtime}}
end
end
describe "disconnect_runtime/1" do
test "sends a runtime update operation to subscribers", %{session: session} do
Session.subscribe(session.id)
pid = self()
runtime = connected_noop_runtime()
Session.set_runtime(session.pid, runtime)
assert_receive {:operation, {:set_runtime, ^pid, _}}
assert_receive {:operation, {:set_runtime, _client_id, _}}
# Calling twice can happen in a race, make sure it doesn't crash
Session.disconnect_runtime(session.pid)
Session.disconnect_runtime([session.pid])
assert_receive {:operation, {:set_runtime, ^pid, runtime}}
assert_receive {:operation, {:set_runtime, _client_id, runtime}}
refute Runtime.connected?(runtime)
end
end
@ -311,13 +297,12 @@ defmodule Livebook.SessionTest do
test "sends a file update operation to subscribers",
%{session: session, tmp_dir: tmp_dir} do
Session.subscribe(session.id)
pid = self()
tmp_dir = FileSystem.File.local(tmp_dir <> "/")
file = FileSystem.File.resolve(tmp_dir, "notebook.livemd")
Session.set_file(session.pid, file)
assert_receive {:operation, {:set_file, ^pid, ^file}}
assert_receive {:operation, {:set_file, _client_id, ^file}}
end
@tag :tmp_dir
@ -540,7 +525,7 @@ defmodule Livebook.SessionTest do
updated_user = %{user | name: "Jake Peralta"}
Livebook.Users.broadcast_change(updated_user)
assert_receive {:operation, {:update_user, _pid, ^updated_user}}
assert_receive {:operation, {:update_user, _client_id, ^updated_user}}
end
# Integration tests concerning input communication
@ -718,7 +703,7 @@ defmodule Livebook.SessionTest do
cell_id = smart_cell.id
new_digest = :erlang.md5("2")
assert_receive {:operation, {:evaluation_started, ^session_pid, ^cell_id, ^new_digest}}
assert_receive {:operation, {:evaluation_started, "__server__", ^cell_id, ^new_digest}}
end
end

View file

@ -8,7 +8,8 @@ defmodule LivebookWeb.JSViewChannelTest do
LivebookWeb.Socket
|> socket()
|> subscribe_and_join(LivebookWeb.JSViewChannel, "js_view", %{
"session_id" => session_id
"session_id" => session_id,
"client_id" => Livebook.Utils.random_id()
})
%{socket: socket}

View file

@ -581,13 +581,13 @@ defmodule LivebookWeb.SessionLiveTest do
client_pid =
spawn_link(fn ->
Session.register_client(session.pid, self(), user1)
receive do
:stop -> :ok
end
end)
Session.register_client(session.pid, client_pid, user1)
{:ok, view, _} = live(conn, "/sessions/#{session.id}")
assert render(view) =~ "Jake Peralta"
@ -605,18 +605,18 @@ defmodule LivebookWeb.SessionLiveTest do
client_pid =
spawn_link(fn ->
Session.register_client(session.pid, self(), user1)
receive do
:stop -> :ok
end
end)
assert_receive {:operation, {:client_join, ^client_pid, _user}}
{_, client_id} = Session.register_client(session.pid, client_pid, user1)
assert_receive {:operation, {:client_join, ^client_id, _user}}
assert render(view) =~ "Jake Peralta"
send(client_pid, :stop)
assert_receive {:operation, {:client_leave, ^client_pid}}
assert_receive {:operation, {:client_leave, ^client_id}}
refute render(view) =~ "Jake Peralta"
end
@ -626,13 +626,13 @@ defmodule LivebookWeb.SessionLiveTest do
client_pid =
spawn_link(fn ->
Session.register_client(session.pid, self(), user1)
receive do
:stop -> :ok
end
end)
Session.register_client(session.pid, client_pid, user1)
{:ok, view, _} = live(conn, "/sessions/#{session.id}")
Session.subscribe(session.id)
@ -640,7 +640,7 @@ defmodule LivebookWeb.SessionLiveTest do
assert render(view) =~ "Jake Peralta"
Users.broadcast_change(%{user1 | name: "Raymond Holt"})
assert_receive {:operation, {:update_user, _pid, _user}}
assert_receive {:operation, {:update_user, _client_id, _user}}
refute render(view) =~ "Jake Peralta"
assert render(view) =~ "Raymond Holt"