class Onyx::EDA::Channel::Redis

Overview

A Redis channel. All subscribers to the same Redis instance receive notifications about events emitted within this channel, which leads to an easy distribution.

NOTE It relies on Redis streams feature, which requires Redis version >= 5!

In Onyx::EDA events are delivered unreliably and in real-time, which means that fresh subscribers do not have access to recent events, only to the future ones. That's why consumption is implemented with locks instead of consumer groups.

All events are serialized with MessagePack.

# Process #1
require "onyx-eda/channel/redis"

record MyEvent, payload : String do
  include Onyx::EDA::Event
end

channel = Onyx::EDA::Channel::Redis.new("redis://localhost:6379")
channel.emit(MyEvent.new("foo"))
# Process #2
require "onyx-eda/channel/redis"

record MyEvent, payload : String do
  include Onyx::EDA::Event
end

channel = Onyx::EDA::Channel::Redis.new("redis://localhost:6379")
channel.subscribe(MyEvent) do |event|
  puts event.payload
  exit
end

sleep

Defined in:

onyx-eda/channel/redis.cr

Constructors

Instance Method Summary

Instance methods inherited from class Onyx::EDA::Channel

await(event : T.class, **filter, &block : T -> U) : U forall T, U
await(event, **filter)
await
, emit(events : Enumerable) : Enumerable
emit(event : T) : T forall T
emit(*events) : Enumerable
emit
, subscribe(event : T.class, consumer_id : String, &block : T -> UNDERSCORE) : Onyx::EDA::Channel::Subscription(T) forall T
subscribe(event : T.class, **filter, &block : T -> UNDERSCORE) : Onyx::EDA::Channel::Subscription(T) forall T
subscribe
, unsubscribe(subscription : Subscription) : Bool unsubscribe

Constructor Detail

def self.new(redis : MiniRedis, sidekick : MiniRedis, namespace : String = "onyx-eda") #

Explicitly initialize with two MiniRedis instances (one would block-read and another would issue commands) and Redis namespace.


[View source]
def self.new(uri : URI, namespace : String = "onyx-eda", *args, **nargs) #

Initialize with Redis uri and Redis namespace. args and nargs are passed directly to a MiniRedis instance.


[View source]
def self.new(uri : String, namespace : String = "onyx-eda", *args, **nargs) #

Initialize with Redis uri and Redis namespace. args and nargs are passed directly to a MiniRedis instance.


[View source]

Instance Method Detail

def emit(events : Enumerable(T), redis : MiniRedis = @sidekick) : Enumerable(T) forall T #

Emit events, sending them to an appropriate stream. See Channel#emit. The underlying XADD command has MAXLEN ~ 1000 option.

This method blocks until all subscribers to this event read it from the stream.

TODO Allow to change MAXLEN.


[View source]
def emit(event : T) : T forall T #

[View source]
def emit(*events : *T) : Enumerable forall T #

Emit events, sending them to an appropriate stream. See Channel#emit. The underlying XADD command has MAXLEN ~ 1000 option.

This method blocks until all subscribers to this event read it from the stream.

TODO Allow to change MAXLEN.


[View source]
def subscribe(event : T.class, consumer_id : String, &block : T -> UNDERSCORE) : Subscription forall T #

Begin consuming an event reading from its stream. It is guaranteed that only a single consuming subscription with given id accross the whole application would be notified about an event.

But such notifications are non-reliable, i.e. a single consumer could crash during event handling, meaning that this event would not be handled properly. If you need reliability, use a background job processing istead, for example, Worcr.

See Channel#subscribe(event, consumer_id, &block).


[View source]
def subscribe(event : T.class, **filter, &block : T -> UNDERSCORE) : Subscription forall T #

Subscribe to an event reading from its stream. You should yield the control to actually start reading. See Channel#subscribe(event, **filter, &block).


[View source]
def unsubscribe(subscription : Subscription) : Bool #

[View source]