mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-10-29 23:05:59 +08:00
Add support for sending frame updates to a specific client (#1662)
This commit is contained in:
parent
238b20b925
commit
86bf972c0d
4 changed files with 54 additions and 0 deletions
|
|
@ -245,6 +245,12 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
|
|||
{:ok, state}
|
||||
end
|
||||
|
||||
defp io_request({:livebook_put_output_to, client_id, output}, state) do
|
||||
state = flush_buffer(state)
|
||||
send(state.send_to, {:runtime_evaluation_output_to, client_id, state.ref, output})
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
defp io_request({:livebook_get_input_value, input_id}, state) do
|
||||
input_cache =
|
||||
Map.put_new_lazy(state.input_cache, input_id, fn ->
|
||||
|
|
|
|||
|
|
@ -1054,6 +1054,20 @@ defmodule Livebook.Session do
|
|||
{:noreply, handle_operation(state, operation)}
|
||||
end
|
||||
|
||||
def handle_info({:runtime_evaluation_output_to, client_id, cell_id, output}, state) do
|
||||
client_pid =
|
||||
Enum.find_value(state.client_pids_with_id, fn {pid, id} ->
|
||||
id == client_id && pid
|
||||
end)
|
||||
|
||||
if client_pid do
|
||||
operation = {:add_cell_evaluation_output, @client_id, cell_id, output}
|
||||
send(client_pid, {:operation, operation})
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:runtime_evaluation_response, cell_id, response, metadata}, state) do
|
||||
{memory_usage, metadata} = Map.pop(metadata, :memory_usage)
|
||||
operation = {:add_cell_evaluation_response, @client_id, cell_id, response, metadata}
|
||||
|
|
|
|||
|
|
@ -104,6 +104,12 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do
|
|||
assert_received {:runtime_evaluation_output, :ref, {:text, "[1, 2, 3]"}}
|
||||
end
|
||||
|
||||
test "supports direct livebook output forwarding for a specific client", %{io: io} do
|
||||
livebook_put_output_to(io, "client1", {:text, "[1, 2, 3]"})
|
||||
|
||||
assert_received {:runtime_evaluation_output_to, "client1", :ref, {:text, "[1, 2, 3]"}}
|
||||
end
|
||||
|
||||
describe "token requests" do
|
||||
test "returns different tokens for subsequent calls", %{io: io} do
|
||||
IOProxy.configure(io, :ref1, "cell1")
|
||||
|
|
@ -155,6 +161,10 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do
|
|||
io_request(io, {:livebook_put_output, output})
|
||||
end
|
||||
|
||||
defp livebook_put_output_to(io, client_id, output) do
|
||||
io_request(io, {:livebook_put_output_to, client_id, output})
|
||||
end
|
||||
|
||||
defp livebook_get_input_value(io, input_id) do
|
||||
io_request(io, {:livebook_get_input_value, input_id})
|
||||
end
|
||||
|
|
|
|||
|
|
@ -476,6 +476,30 @@ defmodule LivebookWeb.SessionLiveTest do
|
|||
assert content =~ "Updated frame"
|
||||
refute content =~ "In frame"
|
||||
end
|
||||
|
||||
test "client specific output is sent only to one target", %{conn: conn, session: session} do
|
||||
user1 = build(:user, name: "Jake Peralta")
|
||||
{_, client_id} = Session.register_client(session.pid, self(), user1)
|
||||
|
||||
Session.subscribe(session.id)
|
||||
evaluate_setup(session.pid)
|
||||
|
||||
section_id = insert_section(session.pid)
|
||||
cell_id = insert_text_cell(session.pid, section_id, :code)
|
||||
|
||||
Session.queue_cell_evaluation(session.pid, cell_id)
|
||||
|
||||
send(
|
||||
session.pid,
|
||||
{:runtime_evaluation_output_to, client_id, cell_id, {:stdout, "line 1\n"}}
|
||||
)
|
||||
|
||||
assert_receive {:operation,
|
||||
{:add_cell_evaluation_output, _, ^cell_id, {:stdout, "line 1\n"}}}
|
||||
|
||||
{:ok, view, _} = live(conn, "/sessions/#{session.id}")
|
||||
refute render(view) =~ "line 1"
|
||||
end
|
||||
end
|
||||
|
||||
describe "smart cells" do
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue