mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-10-25 21:06:08 +08:00
Add support for widget binary payloads (#982)
* Add support for widget binary payloads * Migrate LV auth to a separate hook * Properly set user buffer when encoding
This commit is contained in:
parent
494ab40ac8
commit
ccc64876a8
8 changed files with 187 additions and 52 deletions
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { Socket } from "phoenix";
|
||||||
import { getAttributeOrThrow } from "../lib/attribute";
|
import { getAttributeOrThrow } from "../lib/attribute";
|
||||||
import { randomToken, sha256Base64 } from "../lib/utils";
|
import { randomToken, sha256Base64 } from "../lib/utils";
|
||||||
|
|
||||||
|
|
@ -41,10 +42,7 @@ const JSOutput = {
|
||||||
errorContainer: null,
|
errorContainer: null,
|
||||||
};
|
};
|
||||||
|
|
||||||
const channel = getChannel(
|
const channel = getChannel(this.props.sessionId);
|
||||||
this.__liveSocket.getSocket(),
|
|
||||||
this.props.sessionId
|
|
||||||
);
|
|
||||||
|
|
||||||
// When cells/sections are reordered, morphdom detaches and attaches
|
// When cells/sections are reordered, morphdom detaches and attaches
|
||||||
// the relevant elements in the DOM. Consequently the output element
|
// the relevant elements in the DOM. Consequently the output element
|
||||||
|
|
@ -118,7 +116,8 @@ const JSOutput = {
|
||||||
this.el.dispatchEvent(event);
|
this.el.dispatchEvent(event);
|
||||||
} else if (message.type === "event") {
|
} else if (message.type === "event") {
|
||||||
const { event, payload } = message;
|
const { event, payload } = message;
|
||||||
channel.push("event", { event, payload, ref: this.props.ref });
|
const raw = transportEncode([event, this.props.ref], payload);
|
||||||
|
channel.push("event", raw);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -147,20 +146,18 @@ const JSOutput = {
|
||||||
ref: this.props.ref,
|
ref: this.props.ref,
|
||||||
});
|
});
|
||||||
|
|
||||||
const initRef = channel.on(`init:${this.props.ref}`, ({ data }) => {
|
const initRef = channel.on(`init:${this.props.ref}`, (raw) => {
|
||||||
|
const [, payload] = transportDecode(raw);
|
||||||
|
|
||||||
this.state.childReadyPromise.then(() => {
|
this.state.childReadyPromise.then(() => {
|
||||||
postMessage({ type: "init", data });
|
postMessage({ type: "init", data: payload });
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
const eventRef = channel.on(
|
const eventRef = channel.on(`event:${this.props.ref}`, (raw) => {
|
||||||
`event:${this.props.ref}`,
|
const [[event], payload] = transportDecode(raw);
|
||||||
({ event, payload }) => {
|
postMessage({ type: "event", event, payload });
|
||||||
this.state.childReadyPromise.then(() => {
|
});
|
||||||
postMessage({ type: "event", event, payload });
|
|
||||||
});
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
const errorRef = channel.on(`error:${this.props.ref}`, ({ message }) => {
|
const errorRef = channel.on(`error:${this.props.ref}`, ({ message }) => {
|
||||||
if (!this.state.errorContainer) {
|
if (!this.state.errorContainer) {
|
||||||
|
|
@ -188,13 +185,7 @@ const JSOutput = {
|
||||||
this.disconnectObservers();
|
this.disconnectObservers();
|
||||||
this.state.iframe.remove();
|
this.state.iframe.remove();
|
||||||
|
|
||||||
const channel = getChannel(
|
const channel = getChannel(this.props.sessionId, { create: false });
|
||||||
this.__liveSocket.getSocket(),
|
|
||||||
this.props.sessionId,
|
|
||||||
{
|
|
||||||
create: false,
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
if (channel) {
|
if (channel) {
|
||||||
this.state.channelUnsubscribe();
|
this.state.channelUnsubscribe();
|
||||||
|
|
@ -213,13 +204,19 @@ function getProps(hook) {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const csrfToken = document
|
||||||
|
.querySelector("meta[name='csrf-token']")
|
||||||
|
.getAttribute("content");
|
||||||
|
const socket = new Socket("/socket", { params: { _csrf_token: csrfToken } });
|
||||||
|
|
||||||
let channel = null;
|
let channel = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns channel used for all JS outputs in the current session.
|
* Returns channel used for all JS outputs in the current session.
|
||||||
*/
|
*/
|
||||||
function getChannel(socket, sessionId, { create = true } = {}) {
|
function getChannel(sessionId, { create = true } = {}) {
|
||||||
if (!channel && create) {
|
if (!channel && create) {
|
||||||
|
socket.connect();
|
||||||
channel = socket.channel("js_output", { session_id: sessionId });
|
channel = socket.channel("js_output", { session_id: sessionId });
|
||||||
channel.join();
|
channel.join();
|
||||||
}
|
}
|
||||||
|
|
@ -231,10 +228,8 @@ function getChannel(socket, sessionId, { create = true } = {}) {
|
||||||
* Leaves the JS outputs channel tied to the current session.
|
* Leaves the JS outputs channel tied to the current session.
|
||||||
*/
|
*/
|
||||||
export function leaveChannel() {
|
export function leaveChannel() {
|
||||||
if (channel) {
|
socket.disconnect();
|
||||||
channel.leave();
|
channel = null;
|
||||||
channel = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -330,4 +325,62 @@ function verifyIframeSource() {
|
||||||
return iframeVerificationPromise;
|
return iframeVerificationPromise;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function transportEncode(meta, payload) {
|
||||||
|
if (
|
||||||
|
Array.isArray(payload) &&
|
||||||
|
payload[1] &&
|
||||||
|
payload[1].constructor === ArrayBuffer
|
||||||
|
) {
|
||||||
|
const [info, buffer] = payload;
|
||||||
|
return encode([meta, info], buffer);
|
||||||
|
} else {
|
||||||
|
return { root: [meta, payload] };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function transportDecode(raw) {
|
||||||
|
if (raw.constructor === ArrayBuffer) {
|
||||||
|
const [[meta, info], buffer] = decode(raw);
|
||||||
|
return [meta, [info, buffer]];
|
||||||
|
} else {
|
||||||
|
const {
|
||||||
|
root: [meta, payload],
|
||||||
|
} = raw;
|
||||||
|
return [meta, payload];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const HEADER_LENGTH = 4;
|
||||||
|
|
||||||
|
function encode(meta, buffer) {
|
||||||
|
const encoder = new TextEncoder();
|
||||||
|
const metaArray = encoder.encode(JSON.stringify(meta));
|
||||||
|
|
||||||
|
const raw = new ArrayBuffer(
|
||||||
|
HEADER_LENGTH + metaArray.byteLength + buffer.byteLength
|
||||||
|
);
|
||||||
|
const view = new DataView(raw);
|
||||||
|
|
||||||
|
view.setUint32(0, metaArray.byteLength);
|
||||||
|
new Uint8Array(raw, HEADER_LENGTH, metaArray.byteLength).set(metaArray);
|
||||||
|
new Uint8Array(raw, HEADER_LENGTH + metaArray.byteLength).set(
|
||||||
|
new Uint8Array(buffer)
|
||||||
|
);
|
||||||
|
|
||||||
|
return raw;
|
||||||
|
}
|
||||||
|
|
||||||
|
function decode(raw) {
|
||||||
|
const view = new DataView(raw);
|
||||||
|
const metaArrayLength = view.getUint32(0);
|
||||||
|
|
||||||
|
const metaArray = new Uint8Array(raw, HEADER_LENGTH, metaArrayLength);
|
||||||
|
const buffer = raw.slice(HEADER_LENGTH + metaArrayLength);
|
||||||
|
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
const meta = JSON.parse(decoder.decode(metaArray));
|
||||||
|
|
||||||
|
return [meta, buffer];
|
||||||
|
}
|
||||||
|
|
||||||
export default JSOutput;
|
export default JSOutput;
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,8 @@ defmodule LivebookWeb.JSOutputChannel do
|
||||||
{:noreply, socket}
|
{:noreply, socket}
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_in("event", %{"event" => event, "payload" => payload, "ref" => ref}, socket) do
|
def handle_in("event", raw, socket) do
|
||||||
|
{[event, ref], payload} = transport_decode!(raw)
|
||||||
pid = socket.assigns.ref_with_pid[ref]
|
pid = socket.assigns.ref_with_pid[ref]
|
||||||
send(pid, {:event, event, payload, %{origin: self(), ref: ref}})
|
send(pid, {:event, event, payload, %{origin: self(), ref: ref}})
|
||||||
{:noreply, socket}
|
{:noreply, socket}
|
||||||
|
|
@ -52,7 +53,7 @@ defmodule LivebookWeb.JSOutputChannel do
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_info({:connect_reply, data, %{ref: ref}}, socket) do
|
def handle_info({:connect_reply, data, %{ref: ref}}, socket) do
|
||||||
with {:error, error} <- try_push(socket, "init:#{ref}", %{"data" => data}) do
|
with {:error, error} <- try_push(socket, "init:#{ref}", nil, data) do
|
||||||
message = "Failed to serialize initial widget data, " <> error
|
message = "Failed to serialize initial widget data, " <> error
|
||||||
push(socket, "error:#{ref}", %{"message" => message})
|
push(socket, "error:#{ref}", %{"message" => message})
|
||||||
end
|
end
|
||||||
|
|
@ -61,8 +62,7 @@ defmodule LivebookWeb.JSOutputChannel do
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_info({:event, event, payload, %{ref: ref}}, socket) do
|
def handle_info({:event, event, payload, %{ref: ref}}, socket) do
|
||||||
with {:error, error} <-
|
with {:error, error} <- try_push(socket, "event:#{ref}", [event], payload) do
|
||||||
try_push(socket, "event:#{ref}", %{"event" => event, "payload" => payload}) do
|
|
||||||
message = "Failed to serialize event payload, " <> error
|
message = "Failed to serialize event payload, " <> error
|
||||||
push(socket, "error:#{ref}", %{"message" => message})
|
push(socket, "error:#{ref}", %{"message" => message})
|
||||||
end
|
end
|
||||||
|
|
@ -71,8 +71,9 @@ defmodule LivebookWeb.JSOutputChannel do
|
||||||
end
|
end
|
||||||
|
|
||||||
# In case the payload fails to encode we catch the error
|
# In case the payload fails to encode we catch the error
|
||||||
defp try_push(socket, event, payload) do
|
defp try_push(socket, event, meta, payload) do
|
||||||
try do
|
try do
|
||||||
|
payload = transport_encode!(meta, payload)
|
||||||
push(socket, event, payload)
|
push(socket, event, payload)
|
||||||
catch
|
catch
|
||||||
:error, %Protocol.UndefinedError{protocol: Jason.Encoder, value: value} ->
|
:error, %Protocol.UndefinedError{protocol: Jason.Encoder, value: value} ->
|
||||||
|
|
@ -82,4 +83,40 @@ defmodule LivebookWeb.JSOutputChannel do
|
||||||
{:error, Exception.message(error)}
|
{:error, Exception.message(error)}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# A user payload can be either a JSON-serializable term
|
||||||
|
# or a {:binary, info, binary} tuple, where info is a
|
||||||
|
# JSON-serializable term. The channel allows for sending
|
||||||
|
# either maps or binaries, so we need to translare the
|
||||||
|
# payload accordingly
|
||||||
|
|
||||||
|
defp transport_encode!(meta, {:binary, info, binary}) do
|
||||||
|
{:binary, encode!([meta, info], binary)}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp transport_encode!(meta, payload) do
|
||||||
|
%{"root" => [meta, payload]}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp transport_decode!({:binary, raw}) do
|
||||||
|
{[meta, info], binary} = decode!(raw)
|
||||||
|
{meta, {:binary, info, binary}}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp transport_decode!(raw) do
|
||||||
|
%{"root" => [meta, payload]} = raw
|
||||||
|
{meta, payload}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp encode!(meta, binary) do
|
||||||
|
meta = Jason.encode!(meta)
|
||||||
|
meta_size = byte_size(meta)
|
||||||
|
<<meta_size::size(32), meta::binary, binary::binary>>
|
||||||
|
end
|
||||||
|
|
||||||
|
defp decode!(raw) do
|
||||||
|
<<meta_size::size(32), meta::binary-size(meta_size), binary::binary>> = raw
|
||||||
|
meta = Jason.decode!(meta)
|
||||||
|
{meta, binary}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -1,26 +1,19 @@
|
||||||
defmodule LivebookWeb.Socket do
|
defmodule LivebookWeb.Socket do
|
||||||
use Phoenix.Socket
|
use Phoenix.Socket
|
||||||
|
|
||||||
# App channels
|
|
||||||
channel "js_output", LivebookWeb.JSOutputChannel
|
channel "js_output", LivebookWeb.JSOutputChannel
|
||||||
|
|
||||||
# LiveView channels
|
|
||||||
channel "lvu:*", Phoenix.LiveView.UploadChannel
|
|
||||||
channel "lv:*", Phoenix.LiveView.Channel
|
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def connect(params, socket, info) do
|
def connect(_params, socket, info) do
|
||||||
auth_mode = Livebook.Config.auth_mode()
|
auth_mode = Livebook.Config.auth_mode()
|
||||||
|
|
||||||
if LivebookWeb.AuthPlug.authenticated?(info.session || %{}, info.uri.port, auth_mode) do
|
if LivebookWeb.AuthPlug.authenticated?(info.session || %{}, info.uri.port, auth_mode) do
|
||||||
Phoenix.LiveView.Socket.connect(params, socket, info)
|
{:ok, socket}
|
||||||
else
|
else
|
||||||
:error
|
:error
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def id(socket) do
|
def id(_socket), do: nil
|
||||||
Phoenix.LiveView.Socket.id(socket)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -11,11 +11,16 @@ defmodule LivebookWeb.Endpoint do
|
||||||
same_site: "Lax"
|
same_site: "Lax"
|
||||||
]
|
]
|
||||||
|
|
||||||
socket "/live", LivebookWeb.Socket,
|
# Don't check the origin as we don't know how the web app is gonna be accessed.
|
||||||
# Don't check the origin as we don't know how the web app is gonna be accessed.
|
# It runs locally, but may be exposed via IP or domain name. The WebSocket
|
||||||
# It runs locally, but may be exposed via IP or domain name.
|
# connection is already protected from CSWSH by using CSRF token.
|
||||||
# The WebSocket connection is already protected from CSWSH by using CSRF token.
|
@websocket_options [
|
||||||
websocket: [check_origin: false, connect_info: [:user_agent, :uri, session: @session_options]]
|
check_origin: false,
|
||||||
|
connect_info: [:user_agent, :uri, session: @session_options]
|
||||||
|
]
|
||||||
|
|
||||||
|
socket "/live", Phoenix.LiveView.Socket, websocket: @websocket_options
|
||||||
|
socket "/socket", LivebookWeb.Socket, websocket: @websocket_options
|
||||||
|
|
||||||
# We use Escript for distributing Livebook, so we don't have access to the static
|
# We use Escript for distributing Livebook, so we don't have access to the static
|
||||||
# files at runtime in the prod environment. To overcome this we load contents of
|
# files at runtime in the prod environment. To overcome this we load contents of
|
||||||
|
|
|
||||||
14
lib/livebook_web/live/hooks/auth_hook.ex
Normal file
14
lib/livebook_web/live/hooks/auth_hook.ex
Normal file
|
|
@ -0,0 +1,14 @@
|
||||||
|
defmodule LivebookWeb.AuthHook do
|
||||||
|
import Phoenix.LiveView
|
||||||
|
|
||||||
|
def on_mount(:default, _params, session, socket) do
|
||||||
|
uri = get_connect_info(socket, :uri)
|
||||||
|
auth_mode = Livebook.Config.auth_mode()
|
||||||
|
|
||||||
|
if LivebookWeb.AuthPlug.authenticated?(session || %{}, uri.port, auth_mode) do
|
||||||
|
{:cont, socket}
|
||||||
|
else
|
||||||
|
{:halt, socket}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
defmodule LivebookWeb.CurrentUserHook do
|
defmodule LivebookWeb.UserHook do
|
||||||
import Phoenix.LiveView
|
import Phoenix.LiveView
|
||||||
|
|
||||||
alias Livebook.Users.User
|
alias Livebook.Users.User
|
||||||
|
|
@ -39,7 +39,7 @@ defmodule LivebookWeb.Router do
|
||||||
get "/sessions/:id/assets/:hash/*file_parts", SessionController, :show_asset
|
get "/sessions/:id/assets/:hash/*file_parts", SessionController, :show_asset
|
||||||
end
|
end
|
||||||
|
|
||||||
live_session :default, on_mount: LivebookWeb.CurrentUserHook do
|
live_session :default, on_mount: [LivebookWeb.AuthHook, LivebookWeb.UserHook] do
|
||||||
scope "/", LivebookWeb do
|
scope "/", LivebookWeb do
|
||||||
pipe_through [:browser, :auth]
|
pipe_through [:browser, :auth]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,13 +20,13 @@ defmodule LivebookWeb.JSOutputChannelTest do
|
||||||
assert_receive {:connect, from, %{}}
|
assert_receive {:connect, from, %{}}
|
||||||
send(from, {:connect_reply, [1, 2, 3], %{ref: "1"}})
|
send(from, {:connect_reply, [1, 2, 3], %{ref: "1"}})
|
||||||
|
|
||||||
assert_push "init:1", %{"data" => [1, 2, 3]}
|
assert_push "init:1", %{"root" => [nil, [1, 2, 3]]}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "sends events received from widget server to the client", %{socket: socket} do
|
test "sends events received from widget server to the client", %{socket: socket} do
|
||||||
send(socket.channel_pid, {:event, "ping", [1, 2, 3], %{ref: "1"}})
|
send(socket.channel_pid, {:event, "ping", [1, 2, 3], %{ref: "1"}})
|
||||||
|
|
||||||
assert_push "event:1", %{"event" => "ping", "payload" => [1, 2, 3]}
|
assert_push "event:1", %{"root" => [["ping"], [1, 2, 3]]}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "sends client events to the corresponding widget server", %{socket: socket} do
|
test "sends client events to the corresponding widget server", %{socket: socket} do
|
||||||
|
|
@ -35,11 +35,44 @@ defmodule LivebookWeb.JSOutputChannelTest do
|
||||||
assert_receive {:connect, from, %{}}
|
assert_receive {:connect, from, %{}}
|
||||||
send(from, {:connect_reply, [1, 2, 3], %{ref: "1"}})
|
send(from, {:connect_reply, [1, 2, 3], %{ref: "1"}})
|
||||||
|
|
||||||
push(socket, "event", %{"event" => "ping", "payload" => [1, 2, 3], "ref" => "1"})
|
push(socket, "event", %{"root" => [["ping", "1"], [1, 2, 3]]})
|
||||||
|
|
||||||
assert_receive {:event, "ping", [1, 2, 3], %{origin: _origin}}
|
assert_receive {:event, "ping", [1, 2, 3], %{origin: _origin}}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "binary payload" do
|
||||||
|
test "initial data", %{socket: socket} do
|
||||||
|
push(socket, "connect", %{"session_token" => session_token(), "ref" => "1"})
|
||||||
|
|
||||||
|
assert_receive {:connect, from, %{}}
|
||||||
|
payload = {:binary, %{message: "hey"}, <<1, 2, 3>>}
|
||||||
|
send(from, {:connect_reply, payload, %{ref: "1"}})
|
||||||
|
|
||||||
|
assert_push "init:1", {:binary, <<24::size(32), "[null,{\"message\":\"hey\"}]", 1, 2, 3>>}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "from server to client", %{socket: socket} do
|
||||||
|
payload = {:binary, %{message: "hey"}, <<1, 2, 3>>}
|
||||||
|
send(socket.channel_pid, {:event, "ping", payload, %{ref: "1"}})
|
||||||
|
|
||||||
|
assert_push "event:1",
|
||||||
|
{:binary, <<28::size(32), "[[\"ping\"],{\"message\":\"hey\"}]", 1, 2, 3>>}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "form client to server", %{socket: socket} do
|
||||||
|
push(socket, "connect", %{"session_token" => session_token(), "ref" => "1"})
|
||||||
|
|
||||||
|
assert_receive {:connect, from, %{}}
|
||||||
|
send(from, {:connect_reply, [1, 2, 3], %{ref: "1"}})
|
||||||
|
|
||||||
|
raw = {:binary, <<32::size(32), "[[\"ping\",\"1\"],{\"message\":\"hey\"}]", 1, 2, 3>>}
|
||||||
|
push(socket, "event", raw)
|
||||||
|
|
||||||
|
payload = {:binary, %{"message" => "hey"}, <<1, 2, 3>>}
|
||||||
|
assert_receive {:event, "ping", ^payload, %{origin: _origin}}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
defp session_token() do
|
defp session_token() do
|
||||||
Phoenix.Token.sign(LivebookWeb.Endpoint, "js output", %{pid: self()})
|
Phoenix.Token.sign(LivebookWeb.Endpoint, "js output", %{pid: self()})
|
||||||
end
|
end
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue