EventBroker (Anoma v0.31.0)

I am the EventBroker Application Module.

I startup the PubSub system as an own OTP application. Moreover I provide all the API necessary for the use of the system. I contain all public functionality provided by the Broker and the Registry.

Public API

I have the following public functionality:

Overview

The EventBroker manages subscriptions for processes that are interested in a particular type of message.

Filters

Messages are defined as "filter specs" that define patterns an event must match in order for them to be sent. The filter below will subscribe to all events that are are maps with a key :type that has value :error.

deffilter MyFilter do
  %{type: :error} -> true
  _ -> false
end

Filters can be composed by creating a "filter spec list". If a process is interested in all MyFilter events, but only those whose message field is nil, a second filter can be used to refine the filter.

deffilter MyFilterNil do
  %{message: nil} -> true
  _ -> false
end

Subscribing to events

To subscribe to all messages that match the MyFilter the subscribing process must call &EventBroker.subscribe/1 with the filter spec list as argument. E.g., EventBroker.subscribe([MyFilter]). To only subscribe to events that also have nil for the :message value, the filter spec list should include the MyFilterNil filter. E.g., EventBroker.subscribe([MyFilter, MyFilterNil]).

If a particular filter is no longer of intrest, EventBroker.unsubscribe/1 can be used to unsubscribe from a filter.

Sending events

Any process can generate an event using the &EventBroker.event/1 function.

EventBroker.event(%{type: :error, message: "something happened"})

Summary

Types

I am a filter dependency specification, I am a list of filter specs listed in the order in which the filter agents implementing said specs should be subscribed to one another.

Functions

I am the Event Broker event function.

I return the filter specs for the current process. I return a list of filterspeclists.

I am a subscription function specifically for self()

I return the filter specs for the given process id. I return a list of filterspeclists.

I am the unsubscription function specifically for self()

Types

Link to this type

filter_spec_list()

@type filter_spec_list() :: [struct()]

I am a filter dependency specification, I am a list of filter specs listed in the order in which the filter agents implementing said specs should be subscribed to one another.

Functions

Link to this function

event(event, broker \\ EventBroker.Broker)

@spec event(EventBroker.Event.t(), atom()) :: :ok

I am the Event Broker event function.

I process any incoming events by sending them to all of Broker subscribers using the send/2 functionality.

Link to this function

my_subscriptions()

@spec my_subscriptions() :: [filter_spec_list()]

I return the filter specs for the current process. I return a list of filterspeclists.

Link to this function

subscribe(pid, filter_spec_list, registry \\ EventBroker.Registry)

@spec subscribe(pid(), filter_spec_list(), atom()) :: :ok | String.t()

I am the subscription function.

Given a PID and a list of filter specs, I ensure that all the actors corresponding to each spec given in the list is launched and subscribed to each other in the appropriate order - e.g. given [spec1, spec2] I ensure that agents implemented spec1 and spec2 are launched and that the former is subscribed to the top broker, while the latter is subscribed to the former.

I also do this in a minimal fashion, that is, if some starting subchain of filter spec dependency has already been registered, I launch the minimal chain remaining to build the full dependency.

Note that each filter actor is hence not only determined by the filtering functionality it has but also on the chain of dependencies it is spawned with. Hence [spec1, spec2] corresponds to an agent different from [spec2].

Afterwards, I subscribe the given PID to the final filter agent in the dependency chain and register all the new agents in the map by putting the filter agent PIDs as values to their dependency filter specification.

If I notice that any of the filter structures do not have an appropriate public filter functionality exposed, I return the list of such modules back to the user and do nothing with respect to agent-spawning or registration.

Filter Agent spawning is handled via DynamicSupervisor.

Link to this function

subscribe_me(filter_spec_list, registry \\ EventBroker.Registry)

@spec subscribe_me(filter_spec_list(), atom()) :: :ok | String.t()

I am a subscription function specifically for self()

I call subscribe/2 where the first argument is self()

Link to this function

subscriptions(pid)

@spec subscriptions(pid()) :: [filter_spec_list()]

I return the filter specs for the given process id. I return a list of filterspeclists.

Link to this function

unsubscribe(pid, filter_spec_list, registry \\ EventBroker.Registry)

@spec unsubscribe(pid(), filter_spec_list(), atom()) :: :ok

I am the unsubscription function.

Given a PID and a list of filter specs, I get the PID of the related filter agent by looking in my state, then ask it to unsubscribe the given PID from it. In case the agent has other subscribers, I return the base state.

Otherwise, it will shut down, prompting to recursively send unsubscription requests to its parent filters and processing their termination appropriately in a synchronous manner.

After all the requests have been sent and terminations recorded, I remove all agents which have shut down from my registry map and return :ok

Link to this function

unsubscribe_me(filter_spec_list, registry \\ EventBroker.Registry)

@spec unsubscribe_me(filter_spec_list(), atom()) :: :ok

I am the unsubscription function specifically for self()

I call unsubscribe/2 where the first argument is self()