EventBroker (Anoma v0.29.0)
I am the EventBroker Application Module.
I startup the PubSub system as an own OTP application. Moreover I provide all the API necessary for the use of the system. I contain all public functionality provided by the Broker and the Registry.
Public API
I have the following public functionality:
Summary
Types
I am a filter dependency specification, I am a list of filter specs listed in the order in which the filter agents implementing said specs should be subscribed to one another.
Functions
I am the Event Broker event function.
I am the subscription function.
I am a subscription function specifically for self()
I am the unsubscription function.
I am the unsubscription function specifically for self()
Types
filter_spec_list()
@type filter_spec_list() :: [struct()]
I am a filter dependency specification, I am a list of filter specs listed in the order in which the filter agents implementing said specs should be subscribed to one another.
Functions
event(event, broker \\ EventBroker.Broker)
@spec event(EventBroker.Event.t(), atom()) :: :ok
I am the Event Broker event function.
I process any incoming events by sending them to all of Broker subscribers
using the send/2
functionality.
subscribe(pid, filter_spec_list, registry \\ EventBroker.Registry)
@spec subscribe(pid(), filter_spec_list(), atom()) :: :ok | String.t()
I am the subscription function.
Given a PID and a list of filter specs, I ensure that all the actors
corresponding to each spec given in the list is launched and subscribed
to each other in the appropriate order - e.g. given [spec1, spec2]
I
ensure that agents implemented spec1
and spec2
are launched and that
the former is subscribed to the top broker, while the latter is subscribed
to the former.
I also do this in a minimal fashion, that is, if some starting subchain of filter spec dependency has already been registered, I launch the minimal chain remaining to build the full dependency.
Note that each filter actor is hence not only determined by the filtering
functionality it has but also on the chain of dependencies it is spawned
with. Hence [spec1, spec2]
corresponds to an agent different from
[spec2]
.
Afterwards, I subscribe the given PID to the final filter agent in the dependency chain and register all the new agents in the map by putting the filter agent PIDs as values to their dependency filter specification.
If I notice that any of the filter structures do not have an appropriate public filter functionality exposed, I return the list of such modules back to the user and do nothing with respect to agent-spawning or registration.
Filter Agent spawning is handled via DynamicSupervisor.
subscribe_me(filter_spec_list, registry \\ EventBroker.Registry)
@spec subscribe_me(filter_spec_list(), atom()) :: :ok | String.t()
I am a subscription function specifically for self()
I call subscribe/2
where the first argument is self()
unsubscribe(pid, filter_spec_list, registry \\ EventBroker.Registry)
@spec unsubscribe(pid(), filter_spec_list(), atom()) :: :ok
I am the unsubscription function.
Given a PID and a list of filter specs, I get the PID of the related filter agent by looking in my state, then ask it to unsubscribe the given PID from it. In case the agent has other subscribers, I return the base state.
Otherwise, it will shut down, prompting to recursively send unsubscription requests to its parent filters and processing their termination appropriately in a synchronous manner.
After all the requests have been sent and terminations recorded, I remove
all agents which have shut down from my registry map and return :ok
unsubscribe_me(filter_spec_list, registry \\ EventBroker.Registry)
@spec unsubscribe_me(filter_spec_list(), atom()) :: :ok
I am the unsubscription function specifically for self()
I call unsubscribe/2
where the first argument is self()