abstract class Onyx::EDA::Channel

Overview

An abstract event channel. It implements basic logic used in other channels.

Direct Known Subclasses

Defined in:

onyx-eda/channel/subscription/inactive_error.cr
onyx-eda/channel/subscription.cr
onyx-eda/channel/duplicate_consumer_error.cr
onyx-eda/channel.cr

Instance Method Summary

Instance Method Detail

def await(event : T.class, **filter, &block : T -> U) : U forall T, U #

Wait for an event to happen, returning the block execution result. An event can be filtered by its getters.

It is a blocking method.

record MyEvent, payload : String do
  include Onyx::EDA::Event
end

# Will block the execution unless MyEvent is received with "foo" payload
payload = channel.await(MyEvent, payload: "foo") do |event|
  event.payload
end

# In another fiber...
channel.emit(MyEvent.new("foo"))

This method can be used within the select block. It works better with the timer.cr shard.

select
when payload = channel.await(MyEvent, &.payload)
  puts payload
when Timer.new(30.seconds)
  raise "Timeout!"
end

[View source]
def await(event, **filter) #

The same as block-version, but returns an event instance itself.

event = channel.await(MyEvent)

This method can be used within the select block. It works better with the timer.cr shard:

select
when event = channel.await(MyEvent)
  puts event.payload
when Timer.new(30.seconds)
  raise "Timeout!"
end

[View source]
abstract def emit(events : Enumerable) : Enumerable #

Emit events returning themselves. This method usually blocks until all events are delivered, but the subscription block calls happen asynchronously.


[View source]
abstract def emit(event : T) : T forall T #

Emit event returning itself. This method usually blocks until the event is delivered, but the subscription block calls happen asynchronously.


[View source]
abstract def emit(*events) : Enumerable #

Emit events returning themselves. This method usually blocks until all events are delivered, but the subscription block calls happen asynchronously.


[View source]
abstract def subscribe(event : T.class, consumer_id : String, &block : T -> UNDERSCORE) : Onyx::EDA::Channel::Subscription(T) forall T #

Begin consuming an event. Consumption differs from subscription in a way that only a single consuming subscription instance with certain consumer_id among all this channel subscribers would be notified about an event after it successfully acquires a lock. The lock implementation differs in channels.

Returns a Subscription instance. May raise DuplicateConsumerError if a duplicate consumer ID found for this event in this very process.

This is a non-blocking method, as it spawns a subscription fiber.

record MyEvent, payload : String do
  include Onyx::EDA::Event
end

channel = Onyx::EDA::Channel::Redis.new

sub = channel.subscribe(MyEvent, "MyConsumer") do |event|
  puts event.payload
end

Launch two subscribing processes, then emit an event in another process:

# Only one consumer of the two above will be notified
channel.emit(MyEvent.new("foo"))

See Consumer for an includable consumption module.


[View source]
abstract def subscribe(event : T.class, **filter, &block : T -> UNDERSCORE) : Onyx::EDA::Channel::Subscription(T) forall T #

Subscribe to an event. Returns a Subscription instance, which can be cancelled. Every subscription instance gets notified about an #emitted event.

This is a non-blocking method, as it spawns a subscription fiber.

record MyEvent, payload : String do
  include Onyx::EDA::Event
end

sub = channel.subscribe(MyEvent) do |event|
  puts event.payload
end

channel.emit(MyEvent.new("foo"))

# Need to yield the control
sleep(0.1)

# Can cancel afterwards
sub.unsubscribe

You can filter the events by their getters, for example:

channel.subscribe(MyEvent, payload: "bar") do |event|
  puts event.payload # Would only output events with "bar" payload
end

channel.emit(MyEvent.new("foo")) # Would not trigger the above subscription

See Subscriber for an includable subscribing module.


[View source]
abstract def unsubscribe(subscription : Subscription) : Bool #

Cancel a subscription. Returns a boolean value indicating whether was it successufully cancelled or not (for instance, it may be already cancelled, returning false).


[View source]