mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-10-09 21:16:26 +08:00
Buffer IO evaluation output (#156)
* Buffer IO evaluation output * Respect CR in IOProxy * Handle CR when adding cell output rather than during each render * Optimise CR handling in buffer
This commit is contained in:
parent
6cbf4d1fb0
commit
c6f9c54f31
7 changed files with 130 additions and 43 deletions
|
@ -133,6 +133,8 @@ defmodule Livebook.Evaluator do
|
||||||
{context, response}
|
{context, response}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
Evaluator.IOProxy.flush(state.io_proxy)
|
||||||
|
|
||||||
send_evaluation_response(send_to, ref, response, state.formatter)
|
send_evaluation_response(send_to, ref, response, state.formatter)
|
||||||
|
|
||||||
new_state = put_in(state.contexts[ref], result_context)
|
new_state = put_in(state.contexts[ref], result_context)
|
||||||
|
|
|
@ -46,11 +46,19 @@ defmodule Livebook.Evaluator.IOProxy do
|
||||||
GenServer.cast(pid, {:configure, target, ref})
|
GenServer.cast(pid, {:configure, target, ref})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Synchronously sends all buffer contents to the configured target process.
|
||||||
|
"""
|
||||||
|
@spec flush(pid()) :: :ok
|
||||||
|
def flush(pid) do
|
||||||
|
GenServer.call(pid, :flush)
|
||||||
|
end
|
||||||
|
|
||||||
## Callbacks
|
## Callbacks
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def init(_opts) do
|
def init(_opts) do
|
||||||
{:ok, %{encoding: :unicode, target: nil, ref: nil}}
|
{:ok, %{encoding: :unicode, target: nil, ref: nil, buffer: []}}
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
|
@ -58,6 +66,11 @@ defmodule Livebook.Evaluator.IOProxy do
|
||||||
{:noreply, %{state | target: target, ref: ref}}
|
{:noreply, %{state | target: target, ref: ref}}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
|
def handle_call(:flush, _from, state) do
|
||||||
|
{:reply, :ok, flush_buffer(state)}
|
||||||
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_info({:io_request, from, reply_as, req}, state) do
|
def handle_info({:io_request, from, reply_as, req}, state) do
|
||||||
{reply, state} = io_request(req, state)
|
{reply, state} = io_request(req, state)
|
||||||
|
@ -65,6 +78,10 @@ defmodule Livebook.Evaluator.IOProxy do
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def handle_info(:flush, state) do
|
||||||
|
{:noreply, flush_buffer(state)}
|
||||||
|
end
|
||||||
|
|
||||||
defp io_request({:put_chars, chars} = req, state) do
|
defp io_request({:put_chars, chars} = req, state) do
|
||||||
put_chars(:latin1, chars, req, state)
|
put_chars(:latin1, chars, req, state)
|
||||||
end
|
end
|
||||||
|
@ -148,11 +165,11 @@ defmodule Livebook.Evaluator.IOProxy do
|
||||||
defp put_chars(encoding, chars, req, state) do
|
defp put_chars(encoding, chars, req, state) do
|
||||||
case :unicode.characters_to_binary(chars, encoding, state.encoding) do
|
case :unicode.characters_to_binary(chars, encoding, state.encoding) do
|
||||||
string when is_binary(string) ->
|
string when is_binary(string) ->
|
||||||
if state.target do
|
if state.buffer == [] do
|
||||||
send(state.target, {:evaluation_stdout, state.ref, string})
|
Process.send_after(self(), :flush, 50)
|
||||||
end
|
end
|
||||||
|
|
||||||
{:ok, state}
|
{:ok, update_in(state.buffer, &buffer_append(&1, string))}
|
||||||
|
|
||||||
{_, _, _} ->
|
{_, _, _} ->
|
||||||
{{:error, req}, state}
|
{{:error, req}, state}
|
||||||
|
@ -164,4 +181,36 @@ defmodule Livebook.Evaluator.IOProxy do
|
||||||
defp io_reply(from, reply_as, reply) do
|
defp io_reply(from, reply_as, reply) do
|
||||||
send(from, {:io_reply, reply_as, reply})
|
send(from, {:io_reply, reply_as, reply})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def flush_buffer(state) do
|
||||||
|
string = state.buffer |> Enum.reverse() |> Enum.join()
|
||||||
|
|
||||||
|
if state.target != nil and string != "" do
|
||||||
|
send(state.target, {:evaluation_stdout, state.ref, string})
|
||||||
|
end
|
||||||
|
|
||||||
|
%{state | buffer: []}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp buffer_append(buffer, text) do
|
||||||
|
# Sometimes there are intensive outputs that use \r
|
||||||
|
# to dynamically refresh the printd text.
|
||||||
|
# Since we buffer the messages anyway, it makes
|
||||||
|
# sense to send only the latest of these outputs.
|
||||||
|
# Note that \r works per-line, so if there are newlines
|
||||||
|
# we keep the buffer, but for \r-intensive operations
|
||||||
|
# there are usually no newlines involved, so this optimisation works fine.
|
||||||
|
if has_rewind?(text) and not has_newline?(text) and not Enum.any?(buffer, &has_newline?/1) do
|
||||||
|
[text]
|
||||||
|
else
|
||||||
|
[text | buffer]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Checks for [\r][not \r] sequence in the given string.
|
||||||
|
defp has_rewind?(<<>>), do: false
|
||||||
|
defp has_rewind?(<<?\r, next, _rest::binary>>) when next != ?\r, do: true
|
||||||
|
defp has_rewind?(<<_head, rest::binary>>), do: has_rewind?(rest)
|
||||||
|
|
||||||
|
defp has_newline?(text), do: String.contains?(text, "\n")
|
||||||
end
|
end
|
||||||
|
|
|
@ -512,11 +512,22 @@ defmodule Livebook.Session.Data do
|
||||||
|
|
||||||
defp add_output([head | tail], output) when is_binary(head) and is_binary(output) do
|
defp add_output([head | tail], output) when is_binary(head) and is_binary(output) do
|
||||||
# Merge consecutive string outputs
|
# Merge consecutive string outputs
|
||||||
[head <> output | tail]
|
[apply_rewind(head <> output) | tail]
|
||||||
end
|
end
|
||||||
|
|
||||||
defp add_output(outputs, output), do: [output | outputs]
|
defp add_output(outputs, output), do: [output | outputs]
|
||||||
|
|
||||||
|
# Respect \r indicating a line should be cleared,
|
||||||
|
# so we ignore unnecessary text fragments
|
||||||
|
defp apply_rewind(text) do
|
||||||
|
text
|
||||||
|
|> String.split("\n")
|
||||||
|
|> Enum.map(fn line ->
|
||||||
|
String.replace(line, ~r/^.*\r([^\r].*)$/, "\\1")
|
||||||
|
end)
|
||||||
|
|> Enum.join("\n")
|
||||||
|
end
|
||||||
|
|
||||||
defp finish_cell_evaluation(data_actions, cell, section) do
|
defp finish_cell_evaluation(data_actions, cell, section) do
|
||||||
data_actions
|
data_actions
|
||||||
|> set_cell_info!(cell.id,
|
|> set_cell_info!(cell.id,
|
||||||
|
|
|
@ -60,7 +60,6 @@ defmodule LivebookWeb.Helpers do
|
||||||
content
|
content
|
||||||
|> IO.iodata_to_binary()
|
|> IO.iodata_to_binary()
|
||||||
|> String.split("\n")
|
|> String.split("\n")
|
||||||
|> Enum.map(&apply_rewind/1)
|
|
||||||
|> Enum.map(&LivebookWeb.ANSI.default_renderer(style, &1))
|
|> Enum.map(&LivebookWeb.ANSI.default_renderer(style, &1))
|
||||||
|> Enum.intersperse("\n")
|
|> Enum.intersperse("\n")
|
||||||
end
|
end
|
||||||
|
@ -69,12 +68,4 @@ defmodule LivebookWeb.Helpers do
|
||||||
|> String.split("\n")
|
|> String.split("\n")
|
||||||
|> Enum.map(&Phoenix.HTML.raw/1)
|
|> Enum.map(&Phoenix.HTML.raw/1)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Respect \r indicating the line should be cleared
|
|
||||||
defp apply_rewind(line) do
|
|
||||||
line
|
|
||||||
|> String.split("\r")
|
|
||||||
|> Enum.reverse()
|
|
||||||
|> Enum.find("", &(&1 != ""))
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -9,28 +9,46 @@ defmodule Livebook.Evaluator.IOProxyTest do
|
||||||
%{io: io}
|
%{io: io}
|
||||||
end
|
end
|
||||||
|
|
||||||
# Test the basic ways users interact with :stdio
|
describe ":stdio interoperability" do
|
||||||
|
test "IO.puts", %{io: io} do
|
||||||
|
IO.puts(io, "hey")
|
||||||
|
assert_receive {:evaluation_stdout, :ref, "hey\n"}
|
||||||
|
end
|
||||||
|
|
||||||
test "IO.puts", %{io: io} do
|
test "IO.write", %{io: io} do
|
||||||
|
IO.write(io, "hey")
|
||||||
|
assert_receive {:evaluation_stdout, :ref, "hey"}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "IO.inspect", %{io: io} do
|
||||||
|
IO.inspect(io, %{}, [])
|
||||||
|
assert_receive {:evaluation_stdout, :ref, "%{}\n"}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "IO.read", %{io: io} do
|
||||||
|
assert IO.read(io, :all) == {:error, :enotsup}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "IO.gets", %{io: io} do
|
||||||
|
assert IO.gets(io, "> ") == {:error, :enotsup}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
test "buffers rapid output", %{io: io} do
|
||||||
IO.puts(io, "hey")
|
IO.puts(io, "hey")
|
||||||
|
IO.puts(io, "hey")
|
||||||
|
assert_receive {:evaluation_stdout, :ref, "hey\nhey\n"}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "respects CR as line cleaner", %{io: io} do
|
||||||
|
IO.write(io, "hey")
|
||||||
|
IO.write(io, "\roverride\r")
|
||||||
|
assert_receive {:evaluation_stdout, :ref, "\roverride\r"}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "flush/1 synchronously sends buffer contents", %{io: io} do
|
||||||
|
IO.puts(io, "hey")
|
||||||
|
IOProxy.flush(io)
|
||||||
assert_received {:evaluation_stdout, :ref, "hey\n"}
|
assert_received {:evaluation_stdout, :ref, "hey\n"}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "IO.write", %{io: io} do
|
|
||||||
IO.write(io, "hey")
|
|
||||||
assert_received {:evaluation_stdout, :ref, "hey"}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "IO.inspect", %{io: io} do
|
|
||||||
IO.inspect(io, %{}, [])
|
|
||||||
assert_received {:evaluation_stdout, :ref, "%{}\n"}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "IO.read", %{io: io} do
|
|
||||||
assert IO.read(io, :all) == {:error, :enotsup}
|
|
||||||
end
|
|
||||||
|
|
||||||
test "IO.gets", %{io: io} do
|
|
||||||
assert IO.gets(io, "> ") == {:error, :enotsup}
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -606,7 +606,7 @@ defmodule Livebook.Session.DataTest do
|
||||||
{:insert_section, self(), 0, "s1"},
|
{:insert_section, self(), 0, "s1"},
|
||||||
{:insert_cell, self(), "s1", 0, :elixir, "c1"},
|
{:insert_cell, self(), "s1", 0, :elixir, "c1"},
|
||||||
{:queue_cell_evaluation, self(), "c1"},
|
{:queue_cell_evaluation, self(), "c1"},
|
||||||
{:add_cell_evaluation_stdout, self(), "c1", "Hello"}
|
{:add_cell_evaluation_stdout, self(), "c1", "Hola"}
|
||||||
])
|
])
|
||||||
|
|
||||||
operation = {:add_cell_evaluation_stdout, self(), "c1", " amigo!"}
|
operation = {:add_cell_evaluation_stdout, self(), "c1", " amigo!"}
|
||||||
|
@ -616,7 +616,30 @@ defmodule Livebook.Session.DataTest do
|
||||||
notebook: %{
|
notebook: %{
|
||||||
sections: [
|
sections: [
|
||||||
%{
|
%{
|
||||||
cells: [%{outputs: ["Hello amigo!"]}]
|
cells: [%{outputs: ["Hola amigo!"]}]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}, []} = Data.apply_operation(data, operation)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "normalizes consecutive stdout results to respect CR" do
|
||||||
|
data =
|
||||||
|
data_after_operations!([
|
||||||
|
{:insert_section, self(), 0, "s1"},
|
||||||
|
{:insert_cell, self(), "s1", 0, :elixir, "c1"},
|
||||||
|
{:queue_cell_evaluation, self(), "c1"},
|
||||||
|
{:add_cell_evaluation_stdout, self(), "c1", "Hola"}
|
||||||
|
])
|
||||||
|
|
||||||
|
operation = {:add_cell_evaluation_stdout, self(), "c1", "\ramigo!\r"}
|
||||||
|
|
||||||
|
assert {:ok,
|
||||||
|
%{
|
||||||
|
notebook: %{
|
||||||
|
sections: [
|
||||||
|
%{
|
||||||
|
cells: [%{outputs: ["amigo!\r"]}]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,12 +11,5 @@ defmodule LivebookWeb.HelpersTest do
|
||||||
] ==
|
] ==
|
||||||
Helpers.ansi_to_html_lines("\e[34msmiley\ncat\e[0m")
|
Helpers.ansi_to_html_lines("\e[34msmiley\ncat\e[0m")
|
||||||
end
|
end
|
||||||
|
|
||||||
test "respects CR as line cleaner" do
|
|
||||||
assert [
|
|
||||||
{:safe, ~s{<span style="color: var(--ansi-color-blue);">cat</span>}}
|
|
||||||
] ==
|
|
||||||
Helpers.ansi_to_html_lines("\e[34msmiley\rcat\r\e[0m")
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Reference in a new issue