EventBroker benchmark
Mix.install([
{:vega_lite, "~> 0.1.6"},
{:kino_vega_lite, "~> 0.1.11"}
])Introduction
What's in the Livebook
This Livebook serves as a tool to measure the performance of the EventBroker.
It has three sections, each answering one question.
- Subscribe latency: measure how long it takes on average for a subscription to be made, while varying the number of existing subscriptions.
- Unsubscribe latency: measure how long it takes on average for a subscriber to unsubscribe, while varying the number of existing subscriptions.
- Subscribes per second: measure how long it takes to subscribe a given amount of subscribers, and determine the average subscriptions per second off of this.
At the bottom of the file is a cell to do some profiling. This is meant as a note on how to do actually do profiling.
Running the benchmarks
To run the experiments, a version of the Anoma application must be running on your local machine. Start the application in distributed mode as follows.
iex --name node@127.0.0.1 --cookie monster -S mix
🚨 If you use another node name or another cookie, change the values in the cell below.
Then run each cell in this livebook against that node.
# these have to be atoms!
node_name = :"node@127.0.0.1"
node_cookie = :monsterFilters
require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)
Kino.RPC.eval_string(
node,
~S"""
defmodule Filters do
alias EventBroker.Event
use EventBroker.DefFilter
deffilter FilterA, value: integer() do
%EventBroker.Event{body: %Event{body: %{value_a: ^value}}} -> true
_ -> false
end
deffilter FilterB, value: integer() do
%EventBroker.Event{body: %Event{body: %{value_b: ^value}}} -> true
_ -> false
end
deffilter FilterC, value: integer() do
%EventBroker.Event{body: %Event{body: %{value_b: ^value}}} -> true
_ -> false
end
end
""",
file: __ENV__.file
)Helpers
require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)
Kino.RPC.eval_string(
node,
~S"""
defmodule Helpers do
# return the time in microseconds (μs)
def measure(proc) do
:timer.tc(proc)
end
def average_measure(proc, n) do
{time, res} =
for _ <- 1..n do
measure(proc)
end
|> Enum.reduce({0, []}, fn {time, res}, {acc_time, acc_res} ->
{time + acc_time, [res | acc_res]}
end)
{time / n, res}
end
def do_n_times(proc, n) do
for _ <- 1..n do
proc.()
end
end
end
""",
file: __ENV__.file
)require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)
Kino.RPC.eval_string(
node,
~S"""
defmodule Examples do
def subscriber(filters \\ []) do
receive do
{{:subscribe, filters}, from, ref} ->
EventBroker.subscribe_me(filters)
send(from, {:ok, ref})
subscriber(filters)
{:unsubscribe, from, ref} ->
EventBroker.unsubscribe_me([])
send(from, {:ok, ref})
subscriber(filters)
{:exit, from, ref} ->
send(from, {:ok, ref})
:ok
end
end
def create_subscriber() do
spawn_link(fn ->
subscriber()
end)
end
def sync_send(subscriber, message) do
ref = make_ref()
send(subscriber, {message, self(), ref})
receive do
{:ok, ^ref} ->
:ok
end
end
def async_send(subscriber, message) do
ref = make_ref()
send(subscriber, {message, self(), ref})
end
def subscribe(subscriber, filters \\ []) do
sync_send(subscriber, {:subscribe, filters})
subscriber
end
def unsubscribe(subscriber) do
sync_send(subscriber, :unsubscribe)
subscriber
end
def unsubscribe_async(subscriber) do
async_send(subscriber, :unsubscribe)
subscriber
end
def terminate_subscriber(subscriber) do
sync_send(subscriber, :exit)
subscriber
end
end
""",
file: __ENV__.file
)Subscribe latency
The test below creates a number of subscribers, and measures how long it takes for a subscriber to become subscribed.
require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)
subscribes =
Kino.RPC.eval_string(
node,
~S"""
import Helpers
import Examples
iterations = 10
step_size = 100
# the test to run for a given subscriber count and a given list of filters.
test_subscribers = fn subscriber_count, filters ->
{μs, results} =
average_measure(
fn ->
create_subscriber()
|> subscribe(filters)
end,
subscriber_count
)
# delete the subscribers again
Enum.each(results, &terminate_subscriber/1)
%{subscribers: subscriber_count, time: μs}
end
# test with no filters
no_filter =
for iteration <- 1..iterations do
subscriber_count = iteration * step_size
filters = []
test_subscribers.(subscriber_count, filters)
end
# test with 1 filter
one_filter =
for iteration <- 1..iterations do
subscriber_count = iteration * step_size
filters = [%Filters.FilterA{value: 1}]
test_subscribers.(subscriber_count, filters)
end
# test with 2 filters
two_filter =
for iteration <- 1..iterations do
subscriber_count = iteration * step_size
filters = [%Filters.FilterA{value: 1}, %Filters.FilterB{value: 1}]
test_subscribers.(subscriber_count, filters)
end
# test with 3 filters
three_filter =
for iteration <- 1..iterations do
subscriber_count = iteration * step_size
filters = [%Filters.FilterA{value: 1}, %Filters.FilterB{value: 1}, %Filters.FilterC{value: 1}]
test_subscribers.(subscriber_count, filters)
end
{no_filter, one_filter, two_filter, three_filter}
""",
file: __ENV__.file
)plots = fn data ->
VegaLite.new()
|> VegaLite.data_from_values(data, only: ["subscribers", "time"])
|> VegaLite.mark(:line, point: [fill: "red"])
|> VegaLite.encode_field(:x, "subscribers", type: :quantitative)
|> VegaLite.encode_field(:y, "time", type: :quantitative, title: "Time (μs)")
end
# the first measurement makes the chart unreadable, so its dropped
no_filters = plots.(Enum.drop(elem(subscribes, 0), 1))
one_filters = plots.(Enum.drop(elem(subscribes, 1), 1))
two_filters = plots.(Enum.drop(elem(subscribes, 2), 1))
three_filters = plots.(Enum.drop(elem(subscribes, 3), 1))
VegaLite.new(width: 700)
|> VegaLite.layers([
no_filters,
one_filters,
two_filters,
three_filters
])Unsubscribe Latency
The test below creates a number of subscribers, and measures how long it takes for a subscriber to become unsubscribed.
require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)
subscribes =
Kino.RPC.eval_string(
node,
~S"""
import Helpers
import Examples
iterations = 10
step_size = 100
# the test to run for a given subscriber count and a given list of filters.
test_subscribers = fn subscriber_count, filters ->
# create the subscribers
subscribers = do_n_times(fn -> create_subscriber() |> subscribe(filters) end, subscriber_count)
# unsubscribe all of them and measure
{μs, results} = measure(fn -> Enum.map(subscribers, &unsubscribe/1) end)
# delete the subscribers again
Enum.each(results, &terminate_subscriber/1)
%{subscribers: subscriber_count, time: μs}
end
# test with no filters
no_filter =
for iteration <- 1..iterations do
subscriber_count = iteration * step_size
filters = []
test_subscribers.(subscriber_count, filters)
end
# test with 1 filter
one_filter =
for iteration <- 1..iterations do
subscriber_count = iteration * step_size
filters = [%Filters.FilterA{value: 1}]
test_subscribers.(subscriber_count, filters)
end
# test with 2 filters
two_filter =
for iteration <- 1..iterations do
subscriber_count = iteration * step_size
filters = [%Filters.FilterA{value: 1}, %Filters.FilterB{value: 1}]
test_subscribers.(subscriber_count, filters)
end
# test with 3 filters
three_filter =
for iteration <- 1..iterations do
subscriber_count = iteration * step_size
filters = [%Filters.FilterA{value: 1}, %Filters.FilterB{value: 1}, %Filters.FilterC{value: 1}]
test_subscribers.(subscriber_count, filters)
end
{no_filter, one_filter, two_filter, three_filter}
""",
file: __ENV__.file
)Interpretation
The chart below shows that the time to unsubscribe increases exponentially with the number of nodes.
The red line is the actual execution time for the unsubscribe operation. The green line is the same line, but logarithmized.
The log line is flat, hence the execution time for unsubscribe is exponential.
plots = fn data ->
VegaLite.new()
|> VegaLite.data_from_values(data, only: ["subscribers", "time"])
|> VegaLite.mark(:line, point: [fill: "red"])
|> VegaLite.encode_field(:x, "subscribers", type: :quantitative)
|> VegaLite.encode_field(:y, "time", type: :quantitative, title: "Time (μs)")
end
# the first measurement makes the chart unreadable, so its dropped
no_filters = plots.(Enum.drop(elem(subscribes, 0), 1))
one_filters = plots.(Enum.drop(elem(subscribes, 1), 1))
two_filters = plots.(Enum.drop(elem(subscribes, 2), 1))
three_filters = plots.(Enum.drop(elem(subscribes, 3), 1))
VegaLite.new(width: 700)
|> VegaLite.layers([
no_filters,
one_filters,
two_filters,
three_filters
])Subscribes per second
require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)
subscribes_per_second =
Kino.RPC.eval_string(
node,
~S"""
import Helpers
import Examples
iterations = 10
step_size = 100
make_n_subscriptions = fn count ->
{time, subscribers} =
measure(fn ->
for _ <- 1..count do
create_subscriber()
|> subscribe()
end
end)
# delete the subscribers again
Enum.each(subscribers, &terminate_subscriber/1)
# return the total time
time
end
# warmup
make_n_subscriptions.(100)
# measure
for i <- 1..iterations do
subscription_count = i * step_size
# time is in μs
time = make_n_subscriptions.(subscription_count)
# if `subscription_count` subscriptions takes `time` μs
# then on average a single subscription takes `time / subscription_count` μs.
average_time = time / subscription_count
# if the average time for a subscription is `time / subscription_count`
# then the `EventBroker` can handle `1_000_000 / (time / subscription_count)`
# subscriptions per second.
rate = 1_000_000 / average_time
%{
total_time: time,
subscription_count: subscription_count,
average_time: average_time,
rate: rate
}
end
""",
file: __ENV__.file
)# chart plotting the total exeuction time for the amount of subscribers
total_time =
VegaLite.new()
|> VegaLite.mark(:line, point: [fill: "red"], stroke: "red")
|> VegaLite.encode_field(:y, "total_time",
type: :quantitative,
title: "Total Time (μs)",
axis: [title_color: "red"]
)
# chart plotting the average time it takes 1 subscription
average_time =
VegaLite.new()
|> VegaLite.mark(:line, point: [fill: "green"], stroke: "green", axis: [title_color: "green"])
|> VegaLite.encode_field(:y, "average_time",
type: :quantitative,
title: "Average per subscription (μs)",
axis: [title_color: "green"]
)
# create plots
VegaLite.new(width: 700)
|> VegaLite.encode_field(:x, "subscription_count", type: :quantitative)
|> VegaLite.data_from_values(subscribes_per_second)
|> VegaLite.layers([average_time, total_time])
|> VegaLite.resolve(:scale, y: :independent)# chart plotting the average time it takes 1 subscription
average_time =
VegaLite.new()
|> VegaLite.mark(:line, point: [fill: "green"], stroke: "green", axis: [title_color: "green"])
|> VegaLite.encode_field(:y, "rate",
type: :quantitative,
title: "Subscriptions per second",
axis: [title_color: "green"]
)
# create plots
VegaLite.new(width: 700)
|> VegaLite.encode_field(:x, "subscription_count", type: :quantitative)
|> VegaLite.data_from_values(subscribes_per_second)
|> VegaLite.layers([average_time])
|> VegaLite.resolve(:scale, y: :independent)Profiling
require Kino.RPC
node = node_name
Node.set_cookie(node, node_cookie)
Kino.RPC.eval_string(
node,
~S"""
import Helpers
import Examples
subscriber_count = 50000
# create the subscribers
subscribers = do_n_times(fn -> create_subscriber() |> subscribe() end, subscriber_count)
# profile the unsubscribe
:eprof.start_profiling([Process.whereis(EventBroker.Registry)])
# unsubscribe all of them and measure
{μs, results} = measure(fn -> Enum.map(subscribers, &unsubscribe/1) end)
# stop profiling
:eprof.stop_profiling()
# delete the subscribers again
Enum.each(results, &terminate_subscriber/1)
:eprof.analyze()
μs
""",
file: __ENV__.file
)