Anoma.Node.Transaction.Storage (Anoma v0.29.0)

I am the Storage Engine.

I represent a timestamped table-backed key-value store. I provide the API to read and write at specific heights and keep track of what the last timestamp is.

Any time before the next timestamp or at the timestamp itself is considered the past. Anyting else is considered the future.

The semantics for reads and writes are as follows:

Writing

If an actor wants to write at time T which is structly larger than the last timestamp I keep track of, they need to wait until my latest timestamp becomes T-1. In other words, you only write when your time comes.

If an actor writes at time T when my timestamp is T-1, then they can freely write whatever they need and set my latest timestamp to T.

Reading

If an actor reads at time T which is strictly larger than the last timestamp I have, they need to wait until my latest timestamp becomes T. In other words to read in the future, you need the future to come.

If and actor reads key K at time T which is less than or equal to my last timestamp, then there are two cases.

  • K was never written to have any value.
  • K was written to have some values at times T1 < T2 ... < Tn

In the former case, I return :absent. In the latter case, I return the value of K at time Tn.

Public API

I provide the following public functionality:

Summary

Types

I am the type of the key to be stored.

I am the type of the key at a specific timestamp.

t()

I am the type of a Storage Engine.

I am a type of writing options.

Functions

I am the Storage add function.

I am the Storage append function.

I am a block table name function.

Returns a specification to start this module under a supervisor.

I am the Storage commit function.

I am the height filter.

I am the initialization function for the Storage Engine.

I am the Storage read function.

I am the start_link function of the Storage Engine.

I am an updates table name function.

I am a values table name function.

I am the Storage write function.

Types

@type bare_key() :: [String.t()]

I am the type of the key to be stored.

Link to this type

qualified_key()

@type qualified_key() :: {integer(), bare_key()}

I am the type of the key at a specific timestamp.

@type t() :: %Anoma.Node.Transaction.Storage{
  node_id: String.t(),
  uncommitted: %{required(qualified_key()) => term()},
  uncommitted_height: non_neg_integer(),
  uncommitted_updates: %{required(bare_key()) => [integer()]}
}

I am the type of a Storage Engine.

I store all in-progress information for matching keys and values to specific timestamps.

Fields

  • :node_id - The ID of the Node to which a Storage instantiation is

             bound.
  • :uncommitted - The map of keys at a specific height to its value.

                 Default: %{}
  • :uncommitted_height - The latest timestamp of Storage.

                        Default: 0
  • :uncommitted_updates - The map mapping a key to a list of all

                         timestamps at which it was updated. Reverse
                         ordered.
                         Default: %{}
Link to this type

write_opts()

@type write_opts() :: :append | :write | :add

I am a type of writing options.

Functions

Link to this function

add(node_id, args)

@spec add(
  String.t(),
  {non_neg_integer(),
   %{write: [{bare_key(), any()}], append: [{bare_key(), MapSet.t()}]}}
) :: term()

I am the Storage add function.

I provide functionality to write and append at the same height. I am provided a map of things to write and things to append. I write the list of key-values and append the list of key-sets provided all at the same height.

To check my semantics, see write/2 and append/2.

Link to this function

append(node_id, arg)

@spec append(
  String.t(),
  {non_neg_integer(), [{bare_key(), MapSet.t()}]}
) :: :ok

I am the Storage append function.

I am a Storage-level abstraction for rewriting set values by appending new values to them.

I am given a node ID alongside a height T and a list of key-value pairs where all values are expected to be sets.

If T is larger than the uncommitted height plus one, I block the caller to wait until T is exactly one above the current height in the storage. That is, I block the caller until it is their time to write.

When that time comes, I first read the most recent values of the keys supplied. If none are, I produce an empty set. Afterwards, I go through the list of key-values, add the height to the list which records all height at which those keys have been updated, then map the tuple of {T, key} to the union of their most recent value and the new value, therefore appendin to the set.

Once that is done, I sent a write event specifying what has been written and at white height, while setting the uncommitted height to T.

Link to this function

blocks_table(node_id)

@spec blocks_table(String.t()) :: atom()

I am a block table name function.

Given a Node ID, I produce the name of the appropriate block table connected to it.

Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

commit(node_id, block_round, writes)

@spec commit(
  String.t(),
  non_neg_integer(),
  [Anoma.Node.Transaction.Mempool.Tx.t()] | nil
) :: :ok

I am the Storage commit function.

I am called when an actor with Mempool functionality decides that a block round has been complitted, prompting the commitment of the in-progress storage to appropriate table and the creation of a block with supplied content.

Link to this function

height_filter(height)

I am the height filter.

Given a height, I provide a filter for for messages of a particular height.

I am the initialization function for the Storage Engine.

From the specified arguments, I get the node ID, the uncommitted height and the option to make the table Storage uses rocks-backed.

Given a rocks flag, launch rocks-backed tables, if not, launch usual mnesia tables.

Afterwards I launch the Storage engine with given arguments.

Link to this function

read(node_id, arg)

@spec read(
  String.t(),
  {non_neg_integer(), bare_key()}
) :: :absent | any()

I am the Storage read function.

I provide functionality to read a specific key at a specific height T.

If the height provided is higher than the height of the Storage, I block the actor using me and wait for the height to become the one in the Storage, then read the key at T.

To do that, I see whether the key has been updated in the state. If so, I get the value of that key at the most recent height it was updated, i.e. at the value closest to T.

If not, then it might have been committed in the past and I do the same procedure yet in the corresponding mnesia tables.

If nothing is found, I return :absent

Link to this function

start_link(args \\ [])

@spec start_link([startup_options()]) :: GenServer.on_start()

I am the start_link function of the Storage Engine.

I register the enfine with supplied node ID provided by the arguments and check that the uncommitted height has been supplied.

Link to this function

updates_table(node_id)

@spec updates_table(String.t()) :: atom()

I am an updates table name function.

Given a Node ID, I produce the name of the appropriate updates table connected to it.

Link to this function

values_table(node_id)

@spec values_table(String.t()) :: atom()

I am a values table name function.

Given a Node ID, I produce the name of the appropriate values table connected to it.

Link to this function

write(node_id, arg)

@spec write(
  String.t(),
  {non_neg_integer(), [{bare_key(), any()}]}
) :: :ok

I am the Storage write function.

I am given a node ID alongside a height T and a list of key-value pairs.

If T is larger than the uncommitted height plus one, I block the caller to wait until T is exactly one above the current height in the storage. That is, I block the caller until it is their time to write.

When that time comes, I go through the list of key-values, add the height to the list which records all height at which those keys have been updated, then map the tuple of {T, key} to the new value and store it in the state.

Once that is done, I sent a write event specifying what has been written and at white height, while setting the uncommitted height to T.